mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
60use crate::internal::gc::GcReq;
61use crate::internal::machine::retry_external;
62use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
63use crate::internal::trace::{
64    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
65};
66use crate::metrics::Metrics;
67use crate::read::LeasedReaderId;
68use crate::schema::CaESchema;
69use crate::write::WriterId;
70use crate::{PersistConfig, ShardId};
71
72include!(concat!(
73    env!("OUT_DIR"),
74    "/mz_persist_client.internal.state.rs"
75));
76
77include!(concat!(
78    env!("OUT_DIR"),
79    "/mz_persist_client.internal.diff.rs"
80));
81
82/// Determines how often to write rollups, assigning a maintenance task after
83/// `rollup_threshold` seqnos have passed since the last rollup.
84///
85/// Tuning note: in the absence of a long reader seqno hold, and with
86/// incremental GC, this threshold will determine about how many live diffs are
87/// held in Consensus. Lowering this value decreases the live diff count at the
88/// cost of more maintenance work + blob writes.
89pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
90    "persist_rollup_threshold",
91    128,
92    "The number of seqnos between rollups.",
93);
94
95/// Determines how long to wait before an active rollup is considered
96/// "stuck" and a new rollup is started.
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
98    "persist_rollup_fallback_threshold_ms",
99    5000,
100    "The number of milliseconds before a worker claims an already claimed rollup.",
101);
102
103/// Feature flag the new active rollup tracking mechanism.
104/// We musn't enable this until we are fully deployed on the new version.
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
106    "persist_rollup_use_active_rollup",
107    false,
108    "Whether to use the new active rollup tracking mechanism.",
109);
110
111/// Determines how long to wait before an active GC is considered
112/// "stuck" and a new GC is started.
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
114    "persist_gc_fallback_threshold_ms",
115    900000,
116    "The number of milliseconds before a worker claims an already claimed GC.",
117);
118
119/// See the config description string.
120pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
121    "persist_gc_min_versions",
122    32,
123    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
124);
125
126/// See the config description string.
127pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
128    "persist_gc_max_versions",
129    128_000,
130    "The maximum number of versions to GC in a single GC run.",
131);
132
133/// Feature flag the new active GC tracking mechanism.
134/// We musn't enable this until we are fully deployed on the new version.
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
136    "persist_gc_use_active_gc",
137    false,
138    "Whether to use the new active GC tracking mechanism.",
139);
140
141pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
142    "persist_enable_incremental_compaction",
143    false,
144    "Whether to enable incremental compaction.",
145);
146
147/// A token to disambiguate state commands that could not otherwise be
148/// idempotent.
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
150#[serde(into = "String")]
151pub struct IdempotencyToken(pub(crate) [u8; 16]);
152
153impl std::fmt::Display for IdempotencyToken {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        write!(f, "i{}", Uuid::from_bytes(self.0))
156    }
157}
158
159impl std::fmt::Debug for IdempotencyToken {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
162    }
163}
164
165impl std::str::FromStr for IdempotencyToken {
166    type Err = String;
167
168    fn from_str(s: &str) -> Result<Self, Self::Err> {
169        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
170    }
171}
172
173impl From<IdempotencyToken> for String {
174    fn from(x: IdempotencyToken) -> Self {
175        x.to_string()
176    }
177}
178
179impl IdempotencyToken {
180    pub(crate) fn new() -> Self {
181        IdempotencyToken(*Uuid::new_v4().as_bytes())
182    }
183    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
184}
185
186#[derive(Clone, Debug, PartialEq, Serialize)]
187pub struct LeasedReaderState<T> {
188    /// The seqno capability of this reader.
189    pub seqno: SeqNo,
190    /// The since capability of this reader.
191    pub since: Antichain<T>,
192    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
193    pub last_heartbeat_timestamp_ms: u64,
194    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
195    /// after which this reader may be expired
196    pub lease_duration_ms: u64,
197    /// For debugging.
198    pub debug: HandleDebugState,
199}
200
201#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
202#[serde(into = "u64")]
203pub struct OpaqueState(pub [u8; 8]);
204
205impl From<OpaqueState> for u64 {
206    fn from(value: OpaqueState) -> Self {
207        u64::from_le_bytes(value.0)
208    }
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize)]
212pub struct CriticalReaderState<T> {
213    /// The since capability of this reader.
214    pub since: Antichain<T>,
215    /// An opaque token matched on by compare_and_downgrade_since.
216    pub opaque: OpaqueState,
217    /// The [Codec64] used to encode [Self::opaque].
218    pub opaque_codec: String,
219    /// For debugging.
220    pub debug: HandleDebugState,
221}
222
223#[derive(Clone, Debug, PartialEq, Serialize)]
224pub struct WriterState<T> {
225    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
226    pub last_heartbeat_timestamp_ms: u64,
227    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
228    /// after which this writer may be expired
229    pub lease_duration_ms: u64,
230    /// The idempotency token of the most recent successful compare_and_append
231    /// by this writer.
232    pub most_recent_write_token: IdempotencyToken,
233    /// The upper of the most recent successful compare_and_append by this
234    /// writer.
235    pub most_recent_write_upper: Antichain<T>,
236    /// For debugging.
237    pub debug: HandleDebugState,
238}
239
240/// Debugging info for a reader or writer.
241#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
242pub struct HandleDebugState {
243    /// Hostname of the persist user that registered this writer or reader. For
244    /// critical readers, this is the _most recent_ registration.
245    pub hostname: String,
246    /// Plaintext description of this writer or reader's intent.
247    pub purpose: String,
248}
249
250/// Part of the updates in a Batch.
251///
252/// Either a pointer to ones stored in Blob or the updates themselves inlined.
253#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
254#[serde(tag = "type")]
255pub enum BatchPart<T> {
256    Hollow(HollowBatchPart<T>),
257    Inline {
258        updates: LazyInlineBatchPart,
259        ts_rewrite: Option<Antichain<T>>,
260        schema_id: Option<SchemaId>,
261
262        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
263        deprecated_schema_id: Option<SchemaId>,
264    },
265}
266
267fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
268    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
269        let proto = lower.decode()?;
270        let data = ArrayData::from_proto(proto)?;
271        ensure!(data.len() == 1);
272        Ok(ArrayBound::new(make_array(data), 0))
273    };
274
275    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
276
277    match decoded {
278        Ok(bound) => Some(bound),
279        Err(e) => {
280            soft_panic_or_log!("failed to decode bound: {e:#?}");
281            None
282        }
283    }
284}
285
286impl<T> BatchPart<T> {
287    pub fn hollow_bytes(&self) -> usize {
288        match self {
289            BatchPart::Hollow(x) => x.encoded_size_bytes,
290            BatchPart::Inline { .. } => 0,
291        }
292    }
293
294    pub fn is_inline(&self) -> bool {
295        matches!(self, BatchPart::Inline { .. })
296    }
297
298    pub fn inline_bytes(&self) -> usize {
299        match self {
300            BatchPart::Hollow(_) => 0,
301            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
302        }
303    }
304
305    pub fn writer_key(&self) -> Option<WriterKey> {
306        match self {
307            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
308            BatchPart::Inline { .. } => None,
309        }
310    }
311
312    pub fn encoded_size_bytes(&self) -> usize {
313        match self {
314            BatchPart::Hollow(x) => x.encoded_size_bytes,
315            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
316        }
317    }
318
319    // A user-interpretable identifier or description of the part (for logs and
320    // such).
321    pub fn printable_name(&self) -> &str {
322        match self {
323            BatchPart::Hollow(x) => x.key.0.as_str(),
324            BatchPart::Inline { .. } => "<inline>",
325        }
326    }
327
328    pub fn stats(&self) -> Option<&LazyPartStats> {
329        match self {
330            BatchPart::Hollow(x) => x.stats.as_ref(),
331            BatchPart::Inline { .. } => None,
332        }
333    }
334
335    pub fn key_lower(&self) -> &[u8] {
336        match self {
337            BatchPart::Hollow(x) => x.key_lower.as_slice(),
338            // We don't duplicate the lowest key because this can be
339            // considerable overhead for small parts.
340            //
341            // The empty key might not be a tight lower bound, but it is a valid
342            // lower bound. If a caller is interested in a tighter lower bound,
343            // the data is inline.
344            BatchPart::Inline { .. } => &[],
345        }
346    }
347
348    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
349        let part = match self {
350            BatchPart::Hollow(part) => part,
351            BatchPart::Inline { .. } => return None,
352        };
353
354        decode_structured_lower(part.structured_key_lower.as_ref()?)
355    }
356
357    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
358        match self {
359            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
360            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
361        }
362    }
363
364    pub fn schema_id(&self) -> Option<SchemaId> {
365        match self {
366            BatchPart::Hollow(x) => x.schema_id,
367            BatchPart::Inline { schema_id, .. } => *schema_id,
368        }
369    }
370
371    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
372        match self {
373            BatchPart::Hollow(x) => x.deprecated_schema_id,
374            BatchPart::Inline {
375                deprecated_schema_id,
376                ..
377            } => *deprecated_schema_id,
378        }
379    }
380}
381
382impl<T: Timestamp + Codec64> BatchPart<T> {
383    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
384        match self {
385            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
386            BatchPart::Inline { updates, .. } => {
387                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
388                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
389            }
390        }
391    }
392
393    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
394        match self {
395            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
396            BatchPart::Inline { updates, .. } => Some(
397                updates
398                    .decode::<T>(metrics)
399                    .expect("valid inline part")
400                    .updates
401                    .diffs_sum(),
402            ),
403        }
404    }
405}
406
407/// An ordered list of parts, generally stored as part of a larger run.
408#[derive(Debug, Clone)]
409pub struct HollowRun<T> {
410    /// Pointers usable to retrieve the updates.
411    pub(crate) parts: Vec<RunPart<T>>,
412}
413
414/// A reference to a [HollowRun], including the key in the blob store and some denormalized
415/// metadata.
416#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
417pub struct HollowRunRef<T> {
418    pub key: PartialBatchKey,
419
420    /// The size of the referenced run object, plus all of the hollow objects it contains.
421    pub hollow_bytes: usize,
422
423    /// The size of the largest individual part in the run; useful for sizing compaction.
424    pub max_part_bytes: usize,
425
426    /// The lower bound of the data in this part, ordered by the codec ordering.
427    pub key_lower: Vec<u8>,
428
429    /// The lower bound of the data in this part, ordered by the structured ordering.
430    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
431
432    pub diffs_sum: Option<[u8; 8]>,
433
434    pub(crate) _phantom_data: PhantomData<T>,
435}
436impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
437    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
438        Some(self.cmp(other))
439    }
440}
441
442impl<T: Eq> Ord for HollowRunRef<T> {
443    fn cmp(&self, other: &Self) -> Ordering {
444        self.key.cmp(&other.key)
445    }
446}
447
448impl<T> HollowRunRef<T> {
449    pub fn writer_key(&self) -> Option<WriterKey> {
450        Some(self.key.split()?.0)
451    }
452}
453
454impl<T: Timestamp + Codec64> HollowRunRef<T> {
455    /// Stores the given runs and returns a [HollowRunRef] that points to them.
456    pub async fn set<D: Codec64 + Monoid>(
457        shard_id: ShardId,
458        blob: &dyn Blob,
459        writer: &WriterKey,
460        data: HollowRun<T>,
461        metrics: &Metrics,
462    ) -> Self {
463        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
464        let max_part_bytes = data
465            .parts
466            .iter()
467            .map(|p| p.max_part_bytes())
468            .max()
469            .unwrap_or(0);
470        let key_lower = data
471            .parts
472            .first()
473            .map_or(vec![], |p| p.key_lower().to_vec());
474        let structured_key_lower = match data.parts.first() {
475            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
476            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
477            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
478        };
479        let diffs_sum = data
480            .parts
481            .iter()
482            .map(|p| {
483                p.diffs_sum::<D>(&metrics.columnar)
484                    .expect("valid diffs sum")
485            })
486            .reduce(|mut a, b| {
487                a.plus_equals(&b);
488                a
489            })
490            .expect("valid diffs sum")
491            .encode();
492
493        let key = PartialBatchKey::new(writer, &PartId::new());
494        let blob_key = key.complete(&shard_id);
495        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
496        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
497            blob.set(&blob_key, bytes.clone())
498        })
499        .await;
500        Self {
501            key,
502            hollow_bytes,
503            max_part_bytes,
504            key_lower,
505            structured_key_lower,
506            diffs_sum: Some(diffs_sum),
507            _phantom_data: Default::default(),
508        }
509    }
510
511    /// Retrieve the [HollowRun] that this reference points to.
512    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
513    /// with the same shard id and backing store.
514    pub async fn get(
515        &self,
516        shard_id: ShardId,
517        blob: &dyn Blob,
518        metrics: &Metrics,
519    ) -> Option<HollowRun<T>> {
520        let blob_key = self.key.complete(&shard_id);
521        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
522            blob.get(&blob_key)
523        })
524        .await?;
525        let proto_runs: ProtoHollowRun =
526            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
527        let runs = proto_runs
528            .into_rust()
529            .expect("illegal state: invalid encoded runs proto");
530        Some(runs)
531    }
532}
533
534/// Part of the updates in a run.
535///
536/// Either a pointer to ones stored in Blob or a single part stored inline.
537#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
538#[serde(untagged)]
539pub enum RunPart<T> {
540    Single(BatchPart<T>),
541    Many(HollowRunRef<T>),
542}
543
544impl<T: Ord> PartialOrd<Self> for RunPart<T> {
545    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
546        Some(self.cmp(other))
547    }
548}
549
550impl<T: Ord> Ord for RunPart<T> {
551    fn cmp(&self, other: &Self) -> Ordering {
552        match (self, other) {
553            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
554            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
555            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
556            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
557        }
558    }
559}
560
561impl<T> RunPart<T> {
562    #[cfg(test)]
563    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
564        match self {
565            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
566            _ => panic!("expected hollow part!"),
567        }
568    }
569
570    pub fn hollow_bytes(&self) -> usize {
571        match self {
572            Self::Single(p) => p.hollow_bytes(),
573            Self::Many(r) => r.hollow_bytes,
574        }
575    }
576
577    pub fn is_inline(&self) -> bool {
578        match self {
579            Self::Single(p) => p.is_inline(),
580            Self::Many(_) => false,
581        }
582    }
583
584    pub fn inline_bytes(&self) -> usize {
585        match self {
586            Self::Single(p) => p.inline_bytes(),
587            Self::Many(_) => 0,
588        }
589    }
590
591    pub fn max_part_bytes(&self) -> usize {
592        match self {
593            Self::Single(p) => p.encoded_size_bytes(),
594            Self::Many(r) => r.max_part_bytes,
595        }
596    }
597
598    pub fn writer_key(&self) -> Option<WriterKey> {
599        match self {
600            Self::Single(p) => p.writer_key(),
601            Self::Many(r) => r.writer_key(),
602        }
603    }
604
605    pub fn encoded_size_bytes(&self) -> usize {
606        match self {
607            Self::Single(p) => p.encoded_size_bytes(),
608            Self::Many(r) => r.hollow_bytes,
609        }
610    }
611
612    pub fn schema_id(&self) -> Option<SchemaId> {
613        match self {
614            Self::Single(p) => p.schema_id(),
615            Self::Many(_) => None,
616        }
617    }
618
619    // A user-interpretable identifier or description of the part (for logs and
620    // such).
621    pub fn printable_name(&self) -> &str {
622        match self {
623            Self::Single(p) => p.printable_name(),
624            Self::Many(r) => r.key.0.as_str(),
625        }
626    }
627
628    pub fn stats(&self) -> Option<&LazyPartStats> {
629        match self {
630            Self::Single(p) => p.stats(),
631            // TODO: if we kept stats we could avoid fetching the metadata here.
632            Self::Many(_) => None,
633        }
634    }
635
636    pub fn key_lower(&self) -> &[u8] {
637        match self {
638            Self::Single(p) => p.key_lower(),
639            Self::Many(r) => r.key_lower.as_slice(),
640        }
641    }
642
643    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
644        match self {
645            Self::Single(p) => p.structured_key_lower(),
646            Self::Many(_) => None,
647        }
648    }
649
650    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
651        match self {
652            Self::Single(p) => p.ts_rewrite(),
653            Self::Many(_) => None,
654        }
655    }
656}
657
658impl<T> RunPart<T>
659where
660    T: Timestamp + Codec64,
661{
662    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
663        match self {
664            Self::Single(p) => p.diffs_sum(metrics),
665            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
666        }
667    }
668}
669
670/// A blob was missing!
671#[derive(Clone, Debug)]
672pub struct MissingBlob(BlobKey);
673
674impl std::fmt::Display for MissingBlob {
675    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
676        write!(f, "unexpectedly missing key: {}", self.0)
677    }
678}
679
680impl std::error::Error for MissingBlob {}
681
682impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
683    pub fn part_stream<'a>(
684        &'a self,
685        shard_id: ShardId,
686        blob: &'a dyn Blob,
687        metrics: &'a Metrics,
688    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
689        try_stream! {
690            match self {
691                RunPart::Single(p) => {
692                    yield Cow::Borrowed(p);
693                }
694                RunPart::Many(r) => {
695                    let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
696                    for run_part in fetched.parts {
697                        for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
698                            yield Cow::Owned(batch_part?.into_owned());
699                        }
700                    }
701                }
702            }
703        }
704    }
705}
706
707impl<T: Ord> PartialOrd for BatchPart<T> {
708    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
709        Some(self.cmp(other))
710    }
711}
712
713impl<T: Ord> Ord for BatchPart<T> {
714    fn cmp(&self, other: &Self) -> Ordering {
715        match (self, other) {
716            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
717            (
718                BatchPart::Inline {
719                    updates: s_updates,
720                    ts_rewrite: s_ts_rewrite,
721                    schema_id: s_schema_id,
722                    deprecated_schema_id: s_deprecated_schema_id,
723                },
724                BatchPart::Inline {
725                    updates: o_updates,
726                    ts_rewrite: o_ts_rewrite,
727                    schema_id: o_schema_id,
728                    deprecated_schema_id: o_deprecated_schema_id,
729                },
730            ) => (
731                s_updates,
732                s_ts_rewrite.as_ref().map(|x| x.elements()),
733                s_schema_id,
734                s_deprecated_schema_id,
735            )
736                .cmp(&(
737                    o_updates,
738                    o_ts_rewrite.as_ref().map(|x| x.elements()),
739                    o_schema_id,
740                    o_deprecated_schema_id,
741                )),
742            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
743            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
744        }
745    }
746}
747
748/// What order are the parts in this run in?
749#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
750pub(crate) enum RunOrder {
751    /// They're in no particular order.
752    Unordered,
753    /// They're ordered based on the codec-encoded K/V bytes.
754    Codec,
755    /// They're ordered by the natural ordering of the structured data.
756    Structured,
757}
758
759#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
760pub struct RunId(pub(crate) [u8; 16]);
761
762impl std::fmt::Display for RunId {
763    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
764        write!(f, "ri{}", Uuid::from_bytes(self.0))
765    }
766}
767
768impl std::fmt::Debug for RunId {
769    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
770        write!(f, "RunId({})", Uuid::from_bytes(self.0))
771    }
772}
773
774impl std::str::FromStr for RunId {
775    type Err = String;
776
777    fn from_str(s: &str) -> Result<Self, Self::Err> {
778        parse_id("ri", "RunId", s).map(RunId)
779    }
780}
781
782impl From<RunId> for String {
783    fn from(x: RunId) -> Self {
784        x.to_string()
785    }
786}
787
788impl RunId {
789    pub(crate) fn new() -> Self {
790        RunId(*Uuid::new_v4().as_bytes())
791    }
792}
793
794impl Arbitrary for RunId {
795    type Parameters = ();
796    type Strategy = proptest::strategy::BoxedStrategy<Self>;
797    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
798        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
799            RunId(*Uuid::from_u128(n).as_bytes())
800        })
801        .boxed()
802    }
803}
804
805/// Metadata shared across a run.
806#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
807pub struct RunMeta {
808    /// If none, Persist should infer the order based on the proto metadata.
809    pub(crate) order: Option<RunOrder>,
810    /// All parts in a run should have the same schema.
811    pub(crate) schema: Option<SchemaId>,
812
813    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
814    pub(crate) deprecated_schema: Option<SchemaId>,
815
816    /// If set, a UUID that uniquely identifies this run.
817    pub(crate) id: Option<RunId>,
818
819    /// The number of updates in this run, or `None` if the number is unknown.
820    pub(crate) len: Option<usize>,
821}
822
823/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
824#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
825pub struct HollowBatchPart<T> {
826    /// Pointer usable to retrieve the updates.
827    pub key: PartialBatchKey,
828    /// The encoded size of this part.
829    pub encoded_size_bytes: usize,
830    /// A lower bound on the keys in the part. (By default, this the minimum
831    /// possible key: `vec![]`.)
832    #[serde(serialize_with = "serialize_part_bytes")]
833    pub key_lower: Vec<u8>,
834    /// A lower bound on the keys in the part, stored as structured data.
835    #[serde(serialize_with = "serialize_lazy_proto")]
836    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837    /// Aggregate statistics about data contained in this part.
838    #[serde(serialize_with = "serialize_part_stats")]
839    pub stats: Option<LazyPartStats>,
840    /// A frontier to which timestamps in this part are advanced on read, if
841    /// set.
842    ///
843    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
844    /// but we maintain the distinction between the two for some internal sanity
845    /// checking of invariants as well as metrics. If this ever becomes an
846    /// issue, everything still works with this as just `Antichain<T>`.
847    pub ts_rewrite: Option<Antichain<T>>,
848    /// A Codec64 encoded sum of all diffs in this part, if known.
849    ///
850    /// This is `None` if this part was written before we started storing this
851    /// information, or if it was written when the dyncfg was off.
852    ///
853    /// It could also make sense to model this as part of the pushdown stats, if
854    /// we later decide that's of some benefit.
855    #[serde(serialize_with = "serialize_diffs_sum")]
856    pub diffs_sum: Option<[u8; 8]>,
857    /// Columnar format that this batch was written in.
858    ///
859    /// This is `None` if this part was written before we started writing structured
860    /// columnar data.
861    pub format: Option<BatchColumnarFormat>,
862    /// The schemas used to encode the data in this batch part.
863    ///
864    /// Or None for historical data written before the schema registry was
865    /// added.
866    pub schema_id: Option<SchemaId>,
867
868    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
869    pub deprecated_schema_id: Option<SchemaId>,
870}
871
872/// A [Batch] but with the updates themselves stored externally.
873///
874/// [Batch]: differential_dataflow::trace::BatchReader
875#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877    /// Describes the times of the updates in the batch.
878    pub desc: Description<T>,
879    /// The number of updates in the batch.
880    pub len: usize,
881    /// Pointers usable to retrieve the updates.
882    pub(crate) parts: Vec<RunPart<T>>,
883    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
884    /// ex.
885    /// ```text
886    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
887    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
888    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
889    /// ```
890    pub(crate) run_splits: Vec<usize>,
891    /// Run-level metadata: the first entry has metadata for the first run, and so on.
892    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
893    pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898        let HollowBatch {
899            desc,
900            parts,
901            len,
902            run_splits: runs,
903            run_meta,
904        } = self;
905        f.debug_struct("HollowBatch")
906            .field(
907                "desc",
908                &(
909                    desc.lower().elements(),
910                    desc.upper().elements(),
911                    desc.since().elements(),
912                ),
913            )
914            .field("parts", &parts)
915            .field("len", &len)
916            .field("runs", &runs)
917            .field("run_meta", &run_meta)
918            .finish()
919    }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924        let HollowBatch {
925            desc,
926            len,
927            // Both parts and runs are covered by the self.runs call.
928            parts: _,
929            run_splits: _,
930            run_meta: _,
931        } = self;
932        let mut s = s.serialize_struct("HollowBatch", 5)?;
933        let () = s.serialize_field("lower", &desc.lower().elements())?;
934        let () = s.serialize_field("upper", &desc.upper().elements())?;
935        let () = s.serialize_field("since", &desc.since().elements())?;
936        let () = s.serialize_field("len", len)?;
937        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938        s.end()
939    }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944        Some(self.cmp(other))
945    }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949    fn cmp(&self, other: &Self) -> Ordering {
950        // Deconstruct self and other so we get a compile failure if new fields
951        // are added.
952        let HollowBatch {
953            desc: self_desc,
954            parts: self_parts,
955            len: self_len,
956            run_splits: self_runs,
957            run_meta: self_run_meta,
958        } = self;
959        let HollowBatch {
960            desc: other_desc,
961            parts: other_parts,
962            len: other_len,
963            run_splits: other_runs,
964            run_meta: other_run_meta,
965        } = other;
966        (
967            self_desc.lower().elements(),
968            self_desc.upper().elements(),
969            self_desc.since().elements(),
970            self_parts,
971            self_len,
972            self_runs,
973            self_run_meta,
974        )
975            .cmp(&(
976                other_desc.lower().elements(),
977                other_desc.upper().elements(),
978                other_desc.since().elements(),
979                other_parts,
980                other_len,
981                other_runs,
982                other_run_meta,
983            ))
984    }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988    pub(crate) fn part_stream<'a>(
989        &'a self,
990        shard_id: ShardId,
991        blob: &'a dyn Blob,
992        metrics: &'a Metrics,
993    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994        stream! {
995            for part in &self.parts {
996                for await part in part.part_stream(shard_id, blob, metrics) {
997                    yield part;
998                }
999            }
1000        }
1001    }
1002}
1003impl<T> HollowBatch<T> {
1004    /// Construct an in-memory hollow batch from the given metadata.
1005    ///
1006    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1007    /// is responsible for ensuring that the defined runs are valid.
1008    ///
1009    /// `len` should represent the number of valid updates in the referenced parts.
1010    pub(crate) fn new(
1011        desc: Description<T>,
1012        parts: Vec<RunPart<T>>,
1013        len: usize,
1014        run_meta: Vec<RunMeta>,
1015        run_splits: Vec<usize>,
1016    ) -> Self {
1017        debug_assert!(
1018            run_splits.is_strictly_sorted(),
1019            "run indices should be strictly increasing"
1020        );
1021        debug_assert!(
1022            run_splits.first().map_or(true, |i| *i > 0),
1023            "run indices should be positive"
1024        );
1025        debug_assert!(
1026            run_splits.last().map_or(true, |i| *i < parts.len()),
1027            "run indices should be valid indices into parts"
1028        );
1029        debug_assert!(
1030            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031            "all metadata should correspond to a run"
1032        );
1033
1034        Self {
1035            desc,
1036            len,
1037            parts,
1038            run_splits,
1039            run_meta,
1040        }
1041    }
1042
1043    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1044    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045        let run_meta = if parts.is_empty() {
1046            vec![]
1047        } else {
1048            vec![RunMeta::default()]
1049        };
1050        Self {
1051            desc,
1052            len,
1053            parts,
1054            run_splits: vec![],
1055            run_meta,
1056        }
1057    }
1058
1059    #[cfg(test)]
1060    pub(crate) fn new_run_for_test(
1061        desc: Description<T>,
1062        parts: Vec<RunPart<T>>,
1063        len: usize,
1064        run_id: RunId,
1065    ) -> Self {
1066        let run_meta = if parts.is_empty() {
1067            vec![]
1068        } else {
1069            let mut meta = RunMeta::default();
1070            meta.id = Some(run_id);
1071            vec![meta]
1072        };
1073        Self {
1074            desc,
1075            len,
1076            parts,
1077            run_splits: vec![],
1078            run_meta,
1079        }
1080    }
1081
1082    /// An empty hollow batch, representing no updates over the given desc.
1083    pub(crate) fn empty(desc: Description<T>) -> Self {
1084        Self {
1085            desc,
1086            len: 0,
1087            parts: vec![],
1088            run_splits: vec![],
1089            run_meta: vec![],
1090        }
1091    }
1092
1093    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094        let run_ends = self
1095            .run_splits
1096            .iter()
1097            .copied()
1098            .chain(std::iter::once(self.parts.len()));
1099        let run_metas = self.run_meta.iter();
1100        let run_parts = run_ends
1101            .scan(0, |start, end| {
1102                let range = *start..end;
1103                *start = end;
1104                Some(range)
1105            })
1106            .filter(|range| !range.is_empty())
1107            .map(|range| &self.parts[range]);
1108        run_metas.zip_eq(run_parts)
1109    }
1110
1111    pub(crate) fn inline_bytes(&self) -> usize {
1112        self.parts.iter().map(|x| x.inline_bytes()).sum()
1113    }
1114
1115    pub(crate) fn is_empty(&self) -> bool {
1116        self.parts.is_empty()
1117    }
1118
1119    pub(crate) fn part_count(&self) -> usize {
1120        self.parts.len()
1121    }
1122
1123    /// The sum of the encoded sizes of all parts in the batch.
1124    pub fn encoded_size_bytes(&self) -> usize {
1125        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126    }
1127}
1128
1129// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1130impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131    pub(crate) fn rewrite_ts(
1132        &mut self,
1133        frontier: &Antichain<T>,
1134        new_upper: Antichain<T>,
1135    ) -> Result<(), String> {
1136        if !PartialOrder::less_than(frontier, &new_upper) {
1137            return Err(format!(
1138                "rewrite frontier {:?} !< rewrite upper {:?}",
1139                frontier.elements(),
1140                new_upper.elements(),
1141            ));
1142        }
1143        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144            return Err(format!(
1145                "rewrite upper {:?} < batch upper {:?}",
1146                new_upper.elements(),
1147                self.desc.upper().elements(),
1148            ));
1149        }
1150
1151        // The following are things that it seems like we could support, but
1152        // initially we don't because we don't have a use case for them.
1153        if PartialOrder::less_than(frontier, self.desc.lower()) {
1154            return Err(format!(
1155                "rewrite frontier {:?} < batch lower {:?}",
1156                frontier.elements(),
1157                self.desc.lower().elements(),
1158            ));
1159        }
1160        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161            return Err(format!(
1162                "batch since {:?} != minimum antichain {:?}",
1163                self.desc.since().elements(),
1164                &[T::minimum()],
1165            ));
1166        }
1167        for part in self.parts.iter() {
1168            let Some(ts_rewrite) = part.ts_rewrite() else {
1169                continue;
1170            };
1171            if PartialOrder::less_than(frontier, ts_rewrite) {
1172                return Err(format!(
1173                    "rewrite frontier {:?} < batch rewrite {:?}",
1174                    frontier.elements(),
1175                    ts_rewrite.elements(),
1176                ));
1177            }
1178        }
1179
1180        self.desc = Description::new(
1181            self.desc.lower().clone(),
1182            new_upper,
1183            self.desc.since().clone(),
1184        );
1185        for part in &mut self.parts {
1186            match part {
1187                RunPart::Single(BatchPart::Hollow(part)) => {
1188                    part.ts_rewrite = Some(frontier.clone())
1189                }
1190                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191                    *ts_rewrite = Some(frontier.clone())
1192                }
1193                RunPart::Many(runs) => {
1194                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1195                    // ever generate runs of >1 part for those.
1196                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197                }
1198            }
1199        }
1200        Ok(())
1201    }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206        Some(self.cmp(other))
1207    }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211    fn cmp(&self, other: &Self) -> Ordering {
1212        // Deconstruct self and other so we get a compile failure if new fields
1213        // are added.
1214        let HollowBatchPart {
1215            key: self_key,
1216            encoded_size_bytes: self_encoded_size_bytes,
1217            key_lower: self_key_lower,
1218            structured_key_lower: self_structured_key_lower,
1219            stats: self_stats,
1220            ts_rewrite: self_ts_rewrite,
1221            diffs_sum: self_diffs_sum,
1222            format: self_format,
1223            schema_id: self_schema_id,
1224            deprecated_schema_id: self_deprecated_schema_id,
1225        } = self;
1226        let HollowBatchPart {
1227            key: other_key,
1228            encoded_size_bytes: other_encoded_size_bytes,
1229            key_lower: other_key_lower,
1230            structured_key_lower: other_structured_key_lower,
1231            stats: other_stats,
1232            ts_rewrite: other_ts_rewrite,
1233            diffs_sum: other_diffs_sum,
1234            format: other_format,
1235            schema_id: other_schema_id,
1236            deprecated_schema_id: other_deprecated_schema_id,
1237        } = other;
1238        (
1239            self_key,
1240            self_encoded_size_bytes,
1241            self_key_lower,
1242            self_structured_key_lower,
1243            self_stats,
1244            self_ts_rewrite.as_ref().map(|x| x.elements()),
1245            self_diffs_sum,
1246            self_format,
1247            self_schema_id,
1248            self_deprecated_schema_id,
1249        )
1250            .cmp(&(
1251                other_key,
1252                other_encoded_size_bytes,
1253                other_key_lower,
1254                other_structured_key_lower,
1255                other_stats,
1256                other_ts_rewrite.as_ref().map(|x| x.elements()),
1257                other_diffs_sum,
1258                other_format,
1259                other_schema_id,
1260                other_deprecated_schema_id,
1261            ))
1262    }
1263}
1264
1265/// A pointer to a rollup stored externally.
1266#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1267pub struct HollowRollup {
1268    /// Pointer usable to retrieve the rollup.
1269    pub key: PartialRollupKey,
1270    /// The encoded size of this rollup, if known.
1271    pub encoded_size_bytes: Option<usize>,
1272}
1273
1274/// A pointer to a blob stored externally.
1275#[derive(Debug)]
1276pub enum HollowBlobRef<'a, T> {
1277    Batch(&'a HollowBatch<T>),
1278    Rollup(&'a HollowRollup),
1279}
1280
1281/// A rollup that is currently being computed.
1282#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1283pub struct ActiveRollup {
1284    pub seqno: SeqNo,
1285    pub start_ms: u64,
1286}
1287
1288/// A garbage collection request that is currently being computed.
1289#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1290pub struct ActiveGc {
1291    pub seqno: SeqNo,
1292    pub start_ms: u64,
1293}
1294
1295/// A sentinel for a state transition that was a no-op.
1296///
1297/// Critically, this also indicates that the no-op state transition was not
1298/// committed through compare_and_append and thus is _not linearized_.
1299#[derive(Debug)]
1300#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1301pub struct NoOpStateTransition<T>(pub T);
1302
1303// TODO: Document invariants.
1304#[derive(Debug, Clone)]
1305#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1306pub struct StateCollections<T> {
1307    // - Invariant: `<= all reader.since`
1308    // - Invariant: Doesn't regress across state versions.
1309    pub(crate) last_gc_req: SeqNo,
1310
1311    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1312    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1313
1314    /// The rollup that is currently being computed.
1315    pub(crate) active_rollup: Option<ActiveRollup>,
1316    /// The gc request that is currently being computed.
1317    pub(crate) active_gc: Option<ActiveGc>,
1318
1319    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1320    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1321    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1322    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1323
1324    // - Invariant: `trace.since == meet(all reader.since)`
1325    // - Invariant: `trace.since` doesn't regress across state versions.
1326    // - Invariant: `trace.upper` doesn't regress across state versions.
1327    // - Invariant: `trace` upholds its own invariants.
1328    pub(crate) trace: Trace<T>,
1329}
1330
1331/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1332///
1333/// This strategy of directly serializing the schema objects requires that
1334/// persist users do the right thing. Specifically, that an encoded schema
1335/// doesn't in some later version of mz decode to an in-mem object that acts
1336/// differently. In a sense, the current system (before the introduction of the
1337/// schema registry) where schemas are passed in unchecked to reader and writer
1338/// registration calls also has the same defect, so seems fine.
1339///
1340/// An alternative is to write down here some persist-specific representation of
1341/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1342/// the potential to lead down a similar failure mode to the mz_persist_types
1343/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1344/// can always migrate later by rehydrating these, seems fine to start with the
1345/// easy thing.
1346#[derive(Debug, Clone, Serialize, PartialEq)]
1347pub struct EncodedSchemas {
1348    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1349    pub key: Bytes,
1350    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1351    /// registered, encoded as a `ProtoDataType`.
1352    pub key_data_type: Bytes,
1353    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1354    pub val: Bytes,
1355    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1356    /// registered, encoded as a `ProtoDataType`.
1357    pub val_data_type: Bytes,
1358}
1359
1360impl EncodedSchemas {
1361    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1362        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1363        DataType::from_proto(proto).expect("valid DataType")
1364    }
1365}
1366
1367#[derive(Debug)]
1368#[cfg_attr(test, derive(PartialEq))]
1369pub enum CompareAndAppendBreak<T> {
1370    AlreadyCommitted,
1371    Upper {
1372        shard_upper: Antichain<T>,
1373        writer_upper: Antichain<T>,
1374    },
1375    InvalidUsage(InvalidUsage<T>),
1376    InlineBackpressure,
1377}
1378
1379#[derive(Debug)]
1380#[cfg_attr(test, derive(PartialEq))]
1381pub enum SnapshotErr<T> {
1382    AsOfNotYetAvailable(SeqNo, Upper<T>),
1383    AsOfHistoricalDistinctionsLost(Since<T>),
1384}
1385
1386impl<T> StateCollections<T>
1387where
1388    T: Timestamp + Lattice + Codec64,
1389{
1390    pub fn add_rollup(
1391        &mut self,
1392        add_rollup: (SeqNo, &HollowRollup),
1393    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1394        let (rollup_seqno, rollup) = add_rollup;
1395        let applied = match self.rollups.get(&rollup_seqno) {
1396            Some(x) => x.key == rollup.key,
1397            None => {
1398                self.active_rollup = None;
1399                self.rollups.insert(rollup_seqno, rollup.to_owned());
1400                true
1401            }
1402        };
1403        // This state transition is a no-op if applied is false but we
1404        // still commit the state change so that this gets linearized
1405        // (maybe we're looking at old state).
1406        Continue(applied)
1407    }
1408
1409    pub fn remove_rollups(
1410        &mut self,
1411        remove_rollups: &[(SeqNo, PartialRollupKey)],
1412    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1413        if remove_rollups.is_empty() || self.is_tombstone() {
1414            return Break(NoOpStateTransition(vec![]));
1415        }
1416
1417        //This state transition is called at the end of the GC process, so we
1418        //need to unset the `active_gc` field.
1419        self.active_gc = None;
1420
1421        let mut removed = vec![];
1422        for (seqno, key) in remove_rollups {
1423            let removed_key = self.rollups.remove(seqno);
1424            debug_assert!(
1425                removed_key.as_ref().map_or(true, |x| &x.key == key),
1426                "{} vs {:?}",
1427                key,
1428                removed_key
1429            );
1430
1431            if removed_key.is_some() {
1432                removed.push(*seqno);
1433            }
1434        }
1435
1436        Continue(removed)
1437    }
1438
1439    pub fn register_leased_reader(
1440        &mut self,
1441        hostname: &str,
1442        reader_id: &LeasedReaderId,
1443        purpose: &str,
1444        seqno: SeqNo,
1445        lease_duration: Duration,
1446        heartbeat_timestamp_ms: u64,
1447        use_critical_since: bool,
1448    ) -> ControlFlow<
1449        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1450        (LeasedReaderState<T>, SeqNo),
1451    > {
1452        let since = if use_critical_since {
1453            self.critical_since()
1454                .unwrap_or_else(|| self.trace.since().clone())
1455        } else {
1456            self.trace.since().clone()
1457        };
1458        let reader_state = LeasedReaderState {
1459            debug: HandleDebugState {
1460                hostname: hostname.to_owned(),
1461                purpose: purpose.to_owned(),
1462            },
1463            seqno,
1464            since,
1465            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1466            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1467                .expect("lease duration as millis should fit within u64"),
1468        };
1469
1470        // If the shard-global upper and since are both the empty antichain,
1471        // then no further writes can ever commit and no further reads can be
1472        // served. Optimize this by no-op-ing reader registration so that we can
1473        // settle the shard into a final unchanging tombstone state.
1474        if self.is_tombstone() {
1475            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1476        }
1477
1478        // TODO: Handle if the reader or writer already exists.
1479        self.leased_readers
1480            .insert(reader_id.clone(), reader_state.clone());
1481        Continue((reader_state, self.seqno_since(seqno)))
1482    }
1483
1484    pub fn register_critical_reader<O: Opaque + Codec64>(
1485        &mut self,
1486        hostname: &str,
1487        reader_id: &CriticalReaderId,
1488        purpose: &str,
1489    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1490        let state = CriticalReaderState {
1491            debug: HandleDebugState {
1492                hostname: hostname.to_owned(),
1493                purpose: purpose.to_owned(),
1494            },
1495            since: self.trace.since().clone(),
1496            opaque: OpaqueState(Codec64::encode(&O::initial())),
1497            opaque_codec: O::codec_name(),
1498        };
1499
1500        // We expire all readers if the upper and since both advance to the
1501        // empty antichain. Gracefully handle this. At the same time,
1502        // short-circuit the cmd application so we don't needlessly create new
1503        // SeqNos.
1504        if self.is_tombstone() {
1505            return Break(NoOpStateTransition(state));
1506        }
1507
1508        let state = match self.critical_readers.get_mut(reader_id) {
1509            Some(existing_state) => {
1510                existing_state.debug = state.debug;
1511                existing_state.clone()
1512            }
1513            None => {
1514                self.critical_readers
1515                    .insert(reader_id.clone(), state.clone());
1516                state
1517            }
1518        };
1519        Continue(state)
1520    }
1521
1522    pub fn register_schema<K: Codec, V: Codec>(
1523        &mut self,
1524        key_schema: &K::Schema,
1525        val_schema: &V::Schema,
1526    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1527        fn encode_data_type(data_type: &DataType) -> Bytes {
1528            let proto = data_type.into_proto();
1529            prost::Message::encode_to_vec(&proto).into()
1530        }
1531
1532        // Look for an existing registered SchemaId for these schemas.
1533        //
1534        // The common case is that this should be a recent one, so as a minor
1535        // optimization, do this search in reverse order.
1536        //
1537        // TODO: Note that this impl is `O(schemas)`. Combined with the
1538        // possibility of cmd retries, it's possible but unlikely for this to
1539        // get expensive. We could maintain a reverse map to speed this up in
1540        // necessary. This would either need to work on the encoded
1541        // representation (which, we'd have to fall back to the linear scan) or
1542        // we'd need to add a Hash/Ord bound to Schema.
1543        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1544            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1545        });
1546        match existing_id {
1547            Some((schema_id, _)) => {
1548                // TODO: Validate that the decoded schemas still produce records
1549                // of the recorded DataType, to detect shenanigans. Probably
1550                // best to wait until we've turned on Schema2 in prod and thus
1551                // committed to the current mappings.
1552                Break(NoOpStateTransition(Some(*schema_id)))
1553            }
1554            None if self.is_tombstone() => {
1555                // TODO: Is this right?
1556                Break(NoOpStateTransition(None))
1557            }
1558            None if self.schemas.is_empty() => {
1559                // We'll have to do something more sophisticated here to
1560                // generate the next id if/when we start supporting the removal
1561                // of schemas.
1562                let id = SchemaId(self.schemas.len());
1563                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1564                    .expect("valid key schema");
1565                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1566                    .expect("valid val schema");
1567                let prev = self.schemas.insert(
1568                    id,
1569                    EncodedSchemas {
1570                        key: K::encode_schema(key_schema),
1571                        key_data_type: encode_data_type(&key_data_type),
1572                        val: V::encode_schema(val_schema),
1573                        val_data_type: encode_data_type(&val_data_type),
1574                    },
1575                );
1576                assert_eq!(prev, None);
1577                Continue(Some(id))
1578            }
1579            None => {
1580                info!(
1581                    "register_schemas got {:?} expected {:?}",
1582                    key_schema,
1583                    self.schemas
1584                        .iter()
1585                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1586                        .collect::<Vec<_>>()
1587                );
1588                // Until we implement persist schema changes, only allow at most
1589                // one registered schema.
1590                Break(NoOpStateTransition(None))
1591            }
1592        }
1593    }
1594
1595    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1596        &mut self,
1597        expected: SchemaId,
1598        key_schema: &K::Schema,
1599        val_schema: &V::Schema,
1600    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1601        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1602            // To be defensive, create an empty batch and inspect the resulting
1603            // data type (as opposed to something like allowing the `Schema` to
1604            // declare the DataType).
1605            let array = Schema::encoder(schema).expect("valid schema").finish();
1606            Array::data_type(&array).clone()
1607        }
1608
1609        let (current_id, current) = self
1610            .schemas
1611            .last_key_value()
1612            .expect("all shards have a schema");
1613        if *current_id != expected {
1614            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1615                schema_id: *current_id,
1616                key: K::decode_schema(&current.key),
1617                val: V::decode_schema(&current.val),
1618            }));
1619        }
1620
1621        let current_key = K::decode_schema(&current.key);
1622        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1623        let current_val = V::decode_schema(&current.val);
1624        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1625
1626        let key_dt = data_type(key_schema);
1627        let val_dt = data_type(val_schema);
1628
1629        // If the schema is exactly the same as the current one, no-op.
1630        if current_key == *key_schema
1631            && current_key_dt == key_dt
1632            && current_val == *val_schema
1633            && current_val_dt == val_dt
1634        {
1635            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1636        }
1637
1638        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1639        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1640        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1641            return Break(NoOpStateTransition(CaESchema::Incompatible));
1642        };
1643        // Persist initially disallows dropping columns. This would require a
1644        // bunch more work (e.g. not safe to use the latest schema in
1645        // compaction) and isn't initially necessary in mz.
1646        if key_fn.contains_drop() || val_fn.contains_drop() {
1647            return Break(NoOpStateTransition(CaESchema::Incompatible));
1648        }
1649
1650        // We'll have to do something more sophisticated here to
1651        // generate the next id if/when we start supporting the removal
1652        // of schemas.
1653        let id = SchemaId(self.schemas.len());
1654        self.schemas.insert(
1655            id,
1656            EncodedSchemas {
1657                key: K::encode_schema(key_schema),
1658                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1659                val: V::encode_schema(val_schema),
1660                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1661            },
1662        );
1663        Continue(CaESchema::Ok(id))
1664    }
1665
1666    pub fn compare_and_append(
1667        &mut self,
1668        batch: &HollowBatch<T>,
1669        writer_id: &WriterId,
1670        heartbeat_timestamp_ms: u64,
1671        lease_duration_ms: u64,
1672        idempotency_token: &IdempotencyToken,
1673        debug_info: &HandleDebugState,
1674        inline_writes_total_max_bytes: usize,
1675        claim_compaction_percent: usize,
1676        claim_compaction_min_version: Option<&Version>,
1677    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1678        // We expire all writers if the upper and since both advance to the
1679        // empty antichain. Gracefully handle this. At the same time,
1680        // short-circuit the cmd application so we don't needlessly create new
1681        // SeqNos.
1682        if self.is_tombstone() {
1683            assert_eq!(self.trace.upper(), &Antichain::new());
1684            return Break(CompareAndAppendBreak::Upper {
1685                shard_upper: Antichain::new(),
1686                // This writer might have been registered before the shard upper
1687                // was advanced, which would make this pessimistic in the
1688                // Indeterminate handling of compare_and_append at the machine
1689                // level, but that's fine.
1690                writer_upper: Antichain::new(),
1691            });
1692        }
1693
1694        let writer_state = self
1695            .writers
1696            .entry(writer_id.clone())
1697            .or_insert_with(|| WriterState {
1698                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1699                lease_duration_ms,
1700                most_recent_write_token: IdempotencyToken::SENTINEL,
1701                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1702                debug: debug_info.clone(),
1703            });
1704
1705        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1706            return Break(CompareAndAppendBreak::InvalidUsage(
1707                InvalidUsage::InvalidBounds {
1708                    lower: batch.desc.lower().clone(),
1709                    upper: batch.desc.upper().clone(),
1710                },
1711            ));
1712        }
1713
1714        // If the time interval is empty, the list of updates must also be
1715        // empty.
1716        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1717            return Break(CompareAndAppendBreak::InvalidUsage(
1718                InvalidUsage::InvalidEmptyTimeInterval {
1719                    lower: batch.desc.lower().clone(),
1720                    upper: batch.desc.upper().clone(),
1721                    keys: batch
1722                        .parts
1723                        .iter()
1724                        .map(|x| x.printable_name().to_owned())
1725                        .collect(),
1726                },
1727            ));
1728        }
1729
1730        if idempotency_token == &writer_state.most_recent_write_token {
1731            // If the last write had the same idempotency_token, then this must
1732            // have already committed. Sanity check that the most recent write
1733            // upper matches and that the shard upper is at least the write
1734            // upper, if it's not something very suspect is going on.
1735            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1736            assert!(
1737                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1738                "{:?} vs {:?}",
1739                batch.desc.upper(),
1740                self.trace.upper()
1741            );
1742            return Break(CompareAndAppendBreak::AlreadyCommitted);
1743        }
1744
1745        let shard_upper = self.trace.upper();
1746        if shard_upper != batch.desc.lower() {
1747            return Break(CompareAndAppendBreak::Upper {
1748                shard_upper: shard_upper.clone(),
1749                writer_upper: writer_state.most_recent_write_upper.clone(),
1750            });
1751        }
1752
1753        let new_inline_bytes = batch.inline_bytes();
1754        if new_inline_bytes > 0 {
1755            let mut existing_inline_bytes = 0;
1756            self.trace
1757                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1758            // TODO: For very small batches, it may actually _increase_ the size
1759            // of state to flush them out. Consider another threshold under
1760            // which an inline part can be appended no matter what.
1761            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1762                return Break(CompareAndAppendBreak::InlineBackpressure);
1763            }
1764        }
1765
1766        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1767            self.trace.push_batch(batch.clone())
1768        } else {
1769            Vec::new()
1770        };
1771
1772        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1773        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1774        let all_empty_reqs = merge_reqs
1775            .iter()
1776            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1777        if all_empty_reqs && !batch.is_empty() {
1778            let mut reqs_to_take = claim_compaction_percent / 100;
1779            if (usize::cast_from(idempotency_token.hashed()) % 100)
1780                < (claim_compaction_percent % 100)
1781            {
1782                reqs_to_take += 1;
1783            }
1784            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1785            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1786            merge_reqs.extend(
1787                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1788                // likely to be compacted soon for other reasons.
1789                self.trace
1790                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1791                    .take(reqs_to_take),
1792            )
1793        }
1794
1795        for req in &merge_reqs {
1796            self.trace.claim_compaction(
1797                req.id,
1798                ActiveCompaction {
1799                    start_ms: heartbeat_timestamp_ms,
1800                },
1801            )
1802        }
1803
1804        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1805        writer_state.most_recent_write_token = idempotency_token.clone();
1806        // The writer's most recent upper should only go forward.
1807        assert!(
1808            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1809            "{:?} vs {:?}",
1810            &writer_state.most_recent_write_upper,
1811            batch.desc.upper()
1812        );
1813        writer_state
1814            .most_recent_write_upper
1815            .clone_from(batch.desc.upper());
1816
1817        // Heartbeat the writer state to keep our idempotency token alive.
1818        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1819            heartbeat_timestamp_ms,
1820            writer_state.last_heartbeat_timestamp_ms,
1821        );
1822
1823        Continue(merge_reqs)
1824    }
1825
1826    pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1827        &mut self,
1828        res: &FueledMergeRes<T>,
1829        metrics: &ColumnarMetrics,
1830    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1831        // We expire all writers if the upper and since both advance to the
1832        // empty antichain. Gracefully handle this. At the same time,
1833        // short-circuit the cmd application so we don't needlessly create new
1834        // SeqNos.
1835        if self.is_tombstone() {
1836            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1837        }
1838
1839        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1840        Continue(apply_merge_result)
1841    }
1842
1843    pub fn spine_exert(
1844        &mut self,
1845        fuel: usize,
1846    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1847        let (merge_reqs, did_work) = self.trace.exert(fuel);
1848        if did_work {
1849            Continue(merge_reqs)
1850        } else {
1851            assert!(merge_reqs.is_empty());
1852            // Break if we have nothing useful to do to save the seqno (and
1853            // resulting crdb traffic)
1854            Break(NoOpStateTransition(Vec::new()))
1855        }
1856    }
1857
1858    pub fn downgrade_since(
1859        &mut self,
1860        reader_id: &LeasedReaderId,
1861        seqno: SeqNo,
1862        outstanding_seqno: Option<SeqNo>,
1863        new_since: &Antichain<T>,
1864        heartbeat_timestamp_ms: u64,
1865    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1866        // We expire all readers if the upper and since both advance to the
1867        // empty antichain. Gracefully handle this. At the same time,
1868        // short-circuit the cmd application so we don't needlessly create new
1869        // SeqNos.
1870        if self.is_tombstone() {
1871            return Break(NoOpStateTransition(Since(Antichain::new())));
1872        }
1873
1874        // The only way to have a missing reader in state is if it's been expired... and in that
1875        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1876        let Some(reader_state) = self.leased_reader(reader_id) else {
1877            tracing::warn!(
1878                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1879            );
1880            return Break(NoOpStateTransition(Since(Antichain::new())));
1881        };
1882
1883        // Also use this as an opportunity to heartbeat the reader and downgrade
1884        // the seqno capability.
1885        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1886            heartbeat_timestamp_ms,
1887            reader_state.last_heartbeat_timestamp_ms,
1888        );
1889
1890        let seqno = match outstanding_seqno {
1891            Some(outstanding_seqno) => {
1892                assert!(
1893                    outstanding_seqno >= reader_state.seqno,
1894                    "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1895                    is behind current reader_state ({:?})",
1896                    outstanding_seqno,
1897                    reader_state.seqno,
1898                );
1899                std::cmp::min(outstanding_seqno, seqno)
1900            }
1901            None => seqno,
1902        };
1903
1904        reader_state.seqno = seqno;
1905
1906        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1907            reader_state.since.clone_from(new_since);
1908            self.update_since();
1909            new_since.clone()
1910        } else {
1911            // No-op, but still commit the state change so that this gets
1912            // linearized.
1913            reader_state.since.clone()
1914        };
1915
1916        Continue(Since(reader_current_since))
1917    }
1918
1919    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1920        &mut self,
1921        reader_id: &CriticalReaderId,
1922        expected_opaque: &O,
1923        (new_opaque, new_since): (&O, &Antichain<T>),
1924    ) -> ControlFlow<
1925        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1926        Result<Since<T>, (O, Since<T>)>,
1927    > {
1928        // We expire all readers if the upper and since both advance to the
1929        // empty antichain. Gracefully handle this. At the same time,
1930        // short-circuit the cmd application so we don't needlessly create new
1931        // SeqNos.
1932        if self.is_tombstone() {
1933            // Match the idempotence behavior below of ignoring the token if
1934            // since is already advanced enough (in this case, because it's a
1935            // tombstone, we know it's the empty antichain).
1936            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1937        }
1938
1939        let reader_state = self.critical_reader(reader_id);
1940        assert_eq!(reader_state.opaque_codec, O::codec_name());
1941
1942        if &O::decode(reader_state.opaque.0) != expected_opaque {
1943            // No-op, but still commit the state change so that this gets
1944            // linearized.
1945            return Continue(Err((
1946                Codec64::decode(reader_state.opaque.0),
1947                Since(reader_state.since.clone()),
1948            )));
1949        }
1950
1951        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1952        if PartialOrder::less_equal(&reader_state.since, new_since) {
1953            reader_state.since.clone_from(new_since);
1954            self.update_since();
1955            Continue(Ok(Since(new_since.clone())))
1956        } else {
1957            // no work to be done -- the reader state's `since` is already sufficiently
1958            // advanced. we may someday need to revisit this branch when it's possible
1959            // for two `since` frontiers to be incomparable.
1960            Continue(Ok(Since(reader_state.since.clone())))
1961        }
1962    }
1963
1964    pub fn heartbeat_leased_reader(
1965        &mut self,
1966        reader_id: &LeasedReaderId,
1967        heartbeat_timestamp_ms: u64,
1968    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1969        // We expire all readers if the upper and since both advance to the
1970        // empty antichain. Gracefully handle this. At the same time,
1971        // short-circuit the cmd application so we don't needlessly create new
1972        // SeqNos.
1973        if self.is_tombstone() {
1974            return Break(NoOpStateTransition(false));
1975        }
1976
1977        match self.leased_readers.get_mut(reader_id) {
1978            Some(reader_state) => {
1979                reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1980                    heartbeat_timestamp_ms,
1981                    reader_state.last_heartbeat_timestamp_ms,
1982                );
1983                Continue(true)
1984            }
1985            // No-op, but we still commit the state change so that this gets
1986            // linearized (maybe we're looking at old state).
1987            None => Continue(false),
1988        }
1989    }
1990
1991    pub fn expire_leased_reader(
1992        &mut self,
1993        reader_id: &LeasedReaderId,
1994    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1995        // We expire all readers if the upper and since both advance to the
1996        // empty antichain. Gracefully handle this. At the same time,
1997        // short-circuit the cmd application so we don't needlessly create new
1998        // SeqNos.
1999        if self.is_tombstone() {
2000            return Break(NoOpStateTransition(false));
2001        }
2002
2003        let existed = self.leased_readers.remove(reader_id).is_some();
2004        if existed {
2005            // TODO(database-issues#6885): Re-enable this
2006            //
2007            // Temporarily disabling this because we think it might be the cause
2008            // of the remap since bug. Specifically, a clusterd process has a
2009            // ReadHandle for maintaining the once and one inside a Listen. If
2010            // we crash and stay down for longer than the read lease duration,
2011            // it's possible that an expiry of them both in quick succession
2012            // jumps the since forward to the Listen one.
2013            //
2014            // Don't forget to update the downgrade_since when this gets
2015            // switched back on.
2016            //
2017            // self.update_since();
2018        }
2019        // No-op if existed is false, but still commit the state change so that
2020        // this gets linearized.
2021        Continue(existed)
2022    }
2023
2024    pub fn expire_critical_reader(
2025        &mut self,
2026        reader_id: &CriticalReaderId,
2027    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2028        // We expire all readers if the upper and since both advance to the
2029        // empty antichain. Gracefully handle this. At the same time,
2030        // short-circuit the cmd application so we don't needlessly create new
2031        // SeqNos.
2032        if self.is_tombstone() {
2033            return Break(NoOpStateTransition(false));
2034        }
2035
2036        let existed = self.critical_readers.remove(reader_id).is_some();
2037        if existed {
2038            // TODO(database-issues#6885): Re-enable this
2039            //
2040            // Temporarily disabling this because we think it might be the cause
2041            // of the remap since bug. Specifically, a clusterd process has a
2042            // ReadHandle for maintaining the once and one inside a Listen. If
2043            // we crash and stay down for longer than the read lease duration,
2044            // it's possible that an expiry of them both in quick succession
2045            // jumps the since forward to the Listen one.
2046            //
2047            // Don't forget to update the downgrade_since when this gets
2048            // switched back on.
2049            //
2050            // self.update_since();
2051        }
2052        // This state transition is a no-op if existed is false, but we still
2053        // commit the state change so that this gets linearized (maybe we're
2054        // looking at old state).
2055        Continue(existed)
2056    }
2057
2058    pub fn expire_writer(
2059        &mut self,
2060        writer_id: &WriterId,
2061    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2062        // We expire all writers if the upper and since both advance to the
2063        // empty antichain. Gracefully handle this. At the same time,
2064        // short-circuit the cmd application so we don't needlessly create new
2065        // SeqNos.
2066        if self.is_tombstone() {
2067            return Break(NoOpStateTransition(false));
2068        }
2069
2070        let existed = self.writers.remove(writer_id).is_some();
2071        // This state transition is a no-op if existed is false, but we still
2072        // commit the state change so that this gets linearized (maybe we're
2073        // looking at old state).
2074        Continue(existed)
2075    }
2076
2077    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2078        self.leased_readers.get_mut(id)
2079    }
2080
2081    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2082        self.critical_readers
2083            .get_mut(id)
2084            .unwrap_or_else(|| {
2085                panic!(
2086                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2087                    id
2088                )
2089            })
2090    }
2091
2092    fn critical_since(&self) -> Option<Antichain<T>> {
2093        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2094        let mut since = critical_sinces.next().cloned()?;
2095        for s in critical_sinces {
2096            since.meet_assign(s);
2097        }
2098        Some(since)
2099    }
2100
2101    fn update_since(&mut self) {
2102        let mut sinces_iter = self
2103            .leased_readers
2104            .values()
2105            .map(|x| &x.since)
2106            .chain(self.critical_readers.values().map(|x| &x.since));
2107        let mut since = match sinces_iter.next() {
2108            Some(since) => since.clone(),
2109            None => {
2110                // If there are no current readers, leave `since` unchanged so
2111                // it doesn't regress.
2112                return;
2113            }
2114        };
2115        while let Some(s) = sinces_iter.next() {
2116            since.meet_assign(s);
2117        }
2118        self.trace.downgrade_since(&since);
2119    }
2120
2121    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2122        let mut seqno_since = seqno;
2123        for cap in self.leased_readers.values() {
2124            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2125        }
2126        // critical_readers don't hold a seqno capability.
2127        seqno_since
2128    }
2129
2130    fn tombstone_batch() -> HollowBatch<T> {
2131        HollowBatch::empty(Description::new(
2132            Antichain::from_elem(T::minimum()),
2133            Antichain::new(),
2134            Antichain::new(),
2135        ))
2136    }
2137
2138    pub(crate) fn is_tombstone(&self) -> bool {
2139        self.trace.upper().is_empty()
2140            && self.trace.since().is_empty()
2141            && self.writers.is_empty()
2142            && self.leased_readers.is_empty()
2143            && self.critical_readers.is_empty()
2144    }
2145
2146    pub(crate) fn is_single_empty_batch(&self) -> bool {
2147        let mut batch_count = 0;
2148        let mut is_empty = true;
2149        self.trace.map_batches(|b| {
2150            batch_count += 1;
2151            is_empty &= b.is_empty()
2152        });
2153        batch_count <= 1 && is_empty
2154    }
2155
2156    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2157        assert_eq!(self.trace.upper(), &Antichain::new());
2158        assert_eq!(self.trace.since(), &Antichain::new());
2159
2160        // Remember our current state, so we can decide whether we have to
2161        // record a transition in durable state.
2162        let was_tombstone = self.is_tombstone();
2163
2164        // Enter the "tombstone" state, if we're not in it already.
2165        self.writers.clear();
2166        self.leased_readers.clear();
2167        self.critical_readers.clear();
2168
2169        debug_assert!(self.is_tombstone());
2170
2171        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2172        // it -- the actual contents of our batches no longer matter.
2173        // This method progressively replaces batches in our state with simpler versions, to allow
2174        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2175        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2176        // transitions... most operations happen incrementally, and large single writes can overwhelm
2177        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2178
2179        let mut to_replace = None;
2180        let mut batch_count = 0;
2181        self.trace.map_batches(|b| {
2182            batch_count += 1;
2183            if !b.is_empty() && to_replace.is_none() {
2184                to_replace = Some(b.desc.clone());
2185            }
2186        });
2187        if let Some(desc) = to_replace {
2188            // We have a nonempty batch: replace it with an empty batch and return.
2189            // This should not produce an excessively large diff: if it did, we wouldn't have been
2190            // able to append that batch in the first place.
2191            let result = self.trace.apply_tombstone_merge(&desc);
2192            assert!(
2193                result.matched(),
2194                "merge with a matching desc should always match"
2195            );
2196            Continue(())
2197        } else if batch_count > 1 {
2198            // All our batches are empty, but we have more than one of them. Replace the whole set
2199            // with a new single-batch trace.
2200            // This produces a diff with a size proportional to the number of batches, but since
2201            // Spine keeps a logarithmic number of batches this should never be excessively large.
2202            let mut new_trace = Trace::default();
2203            new_trace.downgrade_since(&Antichain::new());
2204            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2205            assert_eq!(merge_reqs, Vec::new());
2206            self.trace = new_trace;
2207            Continue(())
2208        } else if !was_tombstone {
2209            // We were not tombstoned before, so have to make sure this state
2210            // transition is recorded.
2211            Continue(())
2212        } else {
2213            // All our batches are empty, and there's only one... there's no shrinking this
2214            // tombstone further.
2215            Break(NoOpStateTransition(()))
2216        }
2217    }
2218}
2219
2220// TODO: Document invariants.
2221#[derive(Debug)]
2222#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2223pub struct State<T> {
2224    pub(crate) applier_version: semver::Version,
2225    pub(crate) shard_id: ShardId,
2226
2227    pub(crate) seqno: SeqNo,
2228    /// A strictly increasing wall time of when this state was written, in
2229    /// milliseconds since the unix epoch.
2230    pub(crate) walltime_ms: u64,
2231    /// Hostname of the persist user that created this version of state. For
2232    /// debugging.
2233    pub(crate) hostname: String,
2234    pub(crate) collections: StateCollections<T>,
2235}
2236
2237/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2238/// ones in durable storage.
2239pub struct TypedState<K, V, T, D> {
2240    pub(crate) state: State<T>,
2241
2242    // According to the docs, PhantomData is to "mark things that act like they
2243    // own a T". State doesn't actually own K, V, or D, just the ability to
2244    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2245    // T [1], but also allows State to correctly derive Send+Sync.
2246    //
2247    // [1]:
2248    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2249    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2250}
2251
2252impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2253    #[cfg(any(test, debug_assertions))]
2254    pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2255        TypedState {
2256            state: State {
2257                applier_version,
2258                shard_id: self.shard_id.clone(),
2259                seqno: self.seqno.clone(),
2260                walltime_ms: self.walltime_ms,
2261                hostname,
2262                collections: self.collections.clone(),
2263            },
2264            _phantom: PhantomData,
2265        }
2266    }
2267
2268    pub(crate) fn clone_for_rollup(&self) -> Self {
2269        TypedState {
2270            state: State {
2271                applier_version: self.applier_version.clone(),
2272                shard_id: self.shard_id.clone(),
2273                seqno: self.seqno.clone(),
2274                walltime_ms: self.walltime_ms,
2275                hostname: self.hostname.clone(),
2276                collections: self.collections.clone(),
2277            },
2278            _phantom: PhantomData,
2279        }
2280    }
2281}
2282
2283impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2284    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2285        // Deconstruct self so we get a compile failure if new fields
2286        // are added.
2287        let TypedState { state, _phantom } = self;
2288        f.debug_struct("TypedState").field("state", state).finish()
2289    }
2290}
2291
2292// Impl PartialEq regardless of the type params.
2293#[cfg(any(test, debug_assertions))]
2294impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2295    fn eq(&self, other: &Self) -> bool {
2296        // Deconstruct self and other so we get a compile failure if new fields
2297        // are added.
2298        let TypedState {
2299            state: self_state,
2300            _phantom,
2301        } = self;
2302        let TypedState {
2303            state: other_state,
2304            _phantom,
2305        } = other;
2306        self_state == other_state
2307    }
2308}
2309
2310impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2311    type Target = State<T>;
2312
2313    fn deref(&self) -> &Self::Target {
2314        &self.state
2315    }
2316}
2317
2318impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2319    fn deref_mut(&mut self) -> &mut Self::Target {
2320        &mut self.state
2321    }
2322}
2323
2324impl<K, V, T, D> TypedState<K, V, T, D>
2325where
2326    K: Codec,
2327    V: Codec,
2328    T: Timestamp + Lattice + Codec64,
2329    D: Codec64,
2330{
2331    pub fn new(
2332        applier_version: Version,
2333        shard_id: ShardId,
2334        hostname: String,
2335        walltime_ms: u64,
2336    ) -> Self {
2337        let state = State {
2338            applier_version,
2339            shard_id,
2340            seqno: SeqNo::minimum(),
2341            walltime_ms,
2342            hostname,
2343            collections: StateCollections {
2344                last_gc_req: SeqNo::minimum(),
2345                rollups: BTreeMap::new(),
2346                active_rollup: None,
2347                active_gc: None,
2348                leased_readers: BTreeMap::new(),
2349                critical_readers: BTreeMap::new(),
2350                writers: BTreeMap::new(),
2351                schemas: BTreeMap::new(),
2352                trace: Trace::default(),
2353            },
2354        };
2355        TypedState {
2356            state,
2357            _phantom: PhantomData,
2358        }
2359    }
2360
2361    pub fn clone_apply<R, E, WorkFn>(
2362        &self,
2363        cfg: &PersistConfig,
2364        work_fn: &mut WorkFn,
2365    ) -> ControlFlow<E, (R, Self)>
2366    where
2367        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2368    {
2369        // Now that we support one minor version of forward compatibility, tag
2370        // each version of state with the _max_ version of code that has ever
2371        // contributed to it. Otherwise, we'd erroneously allow rolling back an
2372        // arbitrary number of versions if they were done one-by-one.
2373        let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2374        let mut new_state = State {
2375            applier_version: new_applier_version.clone(),
2376            shard_id: self.shard_id,
2377            seqno: self.seqno.next(),
2378            walltime_ms: (cfg.now)(),
2379            hostname: cfg.hostname.clone(),
2380            collections: self.collections.clone(),
2381        };
2382        // Make sure walltime_ms is strictly increasing, in case clocks are
2383        // offset.
2384        if new_state.walltime_ms <= self.walltime_ms {
2385            new_state.walltime_ms = self.walltime_ms + 1;
2386        }
2387
2388        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2389        let new_state = TypedState {
2390            state: new_state,
2391            _phantom: PhantomData,
2392        };
2393        Continue((work_ret, new_state))
2394    }
2395}
2396
2397#[derive(Copy, Clone, Debug)]
2398pub struct GcConfig {
2399    pub use_active_gc: bool,
2400    pub fallback_threshold_ms: u64,
2401    pub min_versions: usize,
2402    pub max_versions: usize,
2403}
2404
2405impl<T> State<T>
2406where
2407    T: Timestamp + Lattice + Codec64,
2408{
2409    pub fn shard_id(&self) -> ShardId {
2410        self.shard_id
2411    }
2412
2413    pub fn seqno(&self) -> SeqNo {
2414        self.seqno
2415    }
2416
2417    pub fn since(&self) -> &Antichain<T> {
2418        self.collections.trace.since()
2419    }
2420
2421    pub fn upper(&self) -> &Antichain<T> {
2422        self.collections.trace.upper()
2423    }
2424
2425    pub fn spine_batch_count(&self) -> usize {
2426        self.collections.trace.num_spine_batches()
2427    }
2428
2429    pub fn size_metrics(&self) -> StateSizeMetrics {
2430        let mut ret = StateSizeMetrics::default();
2431        self.blobs().for_each(|x| match x {
2432            HollowBlobRef::Batch(x) => {
2433                ret.hollow_batch_count += 1;
2434                ret.batch_part_count += x.part_count();
2435                ret.num_updates += x.len;
2436
2437                let batch_size = x.encoded_size_bytes();
2438                for x in x.parts.iter() {
2439                    if x.ts_rewrite().is_some() {
2440                        ret.rewrite_part_count += 1;
2441                    }
2442                    if x.is_inline() {
2443                        ret.inline_part_count += 1;
2444                        ret.inline_part_bytes += x.inline_bytes();
2445                    }
2446                }
2447                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2448                ret.state_batches_bytes += batch_size;
2449            }
2450            HollowBlobRef::Rollup(x) => {
2451                ret.state_rollup_count += 1;
2452                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2453            }
2454        });
2455        ret
2456    }
2457
2458    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2459        // We maintain the invariant that every version of state has at least
2460        // one rollup.
2461        self.collections
2462            .rollups
2463            .iter()
2464            .rev()
2465            .next()
2466            .expect("State should have at least one rollup if seqno > minimum")
2467    }
2468
2469    pub(crate) fn seqno_since(&self) -> SeqNo {
2470        self.collections.seqno_since(self.seqno)
2471    }
2472
2473    // Returns whether the cmd proposing this state has been selected to perform
2474    // background garbage collection work.
2475    //
2476    // If it was selected, this information is recorded in the state itself for
2477    // commit along with the cmd's state transition. This helps us to avoid
2478    // redundant work.
2479    //
2480    // Correctness does not depend on a gc assignment being executed, nor on
2481    // them being executed in the order they are given. But it is expected that
2482    // gc assignments are best-effort respected. In practice, cmds like
2483    // register_foo or expire_foo, where it would be awkward, ignore gc.
2484    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2485        let GcConfig {
2486            use_active_gc,
2487            fallback_threshold_ms,
2488            min_versions,
2489            max_versions,
2490        } = cfg;
2491        // This is an arbitrary-ish threshold that scales with seqno, but never
2492        // gets particularly big. It probably could be much bigger and certainly
2493        // could use a tuning pass at some point.
2494        let gc_threshold = if use_active_gc {
2495            u64::cast_from(min_versions)
2496        } else {
2497            std::cmp::max(
2498                1,
2499                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2500            )
2501        };
2502        let new_seqno_since = self.seqno_since();
2503        // Collect until the new seqno since... or the old since plus the max number of versions,
2504        // whatever is less.
2505        let gc_until_seqno = new_seqno_since.min(SeqNo(
2506            self.collections
2507                .last_gc_req
2508                .0
2509                .saturating_add(u64::cast_from(max_versions)),
2510        ));
2511        let should_gc = new_seqno_since
2512            .0
2513            .saturating_sub(self.collections.last_gc_req.0)
2514            >= gc_threshold;
2515
2516        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2517        // it's been a while since it started, we should gc.
2518        let should_gc = if use_active_gc && !should_gc {
2519            match self.collections.active_gc {
2520                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2521                None => false,
2522            }
2523        } else {
2524            should_gc
2525        };
2526        // Assign GC traffic preferentially to writers, falling back to anyone
2527        // generating new state versions if there are no writers.
2528        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2529        // Always assign GC work to a tombstoned shard to have the chance to
2530        // clean up any residual blobs. This is safe (won't cause excess gc)
2531        // as the only allowed command after becoming a tombstone is to write
2532        // the final rollup.
2533        let tombstone_needs_gc = self.collections.is_tombstone();
2534        let should_gc = should_gc || tombstone_needs_gc;
2535        let should_gc = if use_active_gc {
2536            // If we have an active gc, we should only gc if the active gc is
2537            // sufficiently old. This is to avoid doing more gc work than
2538            // necessary.
2539            should_gc
2540                && match self.collections.active_gc {
2541                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2542                    None => true,
2543                }
2544        } else {
2545            should_gc
2546        };
2547        if should_gc {
2548            self.collections.last_gc_req = gc_until_seqno;
2549            Some(GcReq {
2550                shard_id: self.shard_id,
2551                new_seqno_since: gc_until_seqno,
2552            })
2553        } else {
2554            None
2555        }
2556    }
2557
2558    /// Return the number of gc-ineligible state versions.
2559    pub fn seqnos_held(&self) -> usize {
2560        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2561    }
2562
2563    /// Expire all readers and writers up to the given walltime_ms.
2564    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2565        let mut metrics = ExpiryMetrics::default();
2566        let shard_id = self.shard_id();
2567        self.collections.leased_readers.retain(|id, state| {
2568            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2569            if !retain {
2570                info!(
2571                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2572                    state.debug.purpose
2573                );
2574                metrics.readers_expired += 1;
2575            }
2576            retain
2577        });
2578        // critical_readers don't need forced expiration. (In fact, that's the point!)
2579        self.collections.writers.retain(|id, state| {
2580            let retain =
2581                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2582            if !retain {
2583                info!(
2584                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2585                    state.debug.purpose
2586                );
2587                metrics.writers_expired += 1;
2588            }
2589            retain
2590        });
2591        metrics
2592    }
2593
2594    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2595    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2596    /// referenced parts are valid to read.
2597    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2598        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2599            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2600                self.collections.trace.since().clone(),
2601            )));
2602        }
2603        let upper = self.collections.trace.upper();
2604        if PartialOrder::less_equal(upper, as_of) {
2605            return Err(SnapshotErr::AsOfNotYetAvailable(
2606                self.seqno,
2607                Upper(upper.clone()),
2608            ));
2609        }
2610
2611        let batches = self
2612            .collections
2613            .trace
2614            .batches()
2615            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2616            .cloned()
2617            .collect();
2618        Ok(batches)
2619    }
2620
2621    // NB: Unlike the other methods here, this one is read-only.
2622    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2623        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2624            return Err(Since(self.collections.trace.since().clone()));
2625        }
2626        Ok(())
2627    }
2628
2629    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2630        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2631        // batch and this iterates through all batches to find the next one.
2632        self.collections
2633            .trace
2634            .batches()
2635            .find(|b| {
2636                PartialOrder::less_equal(b.desc.lower(), frontier)
2637                    && PartialOrder::less_than(frontier, b.desc.upper())
2638            })
2639            .cloned()
2640            .ok_or(self.seqno)
2641    }
2642
2643    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2644        self.collections.active_rollup
2645    }
2646
2647    pub fn need_rollup(
2648        &self,
2649        threshold: usize,
2650        use_active_rollup: bool,
2651        fallback_threshold_ms: u64,
2652        now: u64,
2653    ) -> Option<SeqNo> {
2654        let (latest_rollup_seqno, _) = self.latest_rollup();
2655
2656        // Tombstoned shards require one final rollup. However, because we
2657        // write a rollup as of SeqNo X and then link it in using a state
2658        // transition (in this case from X to X+1), the minimum number of
2659        // live diffs is actually two. Detect when we're in this minimal
2660        // two diff state and stop the (otherwise) infinite iteration.
2661        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2662            return Some(self.seqno);
2663        }
2664
2665        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2666
2667        if use_active_rollup {
2668            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2669            // we should start a new rollup.
2670            // If there is an active rollup, we should check if it has been running too long.
2671            // If it has, we should start a new rollup.
2672            // This is to guard against a worker dying/taking too long/etc.
2673            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2674                match self.active_rollup() {
2675                    Some(active_rollup) => {
2676                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2677                            return Some(self.seqno);
2678                        }
2679                    }
2680                    None => {
2681                        return Some(self.seqno);
2682                    }
2683                }
2684            }
2685        } else {
2686            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2687            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2688            // racing / performing redundant work.
2689            if seqnos_since_last_rollup > 0
2690                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2691            {
2692                return Some(self.seqno);
2693            }
2694
2695            // however, since maintenance is best-effort and could fail, do assign rollup
2696            // work to every seqno after a fallback threshold to ensure one is written.
2697            if seqnos_since_last_rollup
2698                > u64::cast_from(
2699                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2700                )
2701            {
2702                return Some(self.seqno);
2703            }
2704        }
2705
2706        None
2707    }
2708
2709    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2710        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2711        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2712        batches.chain(rollups)
2713    }
2714}
2715
2716fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2717    let val = hex::encode(val);
2718    val.serialize(s)
2719}
2720
2721fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2722    val: &Option<LazyProto<T>>,
2723    s: S,
2724) -> Result<S::Ok, S::Error> {
2725    val.as_ref()
2726        .map(|lazy| hex::encode(&lazy.into_proto()))
2727        .serialize(s)
2728}
2729
2730fn serialize_part_stats<S: Serializer>(
2731    val: &Option<LazyPartStats>,
2732    s: S,
2733) -> Result<S::Ok, S::Error> {
2734    let val = val.as_ref().map(|x| x.decode().key);
2735    val.serialize(s)
2736}
2737
2738fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2739    // This is only used for debugging, so hack to assume that D is i64.
2740    let val = val.map(i64::decode);
2741    val.serialize(s)
2742}
2743
2744// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2745// intentionally gated from users, so not strictly subject to our backward
2746// compatibility guarantees, but still probably best to be thoughtful about
2747// making unnecessary changes. Additionally, it's nice to make the output as
2748// nice to use as possible without tying our hands for the actual code usages.
2749impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2750    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2751        let State {
2752            applier_version,
2753            shard_id,
2754            seqno,
2755            walltime_ms,
2756            hostname,
2757            collections:
2758                StateCollections {
2759                    last_gc_req,
2760                    rollups,
2761                    active_rollup,
2762                    active_gc,
2763                    leased_readers,
2764                    critical_readers,
2765                    writers,
2766                    schemas,
2767                    trace,
2768                },
2769        } = self;
2770        let mut s = s.serialize_struct("State", 13)?;
2771        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2772        let () = s.serialize_field("shard_id", shard_id)?;
2773        let () = s.serialize_field("seqno", seqno)?;
2774        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2775        let () = s.serialize_field("hostname", hostname)?;
2776        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2777        let () = s.serialize_field("rollups", rollups)?;
2778        let () = s.serialize_field("active_rollup", active_rollup)?;
2779        let () = s.serialize_field("active_gc", active_gc)?;
2780        let () = s.serialize_field("leased_readers", leased_readers)?;
2781        let () = s.serialize_field("critical_readers", critical_readers)?;
2782        let () = s.serialize_field("writers", writers)?;
2783        let () = s.serialize_field("schemas", schemas)?;
2784        let () = s.serialize_field("since", &trace.since().elements())?;
2785        let () = s.serialize_field("upper", &trace.upper().elements())?;
2786        let trace = trace.flatten();
2787        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2788        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2789        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2790        let () = s.serialize_field("merges", &trace.merges)?;
2791        s.end()
2792    }
2793}
2794
2795#[derive(Debug, Default)]
2796pub struct StateSizeMetrics {
2797    pub hollow_batch_count: usize,
2798    pub batch_part_count: usize,
2799    pub rewrite_part_count: usize,
2800    pub num_updates: usize,
2801    pub largest_batch_bytes: usize,
2802    pub state_batches_bytes: usize,
2803    pub state_rollups_bytes: usize,
2804    pub state_rollup_count: usize,
2805    pub inline_part_count: usize,
2806    pub inline_part_bytes: usize,
2807}
2808
2809#[derive(Default)]
2810pub struct ExpiryMetrics {
2811    pub(crate) readers_expired: usize,
2812    pub(crate) writers_expired: usize,
2813}
2814
2815/// Wrapper for Antichain that represents a Since
2816#[derive(Debug, Clone, PartialEq)]
2817pub struct Since<T>(pub Antichain<T>);
2818
2819/// Wrapper for Antichain that represents an Upper
2820#[derive(Debug, PartialEq)]
2821pub struct Upper<T>(pub Antichain<T>);
2822
2823#[cfg(test)]
2824pub(crate) mod tests {
2825    use std::ops::Range;
2826    use std::str::FromStr;
2827
2828    use bytes::Bytes;
2829    use mz_build_info::DUMMY_BUILD_INFO;
2830    use mz_dyncfg::ConfigUpdates;
2831    use mz_ore::now::SYSTEM_TIME;
2832    use mz_ore::{assert_none, assert_ok};
2833    use mz_proto::RustType;
2834    use proptest::prelude::*;
2835    use proptest::strategy::ValueTree;
2836
2837    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2838    use crate::PersistLocation;
2839    use crate::cache::PersistClientCache;
2840    use crate::internal::encoding::any_some_lazy_part_stats;
2841    use crate::internal::paths::RollupId;
2842    use crate::internal::trace::tests::any_trace;
2843    use crate::tests::new_test_client_cache;
2844
2845    use super::*;
2846
2847    const LEASE_DURATION_MS: u64 = 900 * 1000;
2848    fn debug_state() -> HandleDebugState {
2849        HandleDebugState {
2850            hostname: "debug".to_owned(),
2851            purpose: "finding the bugs".to_owned(),
2852        }
2853    }
2854
2855    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2856        num_runs: usize,
2857    ) -> impl Strategy<Value = HollowBatch<T>> {
2858        (
2859            any::<T>(),
2860            any::<T>(),
2861            any::<T>(),
2862            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2863            any::<usize>(),
2864        )
2865            .prop_map(move |(t0, t1, since, parts, len)| {
2866                let (lower, upper) = if t0 <= t1 {
2867                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2868                } else {
2869                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2870                };
2871                let since = Antichain::from_elem(since);
2872
2873                let run_splits = (1..num_runs)
2874                    .map(|i| i * parts.len() / num_runs)
2875                    .collect::<Vec<_>>();
2876
2877                let run_meta = (0..num_runs)
2878                    .map(|_| {
2879                        let mut meta = RunMeta::default();
2880                        meta.id = Some(RunId::new());
2881                        meta
2882                    })
2883                    .collect::<Vec<_>>();
2884
2885                HollowBatch::new(
2886                    Description::new(lower, upper, since),
2887                    parts,
2888                    len % 10,
2889                    run_meta,
2890                    run_splits,
2891                )
2892            })
2893    }
2894
2895    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2896        Strategy::prop_map(
2897            (
2898                any::<T>(),
2899                any::<T>(),
2900                any::<T>(),
2901                proptest::collection::vec(any_run_part::<T>(), 0..20),
2902                any::<usize>(),
2903                0..=10usize,
2904                proptest::collection::vec(any::<RunId>(), 10),
2905            ),
2906            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2907                let (lower, upper) = if t0 <= t1 {
2908                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2909                } else {
2910                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2911                };
2912                let since = Antichain::from_elem(since);
2913                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2914                    let run_splits = (1..num_runs)
2915                        .map(|i| i * parts.len() / num_runs)
2916                        .collect::<Vec<_>>();
2917
2918                    let run_meta = (0..num_runs)
2919                        .enumerate()
2920                        .map(|(i, _)| {
2921                            let mut meta = RunMeta::default();
2922                            meta.id = Some(run_ids[i]);
2923                            meta
2924                        })
2925                        .collect::<Vec<_>>();
2926
2927                    HollowBatch::new(
2928                        Description::new(lower, upper, since),
2929                        parts,
2930                        len % 10,
2931                        run_meta,
2932                        run_splits,
2933                    )
2934                } else {
2935                    HollowBatch::new_run_for_test(
2936                        Description::new(lower, upper, since),
2937                        parts,
2938                        len % 10,
2939                        run_ids[0],
2940                    )
2941                }
2942            },
2943        )
2944    }
2945
2946    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2947        Strategy::prop_map(
2948            (
2949                any::<bool>(),
2950                any_hollow_batch_part(),
2951                any::<Option<T>>(),
2952                any::<Option<SchemaId>>(),
2953                any::<Option<SchemaId>>(),
2954            ),
2955            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2956                if is_hollow {
2957                    BatchPart::Hollow(hollow)
2958                } else {
2959                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2960                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2961                    BatchPart::Inline {
2962                        updates,
2963                        ts_rewrite,
2964                        schema_id,
2965                        deprecated_schema_id,
2966                    }
2967                }
2968            },
2969        )
2970    }
2971
2972    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2973        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2974    }
2975
2976    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2977    -> impl Strategy<Value = HollowBatchPart<T>> {
2978        Strategy::prop_map(
2979            (
2980                any::<PartialBatchKey>(),
2981                any::<usize>(),
2982                any::<Vec<u8>>(),
2983                any_some_lazy_part_stats(),
2984                any::<Option<T>>(),
2985                any::<[u8; 8]>(),
2986                any::<Option<BatchColumnarFormat>>(),
2987                any::<Option<SchemaId>>(),
2988                any::<Option<SchemaId>>(),
2989            ),
2990            |(
2991                key,
2992                encoded_size_bytes,
2993                key_lower,
2994                stats,
2995                ts_rewrite,
2996                diffs_sum,
2997                format,
2998                schema_id,
2999                deprecated_schema_id,
3000            )| {
3001                HollowBatchPart {
3002                    key,
3003                    encoded_size_bytes,
3004                    key_lower,
3005                    structured_key_lower: None,
3006                    stats,
3007                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3008                    diffs_sum: Some(diffs_sum),
3009                    format,
3010                    schema_id,
3011                    deprecated_schema_id,
3012                }
3013            },
3014        )
3015    }
3016
3017    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3018        Strategy::prop_map(
3019            (
3020                any::<SeqNo>(),
3021                any::<Option<T>>(),
3022                any::<u64>(),
3023                any::<u64>(),
3024                any::<HandleDebugState>(),
3025            ),
3026            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3027                // lease_duration_ms of 0 means this state was written by an old
3028                // version of code, which means we'll migrate it in the decode
3029                // path. Avoid.
3030                if lease_duration_ms == 0 {
3031                    lease_duration_ms += 1;
3032                }
3033                LeasedReaderState {
3034                    seqno,
3035                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3036                    last_heartbeat_timestamp_ms,
3037                    lease_duration_ms,
3038                    debug,
3039                }
3040            },
3041        )
3042    }
3043
3044    pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3045    {
3046        Strategy::prop_map(
3047            (
3048                any::<Option<T>>(),
3049                any::<OpaqueState>(),
3050                any::<String>(),
3051                any::<HandleDebugState>(),
3052            ),
3053            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3054                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3055                opaque,
3056                opaque_codec,
3057                debug,
3058            },
3059        )
3060    }
3061
3062    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3063        Strategy::prop_map(
3064            (
3065                any::<u64>(),
3066                any::<u64>(),
3067                any::<IdempotencyToken>(),
3068                any::<Option<T>>(),
3069                any::<HandleDebugState>(),
3070            ),
3071            |(
3072                last_heartbeat_timestamp_ms,
3073                lease_duration_ms,
3074                most_recent_write_token,
3075                most_recent_write_upper,
3076                debug,
3077            )| WriterState {
3078                last_heartbeat_timestamp_ms,
3079                lease_duration_ms,
3080                most_recent_write_token,
3081                most_recent_write_upper: most_recent_write_upper
3082                    .map_or_else(Antichain::new, Antichain::from_elem),
3083                debug,
3084            },
3085        )
3086    }
3087
3088    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3089        Strategy::prop_map(
3090            (
3091                any::<Vec<u8>>(),
3092                any::<Vec<u8>>(),
3093                any::<Vec<u8>>(),
3094                any::<Vec<u8>>(),
3095            ),
3096            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3097                key: Bytes::from(key),
3098                key_data_type: Bytes::from(key_data_type),
3099                val: Bytes::from(val),
3100                val_data_type: Bytes::from(val_data_type),
3101            },
3102        )
3103    }
3104
3105    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3106        num_trace_batches: Range<usize>,
3107    ) -> impl Strategy<Value = State<T>> {
3108        let part1 = (
3109            any::<ShardId>(),
3110            any::<SeqNo>(),
3111            any::<u64>(),
3112            any::<String>(),
3113            any::<SeqNo>(),
3114            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3115            proptest::option::of(any::<ActiveRollup>()),
3116        );
3117
3118        let part2 = (
3119            proptest::option::of(any::<ActiveGc>()),
3120            proptest::collection::btree_map(
3121                any::<LeasedReaderId>(),
3122                any_leased_reader_state::<T>(),
3123                1..3,
3124            ),
3125            proptest::collection::btree_map(
3126                any::<CriticalReaderId>(),
3127                any_critical_reader_state::<T>(),
3128                1..3,
3129            ),
3130            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3131            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3132            any_trace::<T>(num_trace_batches),
3133        );
3134
3135        (part1, part2).prop_map(
3136            |(
3137                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3138                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3139            )| State {
3140                applier_version: semver::Version::new(1, 2, 3),
3141                shard_id,
3142                seqno,
3143                walltime_ms,
3144                hostname,
3145                collections: StateCollections {
3146                    last_gc_req,
3147                    rollups,
3148                    active_rollup,
3149                    active_gc,
3150                    leased_readers,
3151                    critical_readers,
3152                    writers,
3153                    schemas,
3154                    trace,
3155                },
3156            },
3157        )
3158    }
3159
3160    pub(crate) fn hollow<T: Timestamp>(
3161        lower: T,
3162        upper: T,
3163        keys: &[&str],
3164        len: usize,
3165    ) -> HollowBatch<T> {
3166        HollowBatch::new_run(
3167            Description::new(
3168                Antichain::from_elem(lower),
3169                Antichain::from_elem(upper),
3170                Antichain::from_elem(T::minimum()),
3171            ),
3172            keys.iter()
3173                .map(|x| {
3174                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3175                        key: PartialBatchKey((*x).to_owned()),
3176                        encoded_size_bytes: 0,
3177                        key_lower: vec![],
3178                        structured_key_lower: None,
3179                        stats: None,
3180                        ts_rewrite: None,
3181                        diffs_sum: None,
3182                        format: None,
3183                        schema_id: None,
3184                        deprecated_schema_id: None,
3185                    }))
3186                })
3187                .collect(),
3188            len,
3189        )
3190    }
3191
3192    #[mz_ore::test]
3193    fn downgrade_since() {
3194        let mut state = TypedState::<(), (), u64, i64>::new(
3195            DUMMY_BUILD_INFO.semver_version(),
3196            ShardId::new(),
3197            "".to_owned(),
3198            0,
3199        );
3200        let reader = LeasedReaderId::new();
3201        let seqno = SeqNo::minimum();
3202        let now = SYSTEM_TIME.clone();
3203        let _ = state.collections.register_leased_reader(
3204            "",
3205            &reader,
3206            "",
3207            seqno,
3208            Duration::from_secs(10),
3209            now(),
3210            false,
3211        );
3212
3213        // The shard global since == 0 initially.
3214        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3215
3216        // Greater
3217        assert_eq!(
3218            state.collections.downgrade_since(
3219                &reader,
3220                seqno,
3221                None,
3222                &Antichain::from_elem(2),
3223                now()
3224            ),
3225            Continue(Since(Antichain::from_elem(2)))
3226        );
3227        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3228        // Equal (no-op)
3229        assert_eq!(
3230            state.collections.downgrade_since(
3231                &reader,
3232                seqno,
3233                None,
3234                &Antichain::from_elem(2),
3235                now()
3236            ),
3237            Continue(Since(Antichain::from_elem(2)))
3238        );
3239        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3240        // Less (no-op)
3241        assert_eq!(
3242            state.collections.downgrade_since(
3243                &reader,
3244                seqno,
3245                None,
3246                &Antichain::from_elem(1),
3247                now()
3248            ),
3249            Continue(Since(Antichain::from_elem(2)))
3250        );
3251        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3252
3253        // Create a second reader.
3254        let reader2 = LeasedReaderId::new();
3255        let _ = state.collections.register_leased_reader(
3256            "",
3257            &reader2,
3258            "",
3259            seqno,
3260            Duration::from_secs(10),
3261            now(),
3262            false,
3263        );
3264
3265        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3266        assert_eq!(
3267            state.collections.downgrade_since(
3268                &reader2,
3269                seqno,
3270                None,
3271                &Antichain::from_elem(3),
3272                now()
3273            ),
3274            Continue(Since(Antichain::from_elem(3)))
3275        );
3276        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3277        // Shard since == 3 when all readers have since >= 3.
3278        assert_eq!(
3279            state.collections.downgrade_since(
3280                &reader,
3281                seqno,
3282                None,
3283                &Antichain::from_elem(5),
3284                now()
3285            ),
3286            Continue(Since(Antichain::from_elem(5)))
3287        );
3288        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3289
3290        // Shard since unaffected readers with since > shard since expiring.
3291        assert_eq!(
3292            state.collections.expire_leased_reader(&reader),
3293            Continue(true)
3294        );
3295        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3296
3297        // Create a third reader.
3298        let reader3 = LeasedReaderId::new();
3299        let _ = state.collections.register_leased_reader(
3300            "",
3301            &reader3,
3302            "",
3303            seqno,
3304            Duration::from_secs(10),
3305            now(),
3306            false,
3307        );
3308
3309        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3310        assert_eq!(
3311            state.collections.downgrade_since(
3312                &reader3,
3313                seqno,
3314                None,
3315                &Antichain::from_elem(10),
3316                now()
3317            ),
3318            Continue(Since(Antichain::from_elem(10)))
3319        );
3320        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3321
3322        // Shard since advances when reader with the minimal since expires.
3323        assert_eq!(
3324            state.collections.expire_leased_reader(&reader2),
3325            Continue(true)
3326        );
3327        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3328        // Switch this assertion back when we re-enable this.
3329        //
3330        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3331        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3332
3333        // Shard since unaffected when all readers are expired.
3334        assert_eq!(
3335            state.collections.expire_leased_reader(&reader3),
3336            Continue(true)
3337        );
3338        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3339        // Switch this assertion back when we re-enable this.
3340        //
3341        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3342        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3343    }
3344
3345    #[mz_ore::test]
3346    fn compare_and_downgrade_since() {
3347        let mut state = TypedState::<(), (), u64, i64>::new(
3348            DUMMY_BUILD_INFO.semver_version(),
3349            ShardId::new(),
3350            "".to_owned(),
3351            0,
3352        );
3353        let reader = CriticalReaderId::new();
3354        let _ = state
3355            .collections
3356            .register_critical_reader::<u64>("", &reader, "");
3357
3358        // The shard global since == 0 initially.
3359        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3360        // The initial opaque value should be set.
3361        assert_eq!(
3362            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3363            u64::initial()
3364        );
3365
3366        // Greater
3367        assert_eq!(
3368            state.collections.compare_and_downgrade_since::<u64>(
3369                &reader,
3370                &u64::initial(),
3371                (&1, &Antichain::from_elem(2)),
3372            ),
3373            Continue(Ok(Since(Antichain::from_elem(2))))
3374        );
3375        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3376        assert_eq!(
3377            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3378            1
3379        );
3380        // Equal (no-op)
3381        assert_eq!(
3382            state.collections.compare_and_downgrade_since::<u64>(
3383                &reader,
3384                &1,
3385                (&2, &Antichain::from_elem(2)),
3386            ),
3387            Continue(Ok(Since(Antichain::from_elem(2))))
3388        );
3389        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3390        assert_eq!(
3391            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3392            2
3393        );
3394        // Less (no-op)
3395        assert_eq!(
3396            state.collections.compare_and_downgrade_since::<u64>(
3397                &reader,
3398                &2,
3399                (&3, &Antichain::from_elem(1)),
3400            ),
3401            Continue(Ok(Since(Antichain::from_elem(2))))
3402        );
3403        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3404        assert_eq!(
3405            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3406            3
3407        );
3408    }
3409
3410    #[mz_ore::test]
3411    fn compare_and_append() {
3412        let state = &mut TypedState::<String, String, u64, i64>::new(
3413            DUMMY_BUILD_INFO.semver_version(),
3414            ShardId::new(),
3415            "".to_owned(),
3416            0,
3417        )
3418        .collections;
3419
3420        let writer_id = WriterId::new();
3421        let now = SYSTEM_TIME.clone();
3422
3423        // State is initially empty.
3424        assert_eq!(state.trace.num_spine_batches(), 0);
3425        assert_eq!(state.trace.num_hollow_batches(), 0);
3426        assert_eq!(state.trace.num_updates(), 0);
3427
3428        // Cannot insert a batch with a lower != current shard upper.
3429        assert_eq!(
3430            state.compare_and_append(
3431                &hollow(1, 2, &["key1"], 1),
3432                &writer_id,
3433                now(),
3434                LEASE_DURATION_MS,
3435                &IdempotencyToken::new(),
3436                &debug_state(),
3437                0,
3438                100,
3439                None
3440            ),
3441            Break(CompareAndAppendBreak::Upper {
3442                shard_upper: Antichain::from_elem(0),
3443                writer_upper: Antichain::from_elem(0)
3444            })
3445        );
3446
3447        // Insert an empty batch with an upper > lower..
3448        assert!(
3449            state
3450                .compare_and_append(
3451                    &hollow(0, 5, &[], 0),
3452                    &writer_id,
3453                    now(),
3454                    LEASE_DURATION_MS,
3455                    &IdempotencyToken::new(),
3456                    &debug_state(),
3457                    0,
3458                    100,
3459                    None
3460                )
3461                .is_continue()
3462        );
3463
3464        // Cannot insert a batch with a upper less than the lower.
3465        assert_eq!(
3466            state.compare_and_append(
3467                &hollow(5, 4, &["key1"], 1),
3468                &writer_id,
3469                now(),
3470                LEASE_DURATION_MS,
3471                &IdempotencyToken::new(),
3472                &debug_state(),
3473                0,
3474                100,
3475                None
3476            ),
3477            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3478                lower: Antichain::from_elem(5),
3479                upper: Antichain::from_elem(4)
3480            }))
3481        );
3482
3483        // Cannot insert a nonempty batch with an upper equal to lower.
3484        assert_eq!(
3485            state.compare_and_append(
3486                &hollow(5, 5, &["key1"], 1),
3487                &writer_id,
3488                now(),
3489                LEASE_DURATION_MS,
3490                &IdempotencyToken::new(),
3491                &debug_state(),
3492                0,
3493                100,
3494                None
3495            ),
3496            Break(CompareAndAppendBreak::InvalidUsage(
3497                InvalidEmptyTimeInterval {
3498                    lower: Antichain::from_elem(5),
3499                    upper: Antichain::from_elem(5),
3500                    keys: vec!["key1".to_owned()],
3501                }
3502            ))
3503        );
3504
3505        // Can insert an empty batch with an upper equal to lower.
3506        assert!(
3507            state
3508                .compare_and_append(
3509                    &hollow(5, 5, &[], 0),
3510                    &writer_id,
3511                    now(),
3512                    LEASE_DURATION_MS,
3513                    &IdempotencyToken::new(),
3514                    &debug_state(),
3515                    0,
3516                    100,
3517                    None
3518                )
3519                .is_continue()
3520        );
3521    }
3522
3523    #[mz_ore::test]
3524    fn snapshot() {
3525        let now = SYSTEM_TIME.clone();
3526
3527        let mut state = TypedState::<String, String, u64, i64>::new(
3528            DUMMY_BUILD_INFO.semver_version(),
3529            ShardId::new(),
3530            "".to_owned(),
3531            0,
3532        );
3533        // Cannot take a snapshot with as_of == shard upper.
3534        assert_eq!(
3535            state.snapshot(&Antichain::from_elem(0)),
3536            Err(SnapshotErr::AsOfNotYetAvailable(
3537                SeqNo(0),
3538                Upper(Antichain::from_elem(0))
3539            ))
3540        );
3541
3542        // Cannot take a snapshot with as_of > shard upper.
3543        assert_eq!(
3544            state.snapshot(&Antichain::from_elem(5)),
3545            Err(SnapshotErr::AsOfNotYetAvailable(
3546                SeqNo(0),
3547                Upper(Antichain::from_elem(0))
3548            ))
3549        );
3550
3551        let writer_id = WriterId::new();
3552
3553        // Advance upper to 5.
3554        assert!(
3555            state
3556                .collections
3557                .compare_and_append(
3558                    &hollow(0, 5, &["key1"], 1),
3559                    &writer_id,
3560                    now(),
3561                    LEASE_DURATION_MS,
3562                    &IdempotencyToken::new(),
3563                    &debug_state(),
3564                    0,
3565                    100,
3566                    None
3567                )
3568                .is_continue()
3569        );
3570
3571        // Can take a snapshot with as_of < upper.
3572        assert_eq!(
3573            state.snapshot(&Antichain::from_elem(0)),
3574            Ok(vec![hollow(0, 5, &["key1"], 1)])
3575        );
3576
3577        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3578        assert_eq!(
3579            state.snapshot(&Antichain::from_elem(4)),
3580            Ok(vec![hollow(0, 5, &["key1"], 1)])
3581        );
3582
3583        // Cannot take a snapshot with as_of >= upper.
3584        assert_eq!(
3585            state.snapshot(&Antichain::from_elem(5)),
3586            Err(SnapshotErr::AsOfNotYetAvailable(
3587                SeqNo(0),
3588                Upper(Antichain::from_elem(5))
3589            ))
3590        );
3591        assert_eq!(
3592            state.snapshot(&Antichain::from_elem(6)),
3593            Err(SnapshotErr::AsOfNotYetAvailable(
3594                SeqNo(0),
3595                Upper(Antichain::from_elem(5))
3596            ))
3597        );
3598
3599        let reader = LeasedReaderId::new();
3600        // Advance the since to 2.
3601        let _ = state.collections.register_leased_reader(
3602            "",
3603            &reader,
3604            "",
3605            SeqNo::minimum(),
3606            Duration::from_secs(10),
3607            now(),
3608            false,
3609        );
3610        assert_eq!(
3611            state.collections.downgrade_since(
3612                &reader,
3613                SeqNo::minimum(),
3614                None,
3615                &Antichain::from_elem(2),
3616                now()
3617            ),
3618            Continue(Since(Antichain::from_elem(2)))
3619        );
3620        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3621        // Cannot take a snapshot with as_of < shard_since.
3622        assert_eq!(
3623            state.snapshot(&Antichain::from_elem(1)),
3624            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3625                Antichain::from_elem(2)
3626            )))
3627        );
3628
3629        // Advance the upper to 10 via an empty batch.
3630        assert!(
3631            state
3632                .collections
3633                .compare_and_append(
3634                    &hollow(5, 10, &[], 0),
3635                    &writer_id,
3636                    now(),
3637                    LEASE_DURATION_MS,
3638                    &IdempotencyToken::new(),
3639                    &debug_state(),
3640                    0,
3641                    100,
3642                    None
3643                )
3644                .is_continue()
3645        );
3646
3647        // Can still take snapshots at times < upper.
3648        assert_eq!(
3649            state.snapshot(&Antichain::from_elem(7)),
3650            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3651        );
3652
3653        // Cannot take snapshots with as_of >= upper.
3654        assert_eq!(
3655            state.snapshot(&Antichain::from_elem(10)),
3656            Err(SnapshotErr::AsOfNotYetAvailable(
3657                SeqNo(0),
3658                Upper(Antichain::from_elem(10))
3659            ))
3660        );
3661
3662        // Advance upper to 15.
3663        assert!(
3664            state
3665                .collections
3666                .compare_and_append(
3667                    &hollow(10, 15, &["key2"], 1),
3668                    &writer_id,
3669                    now(),
3670                    LEASE_DURATION_MS,
3671                    &IdempotencyToken::new(),
3672                    &debug_state(),
3673                    0,
3674                    100,
3675                    None
3676                )
3677                .is_continue()
3678        );
3679
3680        // Filter out batches whose lowers are less than the requested as of (the
3681        // batches that are too far in the future for the requested as_of).
3682        assert_eq!(
3683            state.snapshot(&Antichain::from_elem(9)),
3684            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3685        );
3686
3687        // Don't filter out batches whose lowers are <= the requested as_of.
3688        assert_eq!(
3689            state.snapshot(&Antichain::from_elem(10)),
3690            Ok(vec![
3691                hollow(0, 5, &["key1"], 1),
3692                hollow(5, 10, &[], 0),
3693                hollow(10, 15, &["key2"], 1)
3694            ])
3695        );
3696
3697        assert_eq!(
3698            state.snapshot(&Antichain::from_elem(11)),
3699            Ok(vec![
3700                hollow(0, 5, &["key1"], 1),
3701                hollow(5, 10, &[], 0),
3702                hollow(10, 15, &["key2"], 1)
3703            ])
3704        );
3705    }
3706
3707    #[mz_ore::test]
3708    fn next_listen_batch() {
3709        let mut state = TypedState::<String, String, u64, i64>::new(
3710            DUMMY_BUILD_INFO.semver_version(),
3711            ShardId::new(),
3712            "".to_owned(),
3713            0,
3714        );
3715
3716        // Empty collection never has any batches to listen for, regardless of the
3717        // current frontier.
3718        assert_eq!(
3719            state.next_listen_batch(&Antichain::from_elem(0)),
3720            Err(SeqNo(0))
3721        );
3722        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3723
3724        let writer_id = WriterId::new();
3725        let now = SYSTEM_TIME.clone();
3726
3727        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3728        assert!(
3729            state
3730                .collections
3731                .compare_and_append(
3732                    &hollow(0, 5, &["key1"], 1),
3733                    &writer_id,
3734                    now(),
3735                    LEASE_DURATION_MS,
3736                    &IdempotencyToken::new(),
3737                    &debug_state(),
3738                    0,
3739                    100,
3740                    None
3741                )
3742                .is_continue()
3743        );
3744        assert!(
3745            state
3746                .collections
3747                .compare_and_append(
3748                    &hollow(5, 10, &["key2"], 1),
3749                    &writer_id,
3750                    now(),
3751                    LEASE_DURATION_MS,
3752                    &IdempotencyToken::new(),
3753                    &debug_state(),
3754                    0,
3755                    100,
3756                    None
3757                )
3758                .is_continue()
3759        );
3760
3761        // All frontiers in [0, 5) return the first batch.
3762        for t in 0..=4 {
3763            assert_eq!(
3764                state.next_listen_batch(&Antichain::from_elem(t)),
3765                Ok(hollow(0, 5, &["key1"], 1))
3766            );
3767        }
3768
3769        // All frontiers in [5, 10) return the second batch.
3770        for t in 5..=9 {
3771            assert_eq!(
3772                state.next_listen_batch(&Antichain::from_elem(t)),
3773                Ok(hollow(5, 10, &["key2"], 1))
3774            );
3775        }
3776
3777        // There is no batch currently available for t = 10.
3778        assert_eq!(
3779            state.next_listen_batch(&Antichain::from_elem(10)),
3780            Err(SeqNo(0))
3781        );
3782
3783        // By definition, there is no frontier ever at the empty antichain which
3784        // is the time after all possible times.
3785        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3786    }
3787
3788    #[mz_ore::test]
3789    fn expire_writer() {
3790        let mut state = TypedState::<String, String, u64, i64>::new(
3791            DUMMY_BUILD_INFO.semver_version(),
3792            ShardId::new(),
3793            "".to_owned(),
3794            0,
3795        );
3796        let now = SYSTEM_TIME.clone();
3797
3798        let writer_id_one = WriterId::new();
3799
3800        let writer_id_two = WriterId::new();
3801
3802        // Writer is eligible to write
3803        assert!(
3804            state
3805                .collections
3806                .compare_and_append(
3807                    &hollow(0, 2, &["key1"], 1),
3808                    &writer_id_one,
3809                    now(),
3810                    LEASE_DURATION_MS,
3811                    &IdempotencyToken::new(),
3812                    &debug_state(),
3813                    0,
3814                    100,
3815                    None
3816                )
3817                .is_continue()
3818        );
3819
3820        assert!(
3821            state
3822                .collections
3823                .expire_writer(&writer_id_one)
3824                .is_continue()
3825        );
3826
3827        // Other writers should still be able to write
3828        assert!(
3829            state
3830                .collections
3831                .compare_and_append(
3832                    &hollow(2, 5, &["key2"], 1),
3833                    &writer_id_two,
3834                    now(),
3835                    LEASE_DURATION_MS,
3836                    &IdempotencyToken::new(),
3837                    &debug_state(),
3838                    0,
3839                    100,
3840                    None
3841                )
3842                .is_continue()
3843        );
3844    }
3845
3846    #[mz_ore::test]
3847    fn maybe_gc_active_gc() {
3848        const GC_CONFIG: GcConfig = GcConfig {
3849            use_active_gc: true,
3850            fallback_threshold_ms: 5000,
3851            min_versions: 99,
3852            max_versions: 500,
3853        };
3854        let now_fn = SYSTEM_TIME.clone();
3855
3856        let mut state = TypedState::<String, String, u64, i64>::new(
3857            DUMMY_BUILD_INFO.semver_version(),
3858            ShardId::new(),
3859            "".to_owned(),
3860            0,
3861        );
3862
3863        let now = now_fn();
3864        // Empty state doesn't need gc, regardless of is_write.
3865        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3866        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3867
3868        // Artificially advance the seqno so the seqno_since advances past our
3869        // internal gc_threshold.
3870        state.seqno = SeqNo(100);
3871        assert_eq!(state.seqno_since(), SeqNo(100));
3872
3873        // When a writer is present, non-writes don't gc.
3874        let writer_id = WriterId::new();
3875        let _ = state.collections.compare_and_append(
3876            &hollow(1, 2, &["key1"], 1),
3877            &writer_id,
3878            now,
3879            LEASE_DURATION_MS,
3880            &IdempotencyToken::new(),
3881            &debug_state(),
3882            0,
3883            100,
3884            None,
3885        );
3886        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3887
3888        // A write will gc though.
3889        assert_eq!(
3890            state.maybe_gc(true, now, GC_CONFIG),
3891            Some(GcReq {
3892                shard_id: state.shard_id,
3893                new_seqno_since: SeqNo(100)
3894            })
3895        );
3896
3897        // But if we write down an active gc, we won't gc.
3898        state.collections.active_gc = Some(ActiveGc {
3899            seqno: state.seqno,
3900            start_ms: now,
3901        });
3902
3903        state.seqno = SeqNo(200);
3904        assert_eq!(state.seqno_since(), SeqNo(200));
3905
3906        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3907
3908        state.seqno = SeqNo(300);
3909        assert_eq!(state.seqno_since(), SeqNo(300));
3910        // But if we advance the time past the threshold, we will gc.
3911        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3912        assert_eq!(
3913            state.maybe_gc(true, new_now, GC_CONFIG),
3914            Some(GcReq {
3915                shard_id: state.shard_id,
3916                new_seqno_since: SeqNo(300)
3917            })
3918        );
3919
3920        // Even if the sequence number doesn't pass the threshold, if the
3921        // active gc is expired, we will gc.
3922
3923        state.seqno = SeqNo(301);
3924        assert_eq!(state.seqno_since(), SeqNo(301));
3925        assert_eq!(
3926            state.maybe_gc(true, new_now, GC_CONFIG),
3927            Some(GcReq {
3928                shard_id: state.shard_id,
3929                new_seqno_since: SeqNo(301)
3930            })
3931        );
3932
3933        state.collections.active_gc = None;
3934
3935        // Artificially advance the seqno (again) so the seqno_since advances
3936        // past our internal gc_threshold (again).
3937        state.seqno = SeqNo(400);
3938        assert_eq!(state.seqno_since(), SeqNo(400));
3939
3940        let now = now_fn();
3941
3942        // If there are no writers, even a non-write will gc.
3943        let _ = state.collections.expire_writer(&writer_id);
3944        assert_eq!(
3945            state.maybe_gc(false, now, GC_CONFIG),
3946            Some(GcReq {
3947                shard_id: state.shard_id,
3948                new_seqno_since: SeqNo(400)
3949            })
3950        );
3951
3952        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3953        let previous_seqno = state.seqno;
3954        state.seqno = SeqNo(10_000);
3955        assert_eq!(state.seqno_since(), SeqNo(10_000));
3956
3957        let now = now_fn();
3958        assert_eq!(
3959            state.maybe_gc(true, now, GC_CONFIG),
3960            Some(GcReq {
3961                shard_id: state.shard_id,
3962                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3963            })
3964        );
3965    }
3966
3967    #[mz_ore::test]
3968    fn maybe_gc_classic() {
3969        const GC_CONFIG: GcConfig = GcConfig {
3970            use_active_gc: false,
3971            fallback_threshold_ms: 5000,
3972            min_versions: 16,
3973            max_versions: 128,
3974        };
3975        const NOW_MS: u64 = 0;
3976
3977        let mut state = TypedState::<String, String, u64, i64>::new(
3978            DUMMY_BUILD_INFO.semver_version(),
3979            ShardId::new(),
3980            "".to_owned(),
3981            0,
3982        );
3983
3984        // Empty state doesn't need gc, regardless of is_write.
3985        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3986        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3987
3988        // Artificially advance the seqno so the seqno_since advances past our
3989        // internal gc_threshold.
3990        state.seqno = SeqNo(100);
3991        assert_eq!(state.seqno_since(), SeqNo(100));
3992
3993        // When a writer is present, non-writes don't gc.
3994        let writer_id = WriterId::new();
3995        let now = SYSTEM_TIME.clone();
3996        let _ = state.collections.compare_and_append(
3997            &hollow(1, 2, &["key1"], 1),
3998            &writer_id,
3999            now(),
4000            LEASE_DURATION_MS,
4001            &IdempotencyToken::new(),
4002            &debug_state(),
4003            0,
4004            100,
4005            None,
4006        );
4007        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4008
4009        // A write will gc though.
4010        assert_eq!(
4011            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4012            Some(GcReq {
4013                shard_id: state.shard_id,
4014                new_seqno_since: SeqNo(100)
4015            })
4016        );
4017
4018        // Artificially advance the seqno (again) so the seqno_since advances
4019        // past our internal gc_threshold (again).
4020        state.seqno = SeqNo(200);
4021        assert_eq!(state.seqno_since(), SeqNo(200));
4022
4023        // If there are no writers, even a non-write will gc.
4024        let _ = state.collections.expire_writer(&writer_id);
4025        assert_eq!(
4026            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4027            Some(GcReq {
4028                shard_id: state.shard_id,
4029                new_seqno_since: SeqNo(200)
4030            })
4031        );
4032    }
4033
4034    #[mz_ore::test]
4035    fn need_rollup_active_rollup() {
4036        const ROLLUP_THRESHOLD: usize = 3;
4037        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4038        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4039        let now = SYSTEM_TIME.clone();
4040
4041        mz_ore::test::init_logging();
4042        let mut state = TypedState::<String, String, u64, i64>::new(
4043            DUMMY_BUILD_INFO.semver_version(),
4044            ShardId::new(),
4045            "".to_owned(),
4046            0,
4047        );
4048
4049        let rollup_seqno = SeqNo(5);
4050        let rollup = HollowRollup {
4051            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4052            encoded_size_bytes: None,
4053        };
4054
4055        assert!(
4056            state
4057                .collections
4058                .add_rollup((rollup_seqno, &rollup))
4059                .is_continue()
4060        );
4061
4062        // shouldn't need a rollup at the seqno of the rollup
4063        state.seqno = SeqNo(5);
4064        assert_none!(state.need_rollup(
4065            ROLLUP_THRESHOLD,
4066            ROLLUP_USE_ACTIVE_ROLLUP,
4067            ROLLUP_FALLBACK_THRESHOLD_MS,
4068            now()
4069        ));
4070
4071        // shouldn't need a rollup at seqnos less than our threshold
4072        state.seqno = SeqNo(6);
4073        assert_none!(state.need_rollup(
4074            ROLLUP_THRESHOLD,
4075            ROLLUP_USE_ACTIVE_ROLLUP,
4076            ROLLUP_FALLBACK_THRESHOLD_MS,
4077            now()
4078        ));
4079        state.seqno = SeqNo(7);
4080        assert_none!(state.need_rollup(
4081            ROLLUP_THRESHOLD,
4082            ROLLUP_USE_ACTIVE_ROLLUP,
4083            ROLLUP_FALLBACK_THRESHOLD_MS,
4084            now()
4085        ));
4086        state.seqno = SeqNo(8);
4087        assert_none!(state.need_rollup(
4088            ROLLUP_THRESHOLD,
4089            ROLLUP_USE_ACTIVE_ROLLUP,
4090            ROLLUP_FALLBACK_THRESHOLD_MS,
4091            now()
4092        ));
4093
4094        let mut current_time = now();
4095        // hit our threshold! we should need a rollup
4096        state.seqno = SeqNo(9);
4097        assert_eq!(
4098            state
4099                .need_rollup(
4100                    ROLLUP_THRESHOLD,
4101                    ROLLUP_USE_ACTIVE_ROLLUP,
4102                    ROLLUP_FALLBACK_THRESHOLD_MS,
4103                    current_time
4104                )
4105                .expect("rollup"),
4106            SeqNo(9)
4107        );
4108
4109        state.collections.active_rollup = Some(ActiveRollup {
4110            seqno: SeqNo(9),
4111            start_ms: current_time,
4112        });
4113
4114        // There is now an active rollup, so we shouldn't need a rollup.
4115        assert_none!(state.need_rollup(
4116            ROLLUP_THRESHOLD,
4117            ROLLUP_USE_ACTIVE_ROLLUP,
4118            ROLLUP_FALLBACK_THRESHOLD_MS,
4119            current_time
4120        ));
4121
4122        state.seqno = SeqNo(10);
4123        // We still don't need a rollup, even though the seqno is greater than
4124        // the rollup threshold.
4125        assert_none!(state.need_rollup(
4126            ROLLUP_THRESHOLD,
4127            ROLLUP_USE_ACTIVE_ROLLUP,
4128            ROLLUP_FALLBACK_THRESHOLD_MS,
4129            current_time
4130        ));
4131
4132        // But if we wait long enough, we should need a rollup again.
4133        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4134        assert_eq!(
4135            state
4136                .need_rollup(
4137                    ROLLUP_THRESHOLD,
4138                    ROLLUP_USE_ACTIVE_ROLLUP,
4139                    ROLLUP_FALLBACK_THRESHOLD_MS,
4140                    current_time
4141                )
4142                .expect("rollup"),
4143            SeqNo(10)
4144        );
4145
4146        state.seqno = SeqNo(9);
4147        // Clear the active rollup and ensure we need a rollup again.
4148        state.collections.active_rollup = None;
4149        let rollup_seqno = SeqNo(9);
4150        let rollup = HollowRollup {
4151            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4152            encoded_size_bytes: None,
4153        };
4154        assert!(
4155            state
4156                .collections
4157                .add_rollup((rollup_seqno, &rollup))
4158                .is_continue()
4159        );
4160
4161        state.seqno = SeqNo(11);
4162        // We shouldn't need a rollup at seqnos less than our threshold
4163        assert_none!(state.need_rollup(
4164            ROLLUP_THRESHOLD,
4165            ROLLUP_USE_ACTIVE_ROLLUP,
4166            ROLLUP_FALLBACK_THRESHOLD_MS,
4167            current_time
4168        ));
4169        // hit our threshold! we should need a rollup
4170        state.seqno = SeqNo(13);
4171        assert_eq!(
4172            state
4173                .need_rollup(
4174                    ROLLUP_THRESHOLD,
4175                    ROLLUP_USE_ACTIVE_ROLLUP,
4176                    ROLLUP_FALLBACK_THRESHOLD_MS,
4177                    current_time
4178                )
4179                .expect("rollup"),
4180            SeqNo(13)
4181        );
4182    }
4183
4184    #[mz_ore::test]
4185    fn need_rollup_classic() {
4186        const ROLLUP_THRESHOLD: usize = 3;
4187        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4188        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4189        const NOW: u64 = 0;
4190
4191        mz_ore::test::init_logging();
4192        let mut state = TypedState::<String, String, u64, i64>::new(
4193            DUMMY_BUILD_INFO.semver_version(),
4194            ShardId::new(),
4195            "".to_owned(),
4196            0,
4197        );
4198
4199        let rollup_seqno = SeqNo(5);
4200        let rollup = HollowRollup {
4201            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4202            encoded_size_bytes: None,
4203        };
4204
4205        assert!(
4206            state
4207                .collections
4208                .add_rollup((rollup_seqno, &rollup))
4209                .is_continue()
4210        );
4211
4212        // shouldn't need a rollup at the seqno of the rollup
4213        state.seqno = SeqNo(5);
4214        assert_none!(state.need_rollup(
4215            ROLLUP_THRESHOLD,
4216            ROLLUP_USE_ACTIVE_ROLLUP,
4217            ROLLUP_FALLBACK_THRESHOLD_MS,
4218            NOW
4219        ));
4220
4221        // shouldn't need a rollup at seqnos less than our threshold
4222        state.seqno = SeqNo(6);
4223        assert_none!(state.need_rollup(
4224            ROLLUP_THRESHOLD,
4225            ROLLUP_USE_ACTIVE_ROLLUP,
4226            ROLLUP_FALLBACK_THRESHOLD_MS,
4227            NOW
4228        ));
4229        state.seqno = SeqNo(7);
4230        assert_none!(state.need_rollup(
4231            ROLLUP_THRESHOLD,
4232            ROLLUP_USE_ACTIVE_ROLLUP,
4233            ROLLUP_FALLBACK_THRESHOLD_MS,
4234            NOW
4235        ));
4236
4237        // hit our threshold! we should need a rollup
4238        state.seqno = SeqNo(8);
4239        assert_eq!(
4240            state
4241                .need_rollup(
4242                    ROLLUP_THRESHOLD,
4243                    ROLLUP_USE_ACTIVE_ROLLUP,
4244                    ROLLUP_FALLBACK_THRESHOLD_MS,
4245                    NOW
4246                )
4247                .expect("rollup"),
4248            SeqNo(8)
4249        );
4250
4251        // but we don't need rollups for every seqno > the threshold
4252        state.seqno = SeqNo(9);
4253        assert_none!(state.need_rollup(
4254            ROLLUP_THRESHOLD,
4255            ROLLUP_USE_ACTIVE_ROLLUP,
4256            ROLLUP_FALLBACK_THRESHOLD_MS,
4257            NOW
4258        ));
4259
4260        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4261        state.seqno = SeqNo(11);
4262        assert_eq!(
4263            state
4264                .need_rollup(
4265                    ROLLUP_THRESHOLD,
4266                    ROLLUP_USE_ACTIVE_ROLLUP,
4267                    ROLLUP_FALLBACK_THRESHOLD_MS,
4268                    NOW
4269                )
4270                .expect("rollup"),
4271            SeqNo(11)
4272        );
4273
4274        // add another rollup and ensure we're always picking the latest
4275        let rollup_seqno = SeqNo(6);
4276        let rollup = HollowRollup {
4277            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4278            encoded_size_bytes: None,
4279        };
4280        assert!(
4281            state
4282                .collections
4283                .add_rollup((rollup_seqno, &rollup))
4284                .is_continue()
4285        );
4286
4287        state.seqno = SeqNo(8);
4288        assert_none!(state.need_rollup(
4289            ROLLUP_THRESHOLD,
4290            ROLLUP_USE_ACTIVE_ROLLUP,
4291            ROLLUP_FALLBACK_THRESHOLD_MS,
4292            NOW
4293        ));
4294        state.seqno = SeqNo(9);
4295        assert_eq!(
4296            state
4297                .need_rollup(
4298                    ROLLUP_THRESHOLD,
4299                    ROLLUP_USE_ACTIVE_ROLLUP,
4300                    ROLLUP_FALLBACK_THRESHOLD_MS,
4301                    NOW
4302                )
4303                .expect("rollup"),
4304            SeqNo(9)
4305        );
4306
4307        // and ensure that after a fallback point, we assign every seqno work
4308        let fallback_seqno = SeqNo(
4309            rollup_seqno.0
4310                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4311        );
4312        state.seqno = fallback_seqno;
4313        assert_eq!(
4314            state
4315                .need_rollup(
4316                    ROLLUP_THRESHOLD,
4317                    ROLLUP_USE_ACTIVE_ROLLUP,
4318                    ROLLUP_FALLBACK_THRESHOLD_MS,
4319                    NOW
4320                )
4321                .expect("rollup"),
4322            fallback_seqno
4323        );
4324        state.seqno = fallback_seqno.next();
4325        assert_eq!(
4326            state
4327                .need_rollup(
4328                    ROLLUP_THRESHOLD,
4329                    ROLLUP_USE_ACTIVE_ROLLUP,
4330                    ROLLUP_FALLBACK_THRESHOLD_MS,
4331                    NOW
4332                )
4333                .expect("rollup"),
4334            fallback_seqno.next()
4335        );
4336    }
4337
4338    #[mz_ore::test]
4339    fn idempotency_token_sentinel() {
4340        assert_eq!(
4341            IdempotencyToken::SENTINEL.to_string(),
4342            "i11111111-1111-1111-1111-111111111111"
4343        );
4344    }
4345
4346    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4347    /// randomness, so that it's deterministic. This lets us assert the
4348    /// serialization of that State against a golden file that's committed,
4349    /// making it easy to see what the serialization (used in an upcoming
4350    /// INSPECT feature) looks like.
4351    ///
4352    /// This golden will have to be updated each time we change State, but
4353    /// that's a feature, not a bug.
4354    #[mz_ore::test]
4355    #[cfg_attr(miri, ignore)] // too slow
4356    fn state_inspect_serde_json() {
4357        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4358        let mut runner = proptest::test_runner::TestRunner::deterministic();
4359        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4360        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4361        assert_eq!(
4362            json.trim(),
4363            STATE_SERDE_JSON.trim(),
4364            "\n\nNEW GOLDEN\n{}\n",
4365            json
4366        );
4367    }
4368
4369    #[mz_persist_proc::test(tokio::test)]
4370    #[cfg_attr(miri, ignore)] // too slow
4371    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4372        let mut clients = new_test_client_cache(&dyncfgs);
4373        let shard_id = ShardId::new();
4374
4375        async fn open_and_write(
4376            clients: &mut PersistClientCache,
4377            version: semver::Version,
4378            shard_id: ShardId,
4379        ) -> Result<(), tokio::task::JoinError> {
4380            clients.cfg.build_version = version.clone();
4381            clients.clear_state_cache();
4382            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4383            // Run in a task so we can catch the panic.
4384            mz_ore::task::spawn(|| version.to_string(), async move {
4385                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4386                let current = *write.upper().as_option().unwrap();
4387                // Do a write so that we tag the state with the version.
4388                write
4389                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4390                    .await;
4391            })
4392            .await
4393        }
4394
4395        // Start at v0.10.0.
4396        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4397        assert_ok!(res);
4398
4399        // Upgrade to v0.11.0 is allowed.
4400        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4401        assert_ok!(res);
4402
4403        // Downgrade to v0.10.0 is allowed.
4404        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4405        assert_ok!(res);
4406
4407        // Downgrade to v0.9.0 is _NOT_ allowed.
4408        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4409        assert!(res.unwrap_err().is_panic());
4410    }
4411
4412    #[mz_ore::test]
4413    fn runid_roundtrip() {
4414        proptest!(|(runid: RunId)| {
4415            let runid_str = runid.to_string();
4416            let parsed = RunId::from_str(&runid_str);
4417            prop_assert_eq!(parsed, Ok(runid));
4418        });
4419    }
4420}