1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::{CriticalReaderId, Opaque};
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75 env!("OUT_DIR"),
76 "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80 env!("OUT_DIR"),
81 "/mz_persist_client.internal.diff.rs"
82));
83
84pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92 "persist_rollup_threshold",
93 128,
94 "The number of seqnos between rollups.",
95);
96
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100 "persist_rollup_fallback_threshold_ms",
101 5000,
102 "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108 "persist_rollup_use_active_rollup",
109 true,
110 "Whether to use the new active rollup tracking mechanism.",
111);
112
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116 "persist_gc_fallback_threshold_ms",
117 900000,
118 "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123 "persist_gc_min_versions",
124 32,
125 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130 "persist_gc_max_versions",
131 128_000,
132 "The maximum number of versions to GC in a single GC run.",
133);
134
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138 "persist_gc_use_active_gc",
139 false,
140 "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144 "persist_enable_incremental_compaction",
145 false,
146 "Whether to enable incremental compaction.",
147);
148
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 write!(f, "i{}", Uuid::from_bytes(self.0))
158 }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164 }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168 type Err = String;
169
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172 }
173}
174
175impl From<IdempotencyToken> for String {
176 fn from(x: IdempotencyToken) -> Self {
177 x.to_string()
178 }
179}
180
181impl IdempotencyToken {
182 pub(crate) fn new() -> Self {
183 IdempotencyToken(*Uuid::new_v4().as_bytes())
184 }
185 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190 pub seqno: SeqNo,
192 pub since: Antichain<T>,
194 pub last_heartbeat_timestamp_ms: u64,
196 pub lease_duration_ms: u64,
199 pub debug: HandleDebugState,
201}
202
203#[derive(Clone, Debug, PartialEq, Serialize)]
204pub struct CriticalReaderState<T> {
205 pub since: Antichain<T>,
207 pub opaque: Opaque,
209 pub debug: HandleDebugState,
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct WriterState<T> {
215 pub last_heartbeat_timestamp_ms: u64,
217 pub lease_duration_ms: u64,
220 pub most_recent_write_token: IdempotencyToken,
223 pub most_recent_write_upper: Antichain<T>,
226 pub debug: HandleDebugState,
228}
229
230#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
232pub struct HandleDebugState {
233 pub hostname: String,
236 pub purpose: String,
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
244#[serde(tag = "type")]
245pub enum BatchPart<T> {
246 Hollow(HollowBatchPart<T>),
247 Inline {
248 updates: LazyInlineBatchPart,
249 ts_rewrite: Option<Antichain<T>>,
250 schema_id: Option<SchemaId>,
251
252 deprecated_schema_id: Option<SchemaId>,
254 },
255}
256
257fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
258 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
259 let proto = lower.decode()?;
260 let data = ArrayData::from_proto(proto)?;
261 ensure!(data.len() == 1);
262 Ok(ArrayBound::new(make_array(data), 0))
263 };
264
265 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
266
267 match decoded {
268 Ok(bound) => Some(bound),
269 Err(e) => {
270 soft_panic_or_log!("failed to decode bound: {e:#?}");
271 None
272 }
273 }
274}
275
276impl<T> BatchPart<T> {
277 pub fn hollow_bytes(&self) -> usize {
278 match self {
279 BatchPart::Hollow(x) => x.encoded_size_bytes,
280 BatchPart::Inline { .. } => 0,
281 }
282 }
283
284 pub fn is_inline(&self) -> bool {
285 matches!(self, BatchPart::Inline { .. })
286 }
287
288 pub fn inline_bytes(&self) -> usize {
289 match self {
290 BatchPart::Hollow(_) => 0,
291 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
292 }
293 }
294
295 pub fn writer_key(&self) -> Option<WriterKey> {
296 match self {
297 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
298 BatchPart::Inline { .. } => None,
299 }
300 }
301
302 pub fn encoded_size_bytes(&self) -> usize {
303 match self {
304 BatchPart::Hollow(x) => x.encoded_size_bytes,
305 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
306 }
307 }
308
309 pub fn printable_name(&self) -> &str {
312 match self {
313 BatchPart::Hollow(x) => x.key.0.as_str(),
314 BatchPart::Inline { .. } => "<inline>",
315 }
316 }
317
318 pub fn stats(&self) -> Option<&LazyPartStats> {
319 match self {
320 BatchPart::Hollow(x) => x.stats.as_ref(),
321 BatchPart::Inline { .. } => None,
322 }
323 }
324
325 pub fn key_lower(&self) -> &[u8] {
326 match self {
327 BatchPart::Hollow(x) => x.key_lower.as_slice(),
328 BatchPart::Inline { .. } => &[],
335 }
336 }
337
338 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
339 let part = match self {
340 BatchPart::Hollow(part) => part,
341 BatchPart::Inline { .. } => return None,
342 };
343
344 decode_structured_lower(part.structured_key_lower.as_ref()?)
345 }
346
347 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
348 match self {
349 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
350 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
351 }
352 }
353
354 pub fn schema_id(&self) -> Option<SchemaId> {
355 match self {
356 BatchPart::Hollow(x) => x.schema_id,
357 BatchPart::Inline { schema_id, .. } => *schema_id,
358 }
359 }
360
361 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
362 match self {
363 BatchPart::Hollow(x) => x.deprecated_schema_id,
364 BatchPart::Inline {
365 deprecated_schema_id,
366 ..
367 } => *deprecated_schema_id,
368 }
369 }
370}
371
372impl<T: Timestamp + Codec64> BatchPart<T> {
373 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
374 match self {
375 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
376 BatchPart::Inline { updates, .. } => {
377 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
378 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
379 }
380 }
381 }
382
383 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
384 match self {
385 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
386 BatchPart::Inline { updates, .. } => Some(
387 updates
388 .decode::<T>(metrics)
389 .expect("valid inline part")
390 .updates
391 .diffs_sum(),
392 ),
393 }
394 }
395}
396
397#[derive(Debug, Clone)]
399pub struct HollowRun<T> {
400 pub(crate) parts: Vec<RunPart<T>>,
402}
403
404#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
407pub struct HollowRunRef<T> {
408 pub key: PartialBatchKey,
409
410 pub hollow_bytes: usize,
412
413 pub max_part_bytes: usize,
415
416 pub key_lower: Vec<u8>,
418
419 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
421
422 pub diffs_sum: Option<[u8; 8]>,
423
424 pub(crate) _phantom_data: PhantomData<T>,
425}
426impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
427 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428 Some(self.cmp(other))
429 }
430}
431
432impl<T: Eq> Ord for HollowRunRef<T> {
433 fn cmp(&self, other: &Self) -> Ordering {
434 self.key.cmp(&other.key)
435 }
436}
437
438impl<T> HollowRunRef<T> {
439 pub fn writer_key(&self) -> Option<WriterKey> {
440 Some(self.key.split()?.0)
441 }
442}
443
444impl<T: Timestamp + Codec64> HollowRunRef<T> {
445 pub async fn set<D: Codec64 + Monoid>(
447 shard_id: ShardId,
448 blob: &dyn Blob,
449 writer: &WriterKey,
450 data: HollowRun<T>,
451 metrics: &Metrics,
452 ) -> Self {
453 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
454 let max_part_bytes = data
455 .parts
456 .iter()
457 .map(|p| p.max_part_bytes())
458 .max()
459 .unwrap_or(0);
460 let key_lower = data
461 .parts
462 .first()
463 .map_or(vec![], |p| p.key_lower().to_vec());
464 let structured_key_lower = match data.parts.first() {
465 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
466 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
467 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
468 };
469 let diffs_sum = data
470 .parts
471 .iter()
472 .map(|p| {
473 p.diffs_sum::<D>(&metrics.columnar)
474 .expect("valid diffs sum")
475 })
476 .reduce(|mut a, b| {
477 a.plus_equals(&b);
478 a
479 })
480 .expect("valid diffs sum")
481 .encode();
482
483 let key = PartialBatchKey::new(writer, &PartId::new());
484 let blob_key = key.complete(&shard_id);
485 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
486 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
487 blob.set(&blob_key, bytes.clone())
488 })
489 .await;
490 Self {
491 key,
492 hollow_bytes,
493 max_part_bytes,
494 key_lower,
495 structured_key_lower,
496 diffs_sum: Some(diffs_sum),
497 _phantom_data: Default::default(),
498 }
499 }
500
501 pub async fn get(
505 &self,
506 shard_id: ShardId,
507 blob: &dyn Blob,
508 metrics: &Metrics,
509 ) -> Option<HollowRun<T>> {
510 let blob_key = self.key.complete(&shard_id);
511 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
512 blob.get(&blob_key)
513 })
514 .await?;
515 let proto_runs: ProtoHollowRun =
516 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
517 let runs = proto_runs
518 .into_rust()
519 .expect("illegal state: invalid encoded runs proto");
520 Some(runs)
521 }
522}
523
524#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
528#[serde(untagged)]
529pub enum RunPart<T> {
530 Single(BatchPart<T>),
531 Many(HollowRunRef<T>),
532}
533
534impl<T: Ord> PartialOrd<Self> for RunPart<T> {
535 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
536 Some(self.cmp(other))
537 }
538}
539
540impl<T: Ord> Ord for RunPart<T> {
541 fn cmp(&self, other: &Self) -> Ordering {
542 match (self, other) {
543 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
544 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
545 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
546 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
547 }
548 }
549}
550
551impl<T> RunPart<T> {
552 #[cfg(test)]
553 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
554 match self {
555 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
556 _ => panic!("expected hollow part!"),
557 }
558 }
559
560 pub fn hollow_bytes(&self) -> usize {
561 match self {
562 Self::Single(p) => p.hollow_bytes(),
563 Self::Many(r) => r.hollow_bytes,
564 }
565 }
566
567 pub fn is_inline(&self) -> bool {
568 match self {
569 Self::Single(p) => p.is_inline(),
570 Self::Many(_) => false,
571 }
572 }
573
574 pub fn inline_bytes(&self) -> usize {
575 match self {
576 Self::Single(p) => p.inline_bytes(),
577 Self::Many(_) => 0,
578 }
579 }
580
581 pub fn max_part_bytes(&self) -> usize {
582 match self {
583 Self::Single(p) => p.encoded_size_bytes(),
584 Self::Many(r) => r.max_part_bytes,
585 }
586 }
587
588 pub fn writer_key(&self) -> Option<WriterKey> {
589 match self {
590 Self::Single(p) => p.writer_key(),
591 Self::Many(r) => r.writer_key(),
592 }
593 }
594
595 pub fn encoded_size_bytes(&self) -> usize {
596 match self {
597 Self::Single(p) => p.encoded_size_bytes(),
598 Self::Many(r) => r.hollow_bytes,
599 }
600 }
601
602 pub fn schema_id(&self) -> Option<SchemaId> {
603 match self {
604 Self::Single(p) => p.schema_id(),
605 Self::Many(_) => None,
606 }
607 }
608
609 pub fn printable_name(&self) -> &str {
612 match self {
613 Self::Single(p) => p.printable_name(),
614 Self::Many(r) => r.key.0.as_str(),
615 }
616 }
617
618 pub fn stats(&self) -> Option<&LazyPartStats> {
619 match self {
620 Self::Single(p) => p.stats(),
621 Self::Many(_) => None,
623 }
624 }
625
626 pub fn key_lower(&self) -> &[u8] {
627 match self {
628 Self::Single(p) => p.key_lower(),
629 Self::Many(r) => r.key_lower.as_slice(),
630 }
631 }
632
633 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
634 match self {
635 Self::Single(p) => p.structured_key_lower(),
636 Self::Many(_) => None,
637 }
638 }
639
640 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
641 match self {
642 Self::Single(p) => p.ts_rewrite(),
643 Self::Many(_) => None,
644 }
645 }
646}
647
648impl<T> RunPart<T>
649where
650 T: Timestamp + Codec64,
651{
652 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
653 match self {
654 Self::Single(p) => p.diffs_sum(metrics),
655 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
656 }
657 }
658}
659
660#[derive(Clone, Debug)]
662pub struct MissingBlob(BlobKey);
663
664impl std::fmt::Display for MissingBlob {
665 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666 write!(f, "unexpectedly missing key: {}", self.0)
667 }
668}
669
670impl std::error::Error for MissingBlob {}
671
672impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
673 pub fn part_stream<'a>(
674 &'a self,
675 shard_id: ShardId,
676 blob: &'a dyn Blob,
677 metrics: &'a Metrics,
678 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
679 try_stream! {
680 match self {
681 RunPart::Single(p) => {
682 yield Cow::Borrowed(p);
683 }
684 RunPart::Many(r) => {
685 let fetched = r.get(shard_id, blob, metrics).await
686 .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
687 for run_part in fetched.parts {
688 for await batch_part in
689 run_part.part_stream(shard_id, blob, metrics).boxed()
690 {
691 yield Cow::Owned(batch_part?.into_owned());
692 }
693 }
694 }
695 }
696 }
697 }
698}
699
700impl<T: Ord> PartialOrd for BatchPart<T> {
701 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
702 Some(self.cmp(other))
703 }
704}
705
706impl<T: Ord> Ord for BatchPart<T> {
707 fn cmp(&self, other: &Self) -> Ordering {
708 match (self, other) {
709 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
710 (
711 BatchPart::Inline {
712 updates: s_updates,
713 ts_rewrite: s_ts_rewrite,
714 schema_id: s_schema_id,
715 deprecated_schema_id: s_deprecated_schema_id,
716 },
717 BatchPart::Inline {
718 updates: o_updates,
719 ts_rewrite: o_ts_rewrite,
720 schema_id: o_schema_id,
721 deprecated_schema_id: o_deprecated_schema_id,
722 },
723 ) => (
724 s_updates,
725 s_ts_rewrite.as_ref().map(|x| x.elements()),
726 s_schema_id,
727 s_deprecated_schema_id,
728 )
729 .cmp(&(
730 o_updates,
731 o_ts_rewrite.as_ref().map(|x| x.elements()),
732 o_schema_id,
733 o_deprecated_schema_id,
734 )),
735 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
736 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
737 }
738 }
739}
740
741#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
743pub(crate) enum RunOrder {
744 Unordered,
746 Codec,
748 Structured,
750}
751
752#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
753pub struct RunId(pub(crate) [u8; 16]);
754
755impl std::fmt::Display for RunId {
756 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
757 write!(f, "ri{}", Uuid::from_bytes(self.0))
758 }
759}
760
761impl std::fmt::Debug for RunId {
762 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
763 write!(f, "RunId({})", Uuid::from_bytes(self.0))
764 }
765}
766
767impl std::str::FromStr for RunId {
768 type Err = String;
769
770 fn from_str(s: &str) -> Result<Self, Self::Err> {
771 parse_id("ri", "RunId", s).map(RunId)
772 }
773}
774
775impl From<RunId> for String {
776 fn from(x: RunId) -> Self {
777 x.to_string()
778 }
779}
780
781impl RunId {
782 pub(crate) fn new() -> Self {
783 RunId(*Uuid::new_v4().as_bytes())
784 }
785}
786
787impl Arbitrary for RunId {
788 type Parameters = ();
789 type Strategy = proptest::strategy::BoxedStrategy<Self>;
790 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
791 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
792 RunId(*Uuid::from_u128(n).as_bytes())
793 })
794 .boxed()
795 }
796}
797
798#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
800pub struct RunMeta {
801 pub(crate) order: Option<RunOrder>,
803 pub(crate) schema: Option<SchemaId>,
805
806 pub(crate) deprecated_schema: Option<SchemaId>,
808
809 pub(crate) id: Option<RunId>,
811
812 pub(crate) len: Option<usize>,
814
815 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
817 pub(crate) meta: MetadataMap,
818}
819
820#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823 pub key: PartialBatchKey,
825 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
827 pub meta: MetadataMap,
828 pub encoded_size_bytes: usize,
830 #[serde(serialize_with = "serialize_part_bytes")]
833 pub key_lower: Vec<u8>,
834 #[serde(serialize_with = "serialize_lazy_proto")]
836 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837 #[serde(serialize_with = "serialize_part_stats")]
839 pub stats: Option<LazyPartStats>,
840 pub ts_rewrite: Option<Antichain<T>>,
848 #[serde(serialize_with = "serialize_diffs_sum")]
856 pub diffs_sum: Option<[u8; 8]>,
857 pub format: Option<BatchColumnarFormat>,
862 pub schema_id: Option<SchemaId>,
867
868 pub deprecated_schema_id: Option<SchemaId>,
870}
871
872#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877 pub desc: Description<T>,
879 pub len: usize,
881 pub(crate) parts: Vec<RunPart<T>>,
883 pub(crate) run_splits: Vec<usize>,
891 pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898 let HollowBatch {
899 desc,
900 parts,
901 len,
902 run_splits: runs,
903 run_meta,
904 } = self;
905 f.debug_struct("HollowBatch")
906 .field(
907 "desc",
908 &(
909 desc.lower().elements(),
910 desc.upper().elements(),
911 desc.since().elements(),
912 ),
913 )
914 .field("parts", &parts)
915 .field("len", &len)
916 .field("runs", &runs)
917 .field("run_meta", &run_meta)
918 .finish()
919 }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924 let HollowBatch {
925 desc,
926 len,
927 parts: _,
929 run_splits: _,
930 run_meta: _,
931 } = self;
932 let mut s = s.serialize_struct("HollowBatch", 5)?;
933 let () = s.serialize_field("lower", &desc.lower().elements())?;
934 let () = s.serialize_field("upper", &desc.upper().elements())?;
935 let () = s.serialize_field("since", &desc.since().elements())?;
936 let () = s.serialize_field("len", len)?;
937 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938 s.end()
939 }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944 Some(self.cmp(other))
945 }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949 fn cmp(&self, other: &Self) -> Ordering {
950 let HollowBatch {
953 desc: self_desc,
954 parts: self_parts,
955 len: self_len,
956 run_splits: self_runs,
957 run_meta: self_run_meta,
958 } = self;
959 let HollowBatch {
960 desc: other_desc,
961 parts: other_parts,
962 len: other_len,
963 run_splits: other_runs,
964 run_meta: other_run_meta,
965 } = other;
966 (
967 self_desc.lower().elements(),
968 self_desc.upper().elements(),
969 self_desc.since().elements(),
970 self_parts,
971 self_len,
972 self_runs,
973 self_run_meta,
974 )
975 .cmp(&(
976 other_desc.lower().elements(),
977 other_desc.upper().elements(),
978 other_desc.since().elements(),
979 other_parts,
980 other_len,
981 other_runs,
982 other_run_meta,
983 ))
984 }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988 pub(crate) fn part_stream<'a>(
989 &'a self,
990 shard_id: ShardId,
991 blob: &'a dyn Blob,
992 metrics: &'a Metrics,
993 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994 stream! {
995 for part in &self.parts {
996 for await part in part.part_stream(shard_id, blob, metrics) {
997 yield part;
998 }
999 }
1000 }
1001 }
1002}
1003impl<T> HollowBatch<T> {
1004 pub(crate) fn new(
1011 desc: Description<T>,
1012 parts: Vec<RunPart<T>>,
1013 len: usize,
1014 run_meta: Vec<RunMeta>,
1015 run_splits: Vec<usize>,
1016 ) -> Self {
1017 debug_assert!(
1018 run_splits.is_strictly_sorted(),
1019 "run indices should be strictly increasing"
1020 );
1021 debug_assert!(
1022 run_splits.first().map_or(true, |i| *i > 0),
1023 "run indices should be positive"
1024 );
1025 debug_assert!(
1026 run_splits.last().map_or(true, |i| *i < parts.len()),
1027 "run indices should be valid indices into parts"
1028 );
1029 debug_assert!(
1030 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031 "all metadata should correspond to a run"
1032 );
1033
1034 Self {
1035 desc,
1036 len,
1037 parts,
1038 run_splits,
1039 run_meta,
1040 }
1041 }
1042
1043 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045 let run_meta = if parts.is_empty() {
1046 vec![]
1047 } else {
1048 vec![RunMeta::default()]
1049 };
1050 Self {
1051 desc,
1052 len,
1053 parts,
1054 run_splits: vec![],
1055 run_meta,
1056 }
1057 }
1058
1059 #[cfg(test)]
1060 pub(crate) fn new_run_for_test(
1061 desc: Description<T>,
1062 parts: Vec<RunPart<T>>,
1063 len: usize,
1064 run_id: RunId,
1065 ) -> Self {
1066 let run_meta = if parts.is_empty() {
1067 vec![]
1068 } else {
1069 let mut meta = RunMeta::default();
1070 meta.id = Some(run_id);
1071 vec![meta]
1072 };
1073 Self {
1074 desc,
1075 len,
1076 parts,
1077 run_splits: vec![],
1078 run_meta,
1079 }
1080 }
1081
1082 pub(crate) fn empty(desc: Description<T>) -> Self {
1084 Self {
1085 desc,
1086 len: 0,
1087 parts: vec![],
1088 run_splits: vec![],
1089 run_meta: vec![],
1090 }
1091 }
1092
1093 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094 let run_ends = self
1095 .run_splits
1096 .iter()
1097 .copied()
1098 .chain(std::iter::once(self.parts.len()));
1099 let run_metas = self.run_meta.iter();
1100 let run_parts = run_ends
1101 .scan(0, |start, end| {
1102 let range = *start..end;
1103 *start = end;
1104 Some(range)
1105 })
1106 .filter(|range| !range.is_empty())
1107 .map(|range| &self.parts[range]);
1108 run_metas.zip_eq(run_parts)
1109 }
1110
1111 pub(crate) fn inline_bytes(&self) -> usize {
1112 self.parts.iter().map(|x| x.inline_bytes()).sum()
1113 }
1114
1115 pub(crate) fn is_empty(&self) -> bool {
1116 self.parts.is_empty()
1117 }
1118
1119 pub(crate) fn part_count(&self) -> usize {
1120 self.parts.len()
1121 }
1122
1123 pub fn encoded_size_bytes(&self) -> usize {
1125 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126 }
1127}
1128
1129impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131 pub(crate) fn rewrite_ts(
1132 &mut self,
1133 frontier: &Antichain<T>,
1134 new_upper: Antichain<T>,
1135 ) -> Result<(), String> {
1136 if !PartialOrder::less_than(frontier, &new_upper) {
1137 return Err(format!(
1138 "rewrite frontier {:?} !< rewrite upper {:?}",
1139 frontier.elements(),
1140 new_upper.elements(),
1141 ));
1142 }
1143 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144 return Err(format!(
1145 "rewrite upper {:?} < batch upper {:?}",
1146 new_upper.elements(),
1147 self.desc.upper().elements(),
1148 ));
1149 }
1150
1151 if PartialOrder::less_than(frontier, self.desc.lower()) {
1154 return Err(format!(
1155 "rewrite frontier {:?} < batch lower {:?}",
1156 frontier.elements(),
1157 self.desc.lower().elements(),
1158 ));
1159 }
1160 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161 return Err(format!(
1162 "batch since {:?} != minimum antichain {:?}",
1163 self.desc.since().elements(),
1164 &[T::minimum()],
1165 ));
1166 }
1167 for part in self.parts.iter() {
1168 let Some(ts_rewrite) = part.ts_rewrite() else {
1169 continue;
1170 };
1171 if PartialOrder::less_than(frontier, ts_rewrite) {
1172 return Err(format!(
1173 "rewrite frontier {:?} < batch rewrite {:?}",
1174 frontier.elements(),
1175 ts_rewrite.elements(),
1176 ));
1177 }
1178 }
1179
1180 self.desc = Description::new(
1181 self.desc.lower().clone(),
1182 new_upper,
1183 self.desc.since().clone(),
1184 );
1185 for part in &mut self.parts {
1186 match part {
1187 RunPart::Single(BatchPart::Hollow(part)) => {
1188 part.ts_rewrite = Some(frontier.clone())
1189 }
1190 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191 *ts_rewrite = Some(frontier.clone())
1192 }
1193 RunPart::Many(runs) => {
1194 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197 }
1198 }
1199 }
1200 Ok(())
1201 }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206 Some(self.cmp(other))
1207 }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211 fn cmp(&self, other: &Self) -> Ordering {
1212 let HollowBatchPart {
1215 key: self_key,
1216 meta: self_meta,
1217 encoded_size_bytes: self_encoded_size_bytes,
1218 key_lower: self_key_lower,
1219 structured_key_lower: self_structured_key_lower,
1220 stats: self_stats,
1221 ts_rewrite: self_ts_rewrite,
1222 diffs_sum: self_diffs_sum,
1223 format: self_format,
1224 schema_id: self_schema_id,
1225 deprecated_schema_id: self_deprecated_schema_id,
1226 } = self;
1227 let HollowBatchPart {
1228 key: other_key,
1229 meta: other_meta,
1230 encoded_size_bytes: other_encoded_size_bytes,
1231 key_lower: other_key_lower,
1232 structured_key_lower: other_structured_key_lower,
1233 stats: other_stats,
1234 ts_rewrite: other_ts_rewrite,
1235 diffs_sum: other_diffs_sum,
1236 format: other_format,
1237 schema_id: other_schema_id,
1238 deprecated_schema_id: other_deprecated_schema_id,
1239 } = other;
1240 (
1241 self_key,
1242 self_meta,
1243 self_encoded_size_bytes,
1244 self_key_lower,
1245 self_structured_key_lower,
1246 self_stats,
1247 self_ts_rewrite.as_ref().map(|x| x.elements()),
1248 self_diffs_sum,
1249 self_format,
1250 self_schema_id,
1251 self_deprecated_schema_id,
1252 )
1253 .cmp(&(
1254 other_key,
1255 other_meta,
1256 other_encoded_size_bytes,
1257 other_key_lower,
1258 other_structured_key_lower,
1259 other_stats,
1260 other_ts_rewrite.as_ref().map(|x| x.elements()),
1261 other_diffs_sum,
1262 other_format,
1263 other_schema_id,
1264 other_deprecated_schema_id,
1265 ))
1266 }
1267}
1268
1269#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1271pub struct HollowRollup {
1272 pub key: PartialRollupKey,
1274 pub encoded_size_bytes: Option<usize>,
1276}
1277
1278#[derive(Debug)]
1280pub enum HollowBlobRef<'a, T> {
1281 Batch(&'a HollowBatch<T>),
1282 Rollup(&'a HollowRollup),
1283}
1284
1285#[derive(
1287 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1288)]
1289pub struct ActiveRollup {
1290 pub seqno: SeqNo,
1291 pub start_ms: u64,
1292}
1293
1294#[derive(
1296 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1297)]
1298pub struct ActiveGc {
1299 pub seqno: SeqNo,
1300 pub start_ms: u64,
1301}
1302
1303#[derive(Debug)]
1308#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1309pub struct NoOpStateTransition<T>(pub T);
1310
1311#[derive(Debug, Clone)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct StateCollections<T> {
1315 pub(crate) version: Version,
1319
1320 pub(crate) last_gc_req: SeqNo,
1323
1324 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1326
1327 pub(crate) active_rollup: Option<ActiveRollup>,
1329 pub(crate) active_gc: Option<ActiveGc>,
1331
1332 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1333 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1334 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1335 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1336
1337 pub(crate) trace: Trace<T>,
1342}
1343
1344#[derive(Debug, Clone, Serialize, PartialEq)]
1360pub struct EncodedSchemas {
1361 pub key: Bytes,
1363 pub key_data_type: Bytes,
1366 pub val: Bytes,
1368 pub val_data_type: Bytes,
1371}
1372
1373impl EncodedSchemas {
1374 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1375 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1376 DataType::from_proto(proto).expect("valid DataType")
1377 }
1378}
1379
1380#[derive(Debug)]
1381#[cfg_attr(test, derive(PartialEq))]
1382pub enum CompareAndAppendBreak<T> {
1383 AlreadyCommitted,
1384 Upper {
1385 shard_upper: Antichain<T>,
1386 writer_upper: Antichain<T>,
1387 },
1388 InvalidUsage(InvalidUsage<T>),
1389 InlineBackpressure,
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum SnapshotErr<T> {
1395 AsOfNotYetAvailable(SeqNo, Upper<T>),
1396 AsOfHistoricalDistinctionsLost(Since<T>),
1397}
1398
1399impl<T> StateCollections<T>
1400where
1401 T: Timestamp + Lattice + Codec64,
1402{
1403 pub fn add_rollup(
1404 &mut self,
1405 add_rollup: (SeqNo, &HollowRollup),
1406 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1407 let (rollup_seqno, rollup) = add_rollup;
1408 let applied = match self.rollups.get(&rollup_seqno) {
1409 Some(x) => x.key == rollup.key,
1410 None => {
1411 if let Some(min_kept) = self.rollups.keys().next() {
1432 if rollup_seqno < *min_kept {
1433 return Continue(false);
1434 }
1435 }
1436 self.active_rollup = None;
1437 self.rollups.insert(rollup_seqno, rollup.to_owned());
1438 true
1439 }
1440 };
1441 Continue(applied)
1445 }
1446
1447 pub fn remove_rollups(
1448 &mut self,
1449 remove_rollups: &[(SeqNo, PartialRollupKey)],
1450 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1451 if self.is_tombstone() {
1452 return Break(NoOpStateTransition(vec![]));
1453 }
1454
1455 let active_gc_was_set = self.active_gc.take().is_some();
1458
1459 if remove_rollups.is_empty() {
1460 return if active_gc_was_set {
1461 Continue(vec![])
1462 } else {
1463 Break(NoOpStateTransition(vec![]))
1464 };
1465 }
1466
1467 let mut removed = vec![];
1468 for (seqno, key) in remove_rollups {
1469 let removed_key = self.rollups.remove(seqno);
1470 debug_assert!(
1471 removed_key.as_ref().map_or(true, |x| &x.key == key),
1472 "{} vs {:?}",
1473 key,
1474 removed_key
1475 );
1476
1477 if removed_key.is_some() {
1478 removed.push(*seqno);
1479 }
1480 }
1481
1482 Continue(removed)
1483 }
1484
1485 pub fn register_leased_reader(
1486 &mut self,
1487 hostname: &str,
1488 reader_id: &LeasedReaderId,
1489 purpose: &str,
1490 seqno: SeqNo,
1491 lease_duration: Duration,
1492 heartbeat_timestamp_ms: u64,
1493 use_critical_since: bool,
1494 ) -> ControlFlow<
1495 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1496 (LeasedReaderState<T>, SeqNo),
1497 > {
1498 let since = if use_critical_since {
1499 self.critical_since()
1500 .unwrap_or_else(|| self.trace.since().clone())
1501 } else {
1502 self.trace.since().clone()
1503 };
1504 let reader_state = LeasedReaderState {
1505 debug: HandleDebugState {
1506 hostname: hostname.to_owned(),
1507 purpose: purpose.to_owned(),
1508 },
1509 seqno,
1510 since,
1511 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1512 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1513 .expect("lease duration as millis should fit within u64"),
1514 };
1515
1516 if self.is_tombstone() {
1521 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1522 }
1523
1524 self.leased_readers
1526 .insert(reader_id.clone(), reader_state.clone());
1527 Continue((reader_state, self.seqno_since(seqno)))
1528 }
1529
1530 pub fn register_critical_reader(
1531 &mut self,
1532 hostname: &str,
1533 reader_id: &CriticalReaderId,
1534 opaque: Opaque,
1535 purpose: &str,
1536 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1537 let state = CriticalReaderState {
1538 debug: HandleDebugState {
1539 hostname: hostname.to_owned(),
1540 purpose: purpose.to_owned(),
1541 },
1542 since: self.trace.since().clone(),
1543 opaque,
1544 };
1545
1546 if self.is_tombstone() {
1551 return Break(NoOpStateTransition(state));
1552 }
1553
1554 let state = match self.critical_readers.get_mut(reader_id) {
1555 Some(existing_state) => {
1556 existing_state.debug = state.debug;
1557 existing_state.clone()
1558 }
1559 None => {
1560 self.critical_readers
1561 .insert(reader_id.clone(), state.clone());
1562 state
1563 }
1564 };
1565 Continue(state)
1566 }
1567
1568 pub fn register_schema<K: Codec, V: Codec>(
1569 &mut self,
1570 key_schema: &K::Schema,
1571 val_schema: &V::Schema,
1572 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1573 fn encode_data_type(data_type: &DataType) -> Bytes {
1574 let proto = data_type.into_proto();
1575 prost::Message::encode_to_vec(&proto).into()
1576 }
1577
1578 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1590 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1591 });
1592 match existing_id {
1593 Some((schema_id, _)) => {
1594 Break(NoOpStateTransition(Some(*schema_id)))
1599 }
1600 None if self.is_tombstone() => {
1601 Break(NoOpStateTransition(None))
1603 }
1604 None if self.schemas.is_empty() => {
1605 let id = SchemaId(self.schemas.len());
1609 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1610 .expect("valid key schema");
1611 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1612 .expect("valid val schema");
1613 let prev = self.schemas.insert(
1614 id,
1615 EncodedSchemas {
1616 key: K::encode_schema(key_schema),
1617 key_data_type: encode_data_type(&key_data_type),
1618 val: V::encode_schema(val_schema),
1619 val_data_type: encode_data_type(&val_data_type),
1620 },
1621 );
1622 assert_eq!(prev, None);
1623 Continue(Some(id))
1624 }
1625 None => {
1626 info!(
1627 "register_schemas got {:?} expected {:?}",
1628 key_schema,
1629 self.schemas
1630 .iter()
1631 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1632 .collect::<Vec<_>>()
1633 );
1634 Break(NoOpStateTransition(None))
1637 }
1638 }
1639 }
1640
1641 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1642 &mut self,
1643 expected: SchemaId,
1644 key_schema: &K::Schema,
1645 val_schema: &V::Schema,
1646 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1647 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1648 let array = Schema::encoder(schema).expect("valid schema").finish();
1652 Array::data_type(&array).clone()
1653 }
1654
1655 let (current_id, current) = self
1656 .schemas
1657 .last_key_value()
1658 .expect("all shards have a schema");
1659 if *current_id != expected {
1660 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1661 schema_id: *current_id,
1662 key: K::decode_schema(¤t.key),
1663 val: V::decode_schema(¤t.val),
1664 }));
1665 }
1666
1667 let current_key = K::decode_schema(¤t.key);
1668 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1669 let current_val = V::decode_schema(¤t.val);
1670 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1671
1672 let key_dt = data_type(key_schema);
1673 let val_dt = data_type(val_schema);
1674
1675 if current_key == *key_schema
1677 && current_key_dt == key_dt
1678 && current_val == *val_schema
1679 && current_val_dt == val_dt
1680 {
1681 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1682 }
1683
1684 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1685 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1686 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1687 return Break(NoOpStateTransition(CaESchema::Incompatible));
1688 };
1689 if key_fn.contains_drop() || val_fn.contains_drop() {
1693 return Break(NoOpStateTransition(CaESchema::Incompatible));
1694 }
1695
1696 let id = SchemaId(self.schemas.len());
1700 self.schemas.insert(
1701 id,
1702 EncodedSchemas {
1703 key: K::encode_schema(key_schema),
1704 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1705 val: V::encode_schema(val_schema),
1706 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1707 },
1708 );
1709 Continue(CaESchema::Ok(id))
1710 }
1711
1712 pub fn compare_and_append(
1713 &mut self,
1714 batch: &HollowBatch<T>,
1715 writer_id: &WriterId,
1716 heartbeat_timestamp_ms: u64,
1717 lease_duration_ms: u64,
1718 idempotency_token: &IdempotencyToken,
1719 debug_info: &HandleDebugState,
1720 inline_writes_total_max_bytes: usize,
1721 claim_compaction_percent: usize,
1722 claim_compaction_min_version: Option<&Version>,
1723 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1724 if self.is_tombstone() {
1729 assert_eq!(self.trace.upper(), &Antichain::new());
1730 return Break(CompareAndAppendBreak::Upper {
1731 shard_upper: Antichain::new(),
1732 writer_upper: Antichain::new(),
1737 });
1738 }
1739
1740 let writer_state = self
1741 .writers
1742 .entry(writer_id.clone())
1743 .or_insert_with(|| WriterState {
1744 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1745 lease_duration_ms,
1746 most_recent_write_token: IdempotencyToken::SENTINEL,
1747 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1748 debug: debug_info.clone(),
1749 });
1750
1751 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1752 return Break(CompareAndAppendBreak::InvalidUsage(
1753 InvalidUsage::InvalidBounds {
1754 lower: batch.desc.lower().clone(),
1755 upper: batch.desc.upper().clone(),
1756 },
1757 ));
1758 }
1759
1760 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1763 return Break(CompareAndAppendBreak::InvalidUsage(
1764 InvalidUsage::InvalidEmptyTimeInterval {
1765 lower: batch.desc.lower().clone(),
1766 upper: batch.desc.upper().clone(),
1767 keys: batch
1768 .parts
1769 .iter()
1770 .map(|x| x.printable_name().to_owned())
1771 .collect(),
1772 },
1773 ));
1774 }
1775
1776 if idempotency_token == &writer_state.most_recent_write_token {
1777 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1782 assert!(
1783 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1784 "{:?} vs {:?}",
1785 batch.desc.upper(),
1786 self.trace.upper()
1787 );
1788 return Break(CompareAndAppendBreak::AlreadyCommitted);
1789 }
1790
1791 let shard_upper = self.trace.upper();
1792 if shard_upper != batch.desc.lower() {
1793 return Break(CompareAndAppendBreak::Upper {
1794 shard_upper: shard_upper.clone(),
1795 writer_upper: writer_state.most_recent_write_upper.clone(),
1796 });
1797 }
1798
1799 let new_inline_bytes = batch.inline_bytes();
1800 if new_inline_bytes > 0 {
1801 let mut existing_inline_bytes = 0;
1802 self.trace
1803 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1804 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1808 return Break(CompareAndAppendBreak::InlineBackpressure);
1809 }
1810 }
1811
1812 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1813 self.trace.push_batch(batch.clone())
1814 } else {
1815 Vec::new()
1816 };
1817
1818 let all_empty_reqs = merge_reqs
1821 .iter()
1822 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1823 if all_empty_reqs && !batch.is_empty() {
1824 let mut reqs_to_take = claim_compaction_percent / 100;
1825 if (usize::cast_from(idempotency_token.hashed()) % 100)
1826 < (claim_compaction_percent % 100)
1827 {
1828 reqs_to_take += 1;
1829 }
1830 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1831 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1832 merge_reqs.extend(
1833 self.trace
1836 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1837 .take(reqs_to_take),
1838 )
1839 }
1840
1841 for req in &merge_reqs {
1842 self.trace.claim_compaction(
1843 req.id,
1844 ActiveCompaction {
1845 start_ms: heartbeat_timestamp_ms,
1846 },
1847 )
1848 }
1849
1850 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1851 writer_state.most_recent_write_token = idempotency_token.clone();
1852 assert!(
1854 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1855 "{:?} vs {:?}",
1856 &writer_state.most_recent_write_upper,
1857 batch.desc.upper()
1858 );
1859 writer_state
1860 .most_recent_write_upper
1861 .clone_from(batch.desc.upper());
1862
1863 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1865 heartbeat_timestamp_ms,
1866 writer_state.last_heartbeat_timestamp_ms,
1867 );
1868
1869 Continue(merge_reqs)
1870 }
1871
1872 pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1873 &mut self,
1874 res: &FueledMergeRes<T>,
1875 metrics: &ColumnarMetrics,
1876 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1877 if self.is_tombstone() {
1882 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1883 }
1884
1885 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1886 Continue(apply_merge_result)
1887 }
1888
1889 pub fn spine_exert(
1890 &mut self,
1891 fuel: usize,
1892 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1893 let (merge_reqs, did_work) = self.trace.exert(fuel);
1894 if did_work {
1895 Continue(merge_reqs)
1896 } else {
1897 assert!(merge_reqs.is_empty());
1898 Break(NoOpStateTransition(Vec::new()))
1901 }
1902 }
1903
1904 pub fn downgrade_since(
1905 &mut self,
1906 reader_id: &LeasedReaderId,
1907 seqno: SeqNo,
1908 outstanding_seqno: SeqNo,
1909 new_since: &Antichain<T>,
1910 heartbeat_timestamp_ms: u64,
1911 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1912 if self.is_tombstone() {
1917 return Break(NoOpStateTransition(Since(Antichain::new())));
1918 }
1919
1920 let Some(reader_state) = self.leased_reader(reader_id) else {
1923 tracing::warn!(
1924 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1925 );
1926 return Break(NoOpStateTransition(Since(Antichain::new())));
1927 };
1928
1929 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1932 heartbeat_timestamp_ms,
1933 reader_state.last_heartbeat_timestamp_ms,
1934 );
1935
1936 let seqno = {
1937 assert!(
1938 outstanding_seqno >= reader_state.seqno,
1939 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1940 is behind current reader_state ({:?})",
1941 outstanding_seqno,
1942 reader_state.seqno,
1943 );
1944 std::cmp::min(outstanding_seqno, seqno)
1945 };
1946
1947 reader_state.seqno = seqno;
1948
1949 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1950 reader_state.since.clone_from(new_since);
1951 self.update_since();
1952 new_since.clone()
1953 } else {
1954 reader_state.since.clone()
1957 };
1958
1959 Continue(Since(reader_current_since))
1960 }
1961
1962 pub fn compare_and_downgrade_since(
1963 &mut self,
1964 reader_id: &CriticalReaderId,
1965 expected_opaque: &Opaque,
1966 (new_opaque, new_since): (&Opaque, &Antichain<T>),
1967 ) -> ControlFlow<
1968 NoOpStateTransition<Result<Since<T>, (Opaque, Since<T>)>>,
1969 Result<Since<T>, (Opaque, Since<T>)>,
1970 > {
1971 if self.is_tombstone() {
1976 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1980 }
1981
1982 let reader_state = self.critical_reader(reader_id);
1983
1984 if reader_state.opaque != *expected_opaque {
1985 return Continue(Err((
1988 reader_state.opaque.clone(),
1989 Since(reader_state.since.clone()),
1990 )));
1991 }
1992
1993 reader_state.opaque = new_opaque.clone();
1994 if PartialOrder::less_equal(&reader_state.since, new_since) {
1995 reader_state.since.clone_from(new_since);
1996 self.update_since();
1997 Continue(Ok(Since(new_since.clone())))
1998 } else {
1999 Continue(Ok(Since(reader_state.since.clone())))
2003 }
2004 }
2005
2006 pub fn expire_leased_reader(
2007 &mut self,
2008 reader_id: &LeasedReaderId,
2009 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2010 if self.is_tombstone() {
2015 return Break(NoOpStateTransition(false));
2016 }
2017
2018 let existed = self.leased_readers.remove(reader_id).is_some();
2019 if existed {
2020 }
2034 Continue(existed)
2037 }
2038
2039 pub fn expire_critical_reader(
2040 &mut self,
2041 reader_id: &CriticalReaderId,
2042 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2043 if self.is_tombstone() {
2048 return Break(NoOpStateTransition(false));
2049 }
2050
2051 let existed = self.critical_readers.remove(reader_id).is_some();
2052 if existed {
2053 }
2067 Continue(existed)
2071 }
2072
2073 pub fn expire_writer(
2074 &mut self,
2075 writer_id: &WriterId,
2076 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2077 if self.is_tombstone() {
2082 return Break(NoOpStateTransition(false));
2083 }
2084
2085 let existed = self.writers.remove(writer_id).is_some();
2086 Continue(existed)
2090 }
2091
2092 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2093 self.leased_readers.get_mut(id)
2094 }
2095
2096 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2097 self.critical_readers
2098 .get_mut(id)
2099 .unwrap_or_else(|| {
2100 panic!(
2101 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2102 id
2103 )
2104 })
2105 }
2106
2107 fn critical_since(&self) -> Option<Antichain<T>> {
2108 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2109 let mut since = critical_sinces.next().cloned()?;
2110 for s in critical_sinces {
2111 since.meet_assign(s);
2112 }
2113 Some(since)
2114 }
2115
2116 fn update_since(&mut self) {
2117 let mut sinces_iter = self
2118 .leased_readers
2119 .values()
2120 .map(|x| &x.since)
2121 .chain(self.critical_readers.values().map(|x| &x.since));
2122 let mut since = match sinces_iter.next() {
2123 Some(since) => since.clone(),
2124 None => {
2125 return;
2128 }
2129 };
2130 while let Some(s) = sinces_iter.next() {
2131 since.meet_assign(s);
2132 }
2133 self.trace.downgrade_since(&since);
2134 }
2135
2136 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2137 let mut seqno_since = seqno;
2138 for cap in self.leased_readers.values() {
2139 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2140 }
2141 seqno_since
2143 }
2144
2145 fn tombstone_batch() -> HollowBatch<T> {
2146 HollowBatch::empty(Description::new(
2147 Antichain::from_elem(T::minimum()),
2148 Antichain::new(),
2149 Antichain::new(),
2150 ))
2151 }
2152
2153 pub(crate) fn is_tombstone(&self) -> bool {
2154 self.trace.upper().is_empty()
2155 && self.trace.since().is_empty()
2156 && self.writers.is_empty()
2157 && self.leased_readers.is_empty()
2158 && self.critical_readers.is_empty()
2159 }
2160
2161 pub(crate) fn is_single_empty_batch(&self) -> bool {
2162 let mut batch_count = 0;
2163 let mut is_empty = true;
2164 self.trace.map_batches(|b| {
2165 batch_count += 1;
2166 is_empty &= b.is_empty()
2167 });
2168 batch_count <= 1 && is_empty
2169 }
2170
2171 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2172 assert_eq!(self.trace.upper(), &Antichain::new());
2173 assert_eq!(self.trace.since(), &Antichain::new());
2174
2175 let was_tombstone = self.is_tombstone();
2178
2179 self.writers.clear();
2181 self.leased_readers.clear();
2182 self.critical_readers.clear();
2183
2184 debug_assert!(self.is_tombstone());
2185
2186 let mut to_replace = None;
2195 let mut batch_count = 0;
2196 self.trace.map_batches(|b| {
2197 batch_count += 1;
2198 if !b.is_empty() && to_replace.is_none() {
2199 to_replace = Some(b.desc.clone());
2200 }
2201 });
2202 if let Some(desc) = to_replace {
2203 let result = self.trace.apply_tombstone_merge(&desc);
2207 assert!(
2208 result.matched(),
2209 "merge with a matching desc should always match"
2210 );
2211 Continue(())
2212 } else if batch_count > 1 {
2213 let mut new_trace = Trace::default();
2218 new_trace.downgrade_since(&Antichain::new());
2219 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2220 assert_eq!(merge_reqs, Vec::new());
2221 self.trace = new_trace;
2222 Continue(())
2223 } else if !was_tombstone {
2224 Continue(())
2227 } else {
2228 Break(NoOpStateTransition(()))
2231 }
2232 }
2233}
2234
2235#[derive(Debug)]
2237#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2238pub struct State<T> {
2239 pub(crate) shard_id: ShardId,
2240
2241 pub(crate) seqno: SeqNo,
2242 pub(crate) walltime_ms: u64,
2245 pub(crate) hostname: String,
2248 pub(crate) collections: StateCollections<T>,
2249}
2250
2251pub struct TypedState<K, V, T, D> {
2254 pub(crate) state: State<T>,
2255
2256 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2264}
2265
2266impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2267 #[cfg(any(test, debug_assertions))]
2268 pub(crate) fn clone(&self, hostname: String) -> Self {
2269 TypedState {
2270 state: State {
2271 shard_id: self.shard_id.clone(),
2272 seqno: self.seqno.clone(),
2273 walltime_ms: self.walltime_ms,
2274 hostname,
2275 collections: self.collections.clone(),
2276 },
2277 _phantom: PhantomData,
2278 }
2279 }
2280
2281 pub(crate) fn clone_for_rollup(&self) -> Self {
2282 TypedState {
2283 state: State {
2284 shard_id: self.shard_id.clone(),
2285 seqno: self.seqno.clone(),
2286 walltime_ms: self.walltime_ms,
2287 hostname: self.hostname.clone(),
2288 collections: self.collections.clone(),
2289 },
2290 _phantom: PhantomData,
2291 }
2292 }
2293}
2294
2295impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2296 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2297 let TypedState { state, _phantom } = self;
2300 f.debug_struct("TypedState").field("state", state).finish()
2301 }
2302}
2303
2304#[cfg(any(test, debug_assertions))]
2306impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2307 fn eq(&self, other: &Self) -> bool {
2308 let TypedState {
2311 state: self_state,
2312 _phantom,
2313 } = self;
2314 let TypedState {
2315 state: other_state,
2316 _phantom,
2317 } = other;
2318 self_state == other_state
2319 }
2320}
2321
2322impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2323 type Target = State<T>;
2324
2325 fn deref(&self) -> &Self::Target {
2326 &self.state
2327 }
2328}
2329
2330impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2331 fn deref_mut(&mut self) -> &mut Self::Target {
2332 &mut self.state
2333 }
2334}
2335
2336impl<K, V, T, D> TypedState<K, V, T, D>
2337where
2338 K: Codec,
2339 V: Codec,
2340 T: Timestamp + Lattice + Codec64,
2341 D: Codec64,
2342{
2343 pub fn new(
2344 applier_version: Version,
2345 shard_id: ShardId,
2346 hostname: String,
2347 walltime_ms: u64,
2348 ) -> Self {
2349 let state = State {
2350 shard_id,
2351 seqno: SeqNo::minimum(),
2352 walltime_ms,
2353 hostname,
2354 collections: StateCollections {
2355 version: applier_version,
2356 last_gc_req: SeqNo::minimum(),
2357 rollups: BTreeMap::new(),
2358 active_rollup: None,
2359 active_gc: None,
2360 leased_readers: BTreeMap::new(),
2361 critical_readers: BTreeMap::new(),
2362 writers: BTreeMap::new(),
2363 schemas: BTreeMap::new(),
2364 trace: Trace::default(),
2365 },
2366 };
2367 TypedState {
2368 state,
2369 _phantom: PhantomData,
2370 }
2371 }
2372
2373 pub fn clone_apply<R, E, WorkFn>(
2374 &self,
2375 cfg: &PersistConfig,
2376 work_fn: &mut WorkFn,
2377 ) -> ControlFlow<E, (R, Self)>
2378 where
2379 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2380 {
2381 let mut new_state = State {
2383 shard_id: self.shard_id,
2384 seqno: self.seqno.next(),
2385 walltime_ms: (cfg.now)(),
2386 hostname: cfg.hostname.clone(),
2387 collections: self.collections.clone(),
2388 };
2389
2390 if new_state.walltime_ms <= self.walltime_ms {
2393 new_state.walltime_ms = self.walltime_ms + 1;
2394 }
2395
2396 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2397 let new_state = TypedState {
2398 state: new_state,
2399 _phantom: PhantomData,
2400 };
2401 Continue((work_ret, new_state))
2402 }
2403}
2404
2405#[derive(Copy, Clone, Debug)]
2406pub struct GcConfig {
2407 pub use_active_gc: bool,
2408 pub fallback_threshold_ms: u64,
2409 pub min_versions: usize,
2410 pub max_versions: usize,
2411}
2412
2413impl<T> State<T>
2414where
2415 T: Timestamp + Lattice + Codec64,
2416{
2417 pub fn shard_id(&self) -> ShardId {
2418 self.shard_id
2419 }
2420
2421 pub fn seqno(&self) -> SeqNo {
2422 self.seqno
2423 }
2424
2425 pub fn since(&self) -> &Antichain<T> {
2426 self.collections.trace.since()
2427 }
2428
2429 pub fn upper(&self) -> &Antichain<T> {
2430 self.collections.trace.upper()
2431 }
2432
2433 pub fn spine_batch_count(&self) -> usize {
2434 self.collections.trace.num_spine_batches()
2435 }
2436
2437 pub fn size_metrics(&self) -> StateSizeMetrics {
2438 let mut ret = StateSizeMetrics::default();
2439 self.blobs().for_each(|x| match x {
2440 HollowBlobRef::Batch(x) => {
2441 ret.hollow_batch_count += 1;
2442 ret.batch_part_count += x.part_count();
2443 ret.num_updates += x.len;
2444
2445 let batch_size = x.encoded_size_bytes();
2446 for x in x.parts.iter() {
2447 if x.ts_rewrite().is_some() {
2448 ret.rewrite_part_count += 1;
2449 }
2450 if x.is_inline() {
2451 ret.inline_part_count += 1;
2452 ret.inline_part_bytes += x.inline_bytes();
2453 }
2454 }
2455 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2456 ret.state_batches_bytes += batch_size;
2457 }
2458 HollowBlobRef::Rollup(x) => {
2459 ret.state_rollup_count += 1;
2460 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2461 }
2462 });
2463 ret
2464 }
2465
2466 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2467 self.collections
2470 .rollups
2471 .iter()
2472 .rev()
2473 .next()
2474 .expect("State should have at least one rollup if seqno > minimum")
2475 }
2476
2477 pub(crate) fn seqno_since(&self) -> SeqNo {
2478 self.collections.seqno_since(self.seqno)
2479 }
2480
2481 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2493 let GcConfig {
2494 use_active_gc,
2495 fallback_threshold_ms,
2496 min_versions,
2497 max_versions,
2498 } = cfg;
2499 let gc_threshold = if use_active_gc {
2503 u64::cast_from(min_versions)
2504 } else {
2505 std::cmp::max(
2506 1,
2507 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2508 )
2509 };
2510 let new_seqno_since = self.seqno_since();
2511 let gc_until_seqno = new_seqno_since.min(SeqNo(
2514 self.collections
2515 .last_gc_req
2516 .0
2517 .saturating_add(u64::cast_from(max_versions)),
2518 ));
2519 let should_gc = new_seqno_since
2520 .0
2521 .saturating_sub(self.collections.last_gc_req.0)
2522 >= gc_threshold;
2523
2524 let should_gc = if use_active_gc && !should_gc {
2527 match self.collections.active_gc {
2528 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2529 None => false,
2530 }
2531 } else {
2532 should_gc
2533 };
2534 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2537 let tombstone_needs_gc = self.collections.is_tombstone();
2542 let should_gc = should_gc || tombstone_needs_gc;
2543 let should_gc = if use_active_gc {
2544 should_gc
2548 && match self.collections.active_gc {
2549 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2550 None => true,
2551 }
2552 } else {
2553 should_gc
2554 };
2555 if should_gc {
2556 self.collections.last_gc_req = gc_until_seqno;
2557 Some(GcReq {
2558 shard_id: self.shard_id,
2559 new_seqno_since: gc_until_seqno,
2560 })
2561 } else {
2562 None
2563 }
2564 }
2565
2566 pub fn seqnos_held(&self) -> usize {
2568 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2569 }
2570
2571 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2573 let mut metrics = ExpiryMetrics::default();
2574 let shard_id = self.shard_id();
2575 self.collections.leased_readers.retain(|id, state| {
2576 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2577 if !retain {
2578 info!(
2579 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2580 state.debug.purpose
2581 );
2582 metrics.readers_expired += 1;
2583 }
2584 retain
2585 });
2586 self.collections.writers.retain(|id, state| {
2588 let retain =
2589 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2590 if !retain {
2591 info!(
2592 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2593 state.debug.purpose
2594 );
2595 metrics.writers_expired += 1;
2596 }
2597 retain
2598 });
2599 metrics
2600 }
2601
2602 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2606 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2607 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2608 self.collections.trace.since().clone(),
2609 )));
2610 }
2611 let upper = self.collections.trace.upper();
2612 if PartialOrder::less_equal(upper, as_of) {
2613 return Err(SnapshotErr::AsOfNotYetAvailable(
2614 self.seqno,
2615 Upper(upper.clone()),
2616 ));
2617 }
2618
2619 let batches = self
2620 .collections
2621 .trace
2622 .batches()
2623 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2624 .cloned()
2625 .collect();
2626 Ok(batches)
2627 }
2628
2629 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2631 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2632 return Err(Since(self.collections.trace.since().clone()));
2633 }
2634 Ok(())
2635 }
2636
2637 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2638 self.collections
2641 .trace
2642 .batches()
2643 .find(|b| {
2644 PartialOrder::less_equal(b.desc.lower(), frontier)
2645 && PartialOrder::less_than(frontier, b.desc.upper())
2646 })
2647 .cloned()
2648 .ok_or(self.seqno)
2649 }
2650
2651 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2652 self.collections.active_rollup
2653 }
2654
2655 pub fn need_rollup(
2656 &self,
2657 threshold: usize,
2658 use_active_rollup: bool,
2659 fallback_threshold_ms: u64,
2660 now: u64,
2661 ) -> Option<SeqNo> {
2662 let (latest_rollup_seqno, _) = self.latest_rollup();
2663
2664 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2670 return Some(self.seqno);
2671 }
2672
2673 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2674
2675 if use_active_rollup {
2676 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2682 match self.active_rollup() {
2683 Some(active_rollup) => {
2684 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2685 return Some(self.seqno);
2686 }
2687 }
2688 None => {
2689 return Some(self.seqno);
2690 }
2691 }
2692 }
2693 } else {
2694 if seqnos_since_last_rollup > 0
2698 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2699 {
2700 return Some(self.seqno);
2701 }
2702
2703 if seqnos_since_last_rollup
2706 > u64::cast_from(
2707 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2708 )
2709 {
2710 return Some(self.seqno);
2711 }
2712 }
2713
2714 None
2715 }
2716
2717 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2718 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2719 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2720 batches.chain(rollups)
2721 }
2722}
2723
2724fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2725 let val = hex::encode(val);
2726 val.serialize(s)
2727}
2728
2729fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2730 val: &Option<LazyProto<T>>,
2731 s: S,
2732) -> Result<S::Ok, S::Error> {
2733 val.as_ref()
2734 .map(|lazy| hex::encode(&lazy.into_proto()))
2735 .serialize(s)
2736}
2737
2738fn serialize_part_stats<S: Serializer>(
2739 val: &Option<LazyPartStats>,
2740 s: S,
2741) -> Result<S::Ok, S::Error> {
2742 let val = val.as_ref().map(|x| x.decode().key);
2743 val.serialize(s)
2744}
2745
2746fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2747 let val = val.map(i64::decode);
2749 val.serialize(s)
2750}
2751
2752impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2758 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2759 let State {
2760 shard_id,
2761 seqno,
2762 walltime_ms,
2763 hostname,
2764 collections:
2765 StateCollections {
2766 version: applier_version,
2767 last_gc_req,
2768 rollups,
2769 active_rollup,
2770 active_gc,
2771 leased_readers,
2772 critical_readers,
2773 writers,
2774 schemas,
2775 trace,
2776 },
2777 } = self;
2778 let mut s = s.serialize_struct("State", 13)?;
2779 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2780 let () = s.serialize_field("shard_id", shard_id)?;
2781 let () = s.serialize_field("seqno", seqno)?;
2782 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2783 let () = s.serialize_field("hostname", hostname)?;
2784 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2785 let () = s.serialize_field("rollups", rollups)?;
2786 let () = s.serialize_field("active_rollup", active_rollup)?;
2787 let () = s.serialize_field("active_gc", active_gc)?;
2788 let () = s.serialize_field("leased_readers", leased_readers)?;
2789 let () = s.serialize_field("critical_readers", critical_readers)?;
2790 let () = s.serialize_field("writers", writers)?;
2791 let () = s.serialize_field("schemas", schemas)?;
2792 let () = s.serialize_field("since", &trace.since().elements())?;
2793 let () = s.serialize_field("upper", &trace.upper().elements())?;
2794 let trace = trace.flatten();
2795 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2796 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2797 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2798 let () = s.serialize_field("merges", &trace.merges)?;
2799 s.end()
2800 }
2801}
2802
2803#[derive(Debug, Default)]
2804pub struct StateSizeMetrics {
2805 pub hollow_batch_count: usize,
2806 pub batch_part_count: usize,
2807 pub rewrite_part_count: usize,
2808 pub num_updates: usize,
2809 pub largest_batch_bytes: usize,
2810 pub state_batches_bytes: usize,
2811 pub state_rollups_bytes: usize,
2812 pub state_rollup_count: usize,
2813 pub inline_part_count: usize,
2814 pub inline_part_bytes: usize,
2815}
2816
2817#[derive(Default)]
2818pub struct ExpiryMetrics {
2819 pub(crate) readers_expired: usize,
2820 pub(crate) writers_expired: usize,
2821}
2822
2823#[derive(Debug, Clone, PartialEq)]
2825pub struct Since<T>(pub Antichain<T>);
2826
2827#[derive(Debug, PartialEq)]
2829pub struct Upper<T>(pub Antichain<T>);
2830
2831#[cfg(test)]
2832pub(crate) mod tests {
2833 use std::ops::Range;
2834 use std::str::FromStr;
2835
2836 use bytes::Bytes;
2837 use mz_build_info::DUMMY_BUILD_INFO;
2838 use mz_dyncfg::ConfigUpdates;
2839 use mz_ore::now::SYSTEM_TIME;
2840 use mz_ore::{assert_none, assert_ok};
2841 use mz_proto::RustType;
2842 use proptest::prelude::*;
2843 use proptest::strategy::ValueTree;
2844
2845 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2846 use crate::cache::PersistClientCache;
2847 use crate::internal::encoding::any_some_lazy_part_stats;
2848 use crate::internal::paths::RollupId;
2849 use crate::internal::trace::tests::any_trace;
2850 use crate::tests::new_test_client_cache;
2851 use crate::{Diagnostics, PersistLocation};
2852
2853 use super::*;
2854
2855 const LEASE_DURATION_MS: u64 = 900 * 1000;
2856 fn debug_state() -> HandleDebugState {
2857 HandleDebugState {
2858 hostname: "debug".to_owned(),
2859 purpose: "finding the bugs".to_owned(),
2860 }
2861 }
2862
2863 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2864 num_runs: usize,
2865 ) -> impl Strategy<Value = HollowBatch<T>> {
2866 (
2867 any::<T>(),
2868 any::<T>(),
2869 any::<T>(),
2870 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2871 any::<usize>(),
2872 )
2873 .prop_map(move |(t0, t1, since, parts, len)| {
2874 let (lower, upper) = if t0 <= t1 {
2875 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2876 } else {
2877 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2878 };
2879 let since = Antichain::from_elem(since);
2880
2881 let run_splits = (1..num_runs)
2882 .map(|i| i * parts.len() / num_runs)
2883 .collect::<Vec<_>>();
2884
2885 let run_meta = (0..num_runs)
2886 .map(|_| {
2887 let mut meta = RunMeta::default();
2888 meta.id = Some(RunId::new());
2889 meta
2890 })
2891 .collect::<Vec<_>>();
2892
2893 HollowBatch::new(
2894 Description::new(lower, upper, since),
2895 parts,
2896 len % 10,
2897 run_meta,
2898 run_splits,
2899 )
2900 })
2901 }
2902
2903 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2904 Strategy::prop_map(
2905 (
2906 any::<T>(),
2907 any::<T>(),
2908 any::<T>(),
2909 proptest::collection::vec(any_run_part::<T>(), 0..20),
2910 any::<usize>(),
2911 0..=10usize,
2912 proptest::collection::vec(any::<RunId>(), 10),
2913 ),
2914 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2915 let (lower, upper) = if t0 <= t1 {
2916 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2917 } else {
2918 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2919 };
2920 let since = Antichain::from_elem(since);
2921 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2922 let run_splits = (1..num_runs)
2923 .map(|i| i * parts.len() / num_runs)
2924 .collect::<Vec<_>>();
2925
2926 let run_meta = (0..num_runs)
2927 .enumerate()
2928 .map(|(i, _)| {
2929 let mut meta = RunMeta::default();
2930 meta.id = Some(run_ids[i]);
2931 meta
2932 })
2933 .collect::<Vec<_>>();
2934
2935 HollowBatch::new(
2936 Description::new(lower, upper, since),
2937 parts,
2938 len % 10,
2939 run_meta,
2940 run_splits,
2941 )
2942 } else {
2943 HollowBatch::new_run_for_test(
2944 Description::new(lower, upper, since),
2945 parts,
2946 len % 10,
2947 run_ids[0],
2948 )
2949 }
2950 },
2951 )
2952 }
2953
2954 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2955 Strategy::prop_map(
2956 (
2957 any::<bool>(),
2958 any_hollow_batch_part(),
2959 any::<Option<T>>(),
2960 any::<Option<SchemaId>>(),
2961 any::<Option<SchemaId>>(),
2962 ),
2963 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2964 if is_hollow {
2965 BatchPart::Hollow(hollow)
2966 } else {
2967 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2968 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2969 BatchPart::Inline {
2970 updates,
2971 ts_rewrite,
2972 schema_id,
2973 deprecated_schema_id,
2974 }
2975 }
2976 },
2977 )
2978 }
2979
2980 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2981 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2982 }
2983
2984 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2985 -> impl Strategy<Value = HollowBatchPart<T>> {
2986 Strategy::prop_map(
2987 (
2988 any::<PartialBatchKey>(),
2989 any::<usize>(),
2990 any::<Vec<u8>>(),
2991 any_some_lazy_part_stats(),
2992 any::<Option<T>>(),
2993 any::<[u8; 8]>(),
2994 any::<Option<BatchColumnarFormat>>(),
2995 any::<Option<SchemaId>>(),
2996 any::<Option<SchemaId>>(),
2997 ),
2998 |(
2999 key,
3000 encoded_size_bytes,
3001 key_lower,
3002 stats,
3003 ts_rewrite,
3004 diffs_sum,
3005 format,
3006 schema_id,
3007 deprecated_schema_id,
3008 )| {
3009 HollowBatchPart {
3010 key,
3011 meta: Default::default(),
3012 encoded_size_bytes,
3013 key_lower,
3014 structured_key_lower: None,
3015 stats,
3016 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3017 diffs_sum: Some(diffs_sum),
3018 format,
3019 schema_id,
3020 deprecated_schema_id,
3021 }
3022 },
3023 )
3024 }
3025
3026 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3027 Strategy::prop_map(
3028 (
3029 any::<SeqNo>(),
3030 any::<Option<T>>(),
3031 any::<u64>(),
3032 any::<u64>(),
3033 any::<HandleDebugState>(),
3034 ),
3035 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3036 if lease_duration_ms == 0 {
3040 lease_duration_ms += 1;
3041 }
3042 LeasedReaderState {
3043 seqno,
3044 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3045 last_heartbeat_timestamp_ms,
3046 lease_duration_ms,
3047 debug,
3048 }
3049 },
3050 )
3051 }
3052
3053 pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3054 where
3055 T: Arbitrary,
3056 {
3057 Strategy::prop_map(
3058 (
3059 any::<Option<T>>(),
3060 any::<Opaque>(),
3061 any::<HandleDebugState>(),
3062 ),
3063 |(since, opaque, debug)| CriticalReaderState {
3064 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3065 opaque,
3066 debug,
3067 },
3068 )
3069 }
3070
3071 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3072 Strategy::prop_map(
3073 (
3074 any::<u64>(),
3075 any::<u64>(),
3076 any::<IdempotencyToken>(),
3077 any::<Option<T>>(),
3078 any::<HandleDebugState>(),
3079 ),
3080 |(
3081 last_heartbeat_timestamp_ms,
3082 lease_duration_ms,
3083 most_recent_write_token,
3084 most_recent_write_upper,
3085 debug,
3086 )| WriterState {
3087 last_heartbeat_timestamp_ms,
3088 lease_duration_ms,
3089 most_recent_write_token,
3090 most_recent_write_upper: most_recent_write_upper
3091 .map_or_else(Antichain::new, Antichain::from_elem),
3092 debug,
3093 },
3094 )
3095 }
3096
3097 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3098 Strategy::prop_map(
3099 (
3100 any::<Vec<u8>>(),
3101 any::<Vec<u8>>(),
3102 any::<Vec<u8>>(),
3103 any::<Vec<u8>>(),
3104 ),
3105 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3106 key: Bytes::from(key),
3107 key_data_type: Bytes::from(key_data_type),
3108 val: Bytes::from(val),
3109 val_data_type: Bytes::from(val_data_type),
3110 },
3111 )
3112 }
3113
3114 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3115 num_trace_batches: Range<usize>,
3116 ) -> impl Strategy<Value = State<T>> {
3117 let part1 = (
3118 any::<ShardId>(),
3119 any::<SeqNo>(),
3120 any::<u64>(),
3121 any::<String>(),
3122 any::<SeqNo>(),
3123 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3124 proptest::option::of(any::<ActiveRollup>()),
3125 );
3126
3127 let part2 = (
3128 proptest::option::of(any::<ActiveGc>()),
3129 proptest::collection::btree_map(
3130 any::<LeasedReaderId>(),
3131 any_leased_reader_state::<T>(),
3132 1..3,
3133 ),
3134 proptest::collection::btree_map(
3135 any::<CriticalReaderId>(),
3136 any_critical_reader_state::<T>(),
3137 1..3,
3138 ),
3139 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3140 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3141 any_trace::<T>(num_trace_batches),
3142 );
3143
3144 (part1, part2).prop_map(
3145 |(
3146 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3147 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3148 )| State {
3149 shard_id,
3150 seqno,
3151 walltime_ms,
3152 hostname,
3153 collections: StateCollections {
3154 version: Version::new(1, 2, 3),
3155 last_gc_req,
3156 rollups,
3157 active_rollup,
3158 active_gc,
3159 leased_readers,
3160 critical_readers,
3161 writers,
3162 schemas,
3163 trace,
3164 },
3165 },
3166 )
3167 }
3168
3169 pub(crate) fn hollow<T: Timestamp>(
3170 lower: T,
3171 upper: T,
3172 keys: &[&str],
3173 len: usize,
3174 ) -> HollowBatch<T> {
3175 HollowBatch::new_run(
3176 Description::new(
3177 Antichain::from_elem(lower),
3178 Antichain::from_elem(upper),
3179 Antichain::from_elem(T::minimum()),
3180 ),
3181 keys.iter()
3182 .map(|x| {
3183 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3184 key: PartialBatchKey((*x).to_owned()),
3185 meta: Default::default(),
3186 encoded_size_bytes: 0,
3187 key_lower: vec![],
3188 structured_key_lower: None,
3189 stats: None,
3190 ts_rewrite: None,
3191 diffs_sum: None,
3192 format: None,
3193 schema_id: None,
3194 deprecated_schema_id: None,
3195 }))
3196 })
3197 .collect(),
3198 len,
3199 )
3200 }
3201
3202 #[mz_ore::test]
3203 fn downgrade_since() {
3204 let mut state = TypedState::<(), (), u64, i64>::new(
3205 DUMMY_BUILD_INFO.semver_version(),
3206 ShardId::new(),
3207 "".to_owned(),
3208 0,
3209 );
3210 let reader = LeasedReaderId::new();
3211 let seqno = SeqNo::minimum();
3212 let now = SYSTEM_TIME.clone();
3213 let _ = state.collections.register_leased_reader(
3214 "",
3215 &reader,
3216 "",
3217 seqno,
3218 Duration::from_secs(10),
3219 now(),
3220 false,
3221 );
3222
3223 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3225
3226 assert_eq!(
3228 state.collections.downgrade_since(
3229 &reader,
3230 seqno,
3231 seqno,
3232 &Antichain::from_elem(2),
3233 now()
3234 ),
3235 Continue(Since(Antichain::from_elem(2)))
3236 );
3237 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3238 assert_eq!(
3240 state.collections.downgrade_since(
3241 &reader,
3242 seqno,
3243 seqno,
3244 &Antichain::from_elem(2),
3245 now()
3246 ),
3247 Continue(Since(Antichain::from_elem(2)))
3248 );
3249 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3250 assert_eq!(
3252 state.collections.downgrade_since(
3253 &reader,
3254 seqno,
3255 seqno,
3256 &Antichain::from_elem(1),
3257 now()
3258 ),
3259 Continue(Since(Antichain::from_elem(2)))
3260 );
3261 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3262
3263 let reader2 = LeasedReaderId::new();
3265 let _ = state.collections.register_leased_reader(
3266 "",
3267 &reader2,
3268 "",
3269 seqno,
3270 Duration::from_secs(10),
3271 now(),
3272 false,
3273 );
3274
3275 assert_eq!(
3277 state.collections.downgrade_since(
3278 &reader2,
3279 seqno,
3280 seqno,
3281 &Antichain::from_elem(3),
3282 now()
3283 ),
3284 Continue(Since(Antichain::from_elem(3)))
3285 );
3286 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3287 assert_eq!(
3289 state.collections.downgrade_since(
3290 &reader,
3291 seqno,
3292 seqno,
3293 &Antichain::from_elem(5),
3294 now()
3295 ),
3296 Continue(Since(Antichain::from_elem(5)))
3297 );
3298 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3299
3300 assert_eq!(
3302 state.collections.expire_leased_reader(&reader),
3303 Continue(true)
3304 );
3305 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3306
3307 let reader3 = LeasedReaderId::new();
3309 let _ = state.collections.register_leased_reader(
3310 "",
3311 &reader3,
3312 "",
3313 seqno,
3314 Duration::from_secs(10),
3315 now(),
3316 false,
3317 );
3318
3319 assert_eq!(
3321 state.collections.downgrade_since(
3322 &reader3,
3323 seqno,
3324 seqno,
3325 &Antichain::from_elem(10),
3326 now()
3327 ),
3328 Continue(Since(Antichain::from_elem(10)))
3329 );
3330 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3331
3332 assert_eq!(
3334 state.collections.expire_leased_reader(&reader2),
3335 Continue(true)
3336 );
3337 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3342
3343 assert_eq!(
3345 state.collections.expire_leased_reader(&reader3),
3346 Continue(true)
3347 );
3348 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3353 }
3354
3355 #[mz_ore::test]
3356 fn compare_and_downgrade_since() {
3357 let mut state = TypedState::<(), (), u64, i64>::new(
3358 DUMMY_BUILD_INFO.semver_version(),
3359 ShardId::new(),
3360 "".to_owned(),
3361 0,
3362 );
3363 let reader = CriticalReaderId::new();
3364 let _ = state
3365 .collections
3366 .register_critical_reader("", &reader, Opaque::encode(&0u64), "");
3367
3368 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3370 assert_eq!(
3372 state
3373 .collections
3374 .critical_reader(&reader)
3375 .opaque
3376 .decode::<u64>(),
3377 u64::MIN
3378 );
3379
3380 assert_eq!(
3382 state.collections.compare_and_downgrade_since(
3383 &reader,
3384 &Opaque::encode(&0u64),
3385 (&Opaque::encode(&1u64), &Antichain::from_elem(2)),
3386 ),
3387 Continue(Ok(Since(Antichain::from_elem(2))))
3388 );
3389 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3390 assert_eq!(
3391 state
3392 .collections
3393 .critical_reader(&reader)
3394 .opaque
3395 .decode::<u64>(),
3396 1
3397 );
3398 assert_eq!(
3400 state.collections.compare_and_downgrade_since(
3401 &reader,
3402 &Opaque::encode(&1u64),
3403 (&Opaque::encode(&2u64), &Antichain::from_elem(2)),
3404 ),
3405 Continue(Ok(Since(Antichain::from_elem(2))))
3406 );
3407 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3408 assert_eq!(
3409 state
3410 .collections
3411 .critical_reader(&reader)
3412 .opaque
3413 .decode::<u64>(),
3414 2
3415 );
3416 assert_eq!(
3418 state.collections.compare_and_downgrade_since(
3419 &reader,
3420 &Opaque::encode(&2u64),
3421 (&Opaque::encode(&3u64), &Antichain::from_elem(1)),
3422 ),
3423 Continue(Ok(Since(Antichain::from_elem(2))))
3424 );
3425 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3426 assert_eq!(
3427 state
3428 .collections
3429 .critical_reader(&reader)
3430 .opaque
3431 .decode::<u64>(),
3432 3
3433 );
3434 }
3435
3436 #[mz_ore::test]
3437 fn compare_and_append() {
3438 let state = &mut TypedState::<String, String, u64, i64>::new(
3439 DUMMY_BUILD_INFO.semver_version(),
3440 ShardId::new(),
3441 "".to_owned(),
3442 0,
3443 )
3444 .collections;
3445
3446 let writer_id = WriterId::new();
3447 let now = SYSTEM_TIME.clone();
3448
3449 assert_eq!(state.trace.num_spine_batches(), 0);
3451 assert_eq!(state.trace.num_hollow_batches(), 0);
3452 assert_eq!(state.trace.num_updates(), 0);
3453
3454 assert_eq!(
3456 state.compare_and_append(
3457 &hollow(1, 2, &["key1"], 1),
3458 &writer_id,
3459 now(),
3460 LEASE_DURATION_MS,
3461 &IdempotencyToken::new(),
3462 &debug_state(),
3463 0,
3464 100,
3465 None
3466 ),
3467 Break(CompareAndAppendBreak::Upper {
3468 shard_upper: Antichain::from_elem(0),
3469 writer_upper: Antichain::from_elem(0)
3470 })
3471 );
3472
3473 assert!(
3475 state
3476 .compare_and_append(
3477 &hollow(0, 5, &[], 0),
3478 &writer_id,
3479 now(),
3480 LEASE_DURATION_MS,
3481 &IdempotencyToken::new(),
3482 &debug_state(),
3483 0,
3484 100,
3485 None
3486 )
3487 .is_continue()
3488 );
3489
3490 assert_eq!(
3492 state.compare_and_append(
3493 &hollow(5, 4, &["key1"], 1),
3494 &writer_id,
3495 now(),
3496 LEASE_DURATION_MS,
3497 &IdempotencyToken::new(),
3498 &debug_state(),
3499 0,
3500 100,
3501 None
3502 ),
3503 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3504 lower: Antichain::from_elem(5),
3505 upper: Antichain::from_elem(4)
3506 }))
3507 );
3508
3509 assert_eq!(
3511 state.compare_and_append(
3512 &hollow(5, 5, &["key1"], 1),
3513 &writer_id,
3514 now(),
3515 LEASE_DURATION_MS,
3516 &IdempotencyToken::new(),
3517 &debug_state(),
3518 0,
3519 100,
3520 None
3521 ),
3522 Break(CompareAndAppendBreak::InvalidUsage(
3523 InvalidEmptyTimeInterval {
3524 lower: Antichain::from_elem(5),
3525 upper: Antichain::from_elem(5),
3526 keys: vec!["key1".to_owned()],
3527 }
3528 ))
3529 );
3530
3531 assert!(
3533 state
3534 .compare_and_append(
3535 &hollow(5, 5, &[], 0),
3536 &writer_id,
3537 now(),
3538 LEASE_DURATION_MS,
3539 &IdempotencyToken::new(),
3540 &debug_state(),
3541 0,
3542 100,
3543 None
3544 )
3545 .is_continue()
3546 );
3547 }
3548
3549 #[mz_ore::test]
3550 fn snapshot() {
3551 let now = SYSTEM_TIME.clone();
3552
3553 let mut state = TypedState::<String, String, u64, i64>::new(
3554 DUMMY_BUILD_INFO.semver_version(),
3555 ShardId::new(),
3556 "".to_owned(),
3557 0,
3558 );
3559 assert_eq!(
3561 state.snapshot(&Antichain::from_elem(0)),
3562 Err(SnapshotErr::AsOfNotYetAvailable(
3563 SeqNo(0),
3564 Upper(Antichain::from_elem(0))
3565 ))
3566 );
3567
3568 assert_eq!(
3570 state.snapshot(&Antichain::from_elem(5)),
3571 Err(SnapshotErr::AsOfNotYetAvailable(
3572 SeqNo(0),
3573 Upper(Antichain::from_elem(0))
3574 ))
3575 );
3576
3577 let writer_id = WriterId::new();
3578
3579 assert!(
3581 state
3582 .collections
3583 .compare_and_append(
3584 &hollow(0, 5, &["key1"], 1),
3585 &writer_id,
3586 now(),
3587 LEASE_DURATION_MS,
3588 &IdempotencyToken::new(),
3589 &debug_state(),
3590 0,
3591 100,
3592 None
3593 )
3594 .is_continue()
3595 );
3596
3597 assert_eq!(
3599 state.snapshot(&Antichain::from_elem(0)),
3600 Ok(vec![hollow(0, 5, &["key1"], 1)])
3601 );
3602
3603 assert_eq!(
3605 state.snapshot(&Antichain::from_elem(4)),
3606 Ok(vec![hollow(0, 5, &["key1"], 1)])
3607 );
3608
3609 assert_eq!(
3611 state.snapshot(&Antichain::from_elem(5)),
3612 Err(SnapshotErr::AsOfNotYetAvailable(
3613 SeqNo(0),
3614 Upper(Antichain::from_elem(5))
3615 ))
3616 );
3617 assert_eq!(
3618 state.snapshot(&Antichain::from_elem(6)),
3619 Err(SnapshotErr::AsOfNotYetAvailable(
3620 SeqNo(0),
3621 Upper(Antichain::from_elem(5))
3622 ))
3623 );
3624
3625 let reader = LeasedReaderId::new();
3626 let _ = state.collections.register_leased_reader(
3628 "",
3629 &reader,
3630 "",
3631 SeqNo::minimum(),
3632 Duration::from_secs(10),
3633 now(),
3634 false,
3635 );
3636 assert_eq!(
3637 state.collections.downgrade_since(
3638 &reader,
3639 SeqNo::minimum(),
3640 SeqNo::minimum(),
3641 &Antichain::from_elem(2),
3642 now()
3643 ),
3644 Continue(Since(Antichain::from_elem(2)))
3645 );
3646 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3647 assert_eq!(
3649 state.snapshot(&Antichain::from_elem(1)),
3650 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3651 Antichain::from_elem(2)
3652 )))
3653 );
3654
3655 assert!(
3657 state
3658 .collections
3659 .compare_and_append(
3660 &hollow(5, 10, &[], 0),
3661 &writer_id,
3662 now(),
3663 LEASE_DURATION_MS,
3664 &IdempotencyToken::new(),
3665 &debug_state(),
3666 0,
3667 100,
3668 None
3669 )
3670 .is_continue()
3671 );
3672
3673 assert_eq!(
3675 state.snapshot(&Antichain::from_elem(7)),
3676 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3677 );
3678
3679 assert_eq!(
3681 state.snapshot(&Antichain::from_elem(10)),
3682 Err(SnapshotErr::AsOfNotYetAvailable(
3683 SeqNo(0),
3684 Upper(Antichain::from_elem(10))
3685 ))
3686 );
3687
3688 assert!(
3690 state
3691 .collections
3692 .compare_and_append(
3693 &hollow(10, 15, &["key2"], 1),
3694 &writer_id,
3695 now(),
3696 LEASE_DURATION_MS,
3697 &IdempotencyToken::new(),
3698 &debug_state(),
3699 0,
3700 100,
3701 None
3702 )
3703 .is_continue()
3704 );
3705
3706 assert_eq!(
3709 state.snapshot(&Antichain::from_elem(9)),
3710 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3711 );
3712
3713 assert_eq!(
3715 state.snapshot(&Antichain::from_elem(10)),
3716 Ok(vec![
3717 hollow(0, 5, &["key1"], 1),
3718 hollow(5, 10, &[], 0),
3719 hollow(10, 15, &["key2"], 1)
3720 ])
3721 );
3722
3723 assert_eq!(
3724 state.snapshot(&Antichain::from_elem(11)),
3725 Ok(vec![
3726 hollow(0, 5, &["key1"], 1),
3727 hollow(5, 10, &[], 0),
3728 hollow(10, 15, &["key2"], 1)
3729 ])
3730 );
3731 }
3732
3733 #[mz_ore::test]
3734 fn next_listen_batch() {
3735 let mut state = TypedState::<String, String, u64, i64>::new(
3736 DUMMY_BUILD_INFO.semver_version(),
3737 ShardId::new(),
3738 "".to_owned(),
3739 0,
3740 );
3741
3742 assert_eq!(
3745 state.next_listen_batch(&Antichain::from_elem(0)),
3746 Err(SeqNo(0))
3747 );
3748 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3749
3750 let writer_id = WriterId::new();
3751 let now = SYSTEM_TIME.clone();
3752
3753 assert!(
3755 state
3756 .collections
3757 .compare_and_append(
3758 &hollow(0, 5, &["key1"], 1),
3759 &writer_id,
3760 now(),
3761 LEASE_DURATION_MS,
3762 &IdempotencyToken::new(),
3763 &debug_state(),
3764 0,
3765 100,
3766 None
3767 )
3768 .is_continue()
3769 );
3770 assert!(
3771 state
3772 .collections
3773 .compare_and_append(
3774 &hollow(5, 10, &["key2"], 1),
3775 &writer_id,
3776 now(),
3777 LEASE_DURATION_MS,
3778 &IdempotencyToken::new(),
3779 &debug_state(),
3780 0,
3781 100,
3782 None
3783 )
3784 .is_continue()
3785 );
3786
3787 for t in 0..=4 {
3789 assert_eq!(
3790 state.next_listen_batch(&Antichain::from_elem(t)),
3791 Ok(hollow(0, 5, &["key1"], 1))
3792 );
3793 }
3794
3795 for t in 5..=9 {
3797 assert_eq!(
3798 state.next_listen_batch(&Antichain::from_elem(t)),
3799 Ok(hollow(5, 10, &["key2"], 1))
3800 );
3801 }
3802
3803 assert_eq!(
3805 state.next_listen_batch(&Antichain::from_elem(10)),
3806 Err(SeqNo(0))
3807 );
3808
3809 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3812 }
3813
3814 #[mz_ore::test]
3815 fn expire_writer() {
3816 let mut state = TypedState::<String, String, u64, i64>::new(
3817 DUMMY_BUILD_INFO.semver_version(),
3818 ShardId::new(),
3819 "".to_owned(),
3820 0,
3821 );
3822 let now = SYSTEM_TIME.clone();
3823
3824 let writer_id_one = WriterId::new();
3825
3826 let writer_id_two = WriterId::new();
3827
3828 assert!(
3830 state
3831 .collections
3832 .compare_and_append(
3833 &hollow(0, 2, &["key1"], 1),
3834 &writer_id_one,
3835 now(),
3836 LEASE_DURATION_MS,
3837 &IdempotencyToken::new(),
3838 &debug_state(),
3839 0,
3840 100,
3841 None
3842 )
3843 .is_continue()
3844 );
3845
3846 assert!(
3847 state
3848 .collections
3849 .expire_writer(&writer_id_one)
3850 .is_continue()
3851 );
3852
3853 assert!(
3855 state
3856 .collections
3857 .compare_and_append(
3858 &hollow(2, 5, &["key2"], 1),
3859 &writer_id_two,
3860 now(),
3861 LEASE_DURATION_MS,
3862 &IdempotencyToken::new(),
3863 &debug_state(),
3864 0,
3865 100,
3866 None
3867 )
3868 .is_continue()
3869 );
3870 }
3871
3872 #[mz_ore::test]
3873 fn maybe_gc_active_gc() {
3874 const GC_CONFIG: GcConfig = GcConfig {
3875 use_active_gc: true,
3876 fallback_threshold_ms: 5000,
3877 min_versions: 99,
3878 max_versions: 500,
3879 };
3880 let now_fn = SYSTEM_TIME.clone();
3881
3882 let mut state = TypedState::<String, String, u64, i64>::new(
3883 DUMMY_BUILD_INFO.semver_version(),
3884 ShardId::new(),
3885 "".to_owned(),
3886 0,
3887 );
3888
3889 let now = now_fn();
3890 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3892 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3893
3894 state.seqno = SeqNo(100);
3897 assert_eq!(state.seqno_since(), SeqNo(100));
3898
3899 let writer_id = WriterId::new();
3901 let _ = state.collections.compare_and_append(
3902 &hollow(1, 2, &["key1"], 1),
3903 &writer_id,
3904 now,
3905 LEASE_DURATION_MS,
3906 &IdempotencyToken::new(),
3907 &debug_state(),
3908 0,
3909 100,
3910 None,
3911 );
3912 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3913
3914 assert_eq!(
3916 state.maybe_gc(true, now, GC_CONFIG),
3917 Some(GcReq {
3918 shard_id: state.shard_id,
3919 new_seqno_since: SeqNo(100)
3920 })
3921 );
3922
3923 state.collections.active_gc = Some(ActiveGc {
3925 seqno: state.seqno,
3926 start_ms: now,
3927 });
3928
3929 state.seqno = SeqNo(200);
3930 assert_eq!(state.seqno_since(), SeqNo(200));
3931
3932 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3933
3934 state.seqno = SeqNo(300);
3935 assert_eq!(state.seqno_since(), SeqNo(300));
3936 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3938 assert_eq!(
3939 state.maybe_gc(true, new_now, GC_CONFIG),
3940 Some(GcReq {
3941 shard_id: state.shard_id,
3942 new_seqno_since: SeqNo(300)
3943 })
3944 );
3945
3946 state.seqno = SeqNo(301);
3950 assert_eq!(state.seqno_since(), SeqNo(301));
3951 assert_eq!(
3952 state.maybe_gc(true, new_now, GC_CONFIG),
3953 Some(GcReq {
3954 shard_id: state.shard_id,
3955 new_seqno_since: SeqNo(301)
3956 })
3957 );
3958
3959 state.collections.active_gc = None;
3960
3961 state.seqno = SeqNo(400);
3964 assert_eq!(state.seqno_since(), SeqNo(400));
3965
3966 let now = now_fn();
3967
3968 let _ = state.collections.expire_writer(&writer_id);
3970 assert_eq!(
3971 state.maybe_gc(false, now, GC_CONFIG),
3972 Some(GcReq {
3973 shard_id: state.shard_id,
3974 new_seqno_since: SeqNo(400)
3975 })
3976 );
3977
3978 let previous_seqno = state.seqno;
3980 state.seqno = SeqNo(10_000);
3981 assert_eq!(state.seqno_since(), SeqNo(10_000));
3982
3983 let now = now_fn();
3984 assert_eq!(
3985 state.maybe_gc(true, now, GC_CONFIG),
3986 Some(GcReq {
3987 shard_id: state.shard_id,
3988 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3989 })
3990 );
3991 }
3992
3993 #[mz_ore::test]
3994 fn maybe_gc_classic() {
3995 const GC_CONFIG: GcConfig = GcConfig {
3996 use_active_gc: false,
3997 fallback_threshold_ms: 5000,
3998 min_versions: 16,
3999 max_versions: 128,
4000 };
4001 const NOW_MS: u64 = 0;
4002
4003 let mut state = TypedState::<String, String, u64, i64>::new(
4004 DUMMY_BUILD_INFO.semver_version(),
4005 ShardId::new(),
4006 "".to_owned(),
4007 0,
4008 );
4009
4010 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
4012 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4013
4014 state.seqno = SeqNo(100);
4017 assert_eq!(state.seqno_since(), SeqNo(100));
4018
4019 let writer_id = WriterId::new();
4021 let now = SYSTEM_TIME.clone();
4022 let _ = state.collections.compare_and_append(
4023 &hollow(1, 2, &["key1"], 1),
4024 &writer_id,
4025 now(),
4026 LEASE_DURATION_MS,
4027 &IdempotencyToken::new(),
4028 &debug_state(),
4029 0,
4030 100,
4031 None,
4032 );
4033 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4034
4035 assert_eq!(
4037 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4038 Some(GcReq {
4039 shard_id: state.shard_id,
4040 new_seqno_since: SeqNo(100)
4041 })
4042 );
4043
4044 state.seqno = SeqNo(200);
4047 assert_eq!(state.seqno_since(), SeqNo(200));
4048
4049 let _ = state.collections.expire_writer(&writer_id);
4051 assert_eq!(
4052 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4053 Some(GcReq {
4054 shard_id: state.shard_id,
4055 new_seqno_since: SeqNo(200)
4056 })
4057 );
4058 }
4059
4060 #[mz_ore::test]
4061 fn need_rollup_active_rollup() {
4062 const ROLLUP_THRESHOLD: usize = 3;
4063 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4064 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4065 let now = SYSTEM_TIME.clone();
4066
4067 mz_ore::test::init_logging();
4068 let mut state = TypedState::<String, String, u64, i64>::new(
4069 DUMMY_BUILD_INFO.semver_version(),
4070 ShardId::new(),
4071 "".to_owned(),
4072 0,
4073 );
4074
4075 let rollup_seqno = SeqNo(5);
4076 let rollup = HollowRollup {
4077 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4078 encoded_size_bytes: None,
4079 };
4080
4081 assert!(
4082 state
4083 .collections
4084 .add_rollup((rollup_seqno, &rollup))
4085 .is_continue()
4086 );
4087
4088 state.seqno = SeqNo(5);
4090 assert_none!(state.need_rollup(
4091 ROLLUP_THRESHOLD,
4092 ROLLUP_USE_ACTIVE_ROLLUP,
4093 ROLLUP_FALLBACK_THRESHOLD_MS,
4094 now()
4095 ));
4096
4097 state.seqno = SeqNo(6);
4099 assert_none!(state.need_rollup(
4100 ROLLUP_THRESHOLD,
4101 ROLLUP_USE_ACTIVE_ROLLUP,
4102 ROLLUP_FALLBACK_THRESHOLD_MS,
4103 now()
4104 ));
4105 state.seqno = SeqNo(7);
4106 assert_none!(state.need_rollup(
4107 ROLLUP_THRESHOLD,
4108 ROLLUP_USE_ACTIVE_ROLLUP,
4109 ROLLUP_FALLBACK_THRESHOLD_MS,
4110 now()
4111 ));
4112 state.seqno = SeqNo(8);
4113 assert_none!(state.need_rollup(
4114 ROLLUP_THRESHOLD,
4115 ROLLUP_USE_ACTIVE_ROLLUP,
4116 ROLLUP_FALLBACK_THRESHOLD_MS,
4117 now()
4118 ));
4119
4120 let mut current_time = now();
4121 state.seqno = SeqNo(9);
4123 assert_eq!(
4124 state
4125 .need_rollup(
4126 ROLLUP_THRESHOLD,
4127 ROLLUP_USE_ACTIVE_ROLLUP,
4128 ROLLUP_FALLBACK_THRESHOLD_MS,
4129 current_time
4130 )
4131 .expect("rollup"),
4132 SeqNo(9)
4133 );
4134
4135 state.collections.active_rollup = Some(ActiveRollup {
4136 seqno: SeqNo(9),
4137 start_ms: current_time,
4138 });
4139
4140 assert_none!(state.need_rollup(
4142 ROLLUP_THRESHOLD,
4143 ROLLUP_USE_ACTIVE_ROLLUP,
4144 ROLLUP_FALLBACK_THRESHOLD_MS,
4145 current_time
4146 ));
4147
4148 state.seqno = SeqNo(10);
4149 assert_none!(state.need_rollup(
4152 ROLLUP_THRESHOLD,
4153 ROLLUP_USE_ACTIVE_ROLLUP,
4154 ROLLUP_FALLBACK_THRESHOLD_MS,
4155 current_time
4156 ));
4157
4158 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4160 assert_eq!(
4161 state
4162 .need_rollup(
4163 ROLLUP_THRESHOLD,
4164 ROLLUP_USE_ACTIVE_ROLLUP,
4165 ROLLUP_FALLBACK_THRESHOLD_MS,
4166 current_time
4167 )
4168 .expect("rollup"),
4169 SeqNo(10)
4170 );
4171
4172 state.seqno = SeqNo(9);
4173 state.collections.active_rollup = None;
4175 let rollup_seqno = SeqNo(9);
4176 let rollup = HollowRollup {
4177 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4178 encoded_size_bytes: None,
4179 };
4180 assert!(
4181 state
4182 .collections
4183 .add_rollup((rollup_seqno, &rollup))
4184 .is_continue()
4185 );
4186
4187 state.seqno = SeqNo(11);
4188 assert_none!(state.need_rollup(
4190 ROLLUP_THRESHOLD,
4191 ROLLUP_USE_ACTIVE_ROLLUP,
4192 ROLLUP_FALLBACK_THRESHOLD_MS,
4193 current_time
4194 ));
4195 state.seqno = SeqNo(13);
4197 assert_eq!(
4198 state
4199 .need_rollup(
4200 ROLLUP_THRESHOLD,
4201 ROLLUP_USE_ACTIVE_ROLLUP,
4202 ROLLUP_FALLBACK_THRESHOLD_MS,
4203 current_time
4204 )
4205 .expect("rollup"),
4206 SeqNo(13)
4207 );
4208 }
4209
4210 #[mz_ore::test]
4211 fn need_rollup_classic() {
4212 const ROLLUP_THRESHOLD: usize = 3;
4213 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4214 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4215 const NOW: u64 = 0;
4216
4217 mz_ore::test::init_logging();
4218 let mut state = TypedState::<String, String, u64, i64>::new(
4219 DUMMY_BUILD_INFO.semver_version(),
4220 ShardId::new(),
4221 "".to_owned(),
4222 0,
4223 );
4224
4225 let rollup_seqno = SeqNo(5);
4226 let rollup = HollowRollup {
4227 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4228 encoded_size_bytes: None,
4229 };
4230
4231 assert!(
4232 state
4233 .collections
4234 .add_rollup((rollup_seqno, &rollup))
4235 .is_continue()
4236 );
4237
4238 state.seqno = SeqNo(5);
4240 assert_none!(state.need_rollup(
4241 ROLLUP_THRESHOLD,
4242 ROLLUP_USE_ACTIVE_ROLLUP,
4243 ROLLUP_FALLBACK_THRESHOLD_MS,
4244 NOW
4245 ));
4246
4247 state.seqno = SeqNo(6);
4249 assert_none!(state.need_rollup(
4250 ROLLUP_THRESHOLD,
4251 ROLLUP_USE_ACTIVE_ROLLUP,
4252 ROLLUP_FALLBACK_THRESHOLD_MS,
4253 NOW
4254 ));
4255 state.seqno = SeqNo(7);
4256 assert_none!(state.need_rollup(
4257 ROLLUP_THRESHOLD,
4258 ROLLUP_USE_ACTIVE_ROLLUP,
4259 ROLLUP_FALLBACK_THRESHOLD_MS,
4260 NOW
4261 ));
4262
4263 state.seqno = SeqNo(8);
4265 assert_eq!(
4266 state
4267 .need_rollup(
4268 ROLLUP_THRESHOLD,
4269 ROLLUP_USE_ACTIVE_ROLLUP,
4270 ROLLUP_FALLBACK_THRESHOLD_MS,
4271 NOW
4272 )
4273 .expect("rollup"),
4274 SeqNo(8)
4275 );
4276
4277 state.seqno = SeqNo(9);
4279 assert_none!(state.need_rollup(
4280 ROLLUP_THRESHOLD,
4281 ROLLUP_USE_ACTIVE_ROLLUP,
4282 ROLLUP_FALLBACK_THRESHOLD_MS,
4283 NOW
4284 ));
4285
4286 state.seqno = SeqNo(11);
4288 assert_eq!(
4289 state
4290 .need_rollup(
4291 ROLLUP_THRESHOLD,
4292 ROLLUP_USE_ACTIVE_ROLLUP,
4293 ROLLUP_FALLBACK_THRESHOLD_MS,
4294 NOW
4295 )
4296 .expect("rollup"),
4297 SeqNo(11)
4298 );
4299
4300 let rollup_seqno = SeqNo(6);
4302 let rollup = HollowRollup {
4303 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4304 encoded_size_bytes: None,
4305 };
4306 assert!(
4307 state
4308 .collections
4309 .add_rollup((rollup_seqno, &rollup))
4310 .is_continue()
4311 );
4312
4313 state.seqno = SeqNo(8);
4314 assert_none!(state.need_rollup(
4315 ROLLUP_THRESHOLD,
4316 ROLLUP_USE_ACTIVE_ROLLUP,
4317 ROLLUP_FALLBACK_THRESHOLD_MS,
4318 NOW
4319 ));
4320 state.seqno = SeqNo(9);
4321 assert_eq!(
4322 state
4323 .need_rollup(
4324 ROLLUP_THRESHOLD,
4325 ROLLUP_USE_ACTIVE_ROLLUP,
4326 ROLLUP_FALLBACK_THRESHOLD_MS,
4327 NOW
4328 )
4329 .expect("rollup"),
4330 SeqNo(9)
4331 );
4332
4333 let fallback_seqno = SeqNo(
4335 rollup_seqno.0
4336 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4337 );
4338 state.seqno = fallback_seqno;
4339 assert_eq!(
4340 state
4341 .need_rollup(
4342 ROLLUP_THRESHOLD,
4343 ROLLUP_USE_ACTIVE_ROLLUP,
4344 ROLLUP_FALLBACK_THRESHOLD_MS,
4345 NOW
4346 )
4347 .expect("rollup"),
4348 fallback_seqno
4349 );
4350 state.seqno = fallback_seqno.next();
4351 assert_eq!(
4352 state
4353 .need_rollup(
4354 ROLLUP_THRESHOLD,
4355 ROLLUP_USE_ACTIVE_ROLLUP,
4356 ROLLUP_FALLBACK_THRESHOLD_MS,
4357 NOW
4358 )
4359 .expect("rollup"),
4360 fallback_seqno.next()
4361 );
4362 }
4363
4364 #[mz_ore::test]
4365 fn idempotency_token_sentinel() {
4366 assert_eq!(
4367 IdempotencyToken::SENTINEL.to_string(),
4368 "i11111111-1111-1111-1111-111111111111"
4369 );
4370 }
4371
4372 #[mz_ore::test]
4381 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4383 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4384 let mut runner = proptest::test_runner::TestRunner::deterministic();
4385 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4386 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4387 assert_eq!(
4388 json.trim(),
4389 STATE_SERDE_JSON.trim(),
4390 "\n\nNEW GOLDEN\n{}\n",
4391 json
4392 );
4393 }
4394
4395 #[mz_persist_proc::test(tokio::test)]
4396 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4398 let mut clients = new_test_client_cache(&dyncfgs);
4399 let shard_id = ShardId::new();
4400
4401 async fn open_and_write(
4402 clients: &mut PersistClientCache,
4403 version: semver::Version,
4404 shard_id: ShardId,
4405 ) -> Result<(), tokio::task::JoinError> {
4406 clients.cfg.build_version = version.clone();
4407 clients.clear_state_cache();
4408 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4409 mz_ore::task::spawn(|| version.to_string(), async move {
4411 let () = client
4412 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4413 .await
4414 .expect("valid usage");
4415 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4416 let current = *write.upper().as_option().unwrap();
4417 write
4419 .expect_compare_and_append_batch(&mut [], current, current + 1)
4420 .await;
4421 })
4422 .into_tokio_handle()
4423 .await
4424 }
4425
4426 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4428 assert_ok!(res);
4429
4430 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4432 assert_ok!(res);
4433
4434 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4436 assert!(res.unwrap_err().is_panic());
4437
4438 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4440 assert!(res.unwrap_err().is_panic());
4441 }
4442
4443 #[mz_ore::test]
4444 fn runid_roundtrip() {
4445 proptest!(|(runid: RunId)| {
4446 let runid_str = runid.to_string();
4447 let parsed = RunId::from_str(&runid_str);
4448 prop_assert_eq!(parsed, Ok(runid));
4449 });
4450 }
4451
4452 #[mz_ore::test]
4468 fn add_rollup_idempotent_across_gc_removal() {
4469 let mut state = TypedState::<String, String, u64, i64>::new(
4470 DUMMY_BUILD_INFO.semver_version(),
4471 ShardId::new(),
4472 "".to_owned(),
4473 0,
4474 );
4475
4476 let older_seqno = SeqNo(10);
4477 let older = HollowRollup {
4478 key: PartialRollupKey::new(older_seqno, &RollupId::new()),
4479 encoded_size_bytes: None,
4480 };
4481 let newer_seqno = SeqNo(20);
4482 let newer = HollowRollup {
4483 key: PartialRollupKey::new(newer_seqno, &RollupId::new()),
4484 encoded_size_bytes: None,
4485 };
4486 let add_older = |state: &mut StateCollections<u64>| state.add_rollup((older_seqno, &older));
4487
4488 assert_eq!(add_older(&mut state.collections), Continue(true));
4490 assert_eq!(add_older(&mut state.collections), Continue(true));
4493 assert_eq!(state.collections.rollups.len(), 1);
4494
4495 assert_eq!(
4499 state.collections.add_rollup((newer_seqno, &newer)),
4500 Continue(true),
4501 );
4502
4503 let _ = state
4507 .collections
4508 .remove_rollups(&[(older_seqno, older.key.clone())]);
4509 assert!(!state.collections.rollups.contains_key(&older_seqno));
4510 assert!(state.collections.rollups.contains_key(&newer_seqno));
4511
4512 assert_eq!(add_older(&mut state.collections), Continue(false));
4517 assert!(!state.collections.rollups.contains_key(&older_seqno));
4518 assert_eq!(state.collections.rollups.len(), 1);
4519 }
4520}