mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Semigroup;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
60use crate::internal::gc::GcReq;
61use crate::internal::machine::retry_external;
62use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
63use crate::internal::trace::{
64    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
65};
66use crate::metrics::Metrics;
67use crate::read::LeasedReaderId;
68use crate::schema::CaESchema;
69use crate::write::WriterId;
70use crate::{PersistConfig, ShardId};
71
72include!(concat!(
73    env!("OUT_DIR"),
74    "/mz_persist_client.internal.state.rs"
75));
76
77include!(concat!(
78    env!("OUT_DIR"),
79    "/mz_persist_client.internal.diff.rs"
80));
81
82/// Determines how often to write rollups, assigning a maintenance task after
83/// `rollup_threshold` seqnos have passed since the last rollup.
84///
85/// Tuning note: in the absence of a long reader seqno hold, and with
86/// incremental GC, this threshold will determine about how many live diffs are
87/// held in Consensus. Lowering this value decreases the live diff count at the
88/// cost of more maintenance work + blob writes.
89pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
90    "persist_rollup_threshold",
91    128,
92    "The number of seqnos between rollups.",
93);
94
95/// Determines how long to wait before an active rollup is considered
96/// "stuck" and a new rollup is started.
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
98    "persist_rollup_fallback_threshold_ms",
99    5000,
100    "The number of milliseconds before a worker claims an already claimed rollup.",
101);
102
103/// Feature flag the new active rollup tracking mechanism.
104/// We musn't enable this until we are fully deployed on the new version.
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
106    "persist_rollup_use_active_rollup",
107    false,
108    "Whether to use the new active rollup tracking mechanism.",
109);
110
111/// Determines how long to wait before an active GC is considered
112/// "stuck" and a new GC is started.
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
114    "persist_gc_fallback_threshold_ms",
115    900000,
116    "The number of milliseconds before a worker claims an already claimed GC.",
117);
118
119/// See the config description string.
120pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
121    "persist_gc_min_versions",
122    32,
123    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
124);
125
126/// See the config description string.
127pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
128    "persist_gc_max_versions",
129    128_000,
130    "The maximum number of versions to GC in a single GC run.",
131);
132
133/// Feature flag the new active GC tracking mechanism.
134/// We musn't enable this until we are fully deployed on the new version.
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
136    "persist_gc_use_active_gc",
137    false,
138    "Whether to use the new active GC tracking mechanism.",
139);
140
141pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
142    "persist_enable_incremental_compaction",
143    false,
144    "Whether to enable incremental compaction.",
145);
146
147/// A token to disambiguate state commands that could not otherwise be
148/// idempotent.
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
150#[serde(into = "String")]
151pub struct IdempotencyToken(pub(crate) [u8; 16]);
152
153impl std::fmt::Display for IdempotencyToken {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        write!(f, "i{}", Uuid::from_bytes(self.0))
156    }
157}
158
159impl std::fmt::Debug for IdempotencyToken {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
162    }
163}
164
165impl std::str::FromStr for IdempotencyToken {
166    type Err = String;
167
168    fn from_str(s: &str) -> Result<Self, Self::Err> {
169        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
170    }
171}
172
173impl From<IdempotencyToken> for String {
174    fn from(x: IdempotencyToken) -> Self {
175        x.to_string()
176    }
177}
178
179impl IdempotencyToken {
180    pub(crate) fn new() -> Self {
181        IdempotencyToken(*Uuid::new_v4().as_bytes())
182    }
183    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
184}
185
186#[derive(Clone, Debug, PartialEq, Serialize)]
187pub struct LeasedReaderState<T> {
188    /// The seqno capability of this reader.
189    pub seqno: SeqNo,
190    /// The since capability of this reader.
191    pub since: Antichain<T>,
192    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
193    pub last_heartbeat_timestamp_ms: u64,
194    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
195    /// after which this reader may be expired
196    pub lease_duration_ms: u64,
197    /// For debugging.
198    pub debug: HandleDebugState,
199}
200
201#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
202#[serde(into = "u64")]
203pub struct OpaqueState(pub [u8; 8]);
204
205impl From<OpaqueState> for u64 {
206    fn from(value: OpaqueState) -> Self {
207        u64::from_le_bytes(value.0)
208    }
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize)]
212pub struct CriticalReaderState<T> {
213    /// The since capability of this reader.
214    pub since: Antichain<T>,
215    /// An opaque token matched on by compare_and_downgrade_since.
216    pub opaque: OpaqueState,
217    /// The [Codec64] used to encode [Self::opaque].
218    pub opaque_codec: String,
219    /// For debugging.
220    pub debug: HandleDebugState,
221}
222
223#[derive(Clone, Debug, PartialEq, Serialize)]
224pub struct WriterState<T> {
225    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
226    pub last_heartbeat_timestamp_ms: u64,
227    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
228    /// after which this writer may be expired
229    pub lease_duration_ms: u64,
230    /// The idempotency token of the most recent successful compare_and_append
231    /// by this writer.
232    pub most_recent_write_token: IdempotencyToken,
233    /// The upper of the most recent successful compare_and_append by this
234    /// writer.
235    pub most_recent_write_upper: Antichain<T>,
236    /// For debugging.
237    pub debug: HandleDebugState,
238}
239
240/// Debugging info for a reader or writer.
241#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
242pub struct HandleDebugState {
243    /// Hostname of the persist user that registered this writer or reader. For
244    /// critical readers, this is the _most recent_ registration.
245    pub hostname: String,
246    /// Plaintext description of this writer or reader's intent.
247    pub purpose: String,
248}
249
250/// Part of the updates in a Batch.
251///
252/// Either a pointer to ones stored in Blob or the updates themselves inlined.
253#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
254#[serde(tag = "type")]
255pub enum BatchPart<T> {
256    Hollow(HollowBatchPart<T>),
257    Inline {
258        updates: LazyInlineBatchPart,
259        ts_rewrite: Option<Antichain<T>>,
260        schema_id: Option<SchemaId>,
261
262        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
263        deprecated_schema_id: Option<SchemaId>,
264    },
265}
266
267fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
268    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
269        let proto = lower.decode()?;
270        let data = ArrayData::from_proto(proto)?;
271        ensure!(data.len() == 1);
272        Ok(ArrayBound::new(make_array(data), 0))
273    };
274
275    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
276
277    match decoded {
278        Ok(bound) => Some(bound),
279        Err(e) => {
280            soft_panic_or_log!("failed to decode bound: {e:#?}");
281            None
282        }
283    }
284}
285
286impl<T> BatchPart<T> {
287    pub fn hollow_bytes(&self) -> usize {
288        match self {
289            BatchPart::Hollow(x) => x.encoded_size_bytes,
290            BatchPart::Inline { .. } => 0,
291        }
292    }
293
294    pub fn is_inline(&self) -> bool {
295        matches!(self, BatchPart::Inline { .. })
296    }
297
298    pub fn inline_bytes(&self) -> usize {
299        match self {
300            BatchPart::Hollow(_) => 0,
301            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
302        }
303    }
304
305    pub fn writer_key(&self) -> Option<WriterKey> {
306        match self {
307            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
308            BatchPart::Inline { .. } => None,
309        }
310    }
311
312    pub fn encoded_size_bytes(&self) -> usize {
313        match self {
314            BatchPart::Hollow(x) => x.encoded_size_bytes,
315            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
316        }
317    }
318
319    // A user-interpretable identifier or description of the part (for logs and
320    // such).
321    pub fn printable_name(&self) -> &str {
322        match self {
323            BatchPart::Hollow(x) => x.key.0.as_str(),
324            BatchPart::Inline { .. } => "<inline>",
325        }
326    }
327
328    pub fn stats(&self) -> Option<&LazyPartStats> {
329        match self {
330            BatchPart::Hollow(x) => x.stats.as_ref(),
331            BatchPart::Inline { .. } => None,
332        }
333    }
334
335    pub fn key_lower(&self) -> &[u8] {
336        match self {
337            BatchPart::Hollow(x) => x.key_lower.as_slice(),
338            // We don't duplicate the lowest key because this can be
339            // considerable overhead for small parts.
340            //
341            // The empty key might not be a tight lower bound, but it is a valid
342            // lower bound. If a caller is interested in a tighter lower bound,
343            // the data is inline.
344            BatchPart::Inline { .. } => &[],
345        }
346    }
347
348    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
349        let part = match self {
350            BatchPart::Hollow(part) => part,
351            BatchPart::Inline { .. } => return None,
352        };
353
354        decode_structured_lower(part.structured_key_lower.as_ref()?)
355    }
356
357    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
358        match self {
359            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
360            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
361        }
362    }
363
364    pub fn schema_id(&self) -> Option<SchemaId> {
365        match self {
366            BatchPart::Hollow(x) => x.schema_id,
367            BatchPart::Inline { schema_id, .. } => *schema_id,
368        }
369    }
370
371    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
372        match self {
373            BatchPart::Hollow(x) => x.deprecated_schema_id,
374            BatchPart::Inline {
375                deprecated_schema_id,
376                ..
377            } => *deprecated_schema_id,
378        }
379    }
380}
381
382impl<T: Timestamp + Codec64> BatchPart<T> {
383    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
384        match self {
385            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
386            BatchPart::Inline { updates, .. } => {
387                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
388                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
389            }
390        }
391    }
392
393    pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
394        match self {
395            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
396            BatchPart::Inline { updates, .. } => updates
397                .decode::<T>(metrics)
398                .expect("valid inline part")
399                .updates
400                .diffs_sum(),
401        }
402    }
403}
404
405/// An ordered list of parts, generally stored as part of a larger run.
406#[derive(Debug, Clone)]
407pub struct HollowRun<T> {
408    /// Pointers usable to retrieve the updates.
409    pub(crate) parts: Vec<RunPart<T>>,
410}
411
412/// A reference to a [HollowRun], including the key in the blob store and some denormalized
413/// metadata.
414#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
415pub struct HollowRunRef<T> {
416    pub key: PartialBatchKey,
417
418    /// The size of the referenced run object, plus all of the hollow objects it contains.
419    pub hollow_bytes: usize,
420
421    /// The size of the largest individual part in the run; useful for sizing compaction.
422    pub max_part_bytes: usize,
423
424    /// The lower bound of the data in this part, ordered by the codec ordering.
425    pub key_lower: Vec<u8>,
426
427    /// The lower bound of the data in this part, ordered by the structured ordering.
428    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
429
430    pub diffs_sum: Option<[u8; 8]>,
431
432    pub(crate) _phantom_data: PhantomData<T>,
433}
434impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
435    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
436        Some(self.cmp(other))
437    }
438}
439
440impl<T: Eq> Ord for HollowRunRef<T> {
441    fn cmp(&self, other: &Self) -> Ordering {
442        self.key.cmp(&other.key)
443    }
444}
445
446impl<T> HollowRunRef<T> {
447    pub fn writer_key(&self) -> Option<WriterKey> {
448        Some(self.key.split()?.0)
449    }
450}
451
452impl<T: Timestamp + Codec64> HollowRunRef<T> {
453    /// Stores the given runs and returns a [HollowRunRef] that points to them.
454    pub async fn set<D: Codec64 + Semigroup>(
455        shard_id: ShardId,
456        blob: &dyn Blob,
457        writer: &WriterKey,
458        data: HollowRun<T>,
459        metrics: &Metrics,
460    ) -> Self {
461        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
462        let max_part_bytes = data
463            .parts
464            .iter()
465            .map(|p| p.max_part_bytes())
466            .max()
467            .unwrap_or(0);
468        let key_lower = data
469            .parts
470            .first()
471            .map_or(vec![], |p| p.key_lower().to_vec());
472        let structured_key_lower = match data.parts.first() {
473            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
474            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
475            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
476        };
477        let diffs_sum = data
478            .parts
479            .iter()
480            .map(|p| {
481                p.diffs_sum::<D>(&metrics.columnar)
482                    .expect("valid diffs sum")
483            })
484            .reduce(|mut a, b| {
485                a.plus_equals(&b);
486                a
487            })
488            .expect("valid diffs sum")
489            .encode();
490
491        let key = PartialBatchKey::new(writer, &PartId::new());
492        let blob_key = key.complete(&shard_id);
493        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
494        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
495            blob.set(&blob_key, bytes.clone())
496        })
497        .await;
498        Self {
499            key,
500            hollow_bytes,
501            max_part_bytes,
502            key_lower,
503            structured_key_lower,
504            diffs_sum: Some(diffs_sum),
505            _phantom_data: Default::default(),
506        }
507    }
508
509    /// Retrieve the [HollowRun] that this reference points to.
510    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
511    /// with the same shard id and backing store.
512    pub async fn get(
513        &self,
514        shard_id: ShardId,
515        blob: &dyn Blob,
516        metrics: &Metrics,
517    ) -> Option<HollowRun<T>> {
518        let blob_key = self.key.complete(&shard_id);
519        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
520            blob.get(&blob_key)
521        })
522        .await?;
523        let proto_runs: ProtoHollowRun =
524            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
525        let runs = proto_runs
526            .into_rust()
527            .expect("illegal state: invalid encoded runs proto");
528        Some(runs)
529    }
530}
531
532/// Part of the updates in a run.
533///
534/// Either a pointer to ones stored in Blob or a single part stored inline.
535#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
536#[serde(untagged)]
537pub enum RunPart<T> {
538    Single(BatchPart<T>),
539    Many(HollowRunRef<T>),
540}
541
542impl<T: Ord> PartialOrd<Self> for RunPart<T> {
543    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
544        Some(self.cmp(other))
545    }
546}
547
548impl<T: Ord> Ord for RunPart<T> {
549    fn cmp(&self, other: &Self) -> Ordering {
550        match (self, other) {
551            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
552            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
553            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
554            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
555        }
556    }
557}
558
559impl<T> RunPart<T> {
560    #[cfg(test)]
561    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
562        match self {
563            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
564            _ => panic!("expected hollow part!"),
565        }
566    }
567
568    pub fn hollow_bytes(&self) -> usize {
569        match self {
570            Self::Single(p) => p.hollow_bytes(),
571            Self::Many(r) => r.hollow_bytes,
572        }
573    }
574
575    pub fn is_inline(&self) -> bool {
576        match self {
577            Self::Single(p) => p.is_inline(),
578            Self::Many(_) => false,
579        }
580    }
581
582    pub fn inline_bytes(&self) -> usize {
583        match self {
584            Self::Single(p) => p.inline_bytes(),
585            Self::Many(_) => 0,
586        }
587    }
588
589    pub fn max_part_bytes(&self) -> usize {
590        match self {
591            Self::Single(p) => p.encoded_size_bytes(),
592            Self::Many(r) => r.max_part_bytes,
593        }
594    }
595
596    pub fn writer_key(&self) -> Option<WriterKey> {
597        match self {
598            Self::Single(p) => p.writer_key(),
599            Self::Many(r) => r.writer_key(),
600        }
601    }
602
603    pub fn encoded_size_bytes(&self) -> usize {
604        match self {
605            Self::Single(p) => p.encoded_size_bytes(),
606            Self::Many(r) => r.hollow_bytes,
607        }
608    }
609
610    pub fn schema_id(&self) -> Option<SchemaId> {
611        match self {
612            Self::Single(p) => p.schema_id(),
613            Self::Many(_) => None,
614        }
615    }
616
617    // A user-interpretable identifier or description of the part (for logs and
618    // such).
619    pub fn printable_name(&self) -> &str {
620        match self {
621            Self::Single(p) => p.printable_name(),
622            Self::Many(r) => r.key.0.as_str(),
623        }
624    }
625
626    pub fn stats(&self) -> Option<&LazyPartStats> {
627        match self {
628            Self::Single(p) => p.stats(),
629            // TODO: if we kept stats we could avoid fetching the metadata here.
630            Self::Many(_) => None,
631        }
632    }
633
634    pub fn key_lower(&self) -> &[u8] {
635        match self {
636            Self::Single(p) => p.key_lower(),
637            Self::Many(r) => r.key_lower.as_slice(),
638        }
639    }
640
641    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
642        match self {
643            Self::Single(p) => p.structured_key_lower(),
644            Self::Many(_) => None,
645        }
646    }
647
648    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
649        match self {
650            Self::Single(p) => p.ts_rewrite(),
651            Self::Many(_) => None,
652        }
653    }
654}
655
656impl<T> RunPart<T>
657where
658    T: Timestamp + Codec64,
659{
660    pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
661        match self {
662            Self::Single(p) => p.diffs_sum(metrics),
663            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
664        }
665    }
666}
667
668/// A blob was missing!
669#[derive(Clone, Debug)]
670pub struct MissingBlob(BlobKey);
671
672impl std::fmt::Display for MissingBlob {
673    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
674        write!(f, "unexpectedly missing key: {}", self.0)
675    }
676}
677
678impl std::error::Error for MissingBlob {}
679
680impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
681    pub fn part_stream<'a>(
682        &'a self,
683        shard_id: ShardId,
684        blob: &'a dyn Blob,
685        metrics: &'a Metrics,
686    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
687        try_stream! {
688            match self {
689                RunPart::Single(p) => {
690                    yield Cow::Borrowed(p);
691                }
692                RunPart::Many(r) => {
693                    let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
694                    for run_part in fetched.parts {
695                        for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
696                            yield Cow::Owned(batch_part?.into_owned());
697                        }
698                    }
699                }
700            }
701        }
702    }
703}
704
705impl<T: Ord> PartialOrd for BatchPart<T> {
706    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
707        Some(self.cmp(other))
708    }
709}
710
711impl<T: Ord> Ord for BatchPart<T> {
712    fn cmp(&self, other: &Self) -> Ordering {
713        match (self, other) {
714            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
715            (
716                BatchPart::Inline {
717                    updates: s_updates,
718                    ts_rewrite: s_ts_rewrite,
719                    schema_id: s_schema_id,
720                    deprecated_schema_id: s_deprecated_schema_id,
721                },
722                BatchPart::Inline {
723                    updates: o_updates,
724                    ts_rewrite: o_ts_rewrite,
725                    schema_id: o_schema_id,
726                    deprecated_schema_id: o_deprecated_schema_id,
727                },
728            ) => (
729                s_updates,
730                s_ts_rewrite.as_ref().map(|x| x.elements()),
731                s_schema_id,
732                s_deprecated_schema_id,
733            )
734                .cmp(&(
735                    o_updates,
736                    o_ts_rewrite.as_ref().map(|x| x.elements()),
737                    o_schema_id,
738                    o_deprecated_schema_id,
739                )),
740            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
741            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
742        }
743    }
744}
745
746/// What order are the parts in this run in?
747#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
748pub(crate) enum RunOrder {
749    /// They're in no particular order.
750    Unordered,
751    /// They're ordered based on the codec-encoded K/V bytes.
752    Codec,
753    /// They're ordered by the natural ordering of the structured data.
754    Structured,
755}
756
757#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
758pub struct RunId(pub(crate) [u8; 16]);
759
760impl std::fmt::Display for RunId {
761    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
762        write!(f, "ri{}", Uuid::from_bytes(self.0))
763    }
764}
765
766impl std::fmt::Debug for RunId {
767    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
768        write!(f, "RunId({})", Uuid::from_bytes(self.0))
769    }
770}
771
772impl std::str::FromStr for RunId {
773    type Err = String;
774
775    fn from_str(s: &str) -> Result<Self, Self::Err> {
776        parse_id("ri", "RunId", s).map(RunId)
777    }
778}
779
780impl From<RunId> for String {
781    fn from(x: RunId) -> Self {
782        x.to_string()
783    }
784}
785
786impl RunId {
787    pub(crate) fn new() -> Self {
788        RunId(*Uuid::new_v4().as_bytes())
789    }
790}
791
792impl Arbitrary for RunId {
793    type Parameters = ();
794    type Strategy = proptest::strategy::BoxedStrategy<Self>;
795    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
796        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
797            RunId(*Uuid::from_u128(n).as_bytes())
798        })
799        .boxed()
800    }
801}
802
803/// Metadata shared across a run.
804#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
805pub struct RunMeta {
806    /// If none, Persist should infer the order based on the proto metadata.
807    pub(crate) order: Option<RunOrder>,
808    /// All parts in a run should have the same schema.
809    pub(crate) schema: Option<SchemaId>,
810
811    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
812    pub(crate) deprecated_schema: Option<SchemaId>,
813
814    /// If set, a UUID that uniquely identifies this run.
815    pub(crate) id: Option<RunId>,
816
817    /// The number of updates in this run, or `None` if the number is unknown.
818    pub(crate) len: Option<usize>,
819}
820
821/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
822#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
823pub struct HollowBatchPart<T> {
824    /// Pointer usable to retrieve the updates.
825    pub key: PartialBatchKey,
826    /// The encoded size of this part.
827    pub encoded_size_bytes: usize,
828    /// A lower bound on the keys in the part. (By default, this the minimum
829    /// possible key: `vec![]`.)
830    #[serde(serialize_with = "serialize_part_bytes")]
831    pub key_lower: Vec<u8>,
832    /// A lower bound on the keys in the part, stored as structured data.
833    #[serde(serialize_with = "serialize_lazy_proto")]
834    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
835    /// Aggregate statistics about data contained in this part.
836    #[serde(serialize_with = "serialize_part_stats")]
837    pub stats: Option<LazyPartStats>,
838    /// A frontier to which timestamps in this part are advanced on read, if
839    /// set.
840    ///
841    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
842    /// but we maintain the distinction between the two for some internal sanity
843    /// checking of invariants as well as metrics. If this ever becomes an
844    /// issue, everything still works with this as just `Antichain<T>`.
845    pub ts_rewrite: Option<Antichain<T>>,
846    /// A Codec64 encoded sum of all diffs in this part, if known.
847    ///
848    /// This is `None` if this part was written before we started storing this
849    /// information, or if it was written when the dyncfg was off.
850    ///
851    /// It could also make sense to model this as part of the pushdown stats, if
852    /// we later decide that's of some benefit.
853    #[serde(serialize_with = "serialize_diffs_sum")]
854    pub diffs_sum: Option<[u8; 8]>,
855    /// Columnar format that this batch was written in.
856    ///
857    /// This is `None` if this part was written before we started writing structured
858    /// columnar data.
859    pub format: Option<BatchColumnarFormat>,
860    /// The schemas used to encode the data in this batch part.
861    ///
862    /// Or None for historical data written before the schema registry was
863    /// added.
864    pub schema_id: Option<SchemaId>,
865
866    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
867    pub deprecated_schema_id: Option<SchemaId>,
868}
869
870/// A [Batch] but with the updates themselves stored externally.
871///
872/// [Batch]: differential_dataflow::trace::BatchReader
873#[derive(Clone, PartialEq, Eq)]
874pub struct HollowBatch<T> {
875    /// Describes the times of the updates in the batch.
876    pub desc: Description<T>,
877    /// The number of updates in the batch.
878    pub len: usize,
879    /// Pointers usable to retrieve the updates.
880    pub(crate) parts: Vec<RunPart<T>>,
881    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
882    /// ex.
883    /// ```text
884    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
885    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
886    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
887    /// ```
888    pub(crate) run_splits: Vec<usize>,
889    /// Run-level metadata: the first entry has metadata for the first run, and so on.
890    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
891    pub(crate) run_meta: Vec<RunMeta>,
892}
893
894impl<T: Debug> Debug for HollowBatch<T> {
895    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
896        let HollowBatch {
897            desc,
898            parts,
899            len,
900            run_splits: runs,
901            run_meta,
902        } = self;
903        f.debug_struct("HollowBatch")
904            .field(
905                "desc",
906                &(
907                    desc.lower().elements(),
908                    desc.upper().elements(),
909                    desc.since().elements(),
910                ),
911            )
912            .field("parts", &parts)
913            .field("len", &len)
914            .field("runs", &runs)
915            .field("run_meta", &run_meta)
916            .finish()
917    }
918}
919
920impl<T: Serialize> serde::Serialize for HollowBatch<T> {
921    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
922        let HollowBatch {
923            desc,
924            len,
925            // Both parts and runs are covered by the self.runs call.
926            parts: _,
927            run_splits: _,
928            run_meta: _,
929        } = self;
930        let mut s = s.serialize_struct("HollowBatch", 5)?;
931        let () = s.serialize_field("lower", &desc.lower().elements())?;
932        let () = s.serialize_field("upper", &desc.upper().elements())?;
933        let () = s.serialize_field("since", &desc.since().elements())?;
934        let () = s.serialize_field("len", len)?;
935        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
936        s.end()
937    }
938}
939
940impl<T: Ord> PartialOrd for HollowBatch<T> {
941    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
942        Some(self.cmp(other))
943    }
944}
945
946impl<T: Ord> Ord for HollowBatch<T> {
947    fn cmp(&self, other: &Self) -> Ordering {
948        // Deconstruct self and other so we get a compile failure if new fields
949        // are added.
950        let HollowBatch {
951            desc: self_desc,
952            parts: self_parts,
953            len: self_len,
954            run_splits: self_runs,
955            run_meta: self_run_meta,
956        } = self;
957        let HollowBatch {
958            desc: other_desc,
959            parts: other_parts,
960            len: other_len,
961            run_splits: other_runs,
962            run_meta: other_run_meta,
963        } = other;
964        (
965            self_desc.lower().elements(),
966            self_desc.upper().elements(),
967            self_desc.since().elements(),
968            self_parts,
969            self_len,
970            self_runs,
971            self_run_meta,
972        )
973            .cmp(&(
974                other_desc.lower().elements(),
975                other_desc.upper().elements(),
976                other_desc.since().elements(),
977                other_parts,
978                other_len,
979                other_runs,
980                other_run_meta,
981            ))
982    }
983}
984
985impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
986    pub(crate) fn part_stream<'a>(
987        &'a self,
988        shard_id: ShardId,
989        blob: &'a dyn Blob,
990        metrics: &'a Metrics,
991    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
992        stream! {
993            for part in &self.parts {
994                for await part in part.part_stream(shard_id, blob, metrics) {
995                    yield part;
996                }
997            }
998        }
999    }
1000}
1001impl<T> HollowBatch<T> {
1002    /// Construct an in-memory hollow batch from the given metadata.
1003    ///
1004    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1005    /// is responsible for ensuring that the defined runs are valid.
1006    ///
1007    /// `len` should represent the number of valid updates in the referenced parts.
1008    pub(crate) fn new(
1009        desc: Description<T>,
1010        parts: Vec<RunPart<T>>,
1011        len: usize,
1012        run_meta: Vec<RunMeta>,
1013        run_splits: Vec<usize>,
1014    ) -> Self {
1015        debug_assert!(
1016            run_splits.is_strictly_sorted(),
1017            "run indices should be strictly increasing"
1018        );
1019        debug_assert!(
1020            run_splits.first().map_or(true, |i| *i > 0),
1021            "run indices should be positive"
1022        );
1023        debug_assert!(
1024            run_splits.last().map_or(true, |i| *i < parts.len()),
1025            "run indices should be valid indices into parts"
1026        );
1027        debug_assert!(
1028            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1029            "all metadata should correspond to a run"
1030        );
1031
1032        Self {
1033            desc,
1034            len,
1035            parts,
1036            run_splits,
1037            run_meta,
1038        }
1039    }
1040
1041    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1042    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1043        let run_meta = if parts.is_empty() {
1044            vec![]
1045        } else {
1046            vec![RunMeta::default()]
1047        };
1048        Self {
1049            desc,
1050            len,
1051            parts,
1052            run_splits: vec![],
1053            run_meta,
1054        }
1055    }
1056
1057    #[cfg(test)]
1058    pub(crate) fn new_run_for_test(
1059        desc: Description<T>,
1060        parts: Vec<RunPart<T>>,
1061        len: usize,
1062        run_id: RunId,
1063    ) -> Self {
1064        let run_meta = if parts.is_empty() {
1065            vec![]
1066        } else {
1067            let mut meta = RunMeta::default();
1068            meta.id = Some(run_id);
1069            vec![meta]
1070        };
1071        Self {
1072            desc,
1073            len,
1074            parts,
1075            run_splits: vec![],
1076            run_meta,
1077        }
1078    }
1079
1080    /// An empty hollow batch, representing no updates over the given desc.
1081    pub(crate) fn empty(desc: Description<T>) -> Self {
1082        Self {
1083            desc,
1084            len: 0,
1085            parts: vec![],
1086            run_splits: vec![],
1087            run_meta: vec![],
1088        }
1089    }
1090
1091    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1092        let run_ends = self
1093            .run_splits
1094            .iter()
1095            .copied()
1096            .chain(std::iter::once(self.parts.len()));
1097        let run_metas = self.run_meta.iter();
1098        let run_parts = run_ends
1099            .scan(0, |start, end| {
1100                let range = *start..end;
1101                *start = end;
1102                Some(range)
1103            })
1104            .filter(|range| !range.is_empty())
1105            .map(|range| &self.parts[range]);
1106        run_metas.zip_eq(run_parts)
1107    }
1108
1109    pub(crate) fn inline_bytes(&self) -> usize {
1110        self.parts.iter().map(|x| x.inline_bytes()).sum()
1111    }
1112
1113    pub(crate) fn is_empty(&self) -> bool {
1114        self.parts.is_empty()
1115    }
1116
1117    pub(crate) fn part_count(&self) -> usize {
1118        self.parts.len()
1119    }
1120
1121    /// The sum of the encoded sizes of all parts in the batch.
1122    pub fn encoded_size_bytes(&self) -> usize {
1123        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1124    }
1125}
1126
1127// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1128impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1129    pub(crate) fn rewrite_ts(
1130        &mut self,
1131        frontier: &Antichain<T>,
1132        new_upper: Antichain<T>,
1133    ) -> Result<(), String> {
1134        if !PartialOrder::less_than(frontier, &new_upper) {
1135            return Err(format!(
1136                "rewrite frontier {:?} !< rewrite upper {:?}",
1137                frontier.elements(),
1138                new_upper.elements(),
1139            ));
1140        }
1141        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1142            return Err(format!(
1143                "rewrite upper {:?} < batch upper {:?}",
1144                new_upper.elements(),
1145                self.desc.upper().elements(),
1146            ));
1147        }
1148
1149        // The following are things that it seems like we could support, but
1150        // initially we don't because we don't have a use case for them.
1151        if PartialOrder::less_than(frontier, self.desc.lower()) {
1152            return Err(format!(
1153                "rewrite frontier {:?} < batch lower {:?}",
1154                frontier.elements(),
1155                self.desc.lower().elements(),
1156            ));
1157        }
1158        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1159            return Err(format!(
1160                "batch since {:?} != minimum antichain {:?}",
1161                self.desc.since().elements(),
1162                &[T::minimum()],
1163            ));
1164        }
1165        for part in self.parts.iter() {
1166            let Some(ts_rewrite) = part.ts_rewrite() else {
1167                continue;
1168            };
1169            if PartialOrder::less_than(frontier, ts_rewrite) {
1170                return Err(format!(
1171                    "rewrite frontier {:?} < batch rewrite {:?}",
1172                    frontier.elements(),
1173                    ts_rewrite.elements(),
1174                ));
1175            }
1176        }
1177
1178        self.desc = Description::new(
1179            self.desc.lower().clone(),
1180            new_upper,
1181            self.desc.since().clone(),
1182        );
1183        for part in &mut self.parts {
1184            match part {
1185                RunPart::Single(BatchPart::Hollow(part)) => {
1186                    part.ts_rewrite = Some(frontier.clone())
1187                }
1188                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1189                    *ts_rewrite = Some(frontier.clone())
1190                }
1191                RunPart::Many(runs) => {
1192                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1193                    // ever generate runs of >1 part for those.
1194                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1195                }
1196            }
1197        }
1198        Ok(())
1199    }
1200}
1201
1202impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1203    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1204        Some(self.cmp(other))
1205    }
1206}
1207
1208impl<T: Ord> Ord for HollowBatchPart<T> {
1209    fn cmp(&self, other: &Self) -> Ordering {
1210        // Deconstruct self and other so we get a compile failure if new fields
1211        // are added.
1212        let HollowBatchPart {
1213            key: self_key,
1214            encoded_size_bytes: self_encoded_size_bytes,
1215            key_lower: self_key_lower,
1216            structured_key_lower: self_structured_key_lower,
1217            stats: self_stats,
1218            ts_rewrite: self_ts_rewrite,
1219            diffs_sum: self_diffs_sum,
1220            format: self_format,
1221            schema_id: self_schema_id,
1222            deprecated_schema_id: self_deprecated_schema_id,
1223        } = self;
1224        let HollowBatchPart {
1225            key: other_key,
1226            encoded_size_bytes: other_encoded_size_bytes,
1227            key_lower: other_key_lower,
1228            structured_key_lower: other_structured_key_lower,
1229            stats: other_stats,
1230            ts_rewrite: other_ts_rewrite,
1231            diffs_sum: other_diffs_sum,
1232            format: other_format,
1233            schema_id: other_schema_id,
1234            deprecated_schema_id: other_deprecated_schema_id,
1235        } = other;
1236        (
1237            self_key,
1238            self_encoded_size_bytes,
1239            self_key_lower,
1240            self_structured_key_lower,
1241            self_stats,
1242            self_ts_rewrite.as_ref().map(|x| x.elements()),
1243            self_diffs_sum,
1244            self_format,
1245            self_schema_id,
1246            self_deprecated_schema_id,
1247        )
1248            .cmp(&(
1249                other_key,
1250                other_encoded_size_bytes,
1251                other_key_lower,
1252                other_structured_key_lower,
1253                other_stats,
1254                other_ts_rewrite.as_ref().map(|x| x.elements()),
1255                other_diffs_sum,
1256                other_format,
1257                other_schema_id,
1258                other_deprecated_schema_id,
1259            ))
1260    }
1261}
1262
1263/// A pointer to a rollup stored externally.
1264#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1265pub struct HollowRollup {
1266    /// Pointer usable to retrieve the rollup.
1267    pub key: PartialRollupKey,
1268    /// The encoded size of this rollup, if known.
1269    pub encoded_size_bytes: Option<usize>,
1270}
1271
1272/// A pointer to a blob stored externally.
1273#[derive(Debug)]
1274pub enum HollowBlobRef<'a, T> {
1275    Batch(&'a HollowBatch<T>),
1276    Rollup(&'a HollowRollup),
1277}
1278
1279/// A rollup that is currently being computed.
1280#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1281pub struct ActiveRollup {
1282    pub seqno: SeqNo,
1283    pub start_ms: u64,
1284}
1285
1286/// A garbage collection request that is currently being computed.
1287#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1288pub struct ActiveGc {
1289    pub seqno: SeqNo,
1290    pub start_ms: u64,
1291}
1292
1293/// A sentinel for a state transition that was a no-op.
1294///
1295/// Critically, this also indicates that the no-op state transition was not
1296/// committed through compare_and_append and thus is _not linearized_.
1297#[derive(Debug)]
1298#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1299pub struct NoOpStateTransition<T>(pub T);
1300
1301// TODO: Document invariants.
1302#[derive(Debug, Clone)]
1303#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1304pub struct StateCollections<T> {
1305    // - Invariant: `<= all reader.since`
1306    // - Invariant: Doesn't regress across state versions.
1307    pub(crate) last_gc_req: SeqNo,
1308
1309    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1310    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1311
1312    /// The rollup that is currently being computed.
1313    pub(crate) active_rollup: Option<ActiveRollup>,
1314    /// The gc request that is currently being computed.
1315    pub(crate) active_gc: Option<ActiveGc>,
1316
1317    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1318    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1319    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1320    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1321
1322    // - Invariant: `trace.since == meet(all reader.since)`
1323    // - Invariant: `trace.since` doesn't regress across state versions.
1324    // - Invariant: `trace.upper` doesn't regress across state versions.
1325    // - Invariant: `trace` upholds its own invariants.
1326    pub(crate) trace: Trace<T>,
1327}
1328
1329/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1330///
1331/// This strategy of directly serializing the schema objects requires that
1332/// persist users do the right thing. Specifically, that an encoded schema
1333/// doesn't in some later version of mz decode to an in-mem object that acts
1334/// differently. In a sense, the current system (before the introduction of the
1335/// schema registry) where schemas are passed in unchecked to reader and writer
1336/// registration calls also has the same defect, so seems fine.
1337///
1338/// An alternative is to write down here some persist-specific representation of
1339/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1340/// the potential to lead down a similar failure mode to the mz_persist_types
1341/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1342/// can always migrate later by rehydrating these, seems fine to start with the
1343/// easy thing.
1344#[derive(Debug, Clone, Serialize, PartialEq)]
1345pub struct EncodedSchemas {
1346    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1347    pub key: Bytes,
1348    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1349    /// registered, encoded as a `ProtoDataType`.
1350    pub key_data_type: Bytes,
1351    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1352    pub val: Bytes,
1353    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1354    /// registered, encoded as a `ProtoDataType`.
1355    pub val_data_type: Bytes,
1356}
1357
1358impl EncodedSchemas {
1359    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1360        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1361        DataType::from_proto(proto).expect("valid DataType")
1362    }
1363}
1364
1365#[derive(Debug)]
1366#[cfg_attr(test, derive(PartialEq))]
1367pub enum CompareAndAppendBreak<T> {
1368    AlreadyCommitted,
1369    Upper {
1370        shard_upper: Antichain<T>,
1371        writer_upper: Antichain<T>,
1372    },
1373    InvalidUsage(InvalidUsage<T>),
1374    InlineBackpressure,
1375}
1376
1377#[derive(Debug)]
1378#[cfg_attr(test, derive(PartialEq))]
1379pub enum SnapshotErr<T> {
1380    AsOfNotYetAvailable(SeqNo, Upper<T>),
1381    AsOfHistoricalDistinctionsLost(Since<T>),
1382}
1383
1384impl<T> StateCollections<T>
1385where
1386    T: Timestamp + Lattice + Codec64,
1387{
1388    pub fn add_rollup(
1389        &mut self,
1390        add_rollup: (SeqNo, &HollowRollup),
1391    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1392        let (rollup_seqno, rollup) = add_rollup;
1393        let applied = match self.rollups.get(&rollup_seqno) {
1394            Some(x) => x.key == rollup.key,
1395            None => {
1396                self.active_rollup = None;
1397                self.rollups.insert(rollup_seqno, rollup.to_owned());
1398                true
1399            }
1400        };
1401        // This state transition is a no-op if applied is false but we
1402        // still commit the state change so that this gets linearized
1403        // (maybe we're looking at old state).
1404        Continue(applied)
1405    }
1406
1407    pub fn remove_rollups(
1408        &mut self,
1409        remove_rollups: &[(SeqNo, PartialRollupKey)],
1410    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1411        if remove_rollups.is_empty() || self.is_tombstone() {
1412            return Break(NoOpStateTransition(vec![]));
1413        }
1414
1415        //This state transition is called at the end of the GC process, so we
1416        //need to unset the `active_gc` field.
1417        self.active_gc = None;
1418
1419        let mut removed = vec![];
1420        for (seqno, key) in remove_rollups {
1421            let removed_key = self.rollups.remove(seqno);
1422            debug_assert!(
1423                removed_key.as_ref().map_or(true, |x| &x.key == key),
1424                "{} vs {:?}",
1425                key,
1426                removed_key
1427            );
1428
1429            if removed_key.is_some() {
1430                removed.push(*seqno);
1431            }
1432        }
1433
1434        Continue(removed)
1435    }
1436
1437    pub fn register_leased_reader(
1438        &mut self,
1439        hostname: &str,
1440        reader_id: &LeasedReaderId,
1441        purpose: &str,
1442        seqno: SeqNo,
1443        lease_duration: Duration,
1444        heartbeat_timestamp_ms: u64,
1445        use_critical_since: bool,
1446    ) -> ControlFlow<
1447        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1448        (LeasedReaderState<T>, SeqNo),
1449    > {
1450        let since = if use_critical_since {
1451            self.critical_since()
1452                .unwrap_or_else(|| self.trace.since().clone())
1453        } else {
1454            self.trace.since().clone()
1455        };
1456        let reader_state = LeasedReaderState {
1457            debug: HandleDebugState {
1458                hostname: hostname.to_owned(),
1459                purpose: purpose.to_owned(),
1460            },
1461            seqno,
1462            since,
1463            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1464            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1465                .expect("lease duration as millis should fit within u64"),
1466        };
1467
1468        // If the shard-global upper and since are both the empty antichain,
1469        // then no further writes can ever commit and no further reads can be
1470        // served. Optimize this by no-op-ing reader registration so that we can
1471        // settle the shard into a final unchanging tombstone state.
1472        if self.is_tombstone() {
1473            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1474        }
1475
1476        // TODO: Handle if the reader or writer already exists.
1477        self.leased_readers
1478            .insert(reader_id.clone(), reader_state.clone());
1479        Continue((reader_state, self.seqno_since(seqno)))
1480    }
1481
1482    pub fn register_critical_reader<O: Opaque + Codec64>(
1483        &mut self,
1484        hostname: &str,
1485        reader_id: &CriticalReaderId,
1486        purpose: &str,
1487    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1488        let state = CriticalReaderState {
1489            debug: HandleDebugState {
1490                hostname: hostname.to_owned(),
1491                purpose: purpose.to_owned(),
1492            },
1493            since: self.trace.since().clone(),
1494            opaque: OpaqueState(Codec64::encode(&O::initial())),
1495            opaque_codec: O::codec_name(),
1496        };
1497
1498        // We expire all readers if the upper and since both advance to the
1499        // empty antichain. Gracefully handle this. At the same time,
1500        // short-circuit the cmd application so we don't needlessly create new
1501        // SeqNos.
1502        if self.is_tombstone() {
1503            return Break(NoOpStateTransition(state));
1504        }
1505
1506        let state = match self.critical_readers.get_mut(reader_id) {
1507            Some(existing_state) => {
1508                existing_state.debug = state.debug;
1509                existing_state.clone()
1510            }
1511            None => {
1512                self.critical_readers
1513                    .insert(reader_id.clone(), state.clone());
1514                state
1515            }
1516        };
1517        Continue(state)
1518    }
1519
1520    pub fn register_schema<K: Codec, V: Codec>(
1521        &mut self,
1522        key_schema: &K::Schema,
1523        val_schema: &V::Schema,
1524    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1525        fn encode_data_type(data_type: &DataType) -> Bytes {
1526            let proto = data_type.into_proto();
1527            prost::Message::encode_to_vec(&proto).into()
1528        }
1529
1530        // Look for an existing registered SchemaId for these schemas.
1531        //
1532        // The common case is that this should be a recent one, so as a minor
1533        // optimization, do this search in reverse order.
1534        //
1535        // TODO: Note that this impl is `O(schemas)`. Combined with the
1536        // possibility of cmd retries, it's possible but unlikely for this to
1537        // get expensive. We could maintain a reverse map to speed this up in
1538        // necessary. This would either need to work on the encoded
1539        // representation (which, we'd have to fall back to the linear scan) or
1540        // we'd need to add a Hash/Ord bound to Schema.
1541        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1542            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1543        });
1544        match existing_id {
1545            Some((schema_id, _)) => {
1546                // TODO: Validate that the decoded schemas still produce records
1547                // of the recorded DataType, to detect shenanigans. Probably
1548                // best to wait until we've turned on Schema2 in prod and thus
1549                // committed to the current mappings.
1550                Break(NoOpStateTransition(Some(*schema_id)))
1551            }
1552            None if self.is_tombstone() => {
1553                // TODO: Is this right?
1554                Break(NoOpStateTransition(None))
1555            }
1556            None if self.schemas.is_empty() => {
1557                // We'll have to do something more sophisticated here to
1558                // generate the next id if/when we start supporting the removal
1559                // of schemas.
1560                let id = SchemaId(self.schemas.len());
1561                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1562                    .expect("valid key schema");
1563                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1564                    .expect("valid val schema");
1565                let prev = self.schemas.insert(
1566                    id,
1567                    EncodedSchemas {
1568                        key: K::encode_schema(key_schema),
1569                        key_data_type: encode_data_type(&key_data_type),
1570                        val: V::encode_schema(val_schema),
1571                        val_data_type: encode_data_type(&val_data_type),
1572                    },
1573                );
1574                assert_eq!(prev, None);
1575                Continue(Some(id))
1576            }
1577            None => {
1578                info!(
1579                    "register_schemas got {:?} expected {:?}",
1580                    key_schema,
1581                    self.schemas
1582                        .iter()
1583                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1584                        .collect::<Vec<_>>()
1585                );
1586                // Until we implement persist schema changes, only allow at most
1587                // one registered schema.
1588                Break(NoOpStateTransition(None))
1589            }
1590        }
1591    }
1592
1593    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1594        &mut self,
1595        expected: SchemaId,
1596        key_schema: &K::Schema,
1597        val_schema: &V::Schema,
1598    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1599        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1600            // To be defensive, create an empty batch and inspect the resulting
1601            // data type (as opposed to something like allowing the `Schema` to
1602            // declare the DataType).
1603            let array = Schema::encoder(schema).expect("valid schema").finish();
1604            Array::data_type(&array).clone()
1605        }
1606
1607        let (current_id, current) = self
1608            .schemas
1609            .last_key_value()
1610            .expect("all shards have a schema");
1611        if *current_id != expected {
1612            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1613                schema_id: *current_id,
1614                key: K::decode_schema(&current.key),
1615                val: V::decode_schema(&current.val),
1616            }));
1617        }
1618
1619        let current_key = K::decode_schema(&current.key);
1620        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1621        let current_val = V::decode_schema(&current.val);
1622        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1623
1624        let key_dt = data_type(key_schema);
1625        let val_dt = data_type(val_schema);
1626
1627        // If the schema is exactly the same as the current one, no-op.
1628        if current_key == *key_schema
1629            && current_key_dt == key_dt
1630            && current_val == *val_schema
1631            && current_val_dt == val_dt
1632        {
1633            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1634        }
1635
1636        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1637        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1638        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1639            return Break(NoOpStateTransition(CaESchema::Incompatible));
1640        };
1641        // Persist initially disallows dropping columns. This would require a
1642        // bunch more work (e.g. not safe to use the latest schema in
1643        // compaction) and isn't initially necessary in mz.
1644        if key_fn.contains_drop() || val_fn.contains_drop() {
1645            return Break(NoOpStateTransition(CaESchema::Incompatible));
1646        }
1647
1648        // We'll have to do something more sophisticated here to
1649        // generate the next id if/when we start supporting the removal
1650        // of schemas.
1651        let id = SchemaId(self.schemas.len());
1652        self.schemas.insert(
1653            id,
1654            EncodedSchemas {
1655                key: K::encode_schema(key_schema),
1656                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1657                val: V::encode_schema(val_schema),
1658                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1659            },
1660        );
1661        Continue(CaESchema::Ok(id))
1662    }
1663
1664    pub fn compare_and_append(
1665        &mut self,
1666        batch: &HollowBatch<T>,
1667        writer_id: &WriterId,
1668        heartbeat_timestamp_ms: u64,
1669        lease_duration_ms: u64,
1670        idempotency_token: &IdempotencyToken,
1671        debug_info: &HandleDebugState,
1672        inline_writes_total_max_bytes: usize,
1673        claim_compaction_percent: usize,
1674        claim_compaction_min_version: Option<&Version>,
1675    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1676        // We expire all writers if the upper and since both advance to the
1677        // empty antichain. Gracefully handle this. At the same time,
1678        // short-circuit the cmd application so we don't needlessly create new
1679        // SeqNos.
1680        if self.is_tombstone() {
1681            assert_eq!(self.trace.upper(), &Antichain::new());
1682            return Break(CompareAndAppendBreak::Upper {
1683                shard_upper: Antichain::new(),
1684                // This writer might have been registered before the shard upper
1685                // was advanced, which would make this pessimistic in the
1686                // Indeterminate handling of compare_and_append at the machine
1687                // level, but that's fine.
1688                writer_upper: Antichain::new(),
1689            });
1690        }
1691
1692        let writer_state = self
1693            .writers
1694            .entry(writer_id.clone())
1695            .or_insert_with(|| WriterState {
1696                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1697                lease_duration_ms,
1698                most_recent_write_token: IdempotencyToken::SENTINEL,
1699                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1700                debug: debug_info.clone(),
1701            });
1702
1703        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1704            return Break(CompareAndAppendBreak::InvalidUsage(
1705                InvalidUsage::InvalidBounds {
1706                    lower: batch.desc.lower().clone(),
1707                    upper: batch.desc.upper().clone(),
1708                },
1709            ));
1710        }
1711
1712        // If the time interval is empty, the list of updates must also be
1713        // empty.
1714        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1715            return Break(CompareAndAppendBreak::InvalidUsage(
1716                InvalidUsage::InvalidEmptyTimeInterval {
1717                    lower: batch.desc.lower().clone(),
1718                    upper: batch.desc.upper().clone(),
1719                    keys: batch
1720                        .parts
1721                        .iter()
1722                        .map(|x| x.printable_name().to_owned())
1723                        .collect(),
1724                },
1725            ));
1726        }
1727
1728        if idempotency_token == &writer_state.most_recent_write_token {
1729            // If the last write had the same idempotency_token, then this must
1730            // have already committed. Sanity check that the most recent write
1731            // upper matches and that the shard upper is at least the write
1732            // upper, if it's not something very suspect is going on.
1733            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1734            assert!(
1735                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1736                "{:?} vs {:?}",
1737                batch.desc.upper(),
1738                self.trace.upper()
1739            );
1740            return Break(CompareAndAppendBreak::AlreadyCommitted);
1741        }
1742
1743        let shard_upper = self.trace.upper();
1744        if shard_upper != batch.desc.lower() {
1745            return Break(CompareAndAppendBreak::Upper {
1746                shard_upper: shard_upper.clone(),
1747                writer_upper: writer_state.most_recent_write_upper.clone(),
1748            });
1749        }
1750
1751        let new_inline_bytes = batch.inline_bytes();
1752        if new_inline_bytes > 0 {
1753            let mut existing_inline_bytes = 0;
1754            self.trace
1755                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1756            // TODO: For very small batches, it may actually _increase_ the size
1757            // of state to flush them out. Consider another threshold under
1758            // which an inline part can be appended no matter what.
1759            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1760                return Break(CompareAndAppendBreak::InlineBackpressure);
1761            }
1762        }
1763
1764        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1765            self.trace.push_batch(batch.clone())
1766        } else {
1767            Vec::new()
1768        };
1769
1770        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1771        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1772        let all_empty_reqs = merge_reqs
1773            .iter()
1774            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1775        if all_empty_reqs && !batch.is_empty() {
1776            let mut reqs_to_take = claim_compaction_percent / 100;
1777            if (usize::cast_from(idempotency_token.hashed()) % 100)
1778                < (claim_compaction_percent % 100)
1779            {
1780                reqs_to_take += 1;
1781            }
1782            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1783            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1784            merge_reqs.extend(
1785                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1786                // likely to be compacted soon for other reasons.
1787                self.trace
1788                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1789                    .take(reqs_to_take),
1790            )
1791        }
1792
1793        for req in &merge_reqs {
1794            self.trace.claim_compaction(
1795                req.id,
1796                ActiveCompaction {
1797                    start_ms: heartbeat_timestamp_ms,
1798                },
1799            )
1800        }
1801
1802        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1803        writer_state.most_recent_write_token = idempotency_token.clone();
1804        // The writer's most recent upper should only go forward.
1805        assert!(
1806            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1807            "{:?} vs {:?}",
1808            &writer_state.most_recent_write_upper,
1809            batch.desc.upper()
1810        );
1811        writer_state
1812            .most_recent_write_upper
1813            .clone_from(batch.desc.upper());
1814
1815        // Heartbeat the writer state to keep our idempotency token alive.
1816        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1817            heartbeat_timestamp_ms,
1818            writer_state.last_heartbeat_timestamp_ms,
1819        );
1820
1821        Continue(merge_reqs)
1822    }
1823
1824    pub fn apply_merge_res<D: Codec64 + Semigroup + PartialEq>(
1825        &mut self,
1826        res: &FueledMergeRes<T>,
1827        metrics: &ColumnarMetrics,
1828    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1829        // We expire all writers if the upper and since both advance to the
1830        // empty antichain. Gracefully handle this. At the same time,
1831        // short-circuit the cmd application so we don't needlessly create new
1832        // SeqNos.
1833        if self.is_tombstone() {
1834            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1835        }
1836
1837        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1838        Continue(apply_merge_result)
1839    }
1840
1841    pub fn spine_exert(
1842        &mut self,
1843        fuel: usize,
1844    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1845        let (merge_reqs, did_work) = self.trace.exert(fuel);
1846        if did_work {
1847            Continue(merge_reqs)
1848        } else {
1849            assert!(merge_reqs.is_empty());
1850            // Break if we have nothing useful to do to save the seqno (and
1851            // resulting crdb traffic)
1852            Break(NoOpStateTransition(Vec::new()))
1853        }
1854    }
1855
1856    pub fn downgrade_since(
1857        &mut self,
1858        reader_id: &LeasedReaderId,
1859        seqno: SeqNo,
1860        outstanding_seqno: Option<SeqNo>,
1861        new_since: &Antichain<T>,
1862        heartbeat_timestamp_ms: u64,
1863    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1864        // We expire all readers if the upper and since both advance to the
1865        // empty antichain. Gracefully handle this. At the same time,
1866        // short-circuit the cmd application so we don't needlessly create new
1867        // SeqNos.
1868        if self.is_tombstone() {
1869            return Break(NoOpStateTransition(Since(Antichain::new())));
1870        }
1871
1872        // The only way to have a missing reader in state is if it's been expired... and in that
1873        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1874        let Some(reader_state) = self.leased_reader(reader_id) else {
1875            tracing::warn!(
1876                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1877            );
1878            return Break(NoOpStateTransition(Since(Antichain::new())));
1879        };
1880
1881        // Also use this as an opportunity to heartbeat the reader and downgrade
1882        // the seqno capability.
1883        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1884            heartbeat_timestamp_ms,
1885            reader_state.last_heartbeat_timestamp_ms,
1886        );
1887
1888        let seqno = match outstanding_seqno {
1889            Some(outstanding_seqno) => {
1890                assert!(
1891                    outstanding_seqno >= reader_state.seqno,
1892                    "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1893                    is behind current reader_state ({:?})",
1894                    outstanding_seqno,
1895                    reader_state.seqno,
1896                );
1897                std::cmp::min(outstanding_seqno, seqno)
1898            }
1899            None => seqno,
1900        };
1901
1902        reader_state.seqno = seqno;
1903
1904        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1905            reader_state.since.clone_from(new_since);
1906            self.update_since();
1907            new_since.clone()
1908        } else {
1909            // No-op, but still commit the state change so that this gets
1910            // linearized.
1911            reader_state.since.clone()
1912        };
1913
1914        Continue(Since(reader_current_since))
1915    }
1916
1917    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1918        &mut self,
1919        reader_id: &CriticalReaderId,
1920        expected_opaque: &O,
1921        (new_opaque, new_since): (&O, &Antichain<T>),
1922    ) -> ControlFlow<
1923        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1924        Result<Since<T>, (O, Since<T>)>,
1925    > {
1926        // We expire all readers if the upper and since both advance to the
1927        // empty antichain. Gracefully handle this. At the same time,
1928        // short-circuit the cmd application so we don't needlessly create new
1929        // SeqNos.
1930        if self.is_tombstone() {
1931            // Match the idempotence behavior below of ignoring the token if
1932            // since is already advanced enough (in this case, because it's a
1933            // tombstone, we know it's the empty antichain).
1934            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1935        }
1936
1937        let reader_state = self.critical_reader(reader_id);
1938        assert_eq!(reader_state.opaque_codec, O::codec_name());
1939
1940        if &O::decode(reader_state.opaque.0) != expected_opaque {
1941            // No-op, but still commit the state change so that this gets
1942            // linearized.
1943            return Continue(Err((
1944                Codec64::decode(reader_state.opaque.0),
1945                Since(reader_state.since.clone()),
1946            )));
1947        }
1948
1949        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1950        if PartialOrder::less_equal(&reader_state.since, new_since) {
1951            reader_state.since.clone_from(new_since);
1952            self.update_since();
1953            Continue(Ok(Since(new_since.clone())))
1954        } else {
1955            // no work to be done -- the reader state's `since` is already sufficiently
1956            // advanced. we may someday need to revisit this branch when it's possible
1957            // for two `since` frontiers to be incomparable.
1958            Continue(Ok(Since(reader_state.since.clone())))
1959        }
1960    }
1961
1962    pub fn heartbeat_leased_reader(
1963        &mut self,
1964        reader_id: &LeasedReaderId,
1965        heartbeat_timestamp_ms: u64,
1966    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1967        // We expire all readers if the upper and since both advance to the
1968        // empty antichain. Gracefully handle this. At the same time,
1969        // short-circuit the cmd application so we don't needlessly create new
1970        // SeqNos.
1971        if self.is_tombstone() {
1972            return Break(NoOpStateTransition(false));
1973        }
1974
1975        match self.leased_readers.get_mut(reader_id) {
1976            Some(reader_state) => {
1977                reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1978                    heartbeat_timestamp_ms,
1979                    reader_state.last_heartbeat_timestamp_ms,
1980                );
1981                Continue(true)
1982            }
1983            // No-op, but we still commit the state change so that this gets
1984            // linearized (maybe we're looking at old state).
1985            None => Continue(false),
1986        }
1987    }
1988
1989    pub fn expire_leased_reader(
1990        &mut self,
1991        reader_id: &LeasedReaderId,
1992    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1993        // We expire all readers if the upper and since both advance to the
1994        // empty antichain. Gracefully handle this. At the same time,
1995        // short-circuit the cmd application so we don't needlessly create new
1996        // SeqNos.
1997        if self.is_tombstone() {
1998            return Break(NoOpStateTransition(false));
1999        }
2000
2001        let existed = self.leased_readers.remove(reader_id).is_some();
2002        if existed {
2003            // TODO(database-issues#6885): Re-enable this
2004            //
2005            // Temporarily disabling this because we think it might be the cause
2006            // of the remap since bug. Specifically, a clusterd process has a
2007            // ReadHandle for maintaining the once and one inside a Listen. If
2008            // we crash and stay down for longer than the read lease duration,
2009            // it's possible that an expiry of them both in quick succession
2010            // jumps the since forward to the Listen one.
2011            //
2012            // Don't forget to update the downgrade_since when this gets
2013            // switched back on.
2014            //
2015            // self.update_since();
2016        }
2017        // No-op if existed is false, but still commit the state change so that
2018        // this gets linearized.
2019        Continue(existed)
2020    }
2021
2022    pub fn expire_critical_reader(
2023        &mut self,
2024        reader_id: &CriticalReaderId,
2025    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2026        // We expire all readers if the upper and since both advance to the
2027        // empty antichain. Gracefully handle this. At the same time,
2028        // short-circuit the cmd application so we don't needlessly create new
2029        // SeqNos.
2030        if self.is_tombstone() {
2031            return Break(NoOpStateTransition(false));
2032        }
2033
2034        let existed = self.critical_readers.remove(reader_id).is_some();
2035        if existed {
2036            // TODO(database-issues#6885): Re-enable this
2037            //
2038            // Temporarily disabling this because we think it might be the cause
2039            // of the remap since bug. Specifically, a clusterd process has a
2040            // ReadHandle for maintaining the once and one inside a Listen. If
2041            // we crash and stay down for longer than the read lease duration,
2042            // it's possible that an expiry of them both in quick succession
2043            // jumps the since forward to the Listen one.
2044            //
2045            // Don't forget to update the downgrade_since when this gets
2046            // switched back on.
2047            //
2048            // self.update_since();
2049        }
2050        // This state transition is a no-op if existed is false, but we still
2051        // commit the state change so that this gets linearized (maybe we're
2052        // looking at old state).
2053        Continue(existed)
2054    }
2055
2056    pub fn expire_writer(
2057        &mut self,
2058        writer_id: &WriterId,
2059    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2060        // We expire all writers if the upper and since both advance to the
2061        // empty antichain. Gracefully handle this. At the same time,
2062        // short-circuit the cmd application so we don't needlessly create new
2063        // SeqNos.
2064        if self.is_tombstone() {
2065            return Break(NoOpStateTransition(false));
2066        }
2067
2068        let existed = self.writers.remove(writer_id).is_some();
2069        // This state transition is a no-op if existed is false, but we still
2070        // commit the state change so that this gets linearized (maybe we're
2071        // looking at old state).
2072        Continue(existed)
2073    }
2074
2075    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2076        self.leased_readers.get_mut(id)
2077    }
2078
2079    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2080        self.critical_readers
2081            .get_mut(id)
2082            .unwrap_or_else(|| {
2083                panic!(
2084                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2085                    id
2086                )
2087            })
2088    }
2089
2090    fn critical_since(&self) -> Option<Antichain<T>> {
2091        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2092        let mut since = critical_sinces.next().cloned()?;
2093        for s in critical_sinces {
2094            since.meet_assign(s);
2095        }
2096        Some(since)
2097    }
2098
2099    fn update_since(&mut self) {
2100        let mut sinces_iter = self
2101            .leased_readers
2102            .values()
2103            .map(|x| &x.since)
2104            .chain(self.critical_readers.values().map(|x| &x.since));
2105        let mut since = match sinces_iter.next() {
2106            Some(since) => since.clone(),
2107            None => {
2108                // If there are no current readers, leave `since` unchanged so
2109                // it doesn't regress.
2110                return;
2111            }
2112        };
2113        while let Some(s) = sinces_iter.next() {
2114            since.meet_assign(s);
2115        }
2116        self.trace.downgrade_since(&since);
2117    }
2118
2119    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2120        let mut seqno_since = seqno;
2121        for cap in self.leased_readers.values() {
2122            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2123        }
2124        // critical_readers don't hold a seqno capability.
2125        seqno_since
2126    }
2127
2128    fn tombstone_batch() -> HollowBatch<T> {
2129        HollowBatch::empty(Description::new(
2130            Antichain::from_elem(T::minimum()),
2131            Antichain::new(),
2132            Antichain::new(),
2133        ))
2134    }
2135
2136    pub(crate) fn is_tombstone(&self) -> bool {
2137        self.trace.upper().is_empty()
2138            && self.trace.since().is_empty()
2139            && self.writers.is_empty()
2140            && self.leased_readers.is_empty()
2141            && self.critical_readers.is_empty()
2142    }
2143
2144    pub(crate) fn is_single_empty_batch(&self) -> bool {
2145        let mut batch_count = 0;
2146        let mut is_empty = true;
2147        self.trace.map_batches(|b| {
2148            batch_count += 1;
2149            is_empty &= b.is_empty()
2150        });
2151        batch_count <= 1 && is_empty
2152    }
2153
2154    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2155        assert_eq!(self.trace.upper(), &Antichain::new());
2156        assert_eq!(self.trace.since(), &Antichain::new());
2157
2158        // Remember our current state, so we can decide whether we have to
2159        // record a transition in durable state.
2160        let was_tombstone = self.is_tombstone();
2161
2162        // Enter the "tombstone" state, if we're not in it already.
2163        self.writers.clear();
2164        self.leased_readers.clear();
2165        self.critical_readers.clear();
2166
2167        debug_assert!(self.is_tombstone());
2168
2169        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2170        // it -- the actual contents of our batches no longer matter.
2171        // This method progressively replaces batches in our state with simpler versions, to allow
2172        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2173        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2174        // transitions... most operations happen incrementally, and large single writes can overwhelm
2175        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2176
2177        let mut to_replace = None;
2178        let mut batch_count = 0;
2179        self.trace.map_batches(|b| {
2180            batch_count += 1;
2181            if !b.is_empty() && to_replace.is_none() {
2182                to_replace = Some(b.desc.clone());
2183            }
2184        });
2185        if let Some(desc) = to_replace {
2186            // We have a nonempty batch: replace it with an empty batch and return.
2187            // This should not produce an excessively large diff: if it did, we wouldn't have been
2188            // able to append that batch in the first place.
2189            let result = self.trace.apply_tombstone_merge(&desc);
2190            assert!(
2191                result.matched(),
2192                "merge with a matching desc should always match"
2193            );
2194            Continue(())
2195        } else if batch_count > 1 {
2196            // All our batches are empty, but we have more than one of them. Replace the whole set
2197            // with a new single-batch trace.
2198            // This produces a diff with a size proportional to the number of batches, but since
2199            // Spine keeps a logarithmic number of batches this should never be excessively large.
2200            let mut new_trace = Trace::default();
2201            new_trace.downgrade_since(&Antichain::new());
2202            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2203            assert_eq!(merge_reqs, Vec::new());
2204            self.trace = new_trace;
2205            Continue(())
2206        } else if !was_tombstone {
2207            // We were not tombstoned before, so have to make sure this state
2208            // transition is recorded.
2209            Continue(())
2210        } else {
2211            // All our batches are empty, and there's only one... there's no shrinking this
2212            // tombstone further.
2213            Break(NoOpStateTransition(()))
2214        }
2215    }
2216}
2217
2218// TODO: Document invariants.
2219#[derive(Debug)]
2220#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2221pub struct State<T> {
2222    pub(crate) applier_version: semver::Version,
2223    pub(crate) shard_id: ShardId,
2224
2225    pub(crate) seqno: SeqNo,
2226    /// A strictly increasing wall time of when this state was written, in
2227    /// milliseconds since the unix epoch.
2228    pub(crate) walltime_ms: u64,
2229    /// Hostname of the persist user that created this version of state. For
2230    /// debugging.
2231    pub(crate) hostname: String,
2232    pub(crate) collections: StateCollections<T>,
2233}
2234
2235/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2236/// ones in durable storage.
2237pub struct TypedState<K, V, T, D> {
2238    pub(crate) state: State<T>,
2239
2240    // According to the docs, PhantomData is to "mark things that act like they
2241    // own a T". State doesn't actually own K, V, or D, just the ability to
2242    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2243    // T [1], but also allows State to correctly derive Send+Sync.
2244    //
2245    // [1]:
2246    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2247    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2248}
2249
2250impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2251    #[cfg(any(test, debug_assertions))]
2252    pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2253        TypedState {
2254            state: State {
2255                applier_version,
2256                shard_id: self.shard_id.clone(),
2257                seqno: self.seqno.clone(),
2258                walltime_ms: self.walltime_ms,
2259                hostname,
2260                collections: self.collections.clone(),
2261            },
2262            _phantom: PhantomData,
2263        }
2264    }
2265
2266    pub(crate) fn clone_for_rollup(&self) -> Self {
2267        TypedState {
2268            state: State {
2269                applier_version: self.applier_version.clone(),
2270                shard_id: self.shard_id.clone(),
2271                seqno: self.seqno.clone(),
2272                walltime_ms: self.walltime_ms,
2273                hostname: self.hostname.clone(),
2274                collections: self.collections.clone(),
2275            },
2276            _phantom: PhantomData,
2277        }
2278    }
2279}
2280
2281impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2282    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2283        // Deconstruct self so we get a compile failure if new fields
2284        // are added.
2285        let TypedState { state, _phantom } = self;
2286        f.debug_struct("TypedState").field("state", state).finish()
2287    }
2288}
2289
2290// Impl PartialEq regardless of the type params.
2291#[cfg(any(test, debug_assertions))]
2292impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2293    fn eq(&self, other: &Self) -> bool {
2294        // Deconstruct self and other so we get a compile failure if new fields
2295        // are added.
2296        let TypedState {
2297            state: self_state,
2298            _phantom,
2299        } = self;
2300        let TypedState {
2301            state: other_state,
2302            _phantom,
2303        } = other;
2304        self_state == other_state
2305    }
2306}
2307
2308impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2309    type Target = State<T>;
2310
2311    fn deref(&self) -> &Self::Target {
2312        &self.state
2313    }
2314}
2315
2316impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2317    fn deref_mut(&mut self) -> &mut Self::Target {
2318        &mut self.state
2319    }
2320}
2321
2322impl<K, V, T, D> TypedState<K, V, T, D>
2323where
2324    K: Codec,
2325    V: Codec,
2326    T: Timestamp + Lattice + Codec64,
2327    D: Codec64,
2328{
2329    pub fn new(
2330        applier_version: Version,
2331        shard_id: ShardId,
2332        hostname: String,
2333        walltime_ms: u64,
2334    ) -> Self {
2335        let state = State {
2336            applier_version,
2337            shard_id,
2338            seqno: SeqNo::minimum(),
2339            walltime_ms,
2340            hostname,
2341            collections: StateCollections {
2342                last_gc_req: SeqNo::minimum(),
2343                rollups: BTreeMap::new(),
2344                active_rollup: None,
2345                active_gc: None,
2346                leased_readers: BTreeMap::new(),
2347                critical_readers: BTreeMap::new(),
2348                writers: BTreeMap::new(),
2349                schemas: BTreeMap::new(),
2350                trace: Trace::default(),
2351            },
2352        };
2353        TypedState {
2354            state,
2355            _phantom: PhantomData,
2356        }
2357    }
2358
2359    pub fn clone_apply<R, E, WorkFn>(
2360        &self,
2361        cfg: &PersistConfig,
2362        work_fn: &mut WorkFn,
2363    ) -> ControlFlow<E, (R, Self)>
2364    where
2365        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2366    {
2367        // Now that we support one minor version of forward compatibility, tag
2368        // each version of state with the _max_ version of code that has ever
2369        // contributed to it. Otherwise, we'd erroneously allow rolling back an
2370        // arbitrary number of versions if they were done one-by-one.
2371        let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2372        let mut new_state = State {
2373            applier_version: new_applier_version.clone(),
2374            shard_id: self.shard_id,
2375            seqno: self.seqno.next(),
2376            walltime_ms: (cfg.now)(),
2377            hostname: cfg.hostname.clone(),
2378            collections: self.collections.clone(),
2379        };
2380        // Make sure walltime_ms is strictly increasing, in case clocks are
2381        // offset.
2382        if new_state.walltime_ms <= self.walltime_ms {
2383            new_state.walltime_ms = self.walltime_ms + 1;
2384        }
2385
2386        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2387        let new_state = TypedState {
2388            state: new_state,
2389            _phantom: PhantomData,
2390        };
2391        Continue((work_ret, new_state))
2392    }
2393}
2394
2395#[derive(Copy, Clone, Debug)]
2396pub struct GcConfig {
2397    pub use_active_gc: bool,
2398    pub fallback_threshold_ms: u64,
2399    pub min_versions: usize,
2400    pub max_versions: usize,
2401}
2402
2403impl<T> State<T>
2404where
2405    T: Timestamp + Lattice + Codec64,
2406{
2407    pub fn shard_id(&self) -> ShardId {
2408        self.shard_id
2409    }
2410
2411    pub fn seqno(&self) -> SeqNo {
2412        self.seqno
2413    }
2414
2415    pub fn since(&self) -> &Antichain<T> {
2416        self.collections.trace.since()
2417    }
2418
2419    pub fn upper(&self) -> &Antichain<T> {
2420        self.collections.trace.upper()
2421    }
2422
2423    pub fn spine_batch_count(&self) -> usize {
2424        self.collections.trace.num_spine_batches()
2425    }
2426
2427    pub fn size_metrics(&self) -> StateSizeMetrics {
2428        let mut ret = StateSizeMetrics::default();
2429        self.blobs().for_each(|x| match x {
2430            HollowBlobRef::Batch(x) => {
2431                ret.hollow_batch_count += 1;
2432                ret.batch_part_count += x.part_count();
2433                ret.num_updates += x.len;
2434
2435                let batch_size = x.encoded_size_bytes();
2436                for x in x.parts.iter() {
2437                    if x.ts_rewrite().is_some() {
2438                        ret.rewrite_part_count += 1;
2439                    }
2440                    if x.is_inline() {
2441                        ret.inline_part_count += 1;
2442                        ret.inline_part_bytes += x.inline_bytes();
2443                    }
2444                }
2445                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2446                ret.state_batches_bytes += batch_size;
2447            }
2448            HollowBlobRef::Rollup(x) => {
2449                ret.state_rollup_count += 1;
2450                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2451            }
2452        });
2453        ret
2454    }
2455
2456    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2457        // We maintain the invariant that every version of state has at least
2458        // one rollup.
2459        self.collections
2460            .rollups
2461            .iter()
2462            .rev()
2463            .next()
2464            .expect("State should have at least one rollup if seqno > minimum")
2465    }
2466
2467    pub(crate) fn seqno_since(&self) -> SeqNo {
2468        self.collections.seqno_since(self.seqno)
2469    }
2470
2471    // Returns whether the cmd proposing this state has been selected to perform
2472    // background garbage collection work.
2473    //
2474    // If it was selected, this information is recorded in the state itself for
2475    // commit along with the cmd's state transition. This helps us to avoid
2476    // redundant work.
2477    //
2478    // Correctness does not depend on a gc assignment being executed, nor on
2479    // them being executed in the order they are given. But it is expected that
2480    // gc assignments are best-effort respected. In practice, cmds like
2481    // register_foo or expire_foo, where it would be awkward, ignore gc.
2482    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2483        let GcConfig {
2484            use_active_gc,
2485            fallback_threshold_ms,
2486            min_versions,
2487            max_versions,
2488        } = cfg;
2489        // This is an arbitrary-ish threshold that scales with seqno, but never
2490        // gets particularly big. It probably could be much bigger and certainly
2491        // could use a tuning pass at some point.
2492        let gc_threshold = if use_active_gc {
2493            u64::cast_from(min_versions)
2494        } else {
2495            std::cmp::max(
2496                1,
2497                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2498            )
2499        };
2500        let new_seqno_since = self.seqno_since();
2501        // Collect until the new seqno since... or the old since plus the max number of versions,
2502        // whatever is less.
2503        let gc_until_seqno = new_seqno_since.min(SeqNo(
2504            self.collections
2505                .last_gc_req
2506                .0
2507                .saturating_add(u64::cast_from(max_versions)),
2508        ));
2509        let should_gc = new_seqno_since
2510            .0
2511            .saturating_sub(self.collections.last_gc_req.0)
2512            >= gc_threshold;
2513
2514        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2515        // it's been a while since it started, we should gc.
2516        let should_gc = if use_active_gc && !should_gc {
2517            match self.collections.active_gc {
2518                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2519                None => false,
2520            }
2521        } else {
2522            should_gc
2523        };
2524        // Assign GC traffic preferentially to writers, falling back to anyone
2525        // generating new state versions if there are no writers.
2526        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2527        // Always assign GC work to a tombstoned shard to have the chance to
2528        // clean up any residual blobs. This is safe (won't cause excess gc)
2529        // as the only allowed command after becoming a tombstone is to write
2530        // the final rollup.
2531        let tombstone_needs_gc = self.collections.is_tombstone();
2532        let should_gc = should_gc || tombstone_needs_gc;
2533        let should_gc = if use_active_gc {
2534            // If we have an active gc, we should only gc if the active gc is
2535            // sufficiently old. This is to avoid doing more gc work than
2536            // necessary.
2537            should_gc
2538                && match self.collections.active_gc {
2539                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2540                    None => true,
2541                }
2542        } else {
2543            should_gc
2544        };
2545        if should_gc {
2546            self.collections.last_gc_req = gc_until_seqno;
2547            Some(GcReq {
2548                shard_id: self.shard_id,
2549                new_seqno_since: gc_until_seqno,
2550            })
2551        } else {
2552            None
2553        }
2554    }
2555
2556    /// Return the number of gc-ineligible state versions.
2557    pub fn seqnos_held(&self) -> usize {
2558        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2559    }
2560
2561    /// Expire all readers and writers up to the given walltime_ms.
2562    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2563        let mut metrics = ExpiryMetrics::default();
2564        let shard_id = self.shard_id();
2565        self.collections.leased_readers.retain(|id, state| {
2566            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2567            if !retain {
2568                info!(
2569                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2570                    state.debug.purpose
2571                );
2572                metrics.readers_expired += 1;
2573            }
2574            retain
2575        });
2576        // critical_readers don't need forced expiration. (In fact, that's the point!)
2577        self.collections.writers.retain(|id, state| {
2578            let retain =
2579                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2580            if !retain {
2581                info!(
2582                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2583                    state.debug.purpose
2584                );
2585                metrics.writers_expired += 1;
2586            }
2587            retain
2588        });
2589        metrics
2590    }
2591
2592    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2593    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2594    /// referenced parts are valid to read.
2595    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2596        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2597            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2598                self.collections.trace.since().clone(),
2599            )));
2600        }
2601        let upper = self.collections.trace.upper();
2602        if PartialOrder::less_equal(upper, as_of) {
2603            return Err(SnapshotErr::AsOfNotYetAvailable(
2604                self.seqno,
2605                Upper(upper.clone()),
2606            ));
2607        }
2608
2609        let batches = self
2610            .collections
2611            .trace
2612            .batches()
2613            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2614            .cloned()
2615            .collect();
2616        Ok(batches)
2617    }
2618
2619    // NB: Unlike the other methods here, this one is read-only.
2620    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2621        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2622            return Err(Since(self.collections.trace.since().clone()));
2623        }
2624        Ok(())
2625    }
2626
2627    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2628        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2629        // batch and this iterates through all batches to find the next one.
2630        self.collections
2631            .trace
2632            .batches()
2633            .find(|b| {
2634                PartialOrder::less_equal(b.desc.lower(), frontier)
2635                    && PartialOrder::less_than(frontier, b.desc.upper())
2636            })
2637            .cloned()
2638            .ok_or(self.seqno)
2639    }
2640
2641    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2642        self.collections.active_rollup
2643    }
2644
2645    pub fn need_rollup(
2646        &self,
2647        threshold: usize,
2648        use_active_rollup: bool,
2649        fallback_threshold_ms: u64,
2650        now: u64,
2651    ) -> Option<SeqNo> {
2652        let (latest_rollup_seqno, _) = self.latest_rollup();
2653
2654        // Tombstoned shards require one final rollup. However, because we
2655        // write a rollup as of SeqNo X and then link it in using a state
2656        // transition (in this case from X to X+1), the minimum number of
2657        // live diffs is actually two. Detect when we're in this minimal
2658        // two diff state and stop the (otherwise) infinite iteration.
2659        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2660            return Some(self.seqno);
2661        }
2662
2663        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2664
2665        if use_active_rollup {
2666            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2667            // we should start a new rollup.
2668            // If there is an active rollup, we should check if it has been running too long.
2669            // If it has, we should start a new rollup.
2670            // This is to guard against a worker dying/taking too long/etc.
2671            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2672                match self.active_rollup() {
2673                    Some(active_rollup) => {
2674                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2675                            return Some(self.seqno);
2676                        }
2677                    }
2678                    None => {
2679                        return Some(self.seqno);
2680                    }
2681                }
2682            }
2683        } else {
2684            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2685            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2686            // racing / performing redundant work.
2687            if seqnos_since_last_rollup > 0
2688                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2689            {
2690                return Some(self.seqno);
2691            }
2692
2693            // however, since maintenance is best-effort and could fail, do assign rollup
2694            // work to every seqno after a fallback threshold to ensure one is written.
2695            if seqnos_since_last_rollup
2696                > u64::cast_from(
2697                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2698                )
2699            {
2700                return Some(self.seqno);
2701            }
2702        }
2703
2704        None
2705    }
2706
2707    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2708        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2709        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2710        batches.chain(rollups)
2711    }
2712}
2713
2714fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2715    let val = hex::encode(val);
2716    val.serialize(s)
2717}
2718
2719fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2720    val: &Option<LazyProto<T>>,
2721    s: S,
2722) -> Result<S::Ok, S::Error> {
2723    val.as_ref()
2724        .map(|lazy| hex::encode(&lazy.into_proto()))
2725        .serialize(s)
2726}
2727
2728fn serialize_part_stats<S: Serializer>(
2729    val: &Option<LazyPartStats>,
2730    s: S,
2731) -> Result<S::Ok, S::Error> {
2732    let val = val.as_ref().map(|x| x.decode().key);
2733    val.serialize(s)
2734}
2735
2736fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2737    // This is only used for debugging, so hack to assume that D is i64.
2738    let val = val.map(i64::decode);
2739    val.serialize(s)
2740}
2741
2742// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2743// intentionally gated from users, so not strictly subject to our backward
2744// compatibility guarantees, but still probably best to be thoughtful about
2745// making unnecessary changes. Additionally, it's nice to make the output as
2746// nice to use as possible without tying our hands for the actual code usages.
2747impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2748    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2749        let State {
2750            applier_version,
2751            shard_id,
2752            seqno,
2753            walltime_ms,
2754            hostname,
2755            collections:
2756                StateCollections {
2757                    last_gc_req,
2758                    rollups,
2759                    active_rollup,
2760                    active_gc,
2761                    leased_readers,
2762                    critical_readers,
2763                    writers,
2764                    schemas,
2765                    trace,
2766                },
2767        } = self;
2768        let mut s = s.serialize_struct("State", 13)?;
2769        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2770        let () = s.serialize_field("shard_id", shard_id)?;
2771        let () = s.serialize_field("seqno", seqno)?;
2772        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2773        let () = s.serialize_field("hostname", hostname)?;
2774        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2775        let () = s.serialize_field("rollups", rollups)?;
2776        let () = s.serialize_field("active_rollup", active_rollup)?;
2777        let () = s.serialize_field("active_gc", active_gc)?;
2778        let () = s.serialize_field("leased_readers", leased_readers)?;
2779        let () = s.serialize_field("critical_readers", critical_readers)?;
2780        let () = s.serialize_field("writers", writers)?;
2781        let () = s.serialize_field("schemas", schemas)?;
2782        let () = s.serialize_field("since", &trace.since().elements())?;
2783        let () = s.serialize_field("upper", &trace.upper().elements())?;
2784        let trace = trace.flatten();
2785        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2786        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2787        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2788        let () = s.serialize_field("merges", &trace.merges)?;
2789        s.end()
2790    }
2791}
2792
2793#[derive(Debug, Default)]
2794pub struct StateSizeMetrics {
2795    pub hollow_batch_count: usize,
2796    pub batch_part_count: usize,
2797    pub rewrite_part_count: usize,
2798    pub num_updates: usize,
2799    pub largest_batch_bytes: usize,
2800    pub state_batches_bytes: usize,
2801    pub state_rollups_bytes: usize,
2802    pub state_rollup_count: usize,
2803    pub inline_part_count: usize,
2804    pub inline_part_bytes: usize,
2805}
2806
2807#[derive(Default)]
2808pub struct ExpiryMetrics {
2809    pub(crate) readers_expired: usize,
2810    pub(crate) writers_expired: usize,
2811}
2812
2813/// Wrapper for Antichain that represents a Since
2814#[derive(Debug, Clone, PartialEq)]
2815pub struct Since<T>(pub Antichain<T>);
2816
2817/// Wrapper for Antichain that represents an Upper
2818#[derive(Debug, PartialEq)]
2819pub struct Upper<T>(pub Antichain<T>);
2820
2821#[cfg(test)]
2822pub(crate) mod tests {
2823    use std::ops::Range;
2824    use std::str::FromStr;
2825
2826    use bytes::Bytes;
2827    use mz_build_info::DUMMY_BUILD_INFO;
2828    use mz_dyncfg::ConfigUpdates;
2829    use mz_ore::now::SYSTEM_TIME;
2830    use mz_ore::{assert_none, assert_ok};
2831    use mz_proto::RustType;
2832    use proptest::prelude::*;
2833    use proptest::strategy::ValueTree;
2834
2835    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2836    use crate::PersistLocation;
2837    use crate::cache::PersistClientCache;
2838    use crate::internal::encoding::any_some_lazy_part_stats;
2839    use crate::internal::paths::RollupId;
2840    use crate::internal::trace::tests::any_trace;
2841    use crate::tests::new_test_client_cache;
2842
2843    use super::*;
2844
2845    const LEASE_DURATION_MS: u64 = 900 * 1000;
2846    fn debug_state() -> HandleDebugState {
2847        HandleDebugState {
2848            hostname: "debug".to_owned(),
2849            purpose: "finding the bugs".to_owned(),
2850        }
2851    }
2852
2853    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2854        num_runs: usize,
2855    ) -> impl Strategy<Value = HollowBatch<T>> {
2856        (
2857            any::<T>(),
2858            any::<T>(),
2859            any::<T>(),
2860            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2861            any::<usize>(),
2862        )
2863            .prop_map(move |(t0, t1, since, parts, len)| {
2864                let (lower, upper) = if t0 <= t1 {
2865                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2866                } else {
2867                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2868                };
2869                let since = Antichain::from_elem(since);
2870
2871                let run_splits = (1..num_runs)
2872                    .map(|i| i * parts.len() / num_runs)
2873                    .collect::<Vec<_>>();
2874
2875                let run_meta = (0..num_runs)
2876                    .map(|_| {
2877                        let mut meta = RunMeta::default();
2878                        meta.id = Some(RunId::new());
2879                        meta
2880                    })
2881                    .collect::<Vec<_>>();
2882
2883                HollowBatch::new(
2884                    Description::new(lower, upper, since),
2885                    parts,
2886                    len % 10,
2887                    run_meta,
2888                    run_splits,
2889                )
2890            })
2891    }
2892
2893    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2894        Strategy::prop_map(
2895            (
2896                any::<T>(),
2897                any::<T>(),
2898                any::<T>(),
2899                proptest::collection::vec(any_run_part::<T>(), 0..20),
2900                any::<usize>(),
2901                0..=10usize,
2902                proptest::collection::vec(any::<RunId>(), 10),
2903            ),
2904            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2905                let (lower, upper) = if t0 <= t1 {
2906                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2907                } else {
2908                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2909                };
2910                let since = Antichain::from_elem(since);
2911                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2912                    let run_splits = (1..num_runs)
2913                        .map(|i| i * parts.len() / num_runs)
2914                        .collect::<Vec<_>>();
2915
2916                    let run_meta = (0..num_runs)
2917                        .enumerate()
2918                        .map(|(i, _)| {
2919                            let mut meta = RunMeta::default();
2920                            meta.id = Some(run_ids[i]);
2921                            meta
2922                        })
2923                        .collect::<Vec<_>>();
2924
2925                    HollowBatch::new(
2926                        Description::new(lower, upper, since),
2927                        parts,
2928                        len % 10,
2929                        run_meta,
2930                        run_splits,
2931                    )
2932                } else {
2933                    HollowBatch::new_run_for_test(
2934                        Description::new(lower, upper, since),
2935                        parts,
2936                        len % 10,
2937                        run_ids[0],
2938                    )
2939                }
2940            },
2941        )
2942    }
2943
2944    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2945        Strategy::prop_map(
2946            (
2947                any::<bool>(),
2948                any_hollow_batch_part(),
2949                any::<Option<T>>(),
2950                any::<Option<SchemaId>>(),
2951                any::<Option<SchemaId>>(),
2952            ),
2953            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2954                if is_hollow {
2955                    BatchPart::Hollow(hollow)
2956                } else {
2957                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2958                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2959                    BatchPart::Inline {
2960                        updates,
2961                        ts_rewrite,
2962                        schema_id,
2963                        deprecated_schema_id,
2964                    }
2965                }
2966            },
2967        )
2968    }
2969
2970    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2971        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2972    }
2973
2974    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2975    -> impl Strategy<Value = HollowBatchPart<T>> {
2976        Strategy::prop_map(
2977            (
2978                any::<PartialBatchKey>(),
2979                any::<usize>(),
2980                any::<Vec<u8>>(),
2981                any_some_lazy_part_stats(),
2982                any::<Option<T>>(),
2983                any::<[u8; 8]>(),
2984                any::<Option<BatchColumnarFormat>>(),
2985                any::<Option<SchemaId>>(),
2986                any::<Option<SchemaId>>(),
2987            ),
2988            |(
2989                key,
2990                encoded_size_bytes,
2991                key_lower,
2992                stats,
2993                ts_rewrite,
2994                diffs_sum,
2995                format,
2996                schema_id,
2997                deprecated_schema_id,
2998            )| {
2999                HollowBatchPart {
3000                    key,
3001                    encoded_size_bytes,
3002                    key_lower,
3003                    structured_key_lower: None,
3004                    stats,
3005                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3006                    diffs_sum: Some(diffs_sum),
3007                    format,
3008                    schema_id,
3009                    deprecated_schema_id,
3010                }
3011            },
3012        )
3013    }
3014
3015    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3016        Strategy::prop_map(
3017            (
3018                any::<SeqNo>(),
3019                any::<Option<T>>(),
3020                any::<u64>(),
3021                any::<u64>(),
3022                any::<HandleDebugState>(),
3023            ),
3024            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3025                // lease_duration_ms of 0 means this state was written by an old
3026                // version of code, which means we'll migrate it in the decode
3027                // path. Avoid.
3028                if lease_duration_ms == 0 {
3029                    lease_duration_ms += 1;
3030                }
3031                LeasedReaderState {
3032                    seqno,
3033                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3034                    last_heartbeat_timestamp_ms,
3035                    lease_duration_ms,
3036                    debug,
3037                }
3038            },
3039        )
3040    }
3041
3042    pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3043    {
3044        Strategy::prop_map(
3045            (
3046                any::<Option<T>>(),
3047                any::<OpaqueState>(),
3048                any::<String>(),
3049                any::<HandleDebugState>(),
3050            ),
3051            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3052                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3053                opaque,
3054                opaque_codec,
3055                debug,
3056            },
3057        )
3058    }
3059
3060    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3061        Strategy::prop_map(
3062            (
3063                any::<u64>(),
3064                any::<u64>(),
3065                any::<IdempotencyToken>(),
3066                any::<Option<T>>(),
3067                any::<HandleDebugState>(),
3068            ),
3069            |(
3070                last_heartbeat_timestamp_ms,
3071                lease_duration_ms,
3072                most_recent_write_token,
3073                most_recent_write_upper,
3074                debug,
3075            )| WriterState {
3076                last_heartbeat_timestamp_ms,
3077                lease_duration_ms,
3078                most_recent_write_token,
3079                most_recent_write_upper: most_recent_write_upper
3080                    .map_or_else(Antichain::new, Antichain::from_elem),
3081                debug,
3082            },
3083        )
3084    }
3085
3086    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3087        Strategy::prop_map(
3088            (
3089                any::<Vec<u8>>(),
3090                any::<Vec<u8>>(),
3091                any::<Vec<u8>>(),
3092                any::<Vec<u8>>(),
3093            ),
3094            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3095                key: Bytes::from(key),
3096                key_data_type: Bytes::from(key_data_type),
3097                val: Bytes::from(val),
3098                val_data_type: Bytes::from(val_data_type),
3099            },
3100        )
3101    }
3102
3103    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3104        num_trace_batches: Range<usize>,
3105    ) -> impl Strategy<Value = State<T>> {
3106        let part1 = (
3107            any::<ShardId>(),
3108            any::<SeqNo>(),
3109            any::<u64>(),
3110            any::<String>(),
3111            any::<SeqNo>(),
3112            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3113            proptest::option::of(any::<ActiveRollup>()),
3114        );
3115
3116        let part2 = (
3117            proptest::option::of(any::<ActiveGc>()),
3118            proptest::collection::btree_map(
3119                any::<LeasedReaderId>(),
3120                any_leased_reader_state::<T>(),
3121                1..3,
3122            ),
3123            proptest::collection::btree_map(
3124                any::<CriticalReaderId>(),
3125                any_critical_reader_state::<T>(),
3126                1..3,
3127            ),
3128            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3129            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3130            any_trace::<T>(num_trace_batches),
3131        );
3132
3133        (part1, part2).prop_map(
3134            |(
3135                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3136                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3137            )| State {
3138                applier_version: semver::Version::new(1, 2, 3),
3139                shard_id,
3140                seqno,
3141                walltime_ms,
3142                hostname,
3143                collections: StateCollections {
3144                    last_gc_req,
3145                    rollups,
3146                    active_rollup,
3147                    active_gc,
3148                    leased_readers,
3149                    critical_readers,
3150                    writers,
3151                    schemas,
3152                    trace,
3153                },
3154            },
3155        )
3156    }
3157
3158    pub(crate) fn hollow<T: Timestamp>(
3159        lower: T,
3160        upper: T,
3161        keys: &[&str],
3162        len: usize,
3163    ) -> HollowBatch<T> {
3164        HollowBatch::new_run(
3165            Description::new(
3166                Antichain::from_elem(lower),
3167                Antichain::from_elem(upper),
3168                Antichain::from_elem(T::minimum()),
3169            ),
3170            keys.iter()
3171                .map(|x| {
3172                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3173                        key: PartialBatchKey((*x).to_owned()),
3174                        encoded_size_bytes: 0,
3175                        key_lower: vec![],
3176                        structured_key_lower: None,
3177                        stats: None,
3178                        ts_rewrite: None,
3179                        diffs_sum: None,
3180                        format: None,
3181                        schema_id: None,
3182                        deprecated_schema_id: None,
3183                    }))
3184                })
3185                .collect(),
3186            len,
3187        )
3188    }
3189
3190    #[mz_ore::test]
3191    fn downgrade_since() {
3192        let mut state = TypedState::<(), (), u64, i64>::new(
3193            DUMMY_BUILD_INFO.semver_version(),
3194            ShardId::new(),
3195            "".to_owned(),
3196            0,
3197        );
3198        let reader = LeasedReaderId::new();
3199        let seqno = SeqNo::minimum();
3200        let now = SYSTEM_TIME.clone();
3201        let _ = state.collections.register_leased_reader(
3202            "",
3203            &reader,
3204            "",
3205            seqno,
3206            Duration::from_secs(10),
3207            now(),
3208            false,
3209        );
3210
3211        // The shard global since == 0 initially.
3212        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3213
3214        // Greater
3215        assert_eq!(
3216            state.collections.downgrade_since(
3217                &reader,
3218                seqno,
3219                None,
3220                &Antichain::from_elem(2),
3221                now()
3222            ),
3223            Continue(Since(Antichain::from_elem(2)))
3224        );
3225        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3226        // Equal (no-op)
3227        assert_eq!(
3228            state.collections.downgrade_since(
3229                &reader,
3230                seqno,
3231                None,
3232                &Antichain::from_elem(2),
3233                now()
3234            ),
3235            Continue(Since(Antichain::from_elem(2)))
3236        );
3237        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3238        // Less (no-op)
3239        assert_eq!(
3240            state.collections.downgrade_since(
3241                &reader,
3242                seqno,
3243                None,
3244                &Antichain::from_elem(1),
3245                now()
3246            ),
3247            Continue(Since(Antichain::from_elem(2)))
3248        );
3249        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3250
3251        // Create a second reader.
3252        let reader2 = LeasedReaderId::new();
3253        let _ = state.collections.register_leased_reader(
3254            "",
3255            &reader2,
3256            "",
3257            seqno,
3258            Duration::from_secs(10),
3259            now(),
3260            false,
3261        );
3262
3263        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3264        assert_eq!(
3265            state.collections.downgrade_since(
3266                &reader2,
3267                seqno,
3268                None,
3269                &Antichain::from_elem(3),
3270                now()
3271            ),
3272            Continue(Since(Antichain::from_elem(3)))
3273        );
3274        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3275        // Shard since == 3 when all readers have since >= 3.
3276        assert_eq!(
3277            state.collections.downgrade_since(
3278                &reader,
3279                seqno,
3280                None,
3281                &Antichain::from_elem(5),
3282                now()
3283            ),
3284            Continue(Since(Antichain::from_elem(5)))
3285        );
3286        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3287
3288        // Shard since unaffected readers with since > shard since expiring.
3289        assert_eq!(
3290            state.collections.expire_leased_reader(&reader),
3291            Continue(true)
3292        );
3293        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3294
3295        // Create a third reader.
3296        let reader3 = LeasedReaderId::new();
3297        let _ = state.collections.register_leased_reader(
3298            "",
3299            &reader3,
3300            "",
3301            seqno,
3302            Duration::from_secs(10),
3303            now(),
3304            false,
3305        );
3306
3307        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3308        assert_eq!(
3309            state.collections.downgrade_since(
3310                &reader3,
3311                seqno,
3312                None,
3313                &Antichain::from_elem(10),
3314                now()
3315            ),
3316            Continue(Since(Antichain::from_elem(10)))
3317        );
3318        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3319
3320        // Shard since advances when reader with the minimal since expires.
3321        assert_eq!(
3322            state.collections.expire_leased_reader(&reader2),
3323            Continue(true)
3324        );
3325        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3326        // Switch this assertion back when we re-enable this.
3327        //
3328        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3329        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3330
3331        // Shard since unaffected when all readers are expired.
3332        assert_eq!(
3333            state.collections.expire_leased_reader(&reader3),
3334            Continue(true)
3335        );
3336        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3337        // Switch this assertion back when we re-enable this.
3338        //
3339        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3340        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3341    }
3342
3343    #[mz_ore::test]
3344    fn compare_and_downgrade_since() {
3345        let mut state = TypedState::<(), (), u64, i64>::new(
3346            DUMMY_BUILD_INFO.semver_version(),
3347            ShardId::new(),
3348            "".to_owned(),
3349            0,
3350        );
3351        let reader = CriticalReaderId::new();
3352        let _ = state
3353            .collections
3354            .register_critical_reader::<u64>("", &reader, "");
3355
3356        // The shard global since == 0 initially.
3357        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3358        // The initial opaque value should be set.
3359        assert_eq!(
3360            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3361            u64::initial()
3362        );
3363
3364        // Greater
3365        assert_eq!(
3366            state.collections.compare_and_downgrade_since::<u64>(
3367                &reader,
3368                &u64::initial(),
3369                (&1, &Antichain::from_elem(2)),
3370            ),
3371            Continue(Ok(Since(Antichain::from_elem(2))))
3372        );
3373        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3374        assert_eq!(
3375            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3376            1
3377        );
3378        // Equal (no-op)
3379        assert_eq!(
3380            state.collections.compare_and_downgrade_since::<u64>(
3381                &reader,
3382                &1,
3383                (&2, &Antichain::from_elem(2)),
3384            ),
3385            Continue(Ok(Since(Antichain::from_elem(2))))
3386        );
3387        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3388        assert_eq!(
3389            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3390            2
3391        );
3392        // Less (no-op)
3393        assert_eq!(
3394            state.collections.compare_and_downgrade_since::<u64>(
3395                &reader,
3396                &2,
3397                (&3, &Antichain::from_elem(1)),
3398            ),
3399            Continue(Ok(Since(Antichain::from_elem(2))))
3400        );
3401        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3402        assert_eq!(
3403            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3404            3
3405        );
3406    }
3407
3408    #[mz_ore::test]
3409    fn compare_and_append() {
3410        let state = &mut TypedState::<String, String, u64, i64>::new(
3411            DUMMY_BUILD_INFO.semver_version(),
3412            ShardId::new(),
3413            "".to_owned(),
3414            0,
3415        )
3416        .collections;
3417
3418        let writer_id = WriterId::new();
3419        let now = SYSTEM_TIME.clone();
3420
3421        // State is initially empty.
3422        assert_eq!(state.trace.num_spine_batches(), 0);
3423        assert_eq!(state.trace.num_hollow_batches(), 0);
3424        assert_eq!(state.trace.num_updates(), 0);
3425
3426        // Cannot insert a batch with a lower != current shard upper.
3427        assert_eq!(
3428            state.compare_and_append(
3429                &hollow(1, 2, &["key1"], 1),
3430                &writer_id,
3431                now(),
3432                LEASE_DURATION_MS,
3433                &IdempotencyToken::new(),
3434                &debug_state(),
3435                0,
3436                100,
3437                None
3438            ),
3439            Break(CompareAndAppendBreak::Upper {
3440                shard_upper: Antichain::from_elem(0),
3441                writer_upper: Antichain::from_elem(0)
3442            })
3443        );
3444
3445        // Insert an empty batch with an upper > lower..
3446        assert!(
3447            state
3448                .compare_and_append(
3449                    &hollow(0, 5, &[], 0),
3450                    &writer_id,
3451                    now(),
3452                    LEASE_DURATION_MS,
3453                    &IdempotencyToken::new(),
3454                    &debug_state(),
3455                    0,
3456                    100,
3457                    None
3458                )
3459                .is_continue()
3460        );
3461
3462        // Cannot insert a batch with a upper less than the lower.
3463        assert_eq!(
3464            state.compare_and_append(
3465                &hollow(5, 4, &["key1"], 1),
3466                &writer_id,
3467                now(),
3468                LEASE_DURATION_MS,
3469                &IdempotencyToken::new(),
3470                &debug_state(),
3471                0,
3472                100,
3473                None
3474            ),
3475            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3476                lower: Antichain::from_elem(5),
3477                upper: Antichain::from_elem(4)
3478            }))
3479        );
3480
3481        // Cannot insert a nonempty batch with an upper equal to lower.
3482        assert_eq!(
3483            state.compare_and_append(
3484                &hollow(5, 5, &["key1"], 1),
3485                &writer_id,
3486                now(),
3487                LEASE_DURATION_MS,
3488                &IdempotencyToken::new(),
3489                &debug_state(),
3490                0,
3491                100,
3492                None
3493            ),
3494            Break(CompareAndAppendBreak::InvalidUsage(
3495                InvalidEmptyTimeInterval {
3496                    lower: Antichain::from_elem(5),
3497                    upper: Antichain::from_elem(5),
3498                    keys: vec!["key1".to_owned()],
3499                }
3500            ))
3501        );
3502
3503        // Can insert an empty batch with an upper equal to lower.
3504        assert!(
3505            state
3506                .compare_and_append(
3507                    &hollow(5, 5, &[], 0),
3508                    &writer_id,
3509                    now(),
3510                    LEASE_DURATION_MS,
3511                    &IdempotencyToken::new(),
3512                    &debug_state(),
3513                    0,
3514                    100,
3515                    None
3516                )
3517                .is_continue()
3518        );
3519    }
3520
3521    #[mz_ore::test]
3522    fn snapshot() {
3523        let now = SYSTEM_TIME.clone();
3524
3525        let mut state = TypedState::<String, String, u64, i64>::new(
3526            DUMMY_BUILD_INFO.semver_version(),
3527            ShardId::new(),
3528            "".to_owned(),
3529            0,
3530        );
3531        // Cannot take a snapshot with as_of == shard upper.
3532        assert_eq!(
3533            state.snapshot(&Antichain::from_elem(0)),
3534            Err(SnapshotErr::AsOfNotYetAvailable(
3535                SeqNo(0),
3536                Upper(Antichain::from_elem(0))
3537            ))
3538        );
3539
3540        // Cannot take a snapshot with as_of > shard upper.
3541        assert_eq!(
3542            state.snapshot(&Antichain::from_elem(5)),
3543            Err(SnapshotErr::AsOfNotYetAvailable(
3544                SeqNo(0),
3545                Upper(Antichain::from_elem(0))
3546            ))
3547        );
3548
3549        let writer_id = WriterId::new();
3550
3551        // Advance upper to 5.
3552        assert!(
3553            state
3554                .collections
3555                .compare_and_append(
3556                    &hollow(0, 5, &["key1"], 1),
3557                    &writer_id,
3558                    now(),
3559                    LEASE_DURATION_MS,
3560                    &IdempotencyToken::new(),
3561                    &debug_state(),
3562                    0,
3563                    100,
3564                    None
3565                )
3566                .is_continue()
3567        );
3568
3569        // Can take a snapshot with as_of < upper.
3570        assert_eq!(
3571            state.snapshot(&Antichain::from_elem(0)),
3572            Ok(vec![hollow(0, 5, &["key1"], 1)])
3573        );
3574
3575        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3576        assert_eq!(
3577            state.snapshot(&Antichain::from_elem(4)),
3578            Ok(vec![hollow(0, 5, &["key1"], 1)])
3579        );
3580
3581        // Cannot take a snapshot with as_of >= upper.
3582        assert_eq!(
3583            state.snapshot(&Antichain::from_elem(5)),
3584            Err(SnapshotErr::AsOfNotYetAvailable(
3585                SeqNo(0),
3586                Upper(Antichain::from_elem(5))
3587            ))
3588        );
3589        assert_eq!(
3590            state.snapshot(&Antichain::from_elem(6)),
3591            Err(SnapshotErr::AsOfNotYetAvailable(
3592                SeqNo(0),
3593                Upper(Antichain::from_elem(5))
3594            ))
3595        );
3596
3597        let reader = LeasedReaderId::new();
3598        // Advance the since to 2.
3599        let _ = state.collections.register_leased_reader(
3600            "",
3601            &reader,
3602            "",
3603            SeqNo::minimum(),
3604            Duration::from_secs(10),
3605            now(),
3606            false,
3607        );
3608        assert_eq!(
3609            state.collections.downgrade_since(
3610                &reader,
3611                SeqNo::minimum(),
3612                None,
3613                &Antichain::from_elem(2),
3614                now()
3615            ),
3616            Continue(Since(Antichain::from_elem(2)))
3617        );
3618        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3619        // Cannot take a snapshot with as_of < shard_since.
3620        assert_eq!(
3621            state.snapshot(&Antichain::from_elem(1)),
3622            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3623                Antichain::from_elem(2)
3624            )))
3625        );
3626
3627        // Advance the upper to 10 via an empty batch.
3628        assert!(
3629            state
3630                .collections
3631                .compare_and_append(
3632                    &hollow(5, 10, &[], 0),
3633                    &writer_id,
3634                    now(),
3635                    LEASE_DURATION_MS,
3636                    &IdempotencyToken::new(),
3637                    &debug_state(),
3638                    0,
3639                    100,
3640                    None
3641                )
3642                .is_continue()
3643        );
3644
3645        // Can still take snapshots at times < upper.
3646        assert_eq!(
3647            state.snapshot(&Antichain::from_elem(7)),
3648            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3649        );
3650
3651        // Cannot take snapshots with as_of >= upper.
3652        assert_eq!(
3653            state.snapshot(&Antichain::from_elem(10)),
3654            Err(SnapshotErr::AsOfNotYetAvailable(
3655                SeqNo(0),
3656                Upper(Antichain::from_elem(10))
3657            ))
3658        );
3659
3660        // Advance upper to 15.
3661        assert!(
3662            state
3663                .collections
3664                .compare_and_append(
3665                    &hollow(10, 15, &["key2"], 1),
3666                    &writer_id,
3667                    now(),
3668                    LEASE_DURATION_MS,
3669                    &IdempotencyToken::new(),
3670                    &debug_state(),
3671                    0,
3672                    100,
3673                    None
3674                )
3675                .is_continue()
3676        );
3677
3678        // Filter out batches whose lowers are less than the requested as of (the
3679        // batches that are too far in the future for the requested as_of).
3680        assert_eq!(
3681            state.snapshot(&Antichain::from_elem(9)),
3682            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3683        );
3684
3685        // Don't filter out batches whose lowers are <= the requested as_of.
3686        assert_eq!(
3687            state.snapshot(&Antichain::from_elem(10)),
3688            Ok(vec![
3689                hollow(0, 5, &["key1"], 1),
3690                hollow(5, 10, &[], 0),
3691                hollow(10, 15, &["key2"], 1)
3692            ])
3693        );
3694
3695        assert_eq!(
3696            state.snapshot(&Antichain::from_elem(11)),
3697            Ok(vec![
3698                hollow(0, 5, &["key1"], 1),
3699                hollow(5, 10, &[], 0),
3700                hollow(10, 15, &["key2"], 1)
3701            ])
3702        );
3703    }
3704
3705    #[mz_ore::test]
3706    fn next_listen_batch() {
3707        let mut state = TypedState::<String, String, u64, i64>::new(
3708            DUMMY_BUILD_INFO.semver_version(),
3709            ShardId::new(),
3710            "".to_owned(),
3711            0,
3712        );
3713
3714        // Empty collection never has any batches to listen for, regardless of the
3715        // current frontier.
3716        assert_eq!(
3717            state.next_listen_batch(&Antichain::from_elem(0)),
3718            Err(SeqNo(0))
3719        );
3720        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3721
3722        let writer_id = WriterId::new();
3723        let now = SYSTEM_TIME.clone();
3724
3725        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3726        assert!(
3727            state
3728                .collections
3729                .compare_and_append(
3730                    &hollow(0, 5, &["key1"], 1),
3731                    &writer_id,
3732                    now(),
3733                    LEASE_DURATION_MS,
3734                    &IdempotencyToken::new(),
3735                    &debug_state(),
3736                    0,
3737                    100,
3738                    None
3739                )
3740                .is_continue()
3741        );
3742        assert!(
3743            state
3744                .collections
3745                .compare_and_append(
3746                    &hollow(5, 10, &["key2"], 1),
3747                    &writer_id,
3748                    now(),
3749                    LEASE_DURATION_MS,
3750                    &IdempotencyToken::new(),
3751                    &debug_state(),
3752                    0,
3753                    100,
3754                    None
3755                )
3756                .is_continue()
3757        );
3758
3759        // All frontiers in [0, 5) return the first batch.
3760        for t in 0..=4 {
3761            assert_eq!(
3762                state.next_listen_batch(&Antichain::from_elem(t)),
3763                Ok(hollow(0, 5, &["key1"], 1))
3764            );
3765        }
3766
3767        // All frontiers in [5, 10) return the second batch.
3768        for t in 5..=9 {
3769            assert_eq!(
3770                state.next_listen_batch(&Antichain::from_elem(t)),
3771                Ok(hollow(5, 10, &["key2"], 1))
3772            );
3773        }
3774
3775        // There is no batch currently available for t = 10.
3776        assert_eq!(
3777            state.next_listen_batch(&Antichain::from_elem(10)),
3778            Err(SeqNo(0))
3779        );
3780
3781        // By definition, there is no frontier ever at the empty antichain which
3782        // is the time after all possible times.
3783        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3784    }
3785
3786    #[mz_ore::test]
3787    fn expire_writer() {
3788        let mut state = TypedState::<String, String, u64, i64>::new(
3789            DUMMY_BUILD_INFO.semver_version(),
3790            ShardId::new(),
3791            "".to_owned(),
3792            0,
3793        );
3794        let now = SYSTEM_TIME.clone();
3795
3796        let writer_id_one = WriterId::new();
3797
3798        let writer_id_two = WriterId::new();
3799
3800        // Writer is eligible to write
3801        assert!(
3802            state
3803                .collections
3804                .compare_and_append(
3805                    &hollow(0, 2, &["key1"], 1),
3806                    &writer_id_one,
3807                    now(),
3808                    LEASE_DURATION_MS,
3809                    &IdempotencyToken::new(),
3810                    &debug_state(),
3811                    0,
3812                    100,
3813                    None
3814                )
3815                .is_continue()
3816        );
3817
3818        assert!(
3819            state
3820                .collections
3821                .expire_writer(&writer_id_one)
3822                .is_continue()
3823        );
3824
3825        // Other writers should still be able to write
3826        assert!(
3827            state
3828                .collections
3829                .compare_and_append(
3830                    &hollow(2, 5, &["key2"], 1),
3831                    &writer_id_two,
3832                    now(),
3833                    LEASE_DURATION_MS,
3834                    &IdempotencyToken::new(),
3835                    &debug_state(),
3836                    0,
3837                    100,
3838                    None
3839                )
3840                .is_continue()
3841        );
3842    }
3843
3844    #[mz_ore::test]
3845    fn maybe_gc_active_gc() {
3846        const GC_CONFIG: GcConfig = GcConfig {
3847            use_active_gc: true,
3848            fallback_threshold_ms: 5000,
3849            min_versions: 99,
3850            max_versions: 500,
3851        };
3852        let now_fn = SYSTEM_TIME.clone();
3853
3854        let mut state = TypedState::<String, String, u64, i64>::new(
3855            DUMMY_BUILD_INFO.semver_version(),
3856            ShardId::new(),
3857            "".to_owned(),
3858            0,
3859        );
3860
3861        let now = now_fn();
3862        // Empty state doesn't need gc, regardless of is_write.
3863        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3864        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3865
3866        // Artificially advance the seqno so the seqno_since advances past our
3867        // internal gc_threshold.
3868        state.seqno = SeqNo(100);
3869        assert_eq!(state.seqno_since(), SeqNo(100));
3870
3871        // When a writer is present, non-writes don't gc.
3872        let writer_id = WriterId::new();
3873        let _ = state.collections.compare_and_append(
3874            &hollow(1, 2, &["key1"], 1),
3875            &writer_id,
3876            now,
3877            LEASE_DURATION_MS,
3878            &IdempotencyToken::new(),
3879            &debug_state(),
3880            0,
3881            100,
3882            None,
3883        );
3884        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3885
3886        // A write will gc though.
3887        assert_eq!(
3888            state.maybe_gc(true, now, GC_CONFIG),
3889            Some(GcReq {
3890                shard_id: state.shard_id,
3891                new_seqno_since: SeqNo(100)
3892            })
3893        );
3894
3895        // But if we write down an active gc, we won't gc.
3896        state.collections.active_gc = Some(ActiveGc {
3897            seqno: state.seqno,
3898            start_ms: now,
3899        });
3900
3901        state.seqno = SeqNo(200);
3902        assert_eq!(state.seqno_since(), SeqNo(200));
3903
3904        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3905
3906        state.seqno = SeqNo(300);
3907        assert_eq!(state.seqno_since(), SeqNo(300));
3908        // But if we advance the time past the threshold, we will gc.
3909        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3910        assert_eq!(
3911            state.maybe_gc(true, new_now, GC_CONFIG),
3912            Some(GcReq {
3913                shard_id: state.shard_id,
3914                new_seqno_since: SeqNo(300)
3915            })
3916        );
3917
3918        // Even if the sequence number doesn't pass the threshold, if the
3919        // active gc is expired, we will gc.
3920
3921        state.seqno = SeqNo(301);
3922        assert_eq!(state.seqno_since(), SeqNo(301));
3923        assert_eq!(
3924            state.maybe_gc(true, new_now, GC_CONFIG),
3925            Some(GcReq {
3926                shard_id: state.shard_id,
3927                new_seqno_since: SeqNo(301)
3928            })
3929        );
3930
3931        state.collections.active_gc = None;
3932
3933        // Artificially advance the seqno (again) so the seqno_since advances
3934        // past our internal gc_threshold (again).
3935        state.seqno = SeqNo(400);
3936        assert_eq!(state.seqno_since(), SeqNo(400));
3937
3938        let now = now_fn();
3939
3940        // If there are no writers, even a non-write will gc.
3941        let _ = state.collections.expire_writer(&writer_id);
3942        assert_eq!(
3943            state.maybe_gc(false, now, GC_CONFIG),
3944            Some(GcReq {
3945                shard_id: state.shard_id,
3946                new_seqno_since: SeqNo(400)
3947            })
3948        );
3949
3950        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3951        let previous_seqno = state.seqno;
3952        state.seqno = SeqNo(10_000);
3953        assert_eq!(state.seqno_since(), SeqNo(10_000));
3954
3955        let now = now_fn();
3956        assert_eq!(
3957            state.maybe_gc(true, now, GC_CONFIG),
3958            Some(GcReq {
3959                shard_id: state.shard_id,
3960                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3961            })
3962        );
3963    }
3964
3965    #[mz_ore::test]
3966    fn maybe_gc_classic() {
3967        const GC_CONFIG: GcConfig = GcConfig {
3968            use_active_gc: false,
3969            fallback_threshold_ms: 5000,
3970            min_versions: 16,
3971            max_versions: 128,
3972        };
3973        const NOW_MS: u64 = 0;
3974
3975        let mut state = TypedState::<String, String, u64, i64>::new(
3976            DUMMY_BUILD_INFO.semver_version(),
3977            ShardId::new(),
3978            "".to_owned(),
3979            0,
3980        );
3981
3982        // Empty state doesn't need gc, regardless of is_write.
3983        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3984        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3985
3986        // Artificially advance the seqno so the seqno_since advances past our
3987        // internal gc_threshold.
3988        state.seqno = SeqNo(100);
3989        assert_eq!(state.seqno_since(), SeqNo(100));
3990
3991        // When a writer is present, non-writes don't gc.
3992        let writer_id = WriterId::new();
3993        let now = SYSTEM_TIME.clone();
3994        let _ = state.collections.compare_and_append(
3995            &hollow(1, 2, &["key1"], 1),
3996            &writer_id,
3997            now(),
3998            LEASE_DURATION_MS,
3999            &IdempotencyToken::new(),
4000            &debug_state(),
4001            0,
4002            100,
4003            None,
4004        );
4005        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4006
4007        // A write will gc though.
4008        assert_eq!(
4009            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4010            Some(GcReq {
4011                shard_id: state.shard_id,
4012                new_seqno_since: SeqNo(100)
4013            })
4014        );
4015
4016        // Artificially advance the seqno (again) so the seqno_since advances
4017        // past our internal gc_threshold (again).
4018        state.seqno = SeqNo(200);
4019        assert_eq!(state.seqno_since(), SeqNo(200));
4020
4021        // If there are no writers, even a non-write will gc.
4022        let _ = state.collections.expire_writer(&writer_id);
4023        assert_eq!(
4024            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4025            Some(GcReq {
4026                shard_id: state.shard_id,
4027                new_seqno_since: SeqNo(200)
4028            })
4029        );
4030    }
4031
4032    #[mz_ore::test]
4033    fn need_rollup_active_rollup() {
4034        const ROLLUP_THRESHOLD: usize = 3;
4035        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4036        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4037        let now = SYSTEM_TIME.clone();
4038
4039        mz_ore::test::init_logging();
4040        let mut state = TypedState::<String, String, u64, i64>::new(
4041            DUMMY_BUILD_INFO.semver_version(),
4042            ShardId::new(),
4043            "".to_owned(),
4044            0,
4045        );
4046
4047        let rollup_seqno = SeqNo(5);
4048        let rollup = HollowRollup {
4049            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4050            encoded_size_bytes: None,
4051        };
4052
4053        assert!(
4054            state
4055                .collections
4056                .add_rollup((rollup_seqno, &rollup))
4057                .is_continue()
4058        );
4059
4060        // shouldn't need a rollup at the seqno of the rollup
4061        state.seqno = SeqNo(5);
4062        assert_none!(state.need_rollup(
4063            ROLLUP_THRESHOLD,
4064            ROLLUP_USE_ACTIVE_ROLLUP,
4065            ROLLUP_FALLBACK_THRESHOLD_MS,
4066            now()
4067        ));
4068
4069        // shouldn't need a rollup at seqnos less than our threshold
4070        state.seqno = SeqNo(6);
4071        assert_none!(state.need_rollup(
4072            ROLLUP_THRESHOLD,
4073            ROLLUP_USE_ACTIVE_ROLLUP,
4074            ROLLUP_FALLBACK_THRESHOLD_MS,
4075            now()
4076        ));
4077        state.seqno = SeqNo(7);
4078        assert_none!(state.need_rollup(
4079            ROLLUP_THRESHOLD,
4080            ROLLUP_USE_ACTIVE_ROLLUP,
4081            ROLLUP_FALLBACK_THRESHOLD_MS,
4082            now()
4083        ));
4084        state.seqno = SeqNo(8);
4085        assert_none!(state.need_rollup(
4086            ROLLUP_THRESHOLD,
4087            ROLLUP_USE_ACTIVE_ROLLUP,
4088            ROLLUP_FALLBACK_THRESHOLD_MS,
4089            now()
4090        ));
4091
4092        let mut current_time = now();
4093        // hit our threshold! we should need a rollup
4094        state.seqno = SeqNo(9);
4095        assert_eq!(
4096            state
4097                .need_rollup(
4098                    ROLLUP_THRESHOLD,
4099                    ROLLUP_USE_ACTIVE_ROLLUP,
4100                    ROLLUP_FALLBACK_THRESHOLD_MS,
4101                    current_time
4102                )
4103                .expect("rollup"),
4104            SeqNo(9)
4105        );
4106
4107        state.collections.active_rollup = Some(ActiveRollup {
4108            seqno: SeqNo(9),
4109            start_ms: current_time,
4110        });
4111
4112        // There is now an active rollup, so we shouldn't need a rollup.
4113        assert_none!(state.need_rollup(
4114            ROLLUP_THRESHOLD,
4115            ROLLUP_USE_ACTIVE_ROLLUP,
4116            ROLLUP_FALLBACK_THRESHOLD_MS,
4117            current_time
4118        ));
4119
4120        state.seqno = SeqNo(10);
4121        // We still don't need a rollup, even though the seqno is greater than
4122        // the rollup threshold.
4123        assert_none!(state.need_rollup(
4124            ROLLUP_THRESHOLD,
4125            ROLLUP_USE_ACTIVE_ROLLUP,
4126            ROLLUP_FALLBACK_THRESHOLD_MS,
4127            current_time
4128        ));
4129
4130        // But if we wait long enough, we should need a rollup again.
4131        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4132        assert_eq!(
4133            state
4134                .need_rollup(
4135                    ROLLUP_THRESHOLD,
4136                    ROLLUP_USE_ACTIVE_ROLLUP,
4137                    ROLLUP_FALLBACK_THRESHOLD_MS,
4138                    current_time
4139                )
4140                .expect("rollup"),
4141            SeqNo(10)
4142        );
4143
4144        state.seqno = SeqNo(9);
4145        // Clear the active rollup and ensure we need a rollup again.
4146        state.collections.active_rollup = None;
4147        let rollup_seqno = SeqNo(9);
4148        let rollup = HollowRollup {
4149            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4150            encoded_size_bytes: None,
4151        };
4152        assert!(
4153            state
4154                .collections
4155                .add_rollup((rollup_seqno, &rollup))
4156                .is_continue()
4157        );
4158
4159        state.seqno = SeqNo(11);
4160        // We shouldn't need a rollup at seqnos less than our threshold
4161        assert_none!(state.need_rollup(
4162            ROLLUP_THRESHOLD,
4163            ROLLUP_USE_ACTIVE_ROLLUP,
4164            ROLLUP_FALLBACK_THRESHOLD_MS,
4165            current_time
4166        ));
4167        // hit our threshold! we should need a rollup
4168        state.seqno = SeqNo(13);
4169        assert_eq!(
4170            state
4171                .need_rollup(
4172                    ROLLUP_THRESHOLD,
4173                    ROLLUP_USE_ACTIVE_ROLLUP,
4174                    ROLLUP_FALLBACK_THRESHOLD_MS,
4175                    current_time
4176                )
4177                .expect("rollup"),
4178            SeqNo(13)
4179        );
4180    }
4181
4182    #[mz_ore::test]
4183    fn need_rollup_classic() {
4184        const ROLLUP_THRESHOLD: usize = 3;
4185        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4186        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4187        const NOW: u64 = 0;
4188
4189        mz_ore::test::init_logging();
4190        let mut state = TypedState::<String, String, u64, i64>::new(
4191            DUMMY_BUILD_INFO.semver_version(),
4192            ShardId::new(),
4193            "".to_owned(),
4194            0,
4195        );
4196
4197        let rollup_seqno = SeqNo(5);
4198        let rollup = HollowRollup {
4199            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4200            encoded_size_bytes: None,
4201        };
4202
4203        assert!(
4204            state
4205                .collections
4206                .add_rollup((rollup_seqno, &rollup))
4207                .is_continue()
4208        );
4209
4210        // shouldn't need a rollup at the seqno of the rollup
4211        state.seqno = SeqNo(5);
4212        assert_none!(state.need_rollup(
4213            ROLLUP_THRESHOLD,
4214            ROLLUP_USE_ACTIVE_ROLLUP,
4215            ROLLUP_FALLBACK_THRESHOLD_MS,
4216            NOW
4217        ));
4218
4219        // shouldn't need a rollup at seqnos less than our threshold
4220        state.seqno = SeqNo(6);
4221        assert_none!(state.need_rollup(
4222            ROLLUP_THRESHOLD,
4223            ROLLUP_USE_ACTIVE_ROLLUP,
4224            ROLLUP_FALLBACK_THRESHOLD_MS,
4225            NOW
4226        ));
4227        state.seqno = SeqNo(7);
4228        assert_none!(state.need_rollup(
4229            ROLLUP_THRESHOLD,
4230            ROLLUP_USE_ACTIVE_ROLLUP,
4231            ROLLUP_FALLBACK_THRESHOLD_MS,
4232            NOW
4233        ));
4234
4235        // hit our threshold! we should need a rollup
4236        state.seqno = SeqNo(8);
4237        assert_eq!(
4238            state
4239                .need_rollup(
4240                    ROLLUP_THRESHOLD,
4241                    ROLLUP_USE_ACTIVE_ROLLUP,
4242                    ROLLUP_FALLBACK_THRESHOLD_MS,
4243                    NOW
4244                )
4245                .expect("rollup"),
4246            SeqNo(8)
4247        );
4248
4249        // but we don't need rollups for every seqno > the threshold
4250        state.seqno = SeqNo(9);
4251        assert_none!(state.need_rollup(
4252            ROLLUP_THRESHOLD,
4253            ROLLUP_USE_ACTIVE_ROLLUP,
4254            ROLLUP_FALLBACK_THRESHOLD_MS,
4255            NOW
4256        ));
4257
4258        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4259        state.seqno = SeqNo(11);
4260        assert_eq!(
4261            state
4262                .need_rollup(
4263                    ROLLUP_THRESHOLD,
4264                    ROLLUP_USE_ACTIVE_ROLLUP,
4265                    ROLLUP_FALLBACK_THRESHOLD_MS,
4266                    NOW
4267                )
4268                .expect("rollup"),
4269            SeqNo(11)
4270        );
4271
4272        // add another rollup and ensure we're always picking the latest
4273        let rollup_seqno = SeqNo(6);
4274        let rollup = HollowRollup {
4275            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4276            encoded_size_bytes: None,
4277        };
4278        assert!(
4279            state
4280                .collections
4281                .add_rollup((rollup_seqno, &rollup))
4282                .is_continue()
4283        );
4284
4285        state.seqno = SeqNo(8);
4286        assert_none!(state.need_rollup(
4287            ROLLUP_THRESHOLD,
4288            ROLLUP_USE_ACTIVE_ROLLUP,
4289            ROLLUP_FALLBACK_THRESHOLD_MS,
4290            NOW
4291        ));
4292        state.seqno = SeqNo(9);
4293        assert_eq!(
4294            state
4295                .need_rollup(
4296                    ROLLUP_THRESHOLD,
4297                    ROLLUP_USE_ACTIVE_ROLLUP,
4298                    ROLLUP_FALLBACK_THRESHOLD_MS,
4299                    NOW
4300                )
4301                .expect("rollup"),
4302            SeqNo(9)
4303        );
4304
4305        // and ensure that after a fallback point, we assign every seqno work
4306        let fallback_seqno = SeqNo(
4307            rollup_seqno.0
4308                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4309        );
4310        state.seqno = fallback_seqno;
4311        assert_eq!(
4312            state
4313                .need_rollup(
4314                    ROLLUP_THRESHOLD,
4315                    ROLLUP_USE_ACTIVE_ROLLUP,
4316                    ROLLUP_FALLBACK_THRESHOLD_MS,
4317                    NOW
4318                )
4319                .expect("rollup"),
4320            fallback_seqno
4321        );
4322        state.seqno = fallback_seqno.next();
4323        assert_eq!(
4324            state
4325                .need_rollup(
4326                    ROLLUP_THRESHOLD,
4327                    ROLLUP_USE_ACTIVE_ROLLUP,
4328                    ROLLUP_FALLBACK_THRESHOLD_MS,
4329                    NOW
4330                )
4331                .expect("rollup"),
4332            fallback_seqno.next()
4333        );
4334    }
4335
4336    #[mz_ore::test]
4337    fn idempotency_token_sentinel() {
4338        assert_eq!(
4339            IdempotencyToken::SENTINEL.to_string(),
4340            "i11111111-1111-1111-1111-111111111111"
4341        );
4342    }
4343
4344    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4345    /// randomness, so that it's deterministic. This lets us assert the
4346    /// serialization of that State against a golden file that's committed,
4347    /// making it easy to see what the serialization (used in an upcoming
4348    /// INSPECT feature) looks like.
4349    ///
4350    /// This golden will have to be updated each time we change State, but
4351    /// that's a feature, not a bug.
4352    #[mz_ore::test]
4353    #[cfg_attr(miri, ignore)] // too slow
4354    fn state_inspect_serde_json() {
4355        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4356        let mut runner = proptest::test_runner::TestRunner::deterministic();
4357        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4358        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4359        assert_eq!(
4360            json.trim(),
4361            STATE_SERDE_JSON.trim(),
4362            "\n\nNEW GOLDEN\n{}\n",
4363            json
4364        );
4365    }
4366
4367    #[mz_persist_proc::test(tokio::test)]
4368    #[cfg_attr(miri, ignore)] // too slow
4369    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4370        let mut clients = new_test_client_cache(&dyncfgs);
4371        let shard_id = ShardId::new();
4372
4373        async fn open_and_write(
4374            clients: &mut PersistClientCache,
4375            version: semver::Version,
4376            shard_id: ShardId,
4377        ) -> Result<(), tokio::task::JoinError> {
4378            clients.cfg.build_version = version.clone();
4379            clients.clear_state_cache();
4380            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4381            // Run in a task so we can catch the panic.
4382            mz_ore::task::spawn(|| version.to_string(), async move {
4383                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4384                let current = *write.upper().as_option().unwrap();
4385                // Do a write so that we tag the state with the version.
4386                write
4387                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4388                    .await;
4389            })
4390            .await
4391        }
4392
4393        // Start at v0.10.0.
4394        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4395        assert_ok!(res);
4396
4397        // Upgrade to v0.11.0 is allowed.
4398        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4399        assert_ok!(res);
4400
4401        // Downgrade to v0.10.0 is allowed.
4402        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4403        assert_ok!(res);
4404
4405        // Downgrade to v0.9.0 is _NOT_ allowed.
4406        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4407        assert!(res.unwrap_err().is_panic());
4408    }
4409
4410    #[mz_ore::test]
4411    fn runid_roundtrip() {
4412        proptest!(|(runid: RunId)| {
4413            let runid_str = runid.to_string();
4414            let parsed = RunId::from_str(&runid_str);
4415            prop_assert_eq!(parsed, Ok(runid));
4416        });
4417    }
4418}