mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Semigroup;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
60use crate::internal::gc::GcReq;
61use crate::internal::machine::retry_external;
62use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
63use crate::internal::trace::{
64    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
65};
66use crate::metrics::Metrics;
67use crate::read::LeasedReaderId;
68use crate::schema::CaESchema;
69use crate::write::WriterId;
70use crate::{PersistConfig, ShardId};
71
72include!(concat!(
73    env!("OUT_DIR"),
74    "/mz_persist_client.internal.state.rs"
75));
76
77include!(concat!(
78    env!("OUT_DIR"),
79    "/mz_persist_client.internal.diff.rs"
80));
81
82/// Determines how often to write rollups, assigning a maintenance task after
83/// `rollup_threshold` seqnos have passed since the last rollup.
84///
85/// Tuning note: in the absence of a long reader seqno hold, and with
86/// incremental GC, this threshold will determine about how many live diffs are
87/// held in Consensus. Lowering this value decreases the live diff count at the
88/// cost of more maintenance work + blob writes.
89pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
90    "persist_rollup_threshold",
91    128,
92    "The number of seqnos between rollups.",
93);
94
95/// Determines how long to wait before an active rollup is considered
96/// "stuck" and a new rollup is started.
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
98    "persist_rollup_fallback_threshold_ms",
99    5000,
100    "The number of milliseconds before a worker claims an already claimed rollup.",
101);
102
103/// Feature flag the new active rollup tracking mechanism.
104/// We musn't enable this until we are fully deployed on the new version.
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
106    "persist_rollup_use_active_rollup",
107    false,
108    "Whether to use the new active rollup tracking mechanism.",
109);
110
111/// Determines how long to wait before an active GC is considered
112/// "stuck" and a new GC is started.
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
114    "persist_gc_fallback_threshold_ms",
115    900000,
116    "The number of milliseconds before a worker claims an already claimed GC.",
117);
118
119/// See the config description string.
120pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
121    "persist_gc_min_versions",
122    32,
123    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
124);
125
126/// See the config description string.
127pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
128    "persist_gc_max_versions",
129    128_000,
130    "The maximum number of versions to GC in a single GC run.",
131);
132
133/// Feature flag the new active GC tracking mechanism.
134/// We musn't enable this until we are fully deployed on the new version.
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
136    "persist_gc_use_active_gc",
137    false,
138    "Whether to use the new active GC tracking mechanism.",
139);
140
141pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
142    "persist_enable_incremental_compaction",
143    false,
144    "Whether to enable incremental compaction.",
145);
146
147/// A token to disambiguate state commands that could not otherwise be
148/// idempotent.
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
150#[serde(into = "String")]
151pub struct IdempotencyToken(pub(crate) [u8; 16]);
152
153impl std::fmt::Display for IdempotencyToken {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        write!(f, "i{}", Uuid::from_bytes(self.0))
156    }
157}
158
159impl std::fmt::Debug for IdempotencyToken {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
162    }
163}
164
165impl std::str::FromStr for IdempotencyToken {
166    type Err = String;
167
168    fn from_str(s: &str) -> Result<Self, Self::Err> {
169        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
170    }
171}
172
173impl From<IdempotencyToken> for String {
174    fn from(x: IdempotencyToken) -> Self {
175        x.to_string()
176    }
177}
178
179impl IdempotencyToken {
180    pub(crate) fn new() -> Self {
181        IdempotencyToken(*Uuid::new_v4().as_bytes())
182    }
183    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
184}
185
186#[derive(Clone, Debug, PartialEq, Serialize)]
187pub struct LeasedReaderState<T> {
188    /// The seqno capability of this reader.
189    pub seqno: SeqNo,
190    /// The since capability of this reader.
191    pub since: Antichain<T>,
192    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
193    pub last_heartbeat_timestamp_ms: u64,
194    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
195    /// after which this reader may be expired
196    pub lease_duration_ms: u64,
197    /// For debugging.
198    pub debug: HandleDebugState,
199}
200
201#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
202#[serde(into = "u64")]
203pub struct OpaqueState(pub [u8; 8]);
204
205impl From<OpaqueState> for u64 {
206    fn from(value: OpaqueState) -> Self {
207        u64::from_le_bytes(value.0)
208    }
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize)]
212pub struct CriticalReaderState<T> {
213    /// The since capability of this reader.
214    pub since: Antichain<T>,
215    /// An opaque token matched on by compare_and_downgrade_since.
216    pub opaque: OpaqueState,
217    /// The [Codec64] used to encode [Self::opaque].
218    pub opaque_codec: String,
219    /// For debugging.
220    pub debug: HandleDebugState,
221}
222
223#[derive(Clone, Debug, PartialEq, Serialize)]
224pub struct WriterState<T> {
225    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
226    pub last_heartbeat_timestamp_ms: u64,
227    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
228    /// after which this writer may be expired
229    pub lease_duration_ms: u64,
230    /// The idempotency token of the most recent successful compare_and_append
231    /// by this writer.
232    pub most_recent_write_token: IdempotencyToken,
233    /// The upper of the most recent successful compare_and_append by this
234    /// writer.
235    pub most_recent_write_upper: Antichain<T>,
236    /// For debugging.
237    pub debug: HandleDebugState,
238}
239
240/// Debugging info for a reader or writer.
241#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
242pub struct HandleDebugState {
243    /// Hostname of the persist user that registered this writer or reader. For
244    /// critical readers, this is the _most recent_ registration.
245    pub hostname: String,
246    /// Plaintext description of this writer or reader's intent.
247    pub purpose: String,
248}
249
250/// Part of the updates in a Batch.
251///
252/// Either a pointer to ones stored in Blob or the updates themselves inlined.
253#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
254#[serde(tag = "type")]
255pub enum BatchPart<T> {
256    Hollow(HollowBatchPart<T>),
257    Inline {
258        updates: LazyInlineBatchPart,
259        ts_rewrite: Option<Antichain<T>>,
260        schema_id: Option<SchemaId>,
261
262        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
263        deprecated_schema_id: Option<SchemaId>,
264    },
265}
266
267fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
268    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
269        let proto = lower.decode()?;
270        let data = ArrayData::from_proto(proto)?;
271        ensure!(data.len() == 1);
272        Ok(ArrayBound::new(make_array(data), 0))
273    };
274
275    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
276
277    match decoded {
278        Ok(bound) => Some(bound),
279        Err(e) => {
280            soft_panic_or_log!("failed to decode bound: {e:#?}");
281            None
282        }
283    }
284}
285
286impl<T> BatchPart<T> {
287    pub fn hollow_bytes(&self) -> usize {
288        match self {
289            BatchPart::Hollow(x) => x.encoded_size_bytes,
290            BatchPart::Inline { .. } => 0,
291        }
292    }
293
294    pub fn is_inline(&self) -> bool {
295        matches!(self, BatchPart::Inline { .. })
296    }
297
298    pub fn inline_bytes(&self) -> usize {
299        match self {
300            BatchPart::Hollow(_) => 0,
301            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
302        }
303    }
304
305    pub fn writer_key(&self) -> Option<WriterKey> {
306        match self {
307            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
308            BatchPart::Inline { .. } => None,
309        }
310    }
311
312    pub fn encoded_size_bytes(&self) -> usize {
313        match self {
314            BatchPart::Hollow(x) => x.encoded_size_bytes,
315            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
316        }
317    }
318
319    // A user-interpretable identifier or description of the part (for logs and
320    // such).
321    pub fn printable_name(&self) -> &str {
322        match self {
323            BatchPart::Hollow(x) => x.key.0.as_str(),
324            BatchPart::Inline { .. } => "<inline>",
325        }
326    }
327
328    pub fn stats(&self) -> Option<&LazyPartStats> {
329        match self {
330            BatchPart::Hollow(x) => x.stats.as_ref(),
331            BatchPart::Inline { .. } => None,
332        }
333    }
334
335    pub fn key_lower(&self) -> &[u8] {
336        match self {
337            BatchPart::Hollow(x) => x.key_lower.as_slice(),
338            // We don't duplicate the lowest key because this can be
339            // considerable overhead for small parts.
340            //
341            // The empty key might not be a tight lower bound, but it is a valid
342            // lower bound. If a caller is interested in a tighter lower bound,
343            // the data is inline.
344            BatchPart::Inline { .. } => &[],
345        }
346    }
347
348    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
349        let part = match self {
350            BatchPart::Hollow(part) => part,
351            BatchPart::Inline { .. } => return None,
352        };
353
354        decode_structured_lower(part.structured_key_lower.as_ref()?)
355    }
356
357    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
358        match self {
359            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
360            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
361        }
362    }
363
364    pub fn schema_id(&self) -> Option<SchemaId> {
365        match self {
366            BatchPart::Hollow(x) => x.schema_id,
367            BatchPart::Inline { schema_id, .. } => *schema_id,
368        }
369    }
370
371    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
372        match self {
373            BatchPart::Hollow(x) => x.deprecated_schema_id,
374            BatchPart::Inline {
375                deprecated_schema_id,
376                ..
377            } => *deprecated_schema_id,
378        }
379    }
380}
381
382impl<T: Timestamp + Codec64> BatchPart<T> {
383    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
384        match self {
385            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
386            BatchPart::Inline { updates, .. } => {
387                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
388                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
389            }
390        }
391    }
392
393    pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
394        match self {
395            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
396            BatchPart::Inline { updates, .. } => updates
397                .decode::<T>(metrics)
398                .expect("valid inline part")
399                .updates
400                .diffs_sum(),
401        }
402    }
403}
404
405/// An ordered list of parts, generally stored as part of a larger run.
406#[derive(Debug, Clone)]
407pub struct HollowRun<T> {
408    /// Pointers usable to retrieve the updates.
409    pub(crate) parts: Vec<RunPart<T>>,
410}
411
412/// A reference to a [HollowRun], including the key in the blob store and some denormalized
413/// metadata.
414#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
415pub struct HollowRunRef<T> {
416    pub key: PartialBatchKey,
417
418    /// The size of the referenced run object, plus all of the hollow objects it contains.
419    pub hollow_bytes: usize,
420
421    /// The size of the largest individual part in the run; useful for sizing compaction.
422    pub max_part_bytes: usize,
423
424    /// The lower bound of the data in this part, ordered by the codec ordering.
425    pub key_lower: Vec<u8>,
426
427    /// The lower bound of the data in this part, ordered by the structured ordering.
428    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
429
430    pub diffs_sum: Option<[u8; 8]>,
431
432    pub(crate) _phantom_data: PhantomData<T>,
433}
434impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
435    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
436        Some(self.cmp(other))
437    }
438}
439
440impl<T: Eq> Ord for HollowRunRef<T> {
441    fn cmp(&self, other: &Self) -> Ordering {
442        self.key.cmp(&other.key)
443    }
444}
445
446impl<T> HollowRunRef<T> {
447    pub fn writer_key(&self) -> Option<WriterKey> {
448        Some(self.key.split()?.0)
449    }
450}
451
452impl<T: Timestamp + Codec64> HollowRunRef<T> {
453    /// Stores the given runs and returns a [HollowRunRef] that points to them.
454    pub async fn set<D: Codec64 + Semigroup>(
455        shard_id: ShardId,
456        blob: &dyn Blob,
457        writer: &WriterKey,
458        data: HollowRun<T>,
459        metrics: &Metrics,
460    ) -> Self {
461        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
462        let max_part_bytes = data
463            .parts
464            .iter()
465            .map(|p| p.max_part_bytes())
466            .max()
467            .unwrap_or(0);
468        let key_lower = data
469            .parts
470            .first()
471            .map_or(vec![], |p| p.key_lower().to_vec());
472        let structured_key_lower = match data.parts.first() {
473            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
474            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
475            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
476        };
477        let diffs_sum = data
478            .parts
479            .iter()
480            .map(|p| {
481                p.diffs_sum::<D>(&metrics.columnar)
482                    .expect("valid diffs sum")
483            })
484            .reduce(|mut a, b| {
485                a.plus_equals(&b);
486                a
487            })
488            .expect("valid diffs sum")
489            .encode();
490
491        let key = PartialBatchKey::new(writer, &PartId::new());
492        let blob_key = key.complete(&shard_id);
493        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
494        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
495            blob.set(&blob_key, bytes.clone())
496        })
497        .await;
498        Self {
499            key,
500            hollow_bytes,
501            max_part_bytes,
502            key_lower,
503            structured_key_lower,
504            diffs_sum: Some(diffs_sum),
505            _phantom_data: Default::default(),
506        }
507    }
508
509    /// Retrieve the [HollowRun] that this reference points to.
510    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
511    /// with the same shard id and backing store.
512    pub async fn get(
513        &self,
514        shard_id: ShardId,
515        blob: &dyn Blob,
516        metrics: &Metrics,
517    ) -> Option<HollowRun<T>> {
518        let blob_key = self.key.complete(&shard_id);
519        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
520            blob.get(&blob_key)
521        })
522        .await?;
523        let proto_runs: ProtoHollowRun =
524            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
525        let runs = proto_runs
526            .into_rust()
527            .expect("illegal state: invalid encoded runs proto");
528        Some(runs)
529    }
530}
531
532/// Part of the updates in a run.
533///
534/// Either a pointer to ones stored in Blob or a single part stored inline.
535#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
536#[serde(untagged)]
537pub enum RunPart<T> {
538    Single(BatchPart<T>),
539    Many(HollowRunRef<T>),
540}
541
542impl<T: Ord> PartialOrd<Self> for RunPart<T> {
543    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
544        Some(self.cmp(other))
545    }
546}
547
548impl<T: Ord> Ord for RunPart<T> {
549    fn cmp(&self, other: &Self) -> Ordering {
550        match (self, other) {
551            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
552            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
553            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
554            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
555        }
556    }
557}
558
559impl<T> RunPart<T> {
560    #[cfg(test)]
561    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
562        match self {
563            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
564            _ => panic!("expected hollow part!"),
565        }
566    }
567
568    pub fn hollow_bytes(&self) -> usize {
569        match self {
570            Self::Single(p) => p.hollow_bytes(),
571            Self::Many(r) => r.hollow_bytes,
572        }
573    }
574
575    pub fn is_inline(&self) -> bool {
576        match self {
577            Self::Single(p) => p.is_inline(),
578            Self::Many(_) => false,
579        }
580    }
581
582    pub fn inline_bytes(&self) -> usize {
583        match self {
584            Self::Single(p) => p.inline_bytes(),
585            Self::Many(_) => 0,
586        }
587    }
588
589    pub fn max_part_bytes(&self) -> usize {
590        match self {
591            Self::Single(p) => p.encoded_size_bytes(),
592            Self::Many(r) => r.max_part_bytes,
593        }
594    }
595
596    pub fn writer_key(&self) -> Option<WriterKey> {
597        match self {
598            Self::Single(p) => p.writer_key(),
599            Self::Many(r) => r.writer_key(),
600        }
601    }
602
603    pub fn encoded_size_bytes(&self) -> usize {
604        match self {
605            Self::Single(p) => p.encoded_size_bytes(),
606            Self::Many(r) => r.hollow_bytes,
607        }
608    }
609
610    pub fn schema_id(&self) -> Option<SchemaId> {
611        match self {
612            Self::Single(p) => p.schema_id(),
613            Self::Many(_) => None,
614        }
615    }
616
617    // A user-interpretable identifier or description of the part (for logs and
618    // such).
619    pub fn printable_name(&self) -> &str {
620        match self {
621            Self::Single(p) => p.printable_name(),
622            Self::Many(r) => r.key.0.as_str(),
623        }
624    }
625
626    pub fn stats(&self) -> Option<&LazyPartStats> {
627        match self {
628            Self::Single(p) => p.stats(),
629            // TODO: if we kept stats we could avoid fetching the metadata here.
630            Self::Many(_) => None,
631        }
632    }
633
634    pub fn key_lower(&self) -> &[u8] {
635        match self {
636            Self::Single(p) => p.key_lower(),
637            Self::Many(r) => r.key_lower.as_slice(),
638        }
639    }
640
641    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
642        match self {
643            Self::Single(p) => p.structured_key_lower(),
644            Self::Many(_) => None,
645        }
646    }
647
648    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
649        match self {
650            Self::Single(p) => p.ts_rewrite(),
651            Self::Many(_) => None,
652        }
653    }
654}
655
656impl<T> RunPart<T>
657where
658    T: Timestamp + Codec64,
659{
660    pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
661        match self {
662            Self::Single(p) => p.diffs_sum(metrics),
663            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
664        }
665    }
666}
667
668/// A blob was missing!
669#[derive(Clone, Debug)]
670pub struct MissingBlob(BlobKey);
671
672impl std::fmt::Display for MissingBlob {
673    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
674        write!(f, "unexpectedly missing key: {}", self.0)
675    }
676}
677
678impl std::error::Error for MissingBlob {}
679
680impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
681    pub fn part_stream<'a>(
682        &'a self,
683        shard_id: ShardId,
684        blob: &'a dyn Blob,
685        metrics: &'a Metrics,
686    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
687        try_stream! {
688            match self {
689                RunPart::Single(p) => {
690                    yield Cow::Borrowed(p);
691                }
692                RunPart::Many(r) => {
693                    let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
694                    for run_part in fetched.parts {
695                        for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
696                            yield Cow::Owned(batch_part?.into_owned());
697                        }
698                    }
699                }
700            }
701        }
702    }
703}
704
705impl<T: Ord> PartialOrd for BatchPart<T> {
706    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
707        Some(self.cmp(other))
708    }
709}
710
711impl<T: Ord> Ord for BatchPart<T> {
712    fn cmp(&self, other: &Self) -> Ordering {
713        match (self, other) {
714            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
715            (
716                BatchPart::Inline {
717                    updates: s_updates,
718                    ts_rewrite: s_ts_rewrite,
719                    schema_id: s_schema_id,
720                    deprecated_schema_id: s_deprecated_schema_id,
721                },
722                BatchPart::Inline {
723                    updates: o_updates,
724                    ts_rewrite: o_ts_rewrite,
725                    schema_id: o_schema_id,
726                    deprecated_schema_id: o_deprecated_schema_id,
727                },
728            ) => (
729                s_updates,
730                s_ts_rewrite.as_ref().map(|x| x.elements()),
731                s_schema_id,
732                s_deprecated_schema_id,
733            )
734                .cmp(&(
735                    o_updates,
736                    o_ts_rewrite.as_ref().map(|x| x.elements()),
737                    o_schema_id,
738                    o_deprecated_schema_id,
739                )),
740            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
741            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
742        }
743    }
744}
745
746/// What order are the parts in this run in?
747#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
748pub(crate) enum RunOrder {
749    /// They're in no particular order.
750    Unordered,
751    /// They're ordered based on the codec-encoded K/V bytes.
752    Codec,
753    /// They're ordered by the natural ordering of the structured data.
754    Structured,
755}
756
757#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
758pub struct RunId(pub(crate) [u8; 16]);
759
760impl std::fmt::Display for RunId {
761    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
762        write!(f, "ri{}", Uuid::from_bytes(self.0))
763    }
764}
765
766impl std::fmt::Debug for RunId {
767    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
768        write!(f, "RunId({})", Uuid::from_bytes(self.0))
769    }
770}
771
772impl std::str::FromStr for RunId {
773    type Err = String;
774
775    fn from_str(s: &str) -> Result<Self, Self::Err> {
776        parse_id("ri", "RunId", s).map(RunId)
777    }
778}
779
780impl From<RunId> for String {
781    fn from(x: RunId) -> Self {
782        x.to_string()
783    }
784}
785
786impl RunId {
787    pub(crate) fn new() -> Self {
788        RunId(*Uuid::new_v4().as_bytes())
789    }
790}
791
792impl Arbitrary for RunId {
793    type Parameters = ();
794    type Strategy = proptest::strategy::BoxedStrategy<Self>;
795    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
796        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
797            RunId(*Uuid::from_u128(n).as_bytes())
798        })
799        .boxed()
800    }
801}
802
803/// Metadata shared across a run.
804#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
805pub struct RunMeta {
806    /// If none, Persist should infer the order based on the proto metadata.
807    pub(crate) order: Option<RunOrder>,
808    /// All parts in a run should have the same schema.
809    pub(crate) schema: Option<SchemaId>,
810
811    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
812    pub(crate) deprecated_schema: Option<SchemaId>,
813
814    /// If set, a UUID that uniquely identifies this run.
815    pub(crate) id: Option<RunId>,
816
817    /// The number of updates in this run, or `None` if the number is unknown.
818    pub(crate) len: Option<usize>,
819}
820
821/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
822#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
823pub struct HollowBatchPart<T> {
824    /// Pointer usable to retrieve the updates.
825    pub key: PartialBatchKey,
826    /// The encoded size of this part.
827    pub encoded_size_bytes: usize,
828    /// A lower bound on the keys in the part. (By default, this the minimum
829    /// possible key: `vec![]`.)
830    #[serde(serialize_with = "serialize_part_bytes")]
831    pub key_lower: Vec<u8>,
832    /// A lower bound on the keys in the part, stored as structured data.
833    #[serde(serialize_with = "serialize_lazy_proto")]
834    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
835    /// Aggregate statistics about data contained in this part.
836    #[serde(serialize_with = "serialize_part_stats")]
837    pub stats: Option<LazyPartStats>,
838    /// A frontier to which timestamps in this part are advanced on read, if
839    /// set.
840    ///
841    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
842    /// but we maintain the distinction between the two for some internal sanity
843    /// checking of invariants as well as metrics. If this ever becomes an
844    /// issue, everything still works with this as just `Antichain<T>`.
845    pub ts_rewrite: Option<Antichain<T>>,
846    /// A Codec64 encoded sum of all diffs in this part, if known.
847    ///
848    /// This is `None` if this part was written before we started storing this
849    /// information, or if it was written when the dyncfg was off.
850    ///
851    /// It could also make sense to model this as part of the pushdown stats, if
852    /// we later decide that's of some benefit.
853    #[serde(serialize_with = "serialize_diffs_sum")]
854    pub diffs_sum: Option<[u8; 8]>,
855    /// Columnar format that this batch was written in.
856    ///
857    /// This is `None` if this part was written before we started writing structured
858    /// columnar data.
859    pub format: Option<BatchColumnarFormat>,
860    /// The schemas used to encode the data in this batch part.
861    ///
862    /// Or None for historical data written before the schema registry was
863    /// added.
864    pub schema_id: Option<SchemaId>,
865
866    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
867    pub deprecated_schema_id: Option<SchemaId>,
868}
869
870/// A [Batch] but with the updates themselves stored externally.
871///
872/// [Batch]: differential_dataflow::trace::BatchReader
873#[derive(Clone, PartialEq, Eq)]
874pub struct HollowBatch<T> {
875    /// Describes the times of the updates in the batch.
876    pub desc: Description<T>,
877    /// The number of updates in the batch.
878    pub len: usize,
879    /// Pointers usable to retrieve the updates.
880    pub(crate) parts: Vec<RunPart<T>>,
881    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
882    /// ex.
883    /// ```text
884    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
885    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
886    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
887    /// ```
888    pub(crate) run_splits: Vec<usize>,
889    /// Run-level metadata: the first entry has metadata for the first run, and so on.
890    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
891    pub(crate) run_meta: Vec<RunMeta>,
892}
893
894impl<T: Debug> Debug for HollowBatch<T> {
895    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
896        let HollowBatch {
897            desc,
898            parts,
899            len,
900            run_splits: runs,
901            run_meta,
902        } = self;
903        f.debug_struct("HollowBatch")
904            .field(
905                "desc",
906                &(
907                    desc.lower().elements(),
908                    desc.upper().elements(),
909                    desc.since().elements(),
910                ),
911            )
912            .field("parts", &parts)
913            .field("len", &len)
914            .field("runs", &runs)
915            .field("run_meta", &run_meta)
916            .finish()
917    }
918}
919
920impl<T: Serialize> serde::Serialize for HollowBatch<T> {
921    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
922        let HollowBatch {
923            desc,
924            len,
925            // Both parts and runs are covered by the self.runs call.
926            parts: _,
927            run_splits: _,
928            run_meta: _,
929        } = self;
930        let mut s = s.serialize_struct("HollowBatch", 5)?;
931        let () = s.serialize_field("lower", &desc.lower().elements())?;
932        let () = s.serialize_field("upper", &desc.upper().elements())?;
933        let () = s.serialize_field("since", &desc.since().elements())?;
934        let () = s.serialize_field("len", len)?;
935        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
936        s.end()
937    }
938}
939
940impl<T: Ord> PartialOrd for HollowBatch<T> {
941    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
942        Some(self.cmp(other))
943    }
944}
945
946impl<T: Ord> Ord for HollowBatch<T> {
947    fn cmp(&self, other: &Self) -> Ordering {
948        // Deconstruct self and other so we get a compile failure if new fields
949        // are added.
950        let HollowBatch {
951            desc: self_desc,
952            parts: self_parts,
953            len: self_len,
954            run_splits: self_runs,
955            run_meta: self_run_meta,
956        } = self;
957        let HollowBatch {
958            desc: other_desc,
959            parts: other_parts,
960            len: other_len,
961            run_splits: other_runs,
962            run_meta: other_run_meta,
963        } = other;
964        (
965            self_desc.lower().elements(),
966            self_desc.upper().elements(),
967            self_desc.since().elements(),
968            self_parts,
969            self_len,
970            self_runs,
971            self_run_meta,
972        )
973            .cmp(&(
974                other_desc.lower().elements(),
975                other_desc.upper().elements(),
976                other_desc.since().elements(),
977                other_parts,
978                other_len,
979                other_runs,
980                other_run_meta,
981            ))
982    }
983}
984
985impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
986    pub(crate) fn part_stream<'a>(
987        &'a self,
988        shard_id: ShardId,
989        blob: &'a dyn Blob,
990        metrics: &'a Metrics,
991    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
992        stream! {
993            for part in &self.parts {
994                for await part in part.part_stream(shard_id, blob, metrics) {
995                    yield part;
996                }
997            }
998        }
999    }
1000}
1001impl<T> HollowBatch<T> {
1002    /// Construct an in-memory hollow batch from the given metadata.
1003    ///
1004    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1005    /// is responsible for ensuring that the defined runs are valid.
1006    ///
1007    /// `len` should represent the number of valid updates in the referenced parts.
1008    pub(crate) fn new(
1009        desc: Description<T>,
1010        parts: Vec<RunPart<T>>,
1011        len: usize,
1012        run_meta: Vec<RunMeta>,
1013        run_splits: Vec<usize>,
1014    ) -> Self {
1015        debug_assert!(
1016            run_splits.is_strictly_sorted(),
1017            "run indices should be strictly increasing"
1018        );
1019        debug_assert!(
1020            run_splits.first().map_or(true, |i| *i > 0),
1021            "run indices should be positive"
1022        );
1023        debug_assert!(
1024            run_splits.last().map_or(true, |i| *i < parts.len()),
1025            "run indices should be valid indices into parts"
1026        );
1027        debug_assert!(
1028            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1029            "all metadata should correspond to a run"
1030        );
1031
1032        Self {
1033            desc,
1034            len,
1035            parts,
1036            run_splits,
1037            run_meta,
1038        }
1039    }
1040
1041    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1042    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1043        let run_meta = if parts.is_empty() {
1044            vec![]
1045        } else {
1046            vec![RunMeta::default()]
1047        };
1048        Self {
1049            desc,
1050            len,
1051            parts,
1052            run_splits: vec![],
1053            run_meta,
1054        }
1055    }
1056
1057    #[cfg(test)]
1058    pub(crate) fn new_run_for_test(
1059        desc: Description<T>,
1060        parts: Vec<RunPart<T>>,
1061        len: usize,
1062        run_id: RunId,
1063    ) -> Self {
1064        let run_meta = if parts.is_empty() {
1065            vec![]
1066        } else {
1067            let mut meta = RunMeta::default();
1068            meta.id = Some(run_id);
1069            vec![meta]
1070        };
1071        Self {
1072            desc,
1073            len,
1074            parts,
1075            run_splits: vec![],
1076            run_meta,
1077        }
1078    }
1079
1080    /// An empty hollow batch, representing no updates over the given desc.
1081    pub(crate) fn empty(desc: Description<T>) -> Self {
1082        Self {
1083            desc,
1084            len: 0,
1085            parts: vec![],
1086            run_splits: vec![],
1087            run_meta: vec![],
1088        }
1089    }
1090
1091    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1092        let run_ends = self
1093            .run_splits
1094            .iter()
1095            .copied()
1096            .chain(std::iter::once(self.parts.len()));
1097        let run_metas = self.run_meta.iter();
1098        let run_parts = run_ends
1099            .scan(0, |start, end| {
1100                let range = *start..end;
1101                *start = end;
1102                Some(range)
1103            })
1104            .filter(|range| !range.is_empty())
1105            .map(|range| &self.parts[range]);
1106        run_metas.zip_eq(run_parts)
1107    }
1108
1109    pub(crate) fn inline_bytes(&self) -> usize {
1110        self.parts.iter().map(|x| x.inline_bytes()).sum()
1111    }
1112
1113    pub(crate) fn is_empty(&self) -> bool {
1114        self.parts.is_empty()
1115    }
1116
1117    pub(crate) fn part_count(&self) -> usize {
1118        self.parts.len()
1119    }
1120
1121    /// The sum of the encoded sizes of all parts in the batch.
1122    pub fn encoded_size_bytes(&self) -> usize {
1123        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1124    }
1125}
1126
1127// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1128impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1129    pub(crate) fn rewrite_ts(
1130        &mut self,
1131        frontier: &Antichain<T>,
1132        new_upper: Antichain<T>,
1133    ) -> Result<(), String> {
1134        if !PartialOrder::less_than(frontier, &new_upper) {
1135            return Err(format!(
1136                "rewrite frontier {:?} !< rewrite upper {:?}",
1137                frontier.elements(),
1138                new_upper.elements(),
1139            ));
1140        }
1141        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1142            return Err(format!(
1143                "rewrite upper {:?} < batch upper {:?}",
1144                new_upper.elements(),
1145                self.desc.upper().elements(),
1146            ));
1147        }
1148
1149        // The following are things that it seems like we could support, but
1150        // initially we don't because we don't have a use case for them.
1151        if PartialOrder::less_than(frontier, self.desc.lower()) {
1152            return Err(format!(
1153                "rewrite frontier {:?} < batch lower {:?}",
1154                frontier.elements(),
1155                self.desc.lower().elements(),
1156            ));
1157        }
1158        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1159            return Err(format!(
1160                "batch since {:?} != minimum antichain {:?}",
1161                self.desc.since().elements(),
1162                &[T::minimum()],
1163            ));
1164        }
1165        for part in self.parts.iter() {
1166            let Some(ts_rewrite) = part.ts_rewrite() else {
1167                continue;
1168            };
1169            if PartialOrder::less_than(frontier, ts_rewrite) {
1170                return Err(format!(
1171                    "rewrite frontier {:?} < batch rewrite {:?}",
1172                    frontier.elements(),
1173                    ts_rewrite.elements(),
1174                ));
1175            }
1176        }
1177
1178        self.desc = Description::new(
1179            self.desc.lower().clone(),
1180            new_upper,
1181            self.desc.since().clone(),
1182        );
1183        for part in &mut self.parts {
1184            match part {
1185                RunPart::Single(BatchPart::Hollow(part)) => {
1186                    part.ts_rewrite = Some(frontier.clone())
1187                }
1188                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1189                    *ts_rewrite = Some(frontier.clone())
1190                }
1191                RunPart::Many(runs) => {
1192                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1193                    // ever generate runs of >1 part for those.
1194                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1195                }
1196            }
1197        }
1198        Ok(())
1199    }
1200}
1201
1202impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1203    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1204        Some(self.cmp(other))
1205    }
1206}
1207
1208impl<T: Ord> Ord for HollowBatchPart<T> {
1209    fn cmp(&self, other: &Self) -> Ordering {
1210        // Deconstruct self and other so we get a compile failure if new fields
1211        // are added.
1212        let HollowBatchPart {
1213            key: self_key,
1214            encoded_size_bytes: self_encoded_size_bytes,
1215            key_lower: self_key_lower,
1216            structured_key_lower: self_structured_key_lower,
1217            stats: self_stats,
1218            ts_rewrite: self_ts_rewrite,
1219            diffs_sum: self_diffs_sum,
1220            format: self_format,
1221            schema_id: self_schema_id,
1222            deprecated_schema_id: self_deprecated_schema_id,
1223        } = self;
1224        let HollowBatchPart {
1225            key: other_key,
1226            encoded_size_bytes: other_encoded_size_bytes,
1227            key_lower: other_key_lower,
1228            structured_key_lower: other_structured_key_lower,
1229            stats: other_stats,
1230            ts_rewrite: other_ts_rewrite,
1231            diffs_sum: other_diffs_sum,
1232            format: other_format,
1233            schema_id: other_schema_id,
1234            deprecated_schema_id: other_deprecated_schema_id,
1235        } = other;
1236        (
1237            self_key,
1238            self_encoded_size_bytes,
1239            self_key_lower,
1240            self_structured_key_lower,
1241            self_stats,
1242            self_ts_rewrite.as_ref().map(|x| x.elements()),
1243            self_diffs_sum,
1244            self_format,
1245            self_schema_id,
1246            self_deprecated_schema_id,
1247        )
1248            .cmp(&(
1249                other_key,
1250                other_encoded_size_bytes,
1251                other_key_lower,
1252                other_structured_key_lower,
1253                other_stats,
1254                other_ts_rewrite.as_ref().map(|x| x.elements()),
1255                other_diffs_sum,
1256                other_format,
1257                other_schema_id,
1258                other_deprecated_schema_id,
1259            ))
1260    }
1261}
1262
1263/// A pointer to a rollup stored externally.
1264#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1265pub struct HollowRollup {
1266    /// Pointer usable to retrieve the rollup.
1267    pub key: PartialRollupKey,
1268    /// The encoded size of this rollup, if known.
1269    pub encoded_size_bytes: Option<usize>,
1270}
1271
1272/// A pointer to a blob stored externally.
1273#[derive(Debug)]
1274pub enum HollowBlobRef<'a, T> {
1275    Batch(&'a HollowBatch<T>),
1276    Rollup(&'a HollowRollup),
1277}
1278
1279/// A rollup that is currently being computed.
1280#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1281pub struct ActiveRollup {
1282    pub seqno: SeqNo,
1283    pub start_ms: u64,
1284}
1285
1286/// A garbage collection request that is currently being computed.
1287#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1288pub struct ActiveGc {
1289    pub seqno: SeqNo,
1290    pub start_ms: u64,
1291}
1292
1293/// A sentinel for a state transition that was a no-op.
1294///
1295/// Critically, this also indicates that the no-op state transition was not
1296/// committed through compare_and_append and thus is _not linearized_.
1297#[derive(Debug)]
1298#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1299pub struct NoOpStateTransition<T>(pub T);
1300
1301// TODO: Document invariants.
1302#[derive(Debug, Clone)]
1303#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1304pub struct StateCollections<T> {
1305    // - Invariant: `<= all reader.since`
1306    // - Invariant: Doesn't regress across state versions.
1307    pub(crate) last_gc_req: SeqNo,
1308
1309    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1310    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1311
1312    /// The rollup that is currently being computed.
1313    pub(crate) active_rollup: Option<ActiveRollup>,
1314    /// The gc request that is currently being computed.
1315    pub(crate) active_gc: Option<ActiveGc>,
1316
1317    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1318    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1319    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1320    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1321
1322    // - Invariant: `trace.since == meet(all reader.since)`
1323    // - Invariant: `trace.since` doesn't regress across state versions.
1324    // - Invariant: `trace.upper` doesn't regress across state versions.
1325    // - Invariant: `trace` upholds its own invariants.
1326    pub(crate) trace: Trace<T>,
1327}
1328
1329/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1330///
1331/// This strategy of directly serializing the schema objects requires that
1332/// persist users do the right thing. Specifically, that an encoded schema
1333/// doesn't in some later version of mz decode to an in-mem object that acts
1334/// differently. In a sense, the current system (before the introduction of the
1335/// schema registry) where schemas are passed in unchecked to reader and writer
1336/// registration calls also has the same defect, so seems fine.
1337///
1338/// An alternative is to write down here some persist-specific representation of
1339/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1340/// the potential to lead down a similar failure mode to the mz_persist_types
1341/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1342/// can always migrate later by rehydrating these, seems fine to start with the
1343/// easy thing.
1344#[derive(Debug, Clone, Serialize, PartialEq)]
1345pub struct EncodedSchemas {
1346    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1347    pub key: Bytes,
1348    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1349    /// registered, encoded as a `ProtoDataType`.
1350    pub key_data_type: Bytes,
1351    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1352    pub val: Bytes,
1353    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1354    /// registered, encoded as a `ProtoDataType`.
1355    pub val_data_type: Bytes,
1356}
1357
1358impl EncodedSchemas {
1359    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1360        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1361        DataType::from_proto(proto).expect("valid DataType")
1362    }
1363}
1364
1365#[derive(Debug)]
1366#[cfg_attr(test, derive(PartialEq))]
1367pub enum CompareAndAppendBreak<T> {
1368    AlreadyCommitted,
1369    Upper {
1370        shard_upper: Antichain<T>,
1371        writer_upper: Antichain<T>,
1372    },
1373    InvalidUsage(InvalidUsage<T>),
1374    InlineBackpressure,
1375}
1376
1377#[derive(Debug)]
1378#[cfg_attr(test, derive(PartialEq))]
1379pub enum SnapshotErr<T> {
1380    AsOfNotYetAvailable(SeqNo, Upper<T>),
1381    AsOfHistoricalDistinctionsLost(Since<T>),
1382}
1383
1384impl<T> StateCollections<T>
1385where
1386    T: Timestamp + Lattice + Codec64,
1387{
1388    pub fn add_rollup(
1389        &mut self,
1390        add_rollup: (SeqNo, &HollowRollup),
1391    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1392        let (rollup_seqno, rollup) = add_rollup;
1393        let applied = match self.rollups.get(&rollup_seqno) {
1394            Some(x) => x.key == rollup.key,
1395            None => {
1396                self.active_rollup = None;
1397                self.rollups.insert(rollup_seqno, rollup.to_owned());
1398                true
1399            }
1400        };
1401        // This state transition is a no-op if applied is false but we
1402        // still commit the state change so that this gets linearized
1403        // (maybe we're looking at old state).
1404        Continue(applied)
1405    }
1406
1407    pub fn remove_rollups(
1408        &mut self,
1409        remove_rollups: &[(SeqNo, PartialRollupKey)],
1410    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1411        if remove_rollups.is_empty() || self.is_tombstone() {
1412            return Break(NoOpStateTransition(vec![]));
1413        }
1414
1415        //This state transition is called at the end of the GC process, so we
1416        //need to unset the `active_gc` field.
1417        self.active_gc = None;
1418
1419        let mut removed = vec![];
1420        for (seqno, key) in remove_rollups {
1421            let removed_key = self.rollups.remove(seqno);
1422            debug_assert!(
1423                removed_key.as_ref().map_or(true, |x| &x.key == key),
1424                "{} vs {:?}",
1425                key,
1426                removed_key
1427            );
1428
1429            if removed_key.is_some() {
1430                removed.push(*seqno);
1431            }
1432        }
1433
1434        Continue(removed)
1435    }
1436
1437    pub fn register_leased_reader(
1438        &mut self,
1439        hostname: &str,
1440        reader_id: &LeasedReaderId,
1441        purpose: &str,
1442        seqno: SeqNo,
1443        lease_duration: Duration,
1444        heartbeat_timestamp_ms: u64,
1445        use_critical_since: bool,
1446    ) -> ControlFlow<
1447        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1448        (LeasedReaderState<T>, SeqNo),
1449    > {
1450        let since = if use_critical_since {
1451            self.critical_since()
1452                .unwrap_or_else(|| self.trace.since().clone())
1453        } else {
1454            self.trace.since().clone()
1455        };
1456        let reader_state = LeasedReaderState {
1457            debug: HandleDebugState {
1458                hostname: hostname.to_owned(),
1459                purpose: purpose.to_owned(),
1460            },
1461            seqno,
1462            since,
1463            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1464            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1465                .expect("lease duration as millis should fit within u64"),
1466        };
1467
1468        // If the shard-global upper and since are both the empty antichain,
1469        // then no further writes can ever commit and no further reads can be
1470        // served. Optimize this by no-op-ing reader registration so that we can
1471        // settle the shard into a final unchanging tombstone state.
1472        if self.is_tombstone() {
1473            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1474        }
1475
1476        // TODO: Handle if the reader or writer already exists.
1477        self.leased_readers
1478            .insert(reader_id.clone(), reader_state.clone());
1479        Continue((reader_state, self.seqno_since(seqno)))
1480    }
1481
1482    pub fn register_critical_reader<O: Opaque + Codec64>(
1483        &mut self,
1484        hostname: &str,
1485        reader_id: &CriticalReaderId,
1486        purpose: &str,
1487    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1488        let state = CriticalReaderState {
1489            debug: HandleDebugState {
1490                hostname: hostname.to_owned(),
1491                purpose: purpose.to_owned(),
1492            },
1493            since: self.trace.since().clone(),
1494            opaque: OpaqueState(Codec64::encode(&O::initial())),
1495            opaque_codec: O::codec_name(),
1496        };
1497
1498        // We expire all readers if the upper and since both advance to the
1499        // empty antichain. Gracefully handle this. At the same time,
1500        // short-circuit the cmd application so we don't needlessly create new
1501        // SeqNos.
1502        if self.is_tombstone() {
1503            return Break(NoOpStateTransition(state));
1504        }
1505
1506        let state = match self.critical_readers.get_mut(reader_id) {
1507            Some(existing_state) => {
1508                existing_state.debug = state.debug;
1509                existing_state.clone()
1510            }
1511            None => {
1512                self.critical_readers
1513                    .insert(reader_id.clone(), state.clone());
1514                state
1515            }
1516        };
1517        Continue(state)
1518    }
1519
1520    pub fn register_schema<K: Codec, V: Codec>(
1521        &mut self,
1522        key_schema: &K::Schema,
1523        val_schema: &V::Schema,
1524    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1525        fn encode_data_type(data_type: &DataType) -> Bytes {
1526            let proto = data_type.into_proto();
1527            prost::Message::encode_to_vec(&proto).into()
1528        }
1529
1530        // Look for an existing registered SchemaId for these schemas.
1531        //
1532        // The common case is that this should be a recent one, so as a minor
1533        // optimization, do this search in reverse order.
1534        //
1535        // TODO: Note that this impl is `O(schemas)`. Combined with the
1536        // possibility of cmd retries, it's possible but unlikely for this to
1537        // get expensive. We could maintain a reverse map to speed this up in
1538        // necessary. This would either need to work on the encoded
1539        // representation (which, we'd have to fall back to the linear scan) or
1540        // we'd need to add a Hash/Ord bound to Schema.
1541        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1542            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1543        });
1544        match existing_id {
1545            Some((schema_id, _)) => {
1546                // TODO: Validate that the decoded schemas still produce records
1547                // of the recorded DataType, to detect shenanigans. Probably
1548                // best to wait until we've turned on Schema2 in prod and thus
1549                // committed to the current mappings.
1550                Break(NoOpStateTransition(Some(*schema_id)))
1551            }
1552            None if self.is_tombstone() => {
1553                // TODO: Is this right?
1554                Break(NoOpStateTransition(None))
1555            }
1556            None if self.schemas.is_empty() => {
1557                // We'll have to do something more sophisticated here to
1558                // generate the next id if/when we start supporting the removal
1559                // of schemas.
1560                let id = SchemaId(self.schemas.len());
1561                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1562                    .expect("valid key schema");
1563                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1564                    .expect("valid val schema");
1565                let prev = self.schemas.insert(
1566                    id,
1567                    EncodedSchemas {
1568                        key: K::encode_schema(key_schema),
1569                        key_data_type: encode_data_type(&key_data_type),
1570                        val: V::encode_schema(val_schema),
1571                        val_data_type: encode_data_type(&val_data_type),
1572                    },
1573                );
1574                assert_eq!(prev, None);
1575                Continue(Some(id))
1576            }
1577            None => {
1578                info!(
1579                    "register_schemas got {:?} expected {:?}",
1580                    key_schema,
1581                    self.schemas
1582                        .iter()
1583                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1584                        .collect::<Vec<_>>()
1585                );
1586                // Until we implement persist schema changes, only allow at most
1587                // one registered schema.
1588                Break(NoOpStateTransition(None))
1589            }
1590        }
1591    }
1592
1593    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1594        &mut self,
1595        expected: SchemaId,
1596        key_schema: &K::Schema,
1597        val_schema: &V::Schema,
1598    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1599        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1600            // To be defensive, create an empty batch and inspect the resulting
1601            // data type (as opposed to something like allowing the `Schema` to
1602            // declare the DataType).
1603            let array = Schema::encoder(schema).expect("valid schema").finish();
1604            Array::data_type(&array).clone()
1605        }
1606
1607        let (current_id, current) = self
1608            .schemas
1609            .last_key_value()
1610            .expect("all shards have a schema");
1611        if *current_id != expected {
1612            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1613                schema_id: *current_id,
1614                key: K::decode_schema(&current.key),
1615                val: V::decode_schema(&current.val),
1616            }));
1617        }
1618
1619        let current_key = K::decode_schema(&current.key);
1620        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1621        let current_val = V::decode_schema(&current.val);
1622        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1623
1624        let key_dt = data_type(key_schema);
1625        let val_dt = data_type(val_schema);
1626
1627        // If the schema is exactly the same as the current one, no-op.
1628        if current_key == *key_schema
1629            && current_key_dt == key_dt
1630            && current_val == *val_schema
1631            && current_val_dt == val_dt
1632        {
1633            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1634        }
1635
1636        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1637        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1638        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1639            return Break(NoOpStateTransition(CaESchema::Incompatible));
1640        };
1641        // Persist initially disallows dropping columns. This would require a
1642        // bunch more work (e.g. not safe to use the latest schema in
1643        // compaction) and isn't initially necessary in mz.
1644        if key_fn.contains_drop() || val_fn.contains_drop() {
1645            return Break(NoOpStateTransition(CaESchema::Incompatible));
1646        }
1647
1648        // We'll have to do something more sophisticated here to
1649        // generate the next id if/when we start supporting the removal
1650        // of schemas.
1651        let id = SchemaId(self.schemas.len());
1652        self.schemas.insert(
1653            id,
1654            EncodedSchemas {
1655                key: K::encode_schema(key_schema),
1656                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1657                val: V::encode_schema(val_schema),
1658                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1659            },
1660        );
1661        Continue(CaESchema::Ok(id))
1662    }
1663
1664    pub fn compare_and_append(
1665        &mut self,
1666        batch: &HollowBatch<T>,
1667        writer_id: &WriterId,
1668        heartbeat_timestamp_ms: u64,
1669        lease_duration_ms: u64,
1670        idempotency_token: &IdempotencyToken,
1671        debug_info: &HandleDebugState,
1672        inline_writes_total_max_bytes: usize,
1673        claim_compaction_percent: usize,
1674        claim_compaction_min_version: Option<&Version>,
1675    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1676        // We expire all writers if the upper and since both advance to the
1677        // empty antichain. Gracefully handle this. At the same time,
1678        // short-circuit the cmd application so we don't needlessly create new
1679        // SeqNos.
1680        if self.is_tombstone() {
1681            assert_eq!(self.trace.upper(), &Antichain::new());
1682            return Break(CompareAndAppendBreak::Upper {
1683                shard_upper: Antichain::new(),
1684                // This writer might have been registered before the shard upper
1685                // was advanced, which would make this pessimistic in the
1686                // Indeterminate handling of compare_and_append at the machine
1687                // level, but that's fine.
1688                writer_upper: Antichain::new(),
1689            });
1690        }
1691
1692        let writer_state = self
1693            .writers
1694            .entry(writer_id.clone())
1695            .or_insert_with(|| WriterState {
1696                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1697                lease_duration_ms,
1698                most_recent_write_token: IdempotencyToken::SENTINEL,
1699                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1700                debug: debug_info.clone(),
1701            });
1702
1703        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1704            return Break(CompareAndAppendBreak::InvalidUsage(
1705                InvalidUsage::InvalidBounds {
1706                    lower: batch.desc.lower().clone(),
1707                    upper: batch.desc.upper().clone(),
1708                },
1709            ));
1710        }
1711
1712        // If the time interval is empty, the list of updates must also be
1713        // empty.
1714        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1715            return Break(CompareAndAppendBreak::InvalidUsage(
1716                InvalidUsage::InvalidEmptyTimeInterval {
1717                    lower: batch.desc.lower().clone(),
1718                    upper: batch.desc.upper().clone(),
1719                    keys: batch
1720                        .parts
1721                        .iter()
1722                        .map(|x| x.printable_name().to_owned())
1723                        .collect(),
1724                },
1725            ));
1726        }
1727
1728        if idempotency_token == &writer_state.most_recent_write_token {
1729            // If the last write had the same idempotency_token, then this must
1730            // have already committed. Sanity check that the most recent write
1731            // upper matches and that the shard upper is at least the write
1732            // upper, if it's not something very suspect is going on.
1733            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1734            assert!(
1735                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1736                "{:?} vs {:?}",
1737                batch.desc.upper(),
1738                self.trace.upper()
1739            );
1740            return Break(CompareAndAppendBreak::AlreadyCommitted);
1741        }
1742
1743        let shard_upper = self.trace.upper();
1744        if shard_upper != batch.desc.lower() {
1745            return Break(CompareAndAppendBreak::Upper {
1746                shard_upper: shard_upper.clone(),
1747                writer_upper: writer_state.most_recent_write_upper.clone(),
1748            });
1749        }
1750
1751        let new_inline_bytes = batch.inline_bytes();
1752        if new_inline_bytes > 0 {
1753            let mut existing_inline_bytes = 0;
1754            self.trace
1755                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1756            // TODO: For very small batches, it may actually _increase_ the size
1757            // of state to flush them out. Consider another threshold under
1758            // which an inline part can be appended no matter what.
1759            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1760                return Break(CompareAndAppendBreak::InlineBackpressure);
1761            }
1762        }
1763
1764        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1765            self.trace.push_batch(batch.clone())
1766        } else {
1767            Vec::new()
1768        };
1769
1770        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1771        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1772        let all_empty_reqs = merge_reqs
1773            .iter()
1774            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1775        if all_empty_reqs && !batch.is_empty() {
1776            let mut reqs_to_take = claim_compaction_percent / 100;
1777            if (usize::cast_from(idempotency_token.hashed()) % 100)
1778                < (claim_compaction_percent % 100)
1779            {
1780                reqs_to_take += 1;
1781            }
1782            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1783            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1784            merge_reqs.extend(
1785                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1786                // likely to be compacted soon for other reasons.
1787                self.trace
1788                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1789                    .take(reqs_to_take),
1790            )
1791        }
1792
1793        for req in &merge_reqs {
1794            self.trace.claim_compaction(
1795                req.id,
1796                ActiveCompaction {
1797                    start_ms: heartbeat_timestamp_ms,
1798                },
1799            )
1800        }
1801
1802        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1803        writer_state.most_recent_write_token = idempotency_token.clone();
1804        // The writer's most recent upper should only go forward.
1805        assert!(
1806            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1807            "{:?} vs {:?}",
1808            &writer_state.most_recent_write_upper,
1809            batch.desc.upper()
1810        );
1811        writer_state
1812            .most_recent_write_upper
1813            .clone_from(batch.desc.upper());
1814
1815        // Heartbeat the writer state to keep our idempotency token alive.
1816        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1817            heartbeat_timestamp_ms,
1818            writer_state.last_heartbeat_timestamp_ms,
1819        );
1820
1821        Continue(merge_reqs)
1822    }
1823
1824    pub fn apply_merge_res<D: Codec64 + Semigroup + PartialEq>(
1825        &mut self,
1826        res: &FueledMergeRes<T>,
1827        metrics: &ColumnarMetrics,
1828    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1829        // We expire all writers if the upper and since both advance to the
1830        // empty antichain. Gracefully handle this. At the same time,
1831        // short-circuit the cmd application so we don't needlessly create new
1832        // SeqNos.
1833        if self.is_tombstone() {
1834            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1835        }
1836
1837        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1838        Continue(apply_merge_result)
1839    }
1840
1841    pub fn apply_merge_res_classic<D: Codec64 + Semigroup + PartialEq>(
1842        &mut self,
1843        res: &FueledMergeRes<T>,
1844        metrics: &ColumnarMetrics,
1845    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1846        // We expire all writers if the upper and since both advance to the
1847        // empty antichain. Gracefully handle this. At the same time,
1848        // short-circuit the cmd application so we don't needlessly create new
1849        // SeqNos.
1850        if self.is_tombstone() {
1851            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1852        }
1853
1854        let apply_merge_result = self
1855            .trace
1856            .apply_merge_res_checked_classic::<D>(res, metrics);
1857        Continue(apply_merge_result)
1858    }
1859
1860    pub fn spine_exert(
1861        &mut self,
1862        fuel: usize,
1863    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1864        let (merge_reqs, did_work) = self.trace.exert(fuel);
1865        if did_work {
1866            Continue(merge_reqs)
1867        } else {
1868            assert!(merge_reqs.is_empty());
1869            // Break if we have nothing useful to do to save the seqno (and
1870            // resulting crdb traffic)
1871            Break(NoOpStateTransition(Vec::new()))
1872        }
1873    }
1874
1875    pub fn downgrade_since(
1876        &mut self,
1877        reader_id: &LeasedReaderId,
1878        seqno: SeqNo,
1879        outstanding_seqno: Option<SeqNo>,
1880        new_since: &Antichain<T>,
1881        heartbeat_timestamp_ms: u64,
1882    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1883        // We expire all readers if the upper and since both advance to the
1884        // empty antichain. Gracefully handle this. At the same time,
1885        // short-circuit the cmd application so we don't needlessly create new
1886        // SeqNos.
1887        if self.is_tombstone() {
1888            return Break(NoOpStateTransition(Since(Antichain::new())));
1889        }
1890
1891        // The only way to have a missing reader in state is if it's been expired... and in that
1892        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1893        let Some(reader_state) = self.leased_reader(reader_id) else {
1894            tracing::warn!(
1895                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1896            );
1897            return Break(NoOpStateTransition(Since(Antichain::new())));
1898        };
1899
1900        // Also use this as an opportunity to heartbeat the reader and downgrade
1901        // the seqno capability.
1902        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1903            heartbeat_timestamp_ms,
1904            reader_state.last_heartbeat_timestamp_ms,
1905        );
1906
1907        let seqno = match outstanding_seqno {
1908            Some(outstanding_seqno) => {
1909                assert!(
1910                    outstanding_seqno >= reader_state.seqno,
1911                    "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1912                    is behind current reader_state ({:?})",
1913                    outstanding_seqno,
1914                    reader_state.seqno,
1915                );
1916                std::cmp::min(outstanding_seqno, seqno)
1917            }
1918            None => seqno,
1919        };
1920
1921        reader_state.seqno = seqno;
1922
1923        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1924            reader_state.since.clone_from(new_since);
1925            self.update_since();
1926            new_since.clone()
1927        } else {
1928            // No-op, but still commit the state change so that this gets
1929            // linearized.
1930            reader_state.since.clone()
1931        };
1932
1933        Continue(Since(reader_current_since))
1934    }
1935
1936    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1937        &mut self,
1938        reader_id: &CriticalReaderId,
1939        expected_opaque: &O,
1940        (new_opaque, new_since): (&O, &Antichain<T>),
1941    ) -> ControlFlow<
1942        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1943        Result<Since<T>, (O, Since<T>)>,
1944    > {
1945        // We expire all readers if the upper and since both advance to the
1946        // empty antichain. Gracefully handle this. At the same time,
1947        // short-circuit the cmd application so we don't needlessly create new
1948        // SeqNos.
1949        if self.is_tombstone() {
1950            // Match the idempotence behavior below of ignoring the token if
1951            // since is already advanced enough (in this case, because it's a
1952            // tombstone, we know it's the empty antichain).
1953            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1954        }
1955
1956        let reader_state = self.critical_reader(reader_id);
1957        assert_eq!(reader_state.opaque_codec, O::codec_name());
1958
1959        if &O::decode(reader_state.opaque.0) != expected_opaque {
1960            // No-op, but still commit the state change so that this gets
1961            // linearized.
1962            return Continue(Err((
1963                Codec64::decode(reader_state.opaque.0),
1964                Since(reader_state.since.clone()),
1965            )));
1966        }
1967
1968        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1969        if PartialOrder::less_equal(&reader_state.since, new_since) {
1970            reader_state.since.clone_from(new_since);
1971            self.update_since();
1972            Continue(Ok(Since(new_since.clone())))
1973        } else {
1974            // no work to be done -- the reader state's `since` is already sufficiently
1975            // advanced. we may someday need to revisit this branch when it's possible
1976            // for two `since` frontiers to be incomparable.
1977            Continue(Ok(Since(reader_state.since.clone())))
1978        }
1979    }
1980
1981    pub fn heartbeat_leased_reader(
1982        &mut self,
1983        reader_id: &LeasedReaderId,
1984        heartbeat_timestamp_ms: u64,
1985    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1986        // We expire all readers if the upper and since both advance to the
1987        // empty antichain. Gracefully handle this. At the same time,
1988        // short-circuit the cmd application so we don't needlessly create new
1989        // SeqNos.
1990        if self.is_tombstone() {
1991            return Break(NoOpStateTransition(false));
1992        }
1993
1994        match self.leased_readers.get_mut(reader_id) {
1995            Some(reader_state) => {
1996                reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1997                    heartbeat_timestamp_ms,
1998                    reader_state.last_heartbeat_timestamp_ms,
1999                );
2000                Continue(true)
2001            }
2002            // No-op, but we still commit the state change so that this gets
2003            // linearized (maybe we're looking at old state).
2004            None => Continue(false),
2005        }
2006    }
2007
2008    pub fn expire_leased_reader(
2009        &mut self,
2010        reader_id: &LeasedReaderId,
2011    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2012        // We expire all readers if the upper and since both advance to the
2013        // empty antichain. Gracefully handle this. At the same time,
2014        // short-circuit the cmd application so we don't needlessly create new
2015        // SeqNos.
2016        if self.is_tombstone() {
2017            return Break(NoOpStateTransition(false));
2018        }
2019
2020        let existed = self.leased_readers.remove(reader_id).is_some();
2021        if existed {
2022            // TODO(database-issues#6885): Re-enable this
2023            //
2024            // Temporarily disabling this because we think it might be the cause
2025            // of the remap since bug. Specifically, a clusterd process has a
2026            // ReadHandle for maintaining the once and one inside a Listen. If
2027            // we crash and stay down for longer than the read lease duration,
2028            // it's possible that an expiry of them both in quick succession
2029            // jumps the since forward to the Listen one.
2030            //
2031            // Don't forget to update the downgrade_since when this gets
2032            // switched back on.
2033            //
2034            // self.update_since();
2035        }
2036        // No-op if existed is false, but still commit the state change so that
2037        // this gets linearized.
2038        Continue(existed)
2039    }
2040
2041    pub fn expire_critical_reader(
2042        &mut self,
2043        reader_id: &CriticalReaderId,
2044    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2045        // We expire all readers if the upper and since both advance to the
2046        // empty antichain. Gracefully handle this. At the same time,
2047        // short-circuit the cmd application so we don't needlessly create new
2048        // SeqNos.
2049        if self.is_tombstone() {
2050            return Break(NoOpStateTransition(false));
2051        }
2052
2053        let existed = self.critical_readers.remove(reader_id).is_some();
2054        if existed {
2055            // TODO(database-issues#6885): Re-enable this
2056            //
2057            // Temporarily disabling this because we think it might be the cause
2058            // of the remap since bug. Specifically, a clusterd process has a
2059            // ReadHandle for maintaining the once and one inside a Listen. If
2060            // we crash and stay down for longer than the read lease duration,
2061            // it's possible that an expiry of them both in quick succession
2062            // jumps the since forward to the Listen one.
2063            //
2064            // Don't forget to update the downgrade_since when this gets
2065            // switched back on.
2066            //
2067            // self.update_since();
2068        }
2069        // This state transition is a no-op if existed is false, but we still
2070        // commit the state change so that this gets linearized (maybe we're
2071        // looking at old state).
2072        Continue(existed)
2073    }
2074
2075    pub fn expire_writer(
2076        &mut self,
2077        writer_id: &WriterId,
2078    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2079        // We expire all writers if the upper and since both advance to the
2080        // empty antichain. Gracefully handle this. At the same time,
2081        // short-circuit the cmd application so we don't needlessly create new
2082        // SeqNos.
2083        if self.is_tombstone() {
2084            return Break(NoOpStateTransition(false));
2085        }
2086
2087        let existed = self.writers.remove(writer_id).is_some();
2088        // This state transition is a no-op if existed is false, but we still
2089        // commit the state change so that this gets linearized (maybe we're
2090        // looking at old state).
2091        Continue(existed)
2092    }
2093
2094    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2095        self.leased_readers.get_mut(id)
2096    }
2097
2098    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2099        self.critical_readers
2100            .get_mut(id)
2101            .unwrap_or_else(|| {
2102                panic!(
2103                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2104                    id
2105                )
2106            })
2107    }
2108
2109    fn critical_since(&self) -> Option<Antichain<T>> {
2110        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2111        let mut since = critical_sinces.next().cloned()?;
2112        for s in critical_sinces {
2113            since.meet_assign(s);
2114        }
2115        Some(since)
2116    }
2117
2118    fn update_since(&mut self) {
2119        let mut sinces_iter = self
2120            .leased_readers
2121            .values()
2122            .map(|x| &x.since)
2123            .chain(self.critical_readers.values().map(|x| &x.since));
2124        let mut since = match sinces_iter.next() {
2125            Some(since) => since.clone(),
2126            None => {
2127                // If there are no current readers, leave `since` unchanged so
2128                // it doesn't regress.
2129                return;
2130            }
2131        };
2132        while let Some(s) = sinces_iter.next() {
2133            since.meet_assign(s);
2134        }
2135        self.trace.downgrade_since(&since);
2136    }
2137
2138    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2139        let mut seqno_since = seqno;
2140        for cap in self.leased_readers.values() {
2141            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2142        }
2143        // critical_readers don't hold a seqno capability.
2144        seqno_since
2145    }
2146
2147    fn tombstone_batch() -> HollowBatch<T> {
2148        HollowBatch::empty(Description::new(
2149            Antichain::from_elem(T::minimum()),
2150            Antichain::new(),
2151            Antichain::new(),
2152        ))
2153    }
2154
2155    pub(crate) fn is_tombstone(&self) -> bool {
2156        self.trace.upper().is_empty()
2157            && self.trace.since().is_empty()
2158            && self.writers.is_empty()
2159            && self.leased_readers.is_empty()
2160            && self.critical_readers.is_empty()
2161    }
2162
2163    pub(crate) fn is_single_empty_batch(&self) -> bool {
2164        let mut batch_count = 0;
2165        let mut is_empty = true;
2166        self.trace.map_batches(|b| {
2167            batch_count += 1;
2168            is_empty &= b.is_empty()
2169        });
2170        batch_count <= 1 && is_empty
2171    }
2172
2173    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2174        assert_eq!(self.trace.upper(), &Antichain::new());
2175        assert_eq!(self.trace.since(), &Antichain::new());
2176
2177        // Remember our current state, so we can decide whether we have to
2178        // record a transition in durable state.
2179        let was_tombstone = self.is_tombstone();
2180
2181        // Enter the "tombstone" state, if we're not in it already.
2182        self.writers.clear();
2183        self.leased_readers.clear();
2184        self.critical_readers.clear();
2185
2186        debug_assert!(self.is_tombstone());
2187
2188        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2189        // it -- the actual contents of our batches no longer matter.
2190        // This method progressively replaces batches in our state with simpler versions, to allow
2191        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2192        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2193        // transitions... most operations happen incrementally, and large single writes can overwhelm
2194        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2195
2196        let mut to_replace = None;
2197        let mut batch_count = 0;
2198        self.trace.map_batches(|b| {
2199            batch_count += 1;
2200            if !b.is_empty() && to_replace.is_none() {
2201                to_replace = Some(b.desc.clone());
2202            }
2203        });
2204        if let Some(desc) = to_replace {
2205            // We have a nonempty batch: replace it with an empty batch and return.
2206            // This should not produce an excessively large diff: if it did, we wouldn't have been
2207            // able to append that batch in the first place.
2208            let result = self.trace.apply_tombstone_merge(&desc);
2209            assert!(
2210                result.matched(),
2211                "merge with a matching desc should always match"
2212            );
2213            Continue(())
2214        } else if batch_count > 1 {
2215            // All our batches are empty, but we have more than one of them. Replace the whole set
2216            // with a new single-batch trace.
2217            // This produces a diff with a size proportional to the number of batches, but since
2218            // Spine keeps a logarithmic number of batches this should never be excessively large.
2219            let mut new_trace = Trace::default();
2220            new_trace.downgrade_since(&Antichain::new());
2221            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2222            assert_eq!(merge_reqs, Vec::new());
2223            self.trace = new_trace;
2224            Continue(())
2225        } else if !was_tombstone {
2226            // We were not tombstoned before, so have to make sure this state
2227            // transition is recorded.
2228            Continue(())
2229        } else {
2230            // All our batches are empty, and there's only one... there's no shrinking this
2231            // tombstone further.
2232            Break(NoOpStateTransition(()))
2233        }
2234    }
2235}
2236
2237// TODO: Document invariants.
2238#[derive(Debug)]
2239#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2240pub struct State<T> {
2241    pub(crate) applier_version: semver::Version,
2242    pub(crate) shard_id: ShardId,
2243
2244    pub(crate) seqno: SeqNo,
2245    /// A strictly increasing wall time of when this state was written, in
2246    /// milliseconds since the unix epoch.
2247    pub(crate) walltime_ms: u64,
2248    /// Hostname of the persist user that created this version of state. For
2249    /// debugging.
2250    pub(crate) hostname: String,
2251    pub(crate) collections: StateCollections<T>,
2252}
2253
2254/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2255/// ones in durable storage.
2256pub struct TypedState<K, V, T, D> {
2257    pub(crate) state: State<T>,
2258
2259    // According to the docs, PhantomData is to "mark things that act like they
2260    // own a T". State doesn't actually own K, V, or D, just the ability to
2261    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2262    // T [1], but also allows State to correctly derive Send+Sync.
2263    //
2264    // [1]:
2265    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2266    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2267}
2268
2269impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2270    #[cfg(any(test, debug_assertions))]
2271    pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2272        TypedState {
2273            state: State {
2274                applier_version,
2275                shard_id: self.shard_id.clone(),
2276                seqno: self.seqno.clone(),
2277                walltime_ms: self.walltime_ms,
2278                hostname,
2279                collections: self.collections.clone(),
2280            },
2281            _phantom: PhantomData,
2282        }
2283    }
2284
2285    pub(crate) fn clone_for_rollup(&self) -> Self {
2286        TypedState {
2287            state: State {
2288                applier_version: self.applier_version.clone(),
2289                shard_id: self.shard_id.clone(),
2290                seqno: self.seqno.clone(),
2291                walltime_ms: self.walltime_ms,
2292                hostname: self.hostname.clone(),
2293                collections: self.collections.clone(),
2294            },
2295            _phantom: PhantomData,
2296        }
2297    }
2298}
2299
2300impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2301    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2302        // Deconstruct self so we get a compile failure if new fields
2303        // are added.
2304        let TypedState { state, _phantom } = self;
2305        f.debug_struct("TypedState").field("state", state).finish()
2306    }
2307}
2308
2309// Impl PartialEq regardless of the type params.
2310#[cfg(any(test, debug_assertions))]
2311impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2312    fn eq(&self, other: &Self) -> bool {
2313        // Deconstruct self and other so we get a compile failure if new fields
2314        // are added.
2315        let TypedState {
2316            state: self_state,
2317            _phantom,
2318        } = self;
2319        let TypedState {
2320            state: other_state,
2321            _phantom,
2322        } = other;
2323        self_state == other_state
2324    }
2325}
2326
2327impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2328    type Target = State<T>;
2329
2330    fn deref(&self) -> &Self::Target {
2331        &self.state
2332    }
2333}
2334
2335impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2336    fn deref_mut(&mut self) -> &mut Self::Target {
2337        &mut self.state
2338    }
2339}
2340
2341impl<K, V, T, D> TypedState<K, V, T, D>
2342where
2343    K: Codec,
2344    V: Codec,
2345    T: Timestamp + Lattice + Codec64,
2346    D: Codec64,
2347{
2348    pub fn new(
2349        applier_version: Version,
2350        shard_id: ShardId,
2351        hostname: String,
2352        walltime_ms: u64,
2353    ) -> Self {
2354        let state = State {
2355            applier_version,
2356            shard_id,
2357            seqno: SeqNo::minimum(),
2358            walltime_ms,
2359            hostname,
2360            collections: StateCollections {
2361                last_gc_req: SeqNo::minimum(),
2362                rollups: BTreeMap::new(),
2363                active_rollup: None,
2364                active_gc: None,
2365                leased_readers: BTreeMap::new(),
2366                critical_readers: BTreeMap::new(),
2367                writers: BTreeMap::new(),
2368                schemas: BTreeMap::new(),
2369                trace: Trace::default(),
2370            },
2371        };
2372        TypedState {
2373            state,
2374            _phantom: PhantomData,
2375        }
2376    }
2377
2378    pub fn clone_apply<R, E, WorkFn>(
2379        &self,
2380        cfg: &PersistConfig,
2381        work_fn: &mut WorkFn,
2382    ) -> ControlFlow<E, (R, Self)>
2383    where
2384        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2385    {
2386        // Now that we support one minor version of forward compatibility, tag
2387        // each version of state with the _max_ version of code that has ever
2388        // contributed to it. Otherwise, we'd erroneously allow rolling back an
2389        // arbitrary number of versions if they were done one-by-one.
2390        let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2391        let mut new_state = State {
2392            applier_version: new_applier_version.clone(),
2393            shard_id: self.shard_id,
2394            seqno: self.seqno.next(),
2395            walltime_ms: (cfg.now)(),
2396            hostname: cfg.hostname.clone(),
2397            collections: self.collections.clone(),
2398        };
2399        // Make sure walltime_ms is strictly increasing, in case clocks are
2400        // offset.
2401        if new_state.walltime_ms <= self.walltime_ms {
2402            new_state.walltime_ms = self.walltime_ms + 1;
2403        }
2404
2405        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2406        let new_state = TypedState {
2407            state: new_state,
2408            _phantom: PhantomData,
2409        };
2410        Continue((work_ret, new_state))
2411    }
2412}
2413
2414#[derive(Copy, Clone, Debug)]
2415pub struct GcConfig {
2416    pub use_active_gc: bool,
2417    pub fallback_threshold_ms: u64,
2418    pub min_versions: usize,
2419    pub max_versions: usize,
2420}
2421
2422impl<T> State<T>
2423where
2424    T: Timestamp + Lattice + Codec64,
2425{
2426    pub fn shard_id(&self) -> ShardId {
2427        self.shard_id
2428    }
2429
2430    pub fn seqno(&self) -> SeqNo {
2431        self.seqno
2432    }
2433
2434    pub fn since(&self) -> &Antichain<T> {
2435        self.collections.trace.since()
2436    }
2437
2438    pub fn upper(&self) -> &Antichain<T> {
2439        self.collections.trace.upper()
2440    }
2441
2442    pub fn spine_batch_count(&self) -> usize {
2443        self.collections.trace.num_spine_batches()
2444    }
2445
2446    pub fn size_metrics(&self) -> StateSizeMetrics {
2447        let mut ret = StateSizeMetrics::default();
2448        self.blobs().for_each(|x| match x {
2449            HollowBlobRef::Batch(x) => {
2450                ret.hollow_batch_count += 1;
2451                ret.batch_part_count += x.part_count();
2452                ret.num_updates += x.len;
2453
2454                let batch_size = x.encoded_size_bytes();
2455                for x in x.parts.iter() {
2456                    if x.ts_rewrite().is_some() {
2457                        ret.rewrite_part_count += 1;
2458                    }
2459                    if x.is_inline() {
2460                        ret.inline_part_count += 1;
2461                        ret.inline_part_bytes += x.inline_bytes();
2462                    }
2463                }
2464                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2465                ret.state_batches_bytes += batch_size;
2466            }
2467            HollowBlobRef::Rollup(x) => {
2468                ret.state_rollup_count += 1;
2469                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2470            }
2471        });
2472        ret
2473    }
2474
2475    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2476        // We maintain the invariant that every version of state has at least
2477        // one rollup.
2478        self.collections
2479            .rollups
2480            .iter()
2481            .rev()
2482            .next()
2483            .expect("State should have at least one rollup if seqno > minimum")
2484    }
2485
2486    pub(crate) fn seqno_since(&self) -> SeqNo {
2487        self.collections.seqno_since(self.seqno)
2488    }
2489
2490    // Returns whether the cmd proposing this state has been selected to perform
2491    // background garbage collection work.
2492    //
2493    // If it was selected, this information is recorded in the state itself for
2494    // commit along with the cmd's state transition. This helps us to avoid
2495    // redundant work.
2496    //
2497    // Correctness does not depend on a gc assignment being executed, nor on
2498    // them being executed in the order they are given. But it is expected that
2499    // gc assignments are best-effort respected. In practice, cmds like
2500    // register_foo or expire_foo, where it would be awkward, ignore gc.
2501    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2502        let GcConfig {
2503            use_active_gc,
2504            fallback_threshold_ms,
2505            min_versions,
2506            max_versions,
2507        } = cfg;
2508        // This is an arbitrary-ish threshold that scales with seqno, but never
2509        // gets particularly big. It probably could be much bigger and certainly
2510        // could use a tuning pass at some point.
2511        let gc_threshold = if use_active_gc {
2512            u64::cast_from(min_versions)
2513        } else {
2514            std::cmp::max(
2515                1,
2516                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2517            )
2518        };
2519        let new_seqno_since = self.seqno_since();
2520        // Collect until the new seqno since... or the old since plus the max number of versions,
2521        // whatever is less.
2522        let gc_until_seqno = new_seqno_since.min(SeqNo(
2523            self.collections
2524                .last_gc_req
2525                .0
2526                .saturating_add(u64::cast_from(max_versions)),
2527        ));
2528        let should_gc = new_seqno_since
2529            .0
2530            .saturating_sub(self.collections.last_gc_req.0)
2531            >= gc_threshold;
2532
2533        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2534        // it's been a while since it started, we should gc.
2535        let should_gc = if use_active_gc && !should_gc {
2536            match self.collections.active_gc {
2537                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2538                None => false,
2539            }
2540        } else {
2541            should_gc
2542        };
2543        // Assign GC traffic preferentially to writers, falling back to anyone
2544        // generating new state versions if there are no writers.
2545        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2546        // Always assign GC work to a tombstoned shard to have the chance to
2547        // clean up any residual blobs. This is safe (won't cause excess gc)
2548        // as the only allowed command after becoming a tombstone is to write
2549        // the final rollup.
2550        let tombstone_needs_gc = self.collections.is_tombstone();
2551        let should_gc = should_gc || tombstone_needs_gc;
2552        let should_gc = if use_active_gc {
2553            // If we have an active gc, we should only gc if the active gc is
2554            // sufficiently old. This is to avoid doing more gc work than
2555            // necessary.
2556            should_gc
2557                && match self.collections.active_gc {
2558                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2559                    None => true,
2560                }
2561        } else {
2562            should_gc
2563        };
2564        if should_gc {
2565            self.collections.last_gc_req = gc_until_seqno;
2566            Some(GcReq {
2567                shard_id: self.shard_id,
2568                new_seqno_since: gc_until_seqno,
2569            })
2570        } else {
2571            None
2572        }
2573    }
2574
2575    /// Return the number of gc-ineligible state versions.
2576    pub fn seqnos_held(&self) -> usize {
2577        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2578    }
2579
2580    /// Expire all readers and writers up to the given walltime_ms.
2581    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2582        let mut metrics = ExpiryMetrics::default();
2583        let shard_id = self.shard_id();
2584        self.collections.leased_readers.retain(|id, state| {
2585            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2586            if !retain {
2587                info!(
2588                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2589                    state.debug.purpose
2590                );
2591                metrics.readers_expired += 1;
2592            }
2593            retain
2594        });
2595        // critical_readers don't need forced expiration. (In fact, that's the point!)
2596        self.collections.writers.retain(|id, state| {
2597            let retain =
2598                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2599            if !retain {
2600                info!(
2601                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2602                    state.debug.purpose
2603                );
2604                metrics.writers_expired += 1;
2605            }
2606            retain
2607        });
2608        metrics
2609    }
2610
2611    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2612    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2613    /// referenced parts are valid to read.
2614    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2615        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2616            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2617                self.collections.trace.since().clone(),
2618            )));
2619        }
2620        let upper = self.collections.trace.upper();
2621        if PartialOrder::less_equal(upper, as_of) {
2622            return Err(SnapshotErr::AsOfNotYetAvailable(
2623                self.seqno,
2624                Upper(upper.clone()),
2625            ));
2626        }
2627
2628        let batches = self
2629            .collections
2630            .trace
2631            .batches()
2632            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2633            .cloned()
2634            .collect();
2635        Ok(batches)
2636    }
2637
2638    // NB: Unlike the other methods here, this one is read-only.
2639    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2640        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2641            return Err(Since(self.collections.trace.since().clone()));
2642        }
2643        Ok(())
2644    }
2645
2646    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2647        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2648        // batch and this iterates through all batches to find the next one.
2649        self.collections
2650            .trace
2651            .batches()
2652            .find(|b| {
2653                PartialOrder::less_equal(b.desc.lower(), frontier)
2654                    && PartialOrder::less_than(frontier, b.desc.upper())
2655            })
2656            .cloned()
2657            .ok_or(self.seqno)
2658    }
2659
2660    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2661        self.collections.active_rollup
2662    }
2663
2664    pub fn need_rollup(
2665        &self,
2666        threshold: usize,
2667        use_active_rollup: bool,
2668        fallback_threshold_ms: u64,
2669        now: u64,
2670    ) -> Option<SeqNo> {
2671        let (latest_rollup_seqno, _) = self.latest_rollup();
2672
2673        // Tombstoned shards require one final rollup. However, because we
2674        // write a rollup as of SeqNo X and then link it in using a state
2675        // transition (in this case from X to X+1), the minimum number of
2676        // live diffs is actually two. Detect when we're in this minimal
2677        // two diff state and stop the (otherwise) infinite iteration.
2678        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2679            return Some(self.seqno);
2680        }
2681
2682        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2683
2684        if use_active_rollup {
2685            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2686            // we should start a new rollup.
2687            // If there is an active rollup, we should check if it has been running too long.
2688            // If it has, we should start a new rollup.
2689            // This is to guard against a worker dying/taking too long/etc.
2690            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2691                match self.active_rollup() {
2692                    Some(active_rollup) => {
2693                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2694                            return Some(self.seqno);
2695                        }
2696                    }
2697                    None => {
2698                        return Some(self.seqno);
2699                    }
2700                }
2701            }
2702        } else {
2703            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2704            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2705            // racing / performing redundant work.
2706            if seqnos_since_last_rollup > 0
2707                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2708            {
2709                return Some(self.seqno);
2710            }
2711
2712            // however, since maintenance is best-effort and could fail, do assign rollup
2713            // work to every seqno after a fallback threshold to ensure one is written.
2714            if seqnos_since_last_rollup
2715                > u64::cast_from(
2716                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2717                )
2718            {
2719                return Some(self.seqno);
2720            }
2721        }
2722
2723        None
2724    }
2725
2726    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2727        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2728        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2729        batches.chain(rollups)
2730    }
2731}
2732
2733fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2734    let val = hex::encode(val);
2735    val.serialize(s)
2736}
2737
2738fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2739    val: &Option<LazyProto<T>>,
2740    s: S,
2741) -> Result<S::Ok, S::Error> {
2742    val.as_ref()
2743        .map(|lazy| hex::encode(&lazy.into_proto()))
2744        .serialize(s)
2745}
2746
2747fn serialize_part_stats<S: Serializer>(
2748    val: &Option<LazyPartStats>,
2749    s: S,
2750) -> Result<S::Ok, S::Error> {
2751    let val = val.as_ref().map(|x| x.decode().key);
2752    val.serialize(s)
2753}
2754
2755fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2756    // This is only used for debugging, so hack to assume that D is i64.
2757    let val = val.map(i64::decode);
2758    val.serialize(s)
2759}
2760
2761// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2762// intentionally gated from users, so not strictly subject to our backward
2763// compatibility guarantees, but still probably best to be thoughtful about
2764// making unnecessary changes. Additionally, it's nice to make the output as
2765// nice to use as possible without tying our hands for the actual code usages.
2766impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2767    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2768        let State {
2769            applier_version,
2770            shard_id,
2771            seqno,
2772            walltime_ms,
2773            hostname,
2774            collections:
2775                StateCollections {
2776                    last_gc_req,
2777                    rollups,
2778                    active_rollup,
2779                    active_gc,
2780                    leased_readers,
2781                    critical_readers,
2782                    writers,
2783                    schemas,
2784                    trace,
2785                },
2786        } = self;
2787        let mut s = s.serialize_struct("State", 13)?;
2788        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2789        let () = s.serialize_field("shard_id", shard_id)?;
2790        let () = s.serialize_field("seqno", seqno)?;
2791        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2792        let () = s.serialize_field("hostname", hostname)?;
2793        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2794        let () = s.serialize_field("rollups", rollups)?;
2795        let () = s.serialize_field("active_rollup", active_rollup)?;
2796        let () = s.serialize_field("active_gc", active_gc)?;
2797        let () = s.serialize_field("leased_readers", leased_readers)?;
2798        let () = s.serialize_field("critical_readers", critical_readers)?;
2799        let () = s.serialize_field("writers", writers)?;
2800        let () = s.serialize_field("schemas", schemas)?;
2801        let () = s.serialize_field("since", &trace.since().elements())?;
2802        let () = s.serialize_field("upper", &trace.upper().elements())?;
2803        let trace = trace.flatten();
2804        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2805        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2806        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2807        let () = s.serialize_field("merges", &trace.merges)?;
2808        s.end()
2809    }
2810}
2811
2812#[derive(Debug, Default)]
2813pub struct StateSizeMetrics {
2814    pub hollow_batch_count: usize,
2815    pub batch_part_count: usize,
2816    pub rewrite_part_count: usize,
2817    pub num_updates: usize,
2818    pub largest_batch_bytes: usize,
2819    pub state_batches_bytes: usize,
2820    pub state_rollups_bytes: usize,
2821    pub state_rollup_count: usize,
2822    pub inline_part_count: usize,
2823    pub inline_part_bytes: usize,
2824}
2825
2826#[derive(Default)]
2827pub struct ExpiryMetrics {
2828    pub(crate) readers_expired: usize,
2829    pub(crate) writers_expired: usize,
2830}
2831
2832/// Wrapper for Antichain that represents a Since
2833#[derive(Debug, Clone, PartialEq)]
2834pub struct Since<T>(pub Antichain<T>);
2835
2836/// Wrapper for Antichain that represents an Upper
2837#[derive(Debug, PartialEq)]
2838pub struct Upper<T>(pub Antichain<T>);
2839
2840#[cfg(test)]
2841pub(crate) mod tests {
2842    use std::ops::Range;
2843    use std::str::FromStr;
2844
2845    use bytes::Bytes;
2846    use mz_build_info::DUMMY_BUILD_INFO;
2847    use mz_dyncfg::ConfigUpdates;
2848    use mz_ore::now::SYSTEM_TIME;
2849    use mz_ore::{assert_none, assert_ok};
2850    use mz_proto::RustType;
2851    use proptest::prelude::*;
2852    use proptest::strategy::ValueTree;
2853
2854    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2855    use crate::PersistLocation;
2856    use crate::cache::PersistClientCache;
2857    use crate::internal::encoding::any_some_lazy_part_stats;
2858    use crate::internal::paths::RollupId;
2859    use crate::internal::trace::tests::any_trace;
2860    use crate::tests::new_test_client_cache;
2861
2862    use super::*;
2863
2864    const LEASE_DURATION_MS: u64 = 900 * 1000;
2865    fn debug_state() -> HandleDebugState {
2866        HandleDebugState {
2867            hostname: "debug".to_owned(),
2868            purpose: "finding the bugs".to_owned(),
2869        }
2870    }
2871
2872    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2873        num_runs: usize,
2874    ) -> impl Strategy<Value = HollowBatch<T>> {
2875        (
2876            any::<T>(),
2877            any::<T>(),
2878            any::<T>(),
2879            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2880            any::<usize>(),
2881        )
2882            .prop_map(move |(t0, t1, since, parts, len)| {
2883                let (lower, upper) = if t0 <= t1 {
2884                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2885                } else {
2886                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2887                };
2888                let since = Antichain::from_elem(since);
2889
2890                let run_splits = (1..num_runs)
2891                    .map(|i| i * parts.len() / num_runs)
2892                    .collect::<Vec<_>>();
2893
2894                let run_meta = (0..num_runs)
2895                    .map(|_| {
2896                        let mut meta = RunMeta::default();
2897                        meta.id = Some(RunId::new());
2898                        meta
2899                    })
2900                    .collect::<Vec<_>>();
2901
2902                HollowBatch::new(
2903                    Description::new(lower, upper, since),
2904                    parts,
2905                    len % 10,
2906                    run_meta,
2907                    run_splits,
2908                )
2909            })
2910    }
2911
2912    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2913        Strategy::prop_map(
2914            (
2915                any::<T>(),
2916                any::<T>(),
2917                any::<T>(),
2918                proptest::collection::vec(any_run_part::<T>(), 0..20),
2919                any::<usize>(),
2920                0..=10usize,
2921                proptest::collection::vec(any::<RunId>(), 10),
2922            ),
2923            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2924                let (lower, upper) = if t0 <= t1 {
2925                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2926                } else {
2927                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2928                };
2929                let since = Antichain::from_elem(since);
2930                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2931                    let run_splits = (1..num_runs)
2932                        .map(|i| i * parts.len() / num_runs)
2933                        .collect::<Vec<_>>();
2934
2935                    let run_meta = (0..num_runs)
2936                        .enumerate()
2937                        .map(|(i, _)| {
2938                            let mut meta = RunMeta::default();
2939                            meta.id = Some(run_ids[i]);
2940                            meta
2941                        })
2942                        .collect::<Vec<_>>();
2943
2944                    HollowBatch::new(
2945                        Description::new(lower, upper, since),
2946                        parts,
2947                        len % 10,
2948                        run_meta,
2949                        run_splits,
2950                    )
2951                } else {
2952                    HollowBatch::new_run_for_test(
2953                        Description::new(lower, upper, since),
2954                        parts,
2955                        len % 10,
2956                        run_ids[0],
2957                    )
2958                }
2959            },
2960        )
2961    }
2962
2963    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2964        Strategy::prop_map(
2965            (
2966                any::<bool>(),
2967                any_hollow_batch_part(),
2968                any::<Option<T>>(),
2969                any::<Option<SchemaId>>(),
2970                any::<Option<SchemaId>>(),
2971            ),
2972            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2973                if is_hollow {
2974                    BatchPart::Hollow(hollow)
2975                } else {
2976                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2977                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2978                    BatchPart::Inline {
2979                        updates,
2980                        ts_rewrite,
2981                        schema_id,
2982                        deprecated_schema_id,
2983                    }
2984                }
2985            },
2986        )
2987    }
2988
2989    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2990        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2991    }
2992
2993    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2994    -> impl Strategy<Value = HollowBatchPart<T>> {
2995        Strategy::prop_map(
2996            (
2997                any::<PartialBatchKey>(),
2998                any::<usize>(),
2999                any::<Vec<u8>>(),
3000                any_some_lazy_part_stats(),
3001                any::<Option<T>>(),
3002                any::<[u8; 8]>(),
3003                any::<Option<BatchColumnarFormat>>(),
3004                any::<Option<SchemaId>>(),
3005                any::<Option<SchemaId>>(),
3006            ),
3007            |(
3008                key,
3009                encoded_size_bytes,
3010                key_lower,
3011                stats,
3012                ts_rewrite,
3013                diffs_sum,
3014                format,
3015                schema_id,
3016                deprecated_schema_id,
3017            )| {
3018                HollowBatchPart {
3019                    key,
3020                    encoded_size_bytes,
3021                    key_lower,
3022                    structured_key_lower: None,
3023                    stats,
3024                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3025                    diffs_sum: Some(diffs_sum),
3026                    format,
3027                    schema_id,
3028                    deprecated_schema_id,
3029                }
3030            },
3031        )
3032    }
3033
3034    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3035        Strategy::prop_map(
3036            (
3037                any::<SeqNo>(),
3038                any::<Option<T>>(),
3039                any::<u64>(),
3040                any::<u64>(),
3041                any::<HandleDebugState>(),
3042            ),
3043            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3044                // lease_duration_ms of 0 means this state was written by an old
3045                // version of code, which means we'll migrate it in the decode
3046                // path. Avoid.
3047                if lease_duration_ms == 0 {
3048                    lease_duration_ms += 1;
3049                }
3050                LeasedReaderState {
3051                    seqno,
3052                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3053                    last_heartbeat_timestamp_ms,
3054                    lease_duration_ms,
3055                    debug,
3056                }
3057            },
3058        )
3059    }
3060
3061    pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3062    {
3063        Strategy::prop_map(
3064            (
3065                any::<Option<T>>(),
3066                any::<OpaqueState>(),
3067                any::<String>(),
3068                any::<HandleDebugState>(),
3069            ),
3070            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3071                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3072                opaque,
3073                opaque_codec,
3074                debug,
3075            },
3076        )
3077    }
3078
3079    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3080        Strategy::prop_map(
3081            (
3082                any::<u64>(),
3083                any::<u64>(),
3084                any::<IdempotencyToken>(),
3085                any::<Option<T>>(),
3086                any::<HandleDebugState>(),
3087            ),
3088            |(
3089                last_heartbeat_timestamp_ms,
3090                lease_duration_ms,
3091                most_recent_write_token,
3092                most_recent_write_upper,
3093                debug,
3094            )| WriterState {
3095                last_heartbeat_timestamp_ms,
3096                lease_duration_ms,
3097                most_recent_write_token,
3098                most_recent_write_upper: most_recent_write_upper
3099                    .map_or_else(Antichain::new, Antichain::from_elem),
3100                debug,
3101            },
3102        )
3103    }
3104
3105    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3106        Strategy::prop_map(
3107            (
3108                any::<Vec<u8>>(),
3109                any::<Vec<u8>>(),
3110                any::<Vec<u8>>(),
3111                any::<Vec<u8>>(),
3112            ),
3113            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3114                key: Bytes::from(key),
3115                key_data_type: Bytes::from(key_data_type),
3116                val: Bytes::from(val),
3117                val_data_type: Bytes::from(val_data_type),
3118            },
3119        )
3120    }
3121
3122    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3123        num_trace_batches: Range<usize>,
3124    ) -> impl Strategy<Value = State<T>> {
3125        let part1 = (
3126            any::<ShardId>(),
3127            any::<SeqNo>(),
3128            any::<u64>(),
3129            any::<String>(),
3130            any::<SeqNo>(),
3131            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3132            proptest::option::of(any::<ActiveRollup>()),
3133        );
3134
3135        let part2 = (
3136            proptest::option::of(any::<ActiveGc>()),
3137            proptest::collection::btree_map(
3138                any::<LeasedReaderId>(),
3139                any_leased_reader_state::<T>(),
3140                1..3,
3141            ),
3142            proptest::collection::btree_map(
3143                any::<CriticalReaderId>(),
3144                any_critical_reader_state::<T>(),
3145                1..3,
3146            ),
3147            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3148            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3149            any_trace::<T>(num_trace_batches),
3150        );
3151
3152        (part1, part2).prop_map(
3153            |(
3154                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3155                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3156            )| State {
3157                applier_version: semver::Version::new(1, 2, 3),
3158                shard_id,
3159                seqno,
3160                walltime_ms,
3161                hostname,
3162                collections: StateCollections {
3163                    last_gc_req,
3164                    rollups,
3165                    active_rollup,
3166                    active_gc,
3167                    leased_readers,
3168                    critical_readers,
3169                    writers,
3170                    schemas,
3171                    trace,
3172                },
3173            },
3174        )
3175    }
3176
3177    pub(crate) fn hollow<T: Timestamp>(
3178        lower: T,
3179        upper: T,
3180        keys: &[&str],
3181        len: usize,
3182    ) -> HollowBatch<T> {
3183        HollowBatch::new_run(
3184            Description::new(
3185                Antichain::from_elem(lower),
3186                Antichain::from_elem(upper),
3187                Antichain::from_elem(T::minimum()),
3188            ),
3189            keys.iter()
3190                .map(|x| {
3191                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3192                        key: PartialBatchKey((*x).to_owned()),
3193                        encoded_size_bytes: 0,
3194                        key_lower: vec![],
3195                        structured_key_lower: None,
3196                        stats: None,
3197                        ts_rewrite: None,
3198                        diffs_sum: None,
3199                        format: None,
3200                        schema_id: None,
3201                        deprecated_schema_id: None,
3202                    }))
3203                })
3204                .collect(),
3205            len,
3206        )
3207    }
3208
3209    #[mz_ore::test]
3210    fn downgrade_since() {
3211        let mut state = TypedState::<(), (), u64, i64>::new(
3212            DUMMY_BUILD_INFO.semver_version(),
3213            ShardId::new(),
3214            "".to_owned(),
3215            0,
3216        );
3217        let reader = LeasedReaderId::new();
3218        let seqno = SeqNo::minimum();
3219        let now = SYSTEM_TIME.clone();
3220        let _ = state.collections.register_leased_reader(
3221            "",
3222            &reader,
3223            "",
3224            seqno,
3225            Duration::from_secs(10),
3226            now(),
3227            false,
3228        );
3229
3230        // The shard global since == 0 initially.
3231        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3232
3233        // Greater
3234        assert_eq!(
3235            state.collections.downgrade_since(
3236                &reader,
3237                seqno,
3238                None,
3239                &Antichain::from_elem(2),
3240                now()
3241            ),
3242            Continue(Since(Antichain::from_elem(2)))
3243        );
3244        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3245        // Equal (no-op)
3246        assert_eq!(
3247            state.collections.downgrade_since(
3248                &reader,
3249                seqno,
3250                None,
3251                &Antichain::from_elem(2),
3252                now()
3253            ),
3254            Continue(Since(Antichain::from_elem(2)))
3255        );
3256        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3257        // Less (no-op)
3258        assert_eq!(
3259            state.collections.downgrade_since(
3260                &reader,
3261                seqno,
3262                None,
3263                &Antichain::from_elem(1),
3264                now()
3265            ),
3266            Continue(Since(Antichain::from_elem(2)))
3267        );
3268        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3269
3270        // Create a second reader.
3271        let reader2 = LeasedReaderId::new();
3272        let _ = state.collections.register_leased_reader(
3273            "",
3274            &reader2,
3275            "",
3276            seqno,
3277            Duration::from_secs(10),
3278            now(),
3279            false,
3280        );
3281
3282        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3283        assert_eq!(
3284            state.collections.downgrade_since(
3285                &reader2,
3286                seqno,
3287                None,
3288                &Antichain::from_elem(3),
3289                now()
3290            ),
3291            Continue(Since(Antichain::from_elem(3)))
3292        );
3293        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3294        // Shard since == 3 when all readers have since >= 3.
3295        assert_eq!(
3296            state.collections.downgrade_since(
3297                &reader,
3298                seqno,
3299                None,
3300                &Antichain::from_elem(5),
3301                now()
3302            ),
3303            Continue(Since(Antichain::from_elem(5)))
3304        );
3305        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3306
3307        // Shard since unaffected readers with since > shard since expiring.
3308        assert_eq!(
3309            state.collections.expire_leased_reader(&reader),
3310            Continue(true)
3311        );
3312        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3313
3314        // Create a third reader.
3315        let reader3 = LeasedReaderId::new();
3316        let _ = state.collections.register_leased_reader(
3317            "",
3318            &reader3,
3319            "",
3320            seqno,
3321            Duration::from_secs(10),
3322            now(),
3323            false,
3324        );
3325
3326        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3327        assert_eq!(
3328            state.collections.downgrade_since(
3329                &reader3,
3330                seqno,
3331                None,
3332                &Antichain::from_elem(10),
3333                now()
3334            ),
3335            Continue(Since(Antichain::from_elem(10)))
3336        );
3337        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3338
3339        // Shard since advances when reader with the minimal since expires.
3340        assert_eq!(
3341            state.collections.expire_leased_reader(&reader2),
3342            Continue(true)
3343        );
3344        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3345        // Switch this assertion back when we re-enable this.
3346        //
3347        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3348        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3349
3350        // Shard since unaffected when all readers are expired.
3351        assert_eq!(
3352            state.collections.expire_leased_reader(&reader3),
3353            Continue(true)
3354        );
3355        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3356        // Switch this assertion back when we re-enable this.
3357        //
3358        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3359        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3360    }
3361
3362    #[mz_ore::test]
3363    fn compare_and_downgrade_since() {
3364        let mut state = TypedState::<(), (), u64, i64>::new(
3365            DUMMY_BUILD_INFO.semver_version(),
3366            ShardId::new(),
3367            "".to_owned(),
3368            0,
3369        );
3370        let reader = CriticalReaderId::new();
3371        let _ = state
3372            .collections
3373            .register_critical_reader::<u64>("", &reader, "");
3374
3375        // The shard global since == 0 initially.
3376        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3377        // The initial opaque value should be set.
3378        assert_eq!(
3379            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3380            u64::initial()
3381        );
3382
3383        // Greater
3384        assert_eq!(
3385            state.collections.compare_and_downgrade_since::<u64>(
3386                &reader,
3387                &u64::initial(),
3388                (&1, &Antichain::from_elem(2)),
3389            ),
3390            Continue(Ok(Since(Antichain::from_elem(2))))
3391        );
3392        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3393        assert_eq!(
3394            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3395            1
3396        );
3397        // Equal (no-op)
3398        assert_eq!(
3399            state.collections.compare_and_downgrade_since::<u64>(
3400                &reader,
3401                &1,
3402                (&2, &Antichain::from_elem(2)),
3403            ),
3404            Continue(Ok(Since(Antichain::from_elem(2))))
3405        );
3406        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3407        assert_eq!(
3408            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3409            2
3410        );
3411        // Less (no-op)
3412        assert_eq!(
3413            state.collections.compare_and_downgrade_since::<u64>(
3414                &reader,
3415                &2,
3416                (&3, &Antichain::from_elem(1)),
3417            ),
3418            Continue(Ok(Since(Antichain::from_elem(2))))
3419        );
3420        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3421        assert_eq!(
3422            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3423            3
3424        );
3425    }
3426
3427    #[mz_ore::test]
3428    fn compare_and_append() {
3429        let state = &mut TypedState::<String, String, u64, i64>::new(
3430            DUMMY_BUILD_INFO.semver_version(),
3431            ShardId::new(),
3432            "".to_owned(),
3433            0,
3434        )
3435        .collections;
3436
3437        let writer_id = WriterId::new();
3438        let now = SYSTEM_TIME.clone();
3439
3440        // State is initially empty.
3441        assert_eq!(state.trace.num_spine_batches(), 0);
3442        assert_eq!(state.trace.num_hollow_batches(), 0);
3443        assert_eq!(state.trace.num_updates(), 0);
3444
3445        // Cannot insert a batch with a lower != current shard upper.
3446        assert_eq!(
3447            state.compare_and_append(
3448                &hollow(1, 2, &["key1"], 1),
3449                &writer_id,
3450                now(),
3451                LEASE_DURATION_MS,
3452                &IdempotencyToken::new(),
3453                &debug_state(),
3454                0,
3455                100,
3456                None
3457            ),
3458            Break(CompareAndAppendBreak::Upper {
3459                shard_upper: Antichain::from_elem(0),
3460                writer_upper: Antichain::from_elem(0)
3461            })
3462        );
3463
3464        // Insert an empty batch with an upper > lower..
3465        assert!(
3466            state
3467                .compare_and_append(
3468                    &hollow(0, 5, &[], 0),
3469                    &writer_id,
3470                    now(),
3471                    LEASE_DURATION_MS,
3472                    &IdempotencyToken::new(),
3473                    &debug_state(),
3474                    0,
3475                    100,
3476                    None
3477                )
3478                .is_continue()
3479        );
3480
3481        // Cannot insert a batch with a upper less than the lower.
3482        assert_eq!(
3483            state.compare_and_append(
3484                &hollow(5, 4, &["key1"], 1),
3485                &writer_id,
3486                now(),
3487                LEASE_DURATION_MS,
3488                &IdempotencyToken::new(),
3489                &debug_state(),
3490                0,
3491                100,
3492                None
3493            ),
3494            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3495                lower: Antichain::from_elem(5),
3496                upper: Antichain::from_elem(4)
3497            }))
3498        );
3499
3500        // Cannot insert a nonempty batch with an upper equal to lower.
3501        assert_eq!(
3502            state.compare_and_append(
3503                &hollow(5, 5, &["key1"], 1),
3504                &writer_id,
3505                now(),
3506                LEASE_DURATION_MS,
3507                &IdempotencyToken::new(),
3508                &debug_state(),
3509                0,
3510                100,
3511                None
3512            ),
3513            Break(CompareAndAppendBreak::InvalidUsage(
3514                InvalidEmptyTimeInterval {
3515                    lower: Antichain::from_elem(5),
3516                    upper: Antichain::from_elem(5),
3517                    keys: vec!["key1".to_owned()],
3518                }
3519            ))
3520        );
3521
3522        // Can insert an empty batch with an upper equal to lower.
3523        assert!(
3524            state
3525                .compare_and_append(
3526                    &hollow(5, 5, &[], 0),
3527                    &writer_id,
3528                    now(),
3529                    LEASE_DURATION_MS,
3530                    &IdempotencyToken::new(),
3531                    &debug_state(),
3532                    0,
3533                    100,
3534                    None
3535                )
3536                .is_continue()
3537        );
3538    }
3539
3540    #[mz_ore::test]
3541    fn snapshot() {
3542        let now = SYSTEM_TIME.clone();
3543
3544        let mut state = TypedState::<String, String, u64, i64>::new(
3545            DUMMY_BUILD_INFO.semver_version(),
3546            ShardId::new(),
3547            "".to_owned(),
3548            0,
3549        );
3550        // Cannot take a snapshot with as_of == shard upper.
3551        assert_eq!(
3552            state.snapshot(&Antichain::from_elem(0)),
3553            Err(SnapshotErr::AsOfNotYetAvailable(
3554                SeqNo(0),
3555                Upper(Antichain::from_elem(0))
3556            ))
3557        );
3558
3559        // Cannot take a snapshot with as_of > shard upper.
3560        assert_eq!(
3561            state.snapshot(&Antichain::from_elem(5)),
3562            Err(SnapshotErr::AsOfNotYetAvailable(
3563                SeqNo(0),
3564                Upper(Antichain::from_elem(0))
3565            ))
3566        );
3567
3568        let writer_id = WriterId::new();
3569
3570        // Advance upper to 5.
3571        assert!(
3572            state
3573                .collections
3574                .compare_and_append(
3575                    &hollow(0, 5, &["key1"], 1),
3576                    &writer_id,
3577                    now(),
3578                    LEASE_DURATION_MS,
3579                    &IdempotencyToken::new(),
3580                    &debug_state(),
3581                    0,
3582                    100,
3583                    None
3584                )
3585                .is_continue()
3586        );
3587
3588        // Can take a snapshot with as_of < upper.
3589        assert_eq!(
3590            state.snapshot(&Antichain::from_elem(0)),
3591            Ok(vec![hollow(0, 5, &["key1"], 1)])
3592        );
3593
3594        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3595        assert_eq!(
3596            state.snapshot(&Antichain::from_elem(4)),
3597            Ok(vec![hollow(0, 5, &["key1"], 1)])
3598        );
3599
3600        // Cannot take a snapshot with as_of >= upper.
3601        assert_eq!(
3602            state.snapshot(&Antichain::from_elem(5)),
3603            Err(SnapshotErr::AsOfNotYetAvailable(
3604                SeqNo(0),
3605                Upper(Antichain::from_elem(5))
3606            ))
3607        );
3608        assert_eq!(
3609            state.snapshot(&Antichain::from_elem(6)),
3610            Err(SnapshotErr::AsOfNotYetAvailable(
3611                SeqNo(0),
3612                Upper(Antichain::from_elem(5))
3613            ))
3614        );
3615
3616        let reader = LeasedReaderId::new();
3617        // Advance the since to 2.
3618        let _ = state.collections.register_leased_reader(
3619            "",
3620            &reader,
3621            "",
3622            SeqNo::minimum(),
3623            Duration::from_secs(10),
3624            now(),
3625            false,
3626        );
3627        assert_eq!(
3628            state.collections.downgrade_since(
3629                &reader,
3630                SeqNo::minimum(),
3631                None,
3632                &Antichain::from_elem(2),
3633                now()
3634            ),
3635            Continue(Since(Antichain::from_elem(2)))
3636        );
3637        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3638        // Cannot take a snapshot with as_of < shard_since.
3639        assert_eq!(
3640            state.snapshot(&Antichain::from_elem(1)),
3641            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3642                Antichain::from_elem(2)
3643            )))
3644        );
3645
3646        // Advance the upper to 10 via an empty batch.
3647        assert!(
3648            state
3649                .collections
3650                .compare_and_append(
3651                    &hollow(5, 10, &[], 0),
3652                    &writer_id,
3653                    now(),
3654                    LEASE_DURATION_MS,
3655                    &IdempotencyToken::new(),
3656                    &debug_state(),
3657                    0,
3658                    100,
3659                    None
3660                )
3661                .is_continue()
3662        );
3663
3664        // Can still take snapshots at times < upper.
3665        assert_eq!(
3666            state.snapshot(&Antichain::from_elem(7)),
3667            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3668        );
3669
3670        // Cannot take snapshots with as_of >= upper.
3671        assert_eq!(
3672            state.snapshot(&Antichain::from_elem(10)),
3673            Err(SnapshotErr::AsOfNotYetAvailable(
3674                SeqNo(0),
3675                Upper(Antichain::from_elem(10))
3676            ))
3677        );
3678
3679        // Advance upper to 15.
3680        assert!(
3681            state
3682                .collections
3683                .compare_and_append(
3684                    &hollow(10, 15, &["key2"], 1),
3685                    &writer_id,
3686                    now(),
3687                    LEASE_DURATION_MS,
3688                    &IdempotencyToken::new(),
3689                    &debug_state(),
3690                    0,
3691                    100,
3692                    None
3693                )
3694                .is_continue()
3695        );
3696
3697        // Filter out batches whose lowers are less than the requested as of (the
3698        // batches that are too far in the future for the requested as_of).
3699        assert_eq!(
3700            state.snapshot(&Antichain::from_elem(9)),
3701            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3702        );
3703
3704        // Don't filter out batches whose lowers are <= the requested as_of.
3705        assert_eq!(
3706            state.snapshot(&Antichain::from_elem(10)),
3707            Ok(vec![
3708                hollow(0, 5, &["key1"], 1),
3709                hollow(5, 10, &[], 0),
3710                hollow(10, 15, &["key2"], 1)
3711            ])
3712        );
3713
3714        assert_eq!(
3715            state.snapshot(&Antichain::from_elem(11)),
3716            Ok(vec![
3717                hollow(0, 5, &["key1"], 1),
3718                hollow(5, 10, &[], 0),
3719                hollow(10, 15, &["key2"], 1)
3720            ])
3721        );
3722    }
3723
3724    #[mz_ore::test]
3725    fn next_listen_batch() {
3726        let mut state = TypedState::<String, String, u64, i64>::new(
3727            DUMMY_BUILD_INFO.semver_version(),
3728            ShardId::new(),
3729            "".to_owned(),
3730            0,
3731        );
3732
3733        // Empty collection never has any batches to listen for, regardless of the
3734        // current frontier.
3735        assert_eq!(
3736            state.next_listen_batch(&Antichain::from_elem(0)),
3737            Err(SeqNo(0))
3738        );
3739        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3740
3741        let writer_id = WriterId::new();
3742        let now = SYSTEM_TIME.clone();
3743
3744        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3745        assert!(
3746            state
3747                .collections
3748                .compare_and_append(
3749                    &hollow(0, 5, &["key1"], 1),
3750                    &writer_id,
3751                    now(),
3752                    LEASE_DURATION_MS,
3753                    &IdempotencyToken::new(),
3754                    &debug_state(),
3755                    0,
3756                    100,
3757                    None
3758                )
3759                .is_continue()
3760        );
3761        assert!(
3762            state
3763                .collections
3764                .compare_and_append(
3765                    &hollow(5, 10, &["key2"], 1),
3766                    &writer_id,
3767                    now(),
3768                    LEASE_DURATION_MS,
3769                    &IdempotencyToken::new(),
3770                    &debug_state(),
3771                    0,
3772                    100,
3773                    None
3774                )
3775                .is_continue()
3776        );
3777
3778        // All frontiers in [0, 5) return the first batch.
3779        for t in 0..=4 {
3780            assert_eq!(
3781                state.next_listen_batch(&Antichain::from_elem(t)),
3782                Ok(hollow(0, 5, &["key1"], 1))
3783            );
3784        }
3785
3786        // All frontiers in [5, 10) return the second batch.
3787        for t in 5..=9 {
3788            assert_eq!(
3789                state.next_listen_batch(&Antichain::from_elem(t)),
3790                Ok(hollow(5, 10, &["key2"], 1))
3791            );
3792        }
3793
3794        // There is no batch currently available for t = 10.
3795        assert_eq!(
3796            state.next_listen_batch(&Antichain::from_elem(10)),
3797            Err(SeqNo(0))
3798        );
3799
3800        // By definition, there is no frontier ever at the empty antichain which
3801        // is the time after all possible times.
3802        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3803    }
3804
3805    #[mz_ore::test]
3806    fn expire_writer() {
3807        let mut state = TypedState::<String, String, u64, i64>::new(
3808            DUMMY_BUILD_INFO.semver_version(),
3809            ShardId::new(),
3810            "".to_owned(),
3811            0,
3812        );
3813        let now = SYSTEM_TIME.clone();
3814
3815        let writer_id_one = WriterId::new();
3816
3817        let writer_id_two = WriterId::new();
3818
3819        // Writer is eligible to write
3820        assert!(
3821            state
3822                .collections
3823                .compare_and_append(
3824                    &hollow(0, 2, &["key1"], 1),
3825                    &writer_id_one,
3826                    now(),
3827                    LEASE_DURATION_MS,
3828                    &IdempotencyToken::new(),
3829                    &debug_state(),
3830                    0,
3831                    100,
3832                    None
3833                )
3834                .is_continue()
3835        );
3836
3837        assert!(
3838            state
3839                .collections
3840                .expire_writer(&writer_id_one)
3841                .is_continue()
3842        );
3843
3844        // Other writers should still be able to write
3845        assert!(
3846            state
3847                .collections
3848                .compare_and_append(
3849                    &hollow(2, 5, &["key2"], 1),
3850                    &writer_id_two,
3851                    now(),
3852                    LEASE_DURATION_MS,
3853                    &IdempotencyToken::new(),
3854                    &debug_state(),
3855                    0,
3856                    100,
3857                    None
3858                )
3859                .is_continue()
3860        );
3861    }
3862
3863    #[mz_ore::test]
3864    fn maybe_gc_active_gc() {
3865        const GC_CONFIG: GcConfig = GcConfig {
3866            use_active_gc: true,
3867            fallback_threshold_ms: 5000,
3868            min_versions: 99,
3869            max_versions: 500,
3870        };
3871        let now_fn = SYSTEM_TIME.clone();
3872
3873        let mut state = TypedState::<String, String, u64, i64>::new(
3874            DUMMY_BUILD_INFO.semver_version(),
3875            ShardId::new(),
3876            "".to_owned(),
3877            0,
3878        );
3879
3880        let now = now_fn();
3881        // Empty state doesn't need gc, regardless of is_write.
3882        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3883        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3884
3885        // Artificially advance the seqno so the seqno_since advances past our
3886        // internal gc_threshold.
3887        state.seqno = SeqNo(100);
3888        assert_eq!(state.seqno_since(), SeqNo(100));
3889
3890        // When a writer is present, non-writes don't gc.
3891        let writer_id = WriterId::new();
3892        let _ = state.collections.compare_and_append(
3893            &hollow(1, 2, &["key1"], 1),
3894            &writer_id,
3895            now,
3896            LEASE_DURATION_MS,
3897            &IdempotencyToken::new(),
3898            &debug_state(),
3899            0,
3900            100,
3901            None,
3902        );
3903        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3904
3905        // A write will gc though.
3906        assert_eq!(
3907            state.maybe_gc(true, now, GC_CONFIG),
3908            Some(GcReq {
3909                shard_id: state.shard_id,
3910                new_seqno_since: SeqNo(100)
3911            })
3912        );
3913
3914        // But if we write down an active gc, we won't gc.
3915        state.collections.active_gc = Some(ActiveGc {
3916            seqno: state.seqno,
3917            start_ms: now,
3918        });
3919
3920        state.seqno = SeqNo(200);
3921        assert_eq!(state.seqno_since(), SeqNo(200));
3922
3923        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3924
3925        state.seqno = SeqNo(300);
3926        assert_eq!(state.seqno_since(), SeqNo(300));
3927        // But if we advance the time past the threshold, we will gc.
3928        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3929        assert_eq!(
3930            state.maybe_gc(true, new_now, GC_CONFIG),
3931            Some(GcReq {
3932                shard_id: state.shard_id,
3933                new_seqno_since: SeqNo(300)
3934            })
3935        );
3936
3937        // Even if the sequence number doesn't pass the threshold, if the
3938        // active gc is expired, we will gc.
3939
3940        state.seqno = SeqNo(301);
3941        assert_eq!(state.seqno_since(), SeqNo(301));
3942        assert_eq!(
3943            state.maybe_gc(true, new_now, GC_CONFIG),
3944            Some(GcReq {
3945                shard_id: state.shard_id,
3946                new_seqno_since: SeqNo(301)
3947            })
3948        );
3949
3950        state.collections.active_gc = None;
3951
3952        // Artificially advance the seqno (again) so the seqno_since advances
3953        // past our internal gc_threshold (again).
3954        state.seqno = SeqNo(400);
3955        assert_eq!(state.seqno_since(), SeqNo(400));
3956
3957        let now = now_fn();
3958
3959        // If there are no writers, even a non-write will gc.
3960        let _ = state.collections.expire_writer(&writer_id);
3961        assert_eq!(
3962            state.maybe_gc(false, now, GC_CONFIG),
3963            Some(GcReq {
3964                shard_id: state.shard_id,
3965                new_seqno_since: SeqNo(400)
3966            })
3967        );
3968
3969        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3970        let previous_seqno = state.seqno;
3971        state.seqno = SeqNo(10_000);
3972        assert_eq!(state.seqno_since(), SeqNo(10_000));
3973
3974        let now = now_fn();
3975        assert_eq!(
3976            state.maybe_gc(true, now, GC_CONFIG),
3977            Some(GcReq {
3978                shard_id: state.shard_id,
3979                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3980            })
3981        );
3982    }
3983
3984    #[mz_ore::test]
3985    fn maybe_gc_classic() {
3986        const GC_CONFIG: GcConfig = GcConfig {
3987            use_active_gc: false,
3988            fallback_threshold_ms: 5000,
3989            min_versions: 16,
3990            max_versions: 128,
3991        };
3992        const NOW_MS: u64 = 0;
3993
3994        let mut state = TypedState::<String, String, u64, i64>::new(
3995            DUMMY_BUILD_INFO.semver_version(),
3996            ShardId::new(),
3997            "".to_owned(),
3998            0,
3999        );
4000
4001        // Empty state doesn't need gc, regardless of is_write.
4002        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
4003        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4004
4005        // Artificially advance the seqno so the seqno_since advances past our
4006        // internal gc_threshold.
4007        state.seqno = SeqNo(100);
4008        assert_eq!(state.seqno_since(), SeqNo(100));
4009
4010        // When a writer is present, non-writes don't gc.
4011        let writer_id = WriterId::new();
4012        let now = SYSTEM_TIME.clone();
4013        let _ = state.collections.compare_and_append(
4014            &hollow(1, 2, &["key1"], 1),
4015            &writer_id,
4016            now(),
4017            LEASE_DURATION_MS,
4018            &IdempotencyToken::new(),
4019            &debug_state(),
4020            0,
4021            100,
4022            None,
4023        );
4024        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4025
4026        // A write will gc though.
4027        assert_eq!(
4028            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4029            Some(GcReq {
4030                shard_id: state.shard_id,
4031                new_seqno_since: SeqNo(100)
4032            })
4033        );
4034
4035        // Artificially advance the seqno (again) so the seqno_since advances
4036        // past our internal gc_threshold (again).
4037        state.seqno = SeqNo(200);
4038        assert_eq!(state.seqno_since(), SeqNo(200));
4039
4040        // If there are no writers, even a non-write will gc.
4041        let _ = state.collections.expire_writer(&writer_id);
4042        assert_eq!(
4043            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4044            Some(GcReq {
4045                shard_id: state.shard_id,
4046                new_seqno_since: SeqNo(200)
4047            })
4048        );
4049    }
4050
4051    #[mz_ore::test]
4052    fn need_rollup_active_rollup() {
4053        const ROLLUP_THRESHOLD: usize = 3;
4054        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4055        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4056        let now = SYSTEM_TIME.clone();
4057
4058        mz_ore::test::init_logging();
4059        let mut state = TypedState::<String, String, u64, i64>::new(
4060            DUMMY_BUILD_INFO.semver_version(),
4061            ShardId::new(),
4062            "".to_owned(),
4063            0,
4064        );
4065
4066        let rollup_seqno = SeqNo(5);
4067        let rollup = HollowRollup {
4068            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4069            encoded_size_bytes: None,
4070        };
4071
4072        assert!(
4073            state
4074                .collections
4075                .add_rollup((rollup_seqno, &rollup))
4076                .is_continue()
4077        );
4078
4079        // shouldn't need a rollup at the seqno of the rollup
4080        state.seqno = SeqNo(5);
4081        assert_none!(state.need_rollup(
4082            ROLLUP_THRESHOLD,
4083            ROLLUP_USE_ACTIVE_ROLLUP,
4084            ROLLUP_FALLBACK_THRESHOLD_MS,
4085            now()
4086        ));
4087
4088        // shouldn't need a rollup at seqnos less than our threshold
4089        state.seqno = SeqNo(6);
4090        assert_none!(state.need_rollup(
4091            ROLLUP_THRESHOLD,
4092            ROLLUP_USE_ACTIVE_ROLLUP,
4093            ROLLUP_FALLBACK_THRESHOLD_MS,
4094            now()
4095        ));
4096        state.seqno = SeqNo(7);
4097        assert_none!(state.need_rollup(
4098            ROLLUP_THRESHOLD,
4099            ROLLUP_USE_ACTIVE_ROLLUP,
4100            ROLLUP_FALLBACK_THRESHOLD_MS,
4101            now()
4102        ));
4103        state.seqno = SeqNo(8);
4104        assert_none!(state.need_rollup(
4105            ROLLUP_THRESHOLD,
4106            ROLLUP_USE_ACTIVE_ROLLUP,
4107            ROLLUP_FALLBACK_THRESHOLD_MS,
4108            now()
4109        ));
4110
4111        let mut current_time = now();
4112        // hit our threshold! we should need a rollup
4113        state.seqno = SeqNo(9);
4114        assert_eq!(
4115            state
4116                .need_rollup(
4117                    ROLLUP_THRESHOLD,
4118                    ROLLUP_USE_ACTIVE_ROLLUP,
4119                    ROLLUP_FALLBACK_THRESHOLD_MS,
4120                    current_time
4121                )
4122                .expect("rollup"),
4123            SeqNo(9)
4124        );
4125
4126        state.collections.active_rollup = Some(ActiveRollup {
4127            seqno: SeqNo(9),
4128            start_ms: current_time,
4129        });
4130
4131        // There is now an active rollup, so we shouldn't need a rollup.
4132        assert_none!(state.need_rollup(
4133            ROLLUP_THRESHOLD,
4134            ROLLUP_USE_ACTIVE_ROLLUP,
4135            ROLLUP_FALLBACK_THRESHOLD_MS,
4136            current_time
4137        ));
4138
4139        state.seqno = SeqNo(10);
4140        // We still don't need a rollup, even though the seqno is greater than
4141        // the rollup threshold.
4142        assert_none!(state.need_rollup(
4143            ROLLUP_THRESHOLD,
4144            ROLLUP_USE_ACTIVE_ROLLUP,
4145            ROLLUP_FALLBACK_THRESHOLD_MS,
4146            current_time
4147        ));
4148
4149        // But if we wait long enough, we should need a rollup again.
4150        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4151        assert_eq!(
4152            state
4153                .need_rollup(
4154                    ROLLUP_THRESHOLD,
4155                    ROLLUP_USE_ACTIVE_ROLLUP,
4156                    ROLLUP_FALLBACK_THRESHOLD_MS,
4157                    current_time
4158                )
4159                .expect("rollup"),
4160            SeqNo(10)
4161        );
4162
4163        state.seqno = SeqNo(9);
4164        // Clear the active rollup and ensure we need a rollup again.
4165        state.collections.active_rollup = None;
4166        let rollup_seqno = SeqNo(9);
4167        let rollup = HollowRollup {
4168            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4169            encoded_size_bytes: None,
4170        };
4171        assert!(
4172            state
4173                .collections
4174                .add_rollup((rollup_seqno, &rollup))
4175                .is_continue()
4176        );
4177
4178        state.seqno = SeqNo(11);
4179        // We shouldn't need a rollup at seqnos less than our threshold
4180        assert_none!(state.need_rollup(
4181            ROLLUP_THRESHOLD,
4182            ROLLUP_USE_ACTIVE_ROLLUP,
4183            ROLLUP_FALLBACK_THRESHOLD_MS,
4184            current_time
4185        ));
4186        // hit our threshold! we should need a rollup
4187        state.seqno = SeqNo(13);
4188        assert_eq!(
4189            state
4190                .need_rollup(
4191                    ROLLUP_THRESHOLD,
4192                    ROLLUP_USE_ACTIVE_ROLLUP,
4193                    ROLLUP_FALLBACK_THRESHOLD_MS,
4194                    current_time
4195                )
4196                .expect("rollup"),
4197            SeqNo(13)
4198        );
4199    }
4200
4201    #[mz_ore::test]
4202    fn need_rollup_classic() {
4203        const ROLLUP_THRESHOLD: usize = 3;
4204        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4205        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4206        const NOW: u64 = 0;
4207
4208        mz_ore::test::init_logging();
4209        let mut state = TypedState::<String, String, u64, i64>::new(
4210            DUMMY_BUILD_INFO.semver_version(),
4211            ShardId::new(),
4212            "".to_owned(),
4213            0,
4214        );
4215
4216        let rollup_seqno = SeqNo(5);
4217        let rollup = HollowRollup {
4218            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4219            encoded_size_bytes: None,
4220        };
4221
4222        assert!(
4223            state
4224                .collections
4225                .add_rollup((rollup_seqno, &rollup))
4226                .is_continue()
4227        );
4228
4229        // shouldn't need a rollup at the seqno of the rollup
4230        state.seqno = SeqNo(5);
4231        assert_none!(state.need_rollup(
4232            ROLLUP_THRESHOLD,
4233            ROLLUP_USE_ACTIVE_ROLLUP,
4234            ROLLUP_FALLBACK_THRESHOLD_MS,
4235            NOW
4236        ));
4237
4238        // shouldn't need a rollup at seqnos less than our threshold
4239        state.seqno = SeqNo(6);
4240        assert_none!(state.need_rollup(
4241            ROLLUP_THRESHOLD,
4242            ROLLUP_USE_ACTIVE_ROLLUP,
4243            ROLLUP_FALLBACK_THRESHOLD_MS,
4244            NOW
4245        ));
4246        state.seqno = SeqNo(7);
4247        assert_none!(state.need_rollup(
4248            ROLLUP_THRESHOLD,
4249            ROLLUP_USE_ACTIVE_ROLLUP,
4250            ROLLUP_FALLBACK_THRESHOLD_MS,
4251            NOW
4252        ));
4253
4254        // hit our threshold! we should need a rollup
4255        state.seqno = SeqNo(8);
4256        assert_eq!(
4257            state
4258                .need_rollup(
4259                    ROLLUP_THRESHOLD,
4260                    ROLLUP_USE_ACTIVE_ROLLUP,
4261                    ROLLUP_FALLBACK_THRESHOLD_MS,
4262                    NOW
4263                )
4264                .expect("rollup"),
4265            SeqNo(8)
4266        );
4267
4268        // but we don't need rollups for every seqno > the threshold
4269        state.seqno = SeqNo(9);
4270        assert_none!(state.need_rollup(
4271            ROLLUP_THRESHOLD,
4272            ROLLUP_USE_ACTIVE_ROLLUP,
4273            ROLLUP_FALLBACK_THRESHOLD_MS,
4274            NOW
4275        ));
4276
4277        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4278        state.seqno = SeqNo(11);
4279        assert_eq!(
4280            state
4281                .need_rollup(
4282                    ROLLUP_THRESHOLD,
4283                    ROLLUP_USE_ACTIVE_ROLLUP,
4284                    ROLLUP_FALLBACK_THRESHOLD_MS,
4285                    NOW
4286                )
4287                .expect("rollup"),
4288            SeqNo(11)
4289        );
4290
4291        // add another rollup and ensure we're always picking the latest
4292        let rollup_seqno = SeqNo(6);
4293        let rollup = HollowRollup {
4294            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4295            encoded_size_bytes: None,
4296        };
4297        assert!(
4298            state
4299                .collections
4300                .add_rollup((rollup_seqno, &rollup))
4301                .is_continue()
4302        );
4303
4304        state.seqno = SeqNo(8);
4305        assert_none!(state.need_rollup(
4306            ROLLUP_THRESHOLD,
4307            ROLLUP_USE_ACTIVE_ROLLUP,
4308            ROLLUP_FALLBACK_THRESHOLD_MS,
4309            NOW
4310        ));
4311        state.seqno = SeqNo(9);
4312        assert_eq!(
4313            state
4314                .need_rollup(
4315                    ROLLUP_THRESHOLD,
4316                    ROLLUP_USE_ACTIVE_ROLLUP,
4317                    ROLLUP_FALLBACK_THRESHOLD_MS,
4318                    NOW
4319                )
4320                .expect("rollup"),
4321            SeqNo(9)
4322        );
4323
4324        // and ensure that after a fallback point, we assign every seqno work
4325        let fallback_seqno = SeqNo(
4326            rollup_seqno.0
4327                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4328        );
4329        state.seqno = fallback_seqno;
4330        assert_eq!(
4331            state
4332                .need_rollup(
4333                    ROLLUP_THRESHOLD,
4334                    ROLLUP_USE_ACTIVE_ROLLUP,
4335                    ROLLUP_FALLBACK_THRESHOLD_MS,
4336                    NOW
4337                )
4338                .expect("rollup"),
4339            fallback_seqno
4340        );
4341        state.seqno = fallback_seqno.next();
4342        assert_eq!(
4343            state
4344                .need_rollup(
4345                    ROLLUP_THRESHOLD,
4346                    ROLLUP_USE_ACTIVE_ROLLUP,
4347                    ROLLUP_FALLBACK_THRESHOLD_MS,
4348                    NOW
4349                )
4350                .expect("rollup"),
4351            fallback_seqno.next()
4352        );
4353    }
4354
4355    #[mz_ore::test]
4356    fn idempotency_token_sentinel() {
4357        assert_eq!(
4358            IdempotencyToken::SENTINEL.to_string(),
4359            "i11111111-1111-1111-1111-111111111111"
4360        );
4361    }
4362
4363    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4364    /// randomness, so that it's deterministic. This lets us assert the
4365    /// serialization of that State against a golden file that's committed,
4366    /// making it easy to see what the serialization (used in an upcoming
4367    /// INSPECT feature) looks like.
4368    ///
4369    /// This golden will have to be updated each time we change State, but
4370    /// that's a feature, not a bug.
4371    #[mz_ore::test]
4372    #[cfg_attr(miri, ignore)] // too slow
4373    fn state_inspect_serde_json() {
4374        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4375        let mut runner = proptest::test_runner::TestRunner::deterministic();
4376        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4377        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4378        assert_eq!(
4379            json.trim(),
4380            STATE_SERDE_JSON.trim(),
4381            "\n\nNEW GOLDEN\n{}\n",
4382            json
4383        );
4384    }
4385
4386    #[mz_persist_proc::test(tokio::test)]
4387    #[cfg_attr(miri, ignore)] // too slow
4388    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4389        let mut clients = new_test_client_cache(&dyncfgs);
4390        let shard_id = ShardId::new();
4391
4392        async fn open_and_write(
4393            clients: &mut PersistClientCache,
4394            version: semver::Version,
4395            shard_id: ShardId,
4396        ) -> Result<(), tokio::task::JoinError> {
4397            clients.cfg.build_version = version.clone();
4398            clients.clear_state_cache();
4399            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4400            // Run in a task so we can catch the panic.
4401            mz_ore::task::spawn(|| version.to_string(), async move {
4402                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4403                let current = *write.upper().as_option().unwrap();
4404                // Do a write so that we tag the state with the version.
4405                write
4406                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4407                    .await;
4408            })
4409            .await
4410        }
4411
4412        // Start at v0.10.0.
4413        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4414        assert_ok!(res);
4415
4416        // Upgrade to v0.11.0 is allowed.
4417        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4418        assert_ok!(res);
4419
4420        // Downgrade to v0.10.0 is allowed.
4421        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4422        assert_ok!(res);
4423
4424        // Downgrade to v0.9.0 is _NOT_ allowed.
4425        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4426        assert!(res.unwrap_err().is_panic());
4427    }
4428
4429    #[mz_ore::test]
4430    fn runid_roundtrip() {
4431        proptest!(|(runid: RunId)| {
4432            let runid_str = runid.to_string();
4433            let parsed = RunId::from_str(&runid_str);
4434            prop_assert_eq!(parsed, Ok(runid));
4435        });
4436    }
4437}