mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Semigroup;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use mz_dyncfg::Config;
34use mz_ore::cast::CastFrom;
35use mz_ore::now::EpochMillis;
36use mz_ore::soft_panic_or_log;
37use mz_ore::vec::PartialOrdVecExt;
38use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
39use mz_persist::location::{Blob, SeqNo};
40use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
41use mz_persist_types::columnar::{ColumnEncoder, Schema};
42use mz_persist_types::schema::{SchemaId, backward_compatible};
43use mz_persist_types::{Codec, Codec64, Opaque};
44use mz_proto::ProtoType;
45use mz_proto::RustType;
46use proptest_derive::Arbitrary;
47use semver::Version;
48use serde::ser::SerializeStruct;
49use serde::{Serialize, Serializer};
50use timely::order::TotalOrder;
51use timely::progress::{Antichain, Timestamp};
52use timely::{Container, PartialOrder};
53use tracing::info;
54use uuid::Uuid;
55
56use crate::critical::CriticalReaderId;
57use crate::error::InvalidUsage;
58use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
59use crate::internal::gc::GcReq;
60use crate::internal::machine::retry_external;
61use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
62use crate::internal::trace::{
63    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
64};
65use crate::metrics::Metrics;
66use crate::read::LeasedReaderId;
67use crate::schema::CaESchema;
68use crate::write::WriterId;
69use crate::{PersistConfig, ShardId};
70
71include!(concat!(
72    env!("OUT_DIR"),
73    "/mz_persist_client.internal.state.rs"
74));
75
76include!(concat!(
77    env!("OUT_DIR"),
78    "/mz_persist_client.internal.diff.rs"
79));
80
81/// Determines how often to write rollups, assigning a maintenance task after
82/// `rollup_threshold` seqnos have passed since the last rollup.
83///
84/// Tuning note: in the absence of a long reader seqno hold, and with
85/// incremental GC, this threshold will determine about how many live diffs are
86/// held in Consensus. Lowering this value decreases the live diff count at the
87/// cost of more maintenance work + blob writes.
88pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
89    "persist_rollup_threshold",
90    128,
91    "The number of seqnos between rollups.",
92);
93
94/// Determines how long to wait before an active rollup is considered
95/// "stuck" and a new rollup is started.
96pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
97    "persist_rollup_fallback_threshold_ms",
98    5000,
99    "The number of milliseconds before a worker claims an already claimed rollup.",
100);
101
102/// Feature flag the new active rollup tracking mechanism.
103/// We musn't enable this until we are fully deployed on the new version.
104pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
105    "persist_rollup_use_active_rollup",
106    false,
107    "Whether to use the new active rollup tracking mechanism.",
108);
109
110/// Determines how long to wait before an active GC is considered
111/// "stuck" and a new GC is started.
112pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
113    "persist_gc_fallback_threshold_ms",
114    900000,
115    "The number of milliseconds before a worker claims an already claimed GC.",
116);
117
118/// See the config description string.
119pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
120    "persist_gc_min_versions",
121    32,
122    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
123);
124
125/// See the config description string.
126pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
127    "persist_gc_max_versions",
128    128_000,
129    "The maximum number of versions to GC in a single GC run.",
130);
131
132/// Feature flag the new active GC tracking mechanism.
133/// We musn't enable this until we are fully deployed on the new version.
134pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
135    "persist_gc_use_active_gc",
136    false,
137    "Whether to use the new active GC tracking mechanism.",
138);
139
140pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
141    "persist_enable_incremental_compaction",
142    false,
143    "Whether to enable incremental compaction.",
144);
145
146/// A token to disambiguate state commands that could not otherwise be
147/// idempotent.
148#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
149#[serde(into = "String")]
150pub struct IdempotencyToken(pub(crate) [u8; 16]);
151
152impl std::fmt::Display for IdempotencyToken {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        write!(f, "i{}", Uuid::from_bytes(self.0))
155    }
156}
157
158impl std::fmt::Debug for IdempotencyToken {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
161    }
162}
163
164impl std::str::FromStr for IdempotencyToken {
165    type Err = String;
166
167    fn from_str(s: &str) -> Result<Self, Self::Err> {
168        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
169    }
170}
171
172impl From<IdempotencyToken> for String {
173    fn from(x: IdempotencyToken) -> Self {
174        x.to_string()
175    }
176}
177
178impl IdempotencyToken {
179    pub(crate) fn new() -> Self {
180        IdempotencyToken(*Uuid::new_v4().as_bytes())
181    }
182    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize)]
186pub struct LeasedReaderState<T> {
187    /// The seqno capability of this reader.
188    pub seqno: SeqNo,
189    /// The since capability of this reader.
190    pub since: Antichain<T>,
191    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
192    pub last_heartbeat_timestamp_ms: u64,
193    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
194    /// after which this reader may be expired
195    pub lease_duration_ms: u64,
196    /// For debugging.
197    pub debug: HandleDebugState,
198}
199
200#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
201#[serde(into = "u64")]
202pub struct OpaqueState(pub [u8; 8]);
203
204impl From<OpaqueState> for u64 {
205    fn from(value: OpaqueState) -> Self {
206        u64::from_le_bytes(value.0)
207    }
208}
209
210#[derive(Clone, Debug, PartialEq, Serialize)]
211pub struct CriticalReaderState<T> {
212    /// The since capability of this reader.
213    pub since: Antichain<T>,
214    /// An opaque token matched on by compare_and_downgrade_since.
215    pub opaque: OpaqueState,
216    /// The [Codec64] used to encode [Self::opaque].
217    pub opaque_codec: String,
218    /// For debugging.
219    pub debug: HandleDebugState,
220}
221
222#[derive(Clone, Debug, PartialEq, Serialize)]
223pub struct WriterState<T> {
224    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
225    pub last_heartbeat_timestamp_ms: u64,
226    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
227    /// after which this writer may be expired
228    pub lease_duration_ms: u64,
229    /// The idempotency token of the most recent successful compare_and_append
230    /// by this writer.
231    pub most_recent_write_token: IdempotencyToken,
232    /// The upper of the most recent successful compare_and_append by this
233    /// writer.
234    pub most_recent_write_upper: Antichain<T>,
235    /// For debugging.
236    pub debug: HandleDebugState,
237}
238
239/// Debugging info for a reader or writer.
240#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
241pub struct HandleDebugState {
242    /// Hostname of the persist user that registered this writer or reader. For
243    /// critical readers, this is the _most recent_ registration.
244    pub hostname: String,
245    /// Plaintext description of this writer or reader's intent.
246    pub purpose: String,
247}
248
249/// Part of the updates in a Batch.
250///
251/// Either a pointer to ones stored in Blob or the updates themselves inlined.
252#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
253#[serde(tag = "type")]
254pub enum BatchPart<T> {
255    Hollow(HollowBatchPart<T>),
256    Inline {
257        updates: LazyInlineBatchPart,
258        ts_rewrite: Option<Antichain<T>>,
259        schema_id: Option<SchemaId>,
260
261        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
262        deprecated_schema_id: Option<SchemaId>,
263    },
264}
265
266fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
267    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
268        let proto = lower.decode()?;
269        let data = ArrayData::from_proto(proto)?;
270        ensure!(data.len() == 1);
271        Ok(ArrayBound::new(make_array(data), 0))
272    };
273
274    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
275
276    match decoded {
277        Ok(bound) => Some(bound),
278        Err(e) => {
279            soft_panic_or_log!("failed to decode bound: {e:#?}");
280            None
281        }
282    }
283}
284
285impl<T> BatchPart<T> {
286    pub fn hollow_bytes(&self) -> usize {
287        match self {
288            BatchPart::Hollow(x) => x.encoded_size_bytes,
289            BatchPart::Inline { .. } => 0,
290        }
291    }
292
293    pub fn is_inline(&self) -> bool {
294        matches!(self, BatchPart::Inline { .. })
295    }
296
297    pub fn inline_bytes(&self) -> usize {
298        match self {
299            BatchPart::Hollow(_) => 0,
300            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
301        }
302    }
303
304    pub fn writer_key(&self) -> Option<WriterKey> {
305        match self {
306            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
307            BatchPart::Inline { .. } => None,
308        }
309    }
310
311    pub fn encoded_size_bytes(&self) -> usize {
312        match self {
313            BatchPart::Hollow(x) => x.encoded_size_bytes,
314            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
315        }
316    }
317
318    // A user-interpretable identifier or description of the part (for logs and
319    // such).
320    pub fn printable_name(&self) -> &str {
321        match self {
322            BatchPart::Hollow(x) => x.key.0.as_str(),
323            BatchPart::Inline { .. } => "<inline>",
324        }
325    }
326
327    pub fn stats(&self) -> Option<&LazyPartStats> {
328        match self {
329            BatchPart::Hollow(x) => x.stats.as_ref(),
330            BatchPart::Inline { .. } => None,
331        }
332    }
333
334    pub fn key_lower(&self) -> &[u8] {
335        match self {
336            BatchPart::Hollow(x) => x.key_lower.as_slice(),
337            // We don't duplicate the lowest key because this can be
338            // considerable overhead for small parts.
339            //
340            // The empty key might not be a tight lower bound, but it is a valid
341            // lower bound. If a caller is interested in a tighter lower bound,
342            // the data is inline.
343            BatchPart::Inline { .. } => &[],
344        }
345    }
346
347    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
348        let part = match self {
349            BatchPart::Hollow(part) => part,
350            BatchPart::Inline { .. } => return None,
351        };
352
353        decode_structured_lower(part.structured_key_lower.as_ref()?)
354    }
355
356    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
357        match self {
358            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
359            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
360        }
361    }
362
363    pub fn schema_id(&self) -> Option<SchemaId> {
364        match self {
365            BatchPart::Hollow(x) => x.schema_id,
366            BatchPart::Inline { schema_id, .. } => *schema_id,
367        }
368    }
369
370    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
371        match self {
372            BatchPart::Hollow(x) => x.deprecated_schema_id,
373            BatchPart::Inline {
374                deprecated_schema_id,
375                ..
376            } => *deprecated_schema_id,
377        }
378    }
379}
380
381impl<T: Timestamp + Codec64> BatchPart<T> {
382    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
383        match self {
384            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
385            BatchPart::Inline { updates, .. } => {
386                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
387                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
388            }
389        }
390    }
391
392    pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
393        match self {
394            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
395            BatchPart::Inline { updates, .. } => updates
396                .decode::<T>(metrics)
397                .expect("valid inline part")
398                .updates
399                .diffs_sum(),
400        }
401    }
402}
403
404/// An ordered list of parts, generally stored as part of a larger run.
405#[derive(Debug, Clone)]
406pub struct HollowRun<T> {
407    /// Pointers usable to retrieve the updates.
408    pub(crate) parts: Vec<RunPart<T>>,
409}
410
411/// A reference to a [HollowRun], including the key in the blob store and some denormalized
412/// metadata.
413#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
414pub struct HollowRunRef<T> {
415    pub key: PartialBatchKey,
416
417    /// The size of the referenced run object, plus all of the hollow objects it contains.
418    pub hollow_bytes: usize,
419
420    /// The size of the largest individual part in the run; useful for sizing compaction.
421    pub max_part_bytes: usize,
422
423    /// The lower bound of the data in this part, ordered by the codec ordering.
424    pub key_lower: Vec<u8>,
425
426    /// The lower bound of the data in this part, ordered by the structured ordering.
427    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
428
429    pub diffs_sum: Option<[u8; 8]>,
430
431    pub(crate) _phantom_data: PhantomData<T>,
432}
433impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
434    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
435        Some(self.cmp(other))
436    }
437}
438
439impl<T: Eq> Ord for HollowRunRef<T> {
440    fn cmp(&self, other: &Self) -> Ordering {
441        self.key.cmp(&other.key)
442    }
443}
444
445impl<T> HollowRunRef<T> {
446    pub fn writer_key(&self) -> Option<WriterKey> {
447        Some(self.key.split()?.0)
448    }
449}
450
451impl<T: Timestamp + Codec64> HollowRunRef<T> {
452    /// Stores the given runs and returns a [HollowRunRef] that points to them.
453    pub async fn set<D: Codec64 + Semigroup>(
454        shard_id: ShardId,
455        blob: &dyn Blob,
456        writer: &WriterKey,
457        data: HollowRun<T>,
458        metrics: &Metrics,
459    ) -> Self {
460        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
461        let max_part_bytes = data
462            .parts
463            .iter()
464            .map(|p| p.max_part_bytes())
465            .max()
466            .unwrap_or(0);
467        let key_lower = data
468            .parts
469            .first()
470            .map_or(vec![], |p| p.key_lower().to_vec());
471        let structured_key_lower = match data.parts.first() {
472            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
473            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
474            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
475        };
476        let diffs_sum = data
477            .parts
478            .iter()
479            .map(|p| {
480                p.diffs_sum::<D>(&metrics.columnar)
481                    .expect("valid diffs sum")
482            })
483            .reduce(|mut a, b| {
484                a.plus_equals(&b);
485                a
486            })
487            .expect("valid diffs sum")
488            .encode();
489
490        let key = PartialBatchKey::new(writer, &PartId::new());
491        let blob_key = key.complete(&shard_id);
492        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
493        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
494            blob.set(&blob_key, bytes.clone())
495        })
496        .await;
497        Self {
498            key,
499            hollow_bytes,
500            max_part_bytes,
501            key_lower,
502            structured_key_lower,
503            diffs_sum: Some(diffs_sum),
504            _phantom_data: Default::default(),
505        }
506    }
507
508    /// Retrieve the [HollowRun] that this reference points to.
509    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
510    /// with the same shard id and backing store.
511    pub async fn get(
512        &self,
513        shard_id: ShardId,
514        blob: &dyn Blob,
515        metrics: &Metrics,
516    ) -> Option<HollowRun<T>> {
517        let blob_key = self.key.complete(&shard_id);
518        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
519            blob.get(&blob_key)
520        })
521        .await?;
522        let proto_runs: ProtoHollowRun =
523            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
524        let runs = proto_runs
525            .into_rust()
526            .expect("illegal state: invalid encoded runs proto");
527        Some(runs)
528    }
529}
530
531/// Part of the updates in a run.
532///
533/// Either a pointer to ones stored in Blob or a single part stored inline.
534#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
535#[serde(untagged)]
536pub enum RunPart<T> {
537    Single(BatchPart<T>),
538    Many(HollowRunRef<T>),
539}
540
541impl<T: Ord> PartialOrd<Self> for RunPart<T> {
542    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
543        Some(self.cmp(other))
544    }
545}
546
547impl<T: Ord> Ord for RunPart<T> {
548    fn cmp(&self, other: &Self) -> Ordering {
549        match (self, other) {
550            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
551            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
552            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
553            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
554        }
555    }
556}
557
558impl<T> RunPart<T> {
559    #[cfg(test)]
560    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
561        match self {
562            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
563            _ => panic!("expected hollow part!"),
564        }
565    }
566
567    pub fn hollow_bytes(&self) -> usize {
568        match self {
569            Self::Single(p) => p.hollow_bytes(),
570            Self::Many(r) => r.hollow_bytes,
571        }
572    }
573
574    pub fn is_inline(&self) -> bool {
575        match self {
576            Self::Single(p) => p.is_inline(),
577            Self::Many(_) => false,
578        }
579    }
580
581    pub fn inline_bytes(&self) -> usize {
582        match self {
583            Self::Single(p) => p.inline_bytes(),
584            Self::Many(_) => 0,
585        }
586    }
587
588    pub fn max_part_bytes(&self) -> usize {
589        match self {
590            Self::Single(p) => p.encoded_size_bytes(),
591            Self::Many(r) => r.max_part_bytes,
592        }
593    }
594
595    pub fn writer_key(&self) -> Option<WriterKey> {
596        match self {
597            Self::Single(p) => p.writer_key(),
598            Self::Many(r) => r.writer_key(),
599        }
600    }
601
602    pub fn encoded_size_bytes(&self) -> usize {
603        match self {
604            Self::Single(p) => p.encoded_size_bytes(),
605            Self::Many(r) => r.hollow_bytes,
606        }
607    }
608
609    pub fn schema_id(&self) -> Option<SchemaId> {
610        match self {
611            Self::Single(p) => p.schema_id(),
612            Self::Many(_) => None,
613        }
614    }
615
616    // A user-interpretable identifier or description of the part (for logs and
617    // such).
618    pub fn printable_name(&self) -> &str {
619        match self {
620            Self::Single(p) => p.printable_name(),
621            Self::Many(r) => r.key.0.as_str(),
622        }
623    }
624
625    pub fn stats(&self) -> Option<&LazyPartStats> {
626        match self {
627            Self::Single(p) => p.stats(),
628            // TODO: if we kept stats we could avoid fetching the metadata here.
629            Self::Many(_) => None,
630        }
631    }
632
633    pub fn key_lower(&self) -> &[u8] {
634        match self {
635            Self::Single(p) => p.key_lower(),
636            Self::Many(r) => r.key_lower.as_slice(),
637        }
638    }
639
640    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
641        match self {
642            Self::Single(p) => p.structured_key_lower(),
643            Self::Many(_) => None,
644        }
645    }
646
647    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
648        match self {
649            Self::Single(p) => p.ts_rewrite(),
650            Self::Many(_) => None,
651        }
652    }
653}
654
655impl<T> RunPart<T>
656where
657    T: Timestamp + Codec64,
658{
659    pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
660        match self {
661            Self::Single(p) => p.diffs_sum(metrics),
662            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
663        }
664    }
665}
666
667/// A blob was missing!
668#[derive(Clone, Debug)]
669pub struct MissingBlob(BlobKey);
670
671impl std::fmt::Display for MissingBlob {
672    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
673        write!(f, "unexpectedly missing key: {}", self.0)
674    }
675}
676
677impl std::error::Error for MissingBlob {}
678
679impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
680    pub fn part_stream<'a>(
681        &'a self,
682        shard_id: ShardId,
683        blob: &'a dyn Blob,
684        metrics: &'a Metrics,
685    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
686        try_stream! {
687            match self {
688                RunPart::Single(p) => {
689                    yield Cow::Borrowed(p);
690                }
691                RunPart::Many(r) => {
692                    let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
693                    for run_part in fetched.parts {
694                        for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
695                            yield Cow::Owned(batch_part?.into_owned());
696                        }
697                    }
698                }
699            }
700        }
701    }
702}
703
704impl<T: Ord> PartialOrd for BatchPart<T> {
705    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
706        Some(self.cmp(other))
707    }
708}
709
710impl<T: Ord> Ord for BatchPart<T> {
711    fn cmp(&self, other: &Self) -> Ordering {
712        match (self, other) {
713            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
714            (
715                BatchPart::Inline {
716                    updates: s_updates,
717                    ts_rewrite: s_ts_rewrite,
718                    schema_id: s_schema_id,
719                    deprecated_schema_id: s_deprecated_schema_id,
720                },
721                BatchPart::Inline {
722                    updates: o_updates,
723                    ts_rewrite: o_ts_rewrite,
724                    schema_id: o_schema_id,
725                    deprecated_schema_id: o_deprecated_schema_id,
726                },
727            ) => (
728                s_updates,
729                s_ts_rewrite.as_ref().map(|x| x.elements()),
730                s_schema_id,
731                s_deprecated_schema_id,
732            )
733                .cmp(&(
734                    o_updates,
735                    o_ts_rewrite.as_ref().map(|x| x.elements()),
736                    o_schema_id,
737                    o_deprecated_schema_id,
738                )),
739            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
740            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
741        }
742    }
743}
744
745/// What order are the parts in this run in?
746#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
747pub(crate) enum RunOrder {
748    /// They're in no particular order.
749    Unordered,
750    /// They're ordered based on the codec-encoded K/V bytes.
751    Codec,
752    /// They're ordered by the natural ordering of the structured data.
753    Structured,
754}
755
756#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
757pub struct RunId(pub(crate) [u8; 16]);
758
759impl std::fmt::Display for RunId {
760    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
761        write!(f, "ri{}", Uuid::from_bytes(self.0))
762    }
763}
764
765impl std::fmt::Debug for RunId {
766    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
767        write!(f, "RunId({})", Uuid::from_bytes(self.0))
768    }
769}
770
771impl std::str::FromStr for RunId {
772    type Err = String;
773
774    fn from_str(s: &str) -> Result<Self, Self::Err> {
775        parse_id("ri", "RunId", s).map(RunId)
776    }
777}
778
779impl From<RunId> for String {
780    fn from(x: RunId) -> Self {
781        x.to_string()
782    }
783}
784
785impl RunId {
786    pub(crate) fn new() -> Self {
787        RunId(*Uuid::new_v4().as_bytes())
788    }
789}
790
791impl Arbitrary for RunId {
792    type Parameters = ();
793    type Strategy = proptest::strategy::BoxedStrategy<Self>;
794    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
795        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
796            RunId(*Uuid::from_u128(n).as_bytes())
797        })
798        .boxed()
799    }
800}
801
802/// Metadata shared across a run.
803#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
804pub struct RunMeta {
805    /// If none, Persist should infer the order based on the proto metadata.
806    pub(crate) order: Option<RunOrder>,
807    /// All parts in a run should have the same schema.
808    pub(crate) schema: Option<SchemaId>,
809
810    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
811    pub(crate) deprecated_schema: Option<SchemaId>,
812
813    /// If set, a UUID that uniquely identifies this run.
814    pub(crate) id: Option<RunId>,
815
816    /// The number of updates in this run, or `None` if the number is unknown.
817    pub(crate) len: Option<usize>,
818}
819
820/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
821#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823    /// Pointer usable to retrieve the updates.
824    pub key: PartialBatchKey,
825    /// The encoded size of this part.
826    pub encoded_size_bytes: usize,
827    /// A lower bound on the keys in the part. (By default, this the minimum
828    /// possible key: `vec![]`.)
829    #[serde(serialize_with = "serialize_part_bytes")]
830    pub key_lower: Vec<u8>,
831    /// A lower bound on the keys in the part, stored as structured data.
832    #[serde(serialize_with = "serialize_lazy_proto")]
833    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
834    /// Aggregate statistics about data contained in this part.
835    #[serde(serialize_with = "serialize_part_stats")]
836    pub stats: Option<LazyPartStats>,
837    /// A frontier to which timestamps in this part are advanced on read, if
838    /// set.
839    ///
840    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
841    /// but we maintain the distinction between the two for some internal sanity
842    /// checking of invariants as well as metrics. If this ever becomes an
843    /// issue, everything still works with this as just `Antichain<T>`.
844    pub ts_rewrite: Option<Antichain<T>>,
845    /// A Codec64 encoded sum of all diffs in this part, if known.
846    ///
847    /// This is `None` if this part was written before we started storing this
848    /// information, or if it was written when the dyncfg was off.
849    ///
850    /// It could also make sense to model this as part of the pushdown stats, if
851    /// we later decide that's of some benefit.
852    #[serde(serialize_with = "serialize_diffs_sum")]
853    pub diffs_sum: Option<[u8; 8]>,
854    /// Columnar format that this batch was written in.
855    ///
856    /// This is `None` if this part was written before we started writing structured
857    /// columnar data.
858    pub format: Option<BatchColumnarFormat>,
859    /// The schemas used to encode the data in this batch part.
860    ///
861    /// Or None for historical data written before the schema registry was
862    /// added.
863    pub schema_id: Option<SchemaId>,
864
865    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
866    pub deprecated_schema_id: Option<SchemaId>,
867}
868
869/// A [Batch] but with the updates themselves stored externally.
870///
871/// [Batch]: differential_dataflow::trace::BatchReader
872#[derive(Clone, PartialEq, Eq)]
873pub struct HollowBatch<T> {
874    /// Describes the times of the updates in the batch.
875    pub desc: Description<T>,
876    /// The number of updates in the batch.
877    pub len: usize,
878    /// Pointers usable to retrieve the updates.
879    pub(crate) parts: Vec<RunPart<T>>,
880    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
881    /// ex.
882    /// ```text
883    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
884    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
885    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
886    /// ```
887    pub(crate) run_splits: Vec<usize>,
888    /// Run-level metadata: the first entry has metadata for the first run, and so on.
889    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
890    pub(crate) run_meta: Vec<RunMeta>,
891}
892
893impl<T: Debug> Debug for HollowBatch<T> {
894    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
895        let HollowBatch {
896            desc,
897            parts,
898            len,
899            run_splits: runs,
900            run_meta,
901        } = self;
902        f.debug_struct("HollowBatch")
903            .field(
904                "desc",
905                &(
906                    desc.lower().elements(),
907                    desc.upper().elements(),
908                    desc.since().elements(),
909                ),
910            )
911            .field("parts", &parts)
912            .field("len", &len)
913            .field("runs", &runs)
914            .field("run_meta", &run_meta)
915            .finish()
916    }
917}
918
919impl<T: Serialize> serde::Serialize for HollowBatch<T> {
920    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
921        let HollowBatch {
922            desc,
923            len,
924            // Both parts and runs are covered by the self.runs call.
925            parts: _,
926            run_splits: _,
927            run_meta: _,
928        } = self;
929        let mut s = s.serialize_struct("HollowBatch", 5)?;
930        let () = s.serialize_field("lower", &desc.lower().elements())?;
931        let () = s.serialize_field("upper", &desc.upper().elements())?;
932        let () = s.serialize_field("since", &desc.since().elements())?;
933        let () = s.serialize_field("len", len)?;
934        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
935        s.end()
936    }
937}
938
939impl<T: Ord> PartialOrd for HollowBatch<T> {
940    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
941        Some(self.cmp(other))
942    }
943}
944
945impl<T: Ord> Ord for HollowBatch<T> {
946    fn cmp(&self, other: &Self) -> Ordering {
947        // Deconstruct self and other so we get a compile failure if new fields
948        // are added.
949        let HollowBatch {
950            desc: self_desc,
951            parts: self_parts,
952            len: self_len,
953            run_splits: self_runs,
954            run_meta: self_run_meta,
955        } = self;
956        let HollowBatch {
957            desc: other_desc,
958            parts: other_parts,
959            len: other_len,
960            run_splits: other_runs,
961            run_meta: other_run_meta,
962        } = other;
963        (
964            self_desc.lower().elements(),
965            self_desc.upper().elements(),
966            self_desc.since().elements(),
967            self_parts,
968            self_len,
969            self_runs,
970            self_run_meta,
971        )
972            .cmp(&(
973                other_desc.lower().elements(),
974                other_desc.upper().elements(),
975                other_desc.since().elements(),
976                other_parts,
977                other_len,
978                other_runs,
979                other_run_meta,
980            ))
981    }
982}
983
984impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
985    pub(crate) fn part_stream<'a>(
986        &'a self,
987        shard_id: ShardId,
988        blob: &'a dyn Blob,
989        metrics: &'a Metrics,
990    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
991        stream! {
992            for part in &self.parts {
993                for await part in part.part_stream(shard_id, blob, metrics) {
994                    yield part;
995                }
996            }
997        }
998    }
999}
1000impl<T> HollowBatch<T> {
1001    /// Construct an in-memory hollow batch from the given metadata.
1002    ///
1003    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1004    /// is responsible for ensuring that the defined runs are valid.
1005    ///
1006    /// `len` should represent the number of valid updates in the referenced parts.
1007    pub(crate) fn new(
1008        desc: Description<T>,
1009        parts: Vec<RunPart<T>>,
1010        len: usize,
1011        run_meta: Vec<RunMeta>,
1012        run_splits: Vec<usize>,
1013    ) -> Self {
1014        debug_assert!(
1015            run_splits.is_strictly_sorted(),
1016            "run indices should be strictly increasing"
1017        );
1018        debug_assert!(
1019            run_splits.first().map_or(true, |i| *i > 0),
1020            "run indices should be positive"
1021        );
1022        debug_assert!(
1023            run_splits.last().map_or(true, |i| *i < parts.len()),
1024            "run indices should be valid indices into parts"
1025        );
1026        debug_assert!(
1027            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1028            "all metadata should correspond to a run"
1029        );
1030
1031        Self {
1032            desc,
1033            len,
1034            parts,
1035            run_splits,
1036            run_meta,
1037        }
1038    }
1039
1040    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1041    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1042        let run_meta = if parts.is_empty() {
1043            vec![]
1044        } else {
1045            vec![RunMeta::default()]
1046        };
1047        Self {
1048            desc,
1049            len,
1050            parts,
1051            run_splits: vec![],
1052            run_meta,
1053        }
1054    }
1055
1056    #[cfg(test)]
1057    pub(crate) fn new_run_for_test(
1058        desc: Description<T>,
1059        parts: Vec<RunPart<T>>,
1060        len: usize,
1061        run_id: RunId,
1062    ) -> Self {
1063        let run_meta = if parts.is_empty() {
1064            vec![]
1065        } else {
1066            let mut meta = RunMeta::default();
1067            meta.id = Some(run_id);
1068            vec![meta]
1069        };
1070        Self {
1071            desc,
1072            len,
1073            parts,
1074            run_splits: vec![],
1075            run_meta,
1076        }
1077    }
1078
1079    /// An empty hollow batch, representing no updates over the given desc.
1080    pub(crate) fn empty(desc: Description<T>) -> Self {
1081        Self {
1082            desc,
1083            len: 0,
1084            parts: vec![],
1085            run_splits: vec![],
1086            run_meta: vec![],
1087        }
1088    }
1089
1090    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1091        let run_ends = self
1092            .run_splits
1093            .iter()
1094            .copied()
1095            .chain(std::iter::once(self.parts.len()));
1096        let run_metas = self.run_meta.iter();
1097        let run_parts = run_ends
1098            .scan(0, |start, end| {
1099                let range = *start..end;
1100                *start = end;
1101                Some(range)
1102            })
1103            .filter(|range| !range.is_empty())
1104            .map(|range| &self.parts[range]);
1105        run_metas.zip(run_parts)
1106    }
1107
1108    pub(crate) fn inline_bytes(&self) -> usize {
1109        self.parts.iter().map(|x| x.inline_bytes()).sum()
1110    }
1111
1112    pub(crate) fn is_empty(&self) -> bool {
1113        self.parts.is_empty()
1114    }
1115
1116    pub(crate) fn part_count(&self) -> usize {
1117        self.parts.len()
1118    }
1119
1120    /// The sum of the encoded sizes of all parts in the batch.
1121    pub fn encoded_size_bytes(&self) -> usize {
1122        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1123    }
1124}
1125
1126// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1127impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1128    pub(crate) fn rewrite_ts(
1129        &mut self,
1130        frontier: &Antichain<T>,
1131        new_upper: Antichain<T>,
1132    ) -> Result<(), String> {
1133        if !PartialOrder::less_than(frontier, &new_upper) {
1134            return Err(format!(
1135                "rewrite frontier {:?} !< rewrite upper {:?}",
1136                frontier.elements(),
1137                new_upper.elements(),
1138            ));
1139        }
1140        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1141            return Err(format!(
1142                "rewrite upper {:?} < batch upper {:?}",
1143                new_upper.elements(),
1144                self.desc.upper().elements(),
1145            ));
1146        }
1147
1148        // The following are things that it seems like we could support, but
1149        // initially we don't because we don't have a use case for them.
1150        if PartialOrder::less_than(frontier, self.desc.lower()) {
1151            return Err(format!(
1152                "rewrite frontier {:?} < batch lower {:?}",
1153                frontier.elements(),
1154                self.desc.lower().elements(),
1155            ));
1156        }
1157        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1158            return Err(format!(
1159                "batch since {:?} != minimum antichain {:?}",
1160                self.desc.since().elements(),
1161                &[T::minimum()],
1162            ));
1163        }
1164        for part in self.parts.iter() {
1165            let Some(ts_rewrite) = part.ts_rewrite() else {
1166                continue;
1167            };
1168            if PartialOrder::less_than(frontier, ts_rewrite) {
1169                return Err(format!(
1170                    "rewrite frontier {:?} < batch rewrite {:?}",
1171                    frontier.elements(),
1172                    ts_rewrite.elements(),
1173                ));
1174            }
1175        }
1176
1177        self.desc = Description::new(
1178            self.desc.lower().clone(),
1179            new_upper,
1180            self.desc.since().clone(),
1181        );
1182        for part in &mut self.parts {
1183            match part {
1184                RunPart::Single(BatchPart::Hollow(part)) => {
1185                    part.ts_rewrite = Some(frontier.clone())
1186                }
1187                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1188                    *ts_rewrite = Some(frontier.clone())
1189                }
1190                RunPart::Many(runs) => {
1191                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1192                    // ever generate runs of >1 part for those.
1193                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1194                }
1195            }
1196        }
1197        Ok(())
1198    }
1199}
1200
1201impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1202    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1203        Some(self.cmp(other))
1204    }
1205}
1206
1207impl<T: Ord> Ord for HollowBatchPart<T> {
1208    fn cmp(&self, other: &Self) -> Ordering {
1209        // Deconstruct self and other so we get a compile failure if new fields
1210        // are added.
1211        let HollowBatchPart {
1212            key: self_key,
1213            encoded_size_bytes: self_encoded_size_bytes,
1214            key_lower: self_key_lower,
1215            structured_key_lower: self_structured_key_lower,
1216            stats: self_stats,
1217            ts_rewrite: self_ts_rewrite,
1218            diffs_sum: self_diffs_sum,
1219            format: self_format,
1220            schema_id: self_schema_id,
1221            deprecated_schema_id: self_deprecated_schema_id,
1222        } = self;
1223        let HollowBatchPart {
1224            key: other_key,
1225            encoded_size_bytes: other_encoded_size_bytes,
1226            key_lower: other_key_lower,
1227            structured_key_lower: other_structured_key_lower,
1228            stats: other_stats,
1229            ts_rewrite: other_ts_rewrite,
1230            diffs_sum: other_diffs_sum,
1231            format: other_format,
1232            schema_id: other_schema_id,
1233            deprecated_schema_id: other_deprecated_schema_id,
1234        } = other;
1235        (
1236            self_key,
1237            self_encoded_size_bytes,
1238            self_key_lower,
1239            self_structured_key_lower,
1240            self_stats,
1241            self_ts_rewrite.as_ref().map(|x| x.elements()),
1242            self_diffs_sum,
1243            self_format,
1244            self_schema_id,
1245            self_deprecated_schema_id,
1246        )
1247            .cmp(&(
1248                other_key,
1249                other_encoded_size_bytes,
1250                other_key_lower,
1251                other_structured_key_lower,
1252                other_stats,
1253                other_ts_rewrite.as_ref().map(|x| x.elements()),
1254                other_diffs_sum,
1255                other_format,
1256                other_schema_id,
1257                other_deprecated_schema_id,
1258            ))
1259    }
1260}
1261
1262/// A pointer to a rollup stored externally.
1263#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1264pub struct HollowRollup {
1265    /// Pointer usable to retrieve the rollup.
1266    pub key: PartialRollupKey,
1267    /// The encoded size of this rollup, if known.
1268    pub encoded_size_bytes: Option<usize>,
1269}
1270
1271/// A pointer to a blob stored externally.
1272#[derive(Debug)]
1273pub enum HollowBlobRef<'a, T> {
1274    Batch(&'a HollowBatch<T>),
1275    Rollup(&'a HollowRollup),
1276}
1277
1278/// A rollup that is currently being computed.
1279#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1280pub struct ActiveRollup {
1281    pub seqno: SeqNo,
1282    pub start_ms: u64,
1283}
1284
1285/// A garbage collection request that is currently being computed.
1286#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1287pub struct ActiveGc {
1288    pub seqno: SeqNo,
1289    pub start_ms: u64,
1290}
1291
1292/// A sentinel for a state transition that was a no-op.
1293///
1294/// Critically, this also indicates that the no-op state transition was not
1295/// committed through compare_and_append and thus is _not linearized_.
1296#[derive(Debug)]
1297#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1298pub struct NoOpStateTransition<T>(pub T);
1299
1300// TODO: Document invariants.
1301#[derive(Debug, Clone)]
1302#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1303pub struct StateCollections<T> {
1304    // - Invariant: `<= all reader.since`
1305    // - Invariant: Doesn't regress across state versions.
1306    pub(crate) last_gc_req: SeqNo,
1307
1308    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1309    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1310
1311    /// The rollup that is currently being computed.
1312    pub(crate) active_rollup: Option<ActiveRollup>,
1313    /// The gc request that is currently being computed.
1314    pub(crate) active_gc: Option<ActiveGc>,
1315
1316    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1317    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1318    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1319    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1320
1321    // - Invariant: `trace.since == meet(all reader.since)`
1322    // - Invariant: `trace.since` doesn't regress across state versions.
1323    // - Invariant: `trace.upper` doesn't regress across state versions.
1324    // - Invariant: `trace` upholds its own invariants.
1325    pub(crate) trace: Trace<T>,
1326}
1327
1328/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1329///
1330/// This strategy of directly serializing the schema objects requires that
1331/// persist users do the right thing. Specifically, that an encoded schema
1332/// doesn't in some later version of mz decode to an in-mem object that acts
1333/// differently. In a sense, the current system (before the introduction of the
1334/// schema registry) where schemas are passed in unchecked to reader and writer
1335/// registration calls also has the same defect, so seems fine.
1336///
1337/// An alternative is to write down here some persist-specific representation of
1338/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1339/// the potential to lead down a similar failure mode to the mz_persist_types
1340/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1341/// can always migrate later by rehydrating these, seems fine to start with the
1342/// easy thing.
1343#[derive(Debug, Clone, Serialize, PartialEq)]
1344pub struct EncodedSchemas {
1345    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1346    pub key: Bytes,
1347    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1348    /// registered, encoded as a `ProtoDataType`.
1349    pub key_data_type: Bytes,
1350    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1351    pub val: Bytes,
1352    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1353    /// registered, encoded as a `ProtoDataType`.
1354    pub val_data_type: Bytes,
1355}
1356
1357impl EncodedSchemas {
1358    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1359        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1360        DataType::from_proto(proto).expect("valid DataType")
1361    }
1362}
1363
1364#[derive(Debug)]
1365#[cfg_attr(test, derive(PartialEq))]
1366pub enum CompareAndAppendBreak<T> {
1367    AlreadyCommitted,
1368    Upper {
1369        shard_upper: Antichain<T>,
1370        writer_upper: Antichain<T>,
1371    },
1372    InvalidUsage(InvalidUsage<T>),
1373    InlineBackpressure,
1374}
1375
1376#[derive(Debug)]
1377#[cfg_attr(test, derive(PartialEq))]
1378pub enum SnapshotErr<T> {
1379    AsOfNotYetAvailable(SeqNo, Upper<T>),
1380    AsOfHistoricalDistinctionsLost(Since<T>),
1381}
1382
1383impl<T> StateCollections<T>
1384where
1385    T: Timestamp + Lattice + Codec64,
1386{
1387    pub fn add_rollup(
1388        &mut self,
1389        add_rollup: (SeqNo, &HollowRollup),
1390    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1391        let (rollup_seqno, rollup) = add_rollup;
1392        let applied = match self.rollups.get(&rollup_seqno) {
1393            Some(x) => x.key == rollup.key,
1394            None => {
1395                self.active_rollup = None;
1396                self.rollups.insert(rollup_seqno, rollup.to_owned());
1397                true
1398            }
1399        };
1400        // This state transition is a no-op if applied is false but we
1401        // still commit the state change so that this gets linearized
1402        // (maybe we're looking at old state).
1403        Continue(applied)
1404    }
1405
1406    pub fn remove_rollups(
1407        &mut self,
1408        remove_rollups: &[(SeqNo, PartialRollupKey)],
1409    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1410        if remove_rollups.is_empty() || self.is_tombstone() {
1411            return Break(NoOpStateTransition(vec![]));
1412        }
1413
1414        //This state transition is called at the end of the GC process, so we
1415        //need to unset the `active_gc` field.
1416        self.active_gc = None;
1417
1418        let mut removed = vec![];
1419        for (seqno, key) in remove_rollups {
1420            let removed_key = self.rollups.remove(seqno);
1421            debug_assert!(
1422                removed_key.as_ref().map_or(true, |x| &x.key == key),
1423                "{} vs {:?}",
1424                key,
1425                removed_key
1426            );
1427
1428            if removed_key.is_some() {
1429                removed.push(*seqno);
1430            }
1431        }
1432
1433        Continue(removed)
1434    }
1435
1436    pub fn register_leased_reader(
1437        &mut self,
1438        hostname: &str,
1439        reader_id: &LeasedReaderId,
1440        purpose: &str,
1441        seqno: SeqNo,
1442        lease_duration: Duration,
1443        heartbeat_timestamp_ms: u64,
1444        use_critical_since: bool,
1445    ) -> ControlFlow<
1446        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1447        (LeasedReaderState<T>, SeqNo),
1448    > {
1449        let since = if use_critical_since {
1450            self.critical_since()
1451                .unwrap_or_else(|| self.trace.since().clone())
1452        } else {
1453            self.trace.since().clone()
1454        };
1455        let reader_state = LeasedReaderState {
1456            debug: HandleDebugState {
1457                hostname: hostname.to_owned(),
1458                purpose: purpose.to_owned(),
1459            },
1460            seqno,
1461            since,
1462            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1463            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1464                .expect("lease duration as millis should fit within u64"),
1465        };
1466
1467        // If the shard-global upper and since are both the empty antichain,
1468        // then no further writes can ever commit and no further reads can be
1469        // served. Optimize this by no-op-ing reader registration so that we can
1470        // settle the shard into a final unchanging tombstone state.
1471        if self.is_tombstone() {
1472            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1473        }
1474
1475        // TODO: Handle if the reader or writer already exists.
1476        self.leased_readers
1477            .insert(reader_id.clone(), reader_state.clone());
1478        Continue((reader_state, self.seqno_since(seqno)))
1479    }
1480
1481    pub fn register_critical_reader<O: Opaque + Codec64>(
1482        &mut self,
1483        hostname: &str,
1484        reader_id: &CriticalReaderId,
1485        purpose: &str,
1486    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1487        let state = CriticalReaderState {
1488            debug: HandleDebugState {
1489                hostname: hostname.to_owned(),
1490                purpose: purpose.to_owned(),
1491            },
1492            since: self.trace.since().clone(),
1493            opaque: OpaqueState(Codec64::encode(&O::initial())),
1494            opaque_codec: O::codec_name(),
1495        };
1496
1497        // We expire all readers if the upper and since both advance to the
1498        // empty antichain. Gracefully handle this. At the same time,
1499        // short-circuit the cmd application so we don't needlessly create new
1500        // SeqNos.
1501        if self.is_tombstone() {
1502            return Break(NoOpStateTransition(state));
1503        }
1504
1505        let state = match self.critical_readers.get_mut(reader_id) {
1506            Some(existing_state) => {
1507                existing_state.debug = state.debug;
1508                existing_state.clone()
1509            }
1510            None => {
1511                self.critical_readers
1512                    .insert(reader_id.clone(), state.clone());
1513                state
1514            }
1515        };
1516        Continue(state)
1517    }
1518
1519    pub fn register_schema<K: Codec, V: Codec>(
1520        &mut self,
1521        key_schema: &K::Schema,
1522        val_schema: &V::Schema,
1523    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1524        fn encode_data_type(data_type: &DataType) -> Bytes {
1525            let proto = data_type.into_proto();
1526            prost::Message::encode_to_vec(&proto).into()
1527        }
1528
1529        // Look for an existing registered SchemaId for these schemas.
1530        //
1531        // The common case is that this should be a recent one, so as a minor
1532        // optimization, do this search in reverse order.
1533        //
1534        // TODO: Note that this impl is `O(schemas)`. Combined with the
1535        // possibility of cmd retries, it's possible but unlikely for this to
1536        // get expensive. We could maintain a reverse map to speed this up in
1537        // necessary. This would either need to work on the encoded
1538        // representation (which, we'd have to fall back to the linear scan) or
1539        // we'd need to add a Hash/Ord bound to Schema.
1540        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1541            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1542        });
1543        match existing_id {
1544            Some((schema_id, _)) => {
1545                // TODO: Validate that the decoded schemas still produce records
1546                // of the recorded DataType, to detect shenanigans. Probably
1547                // best to wait until we've turned on Schema2 in prod and thus
1548                // committed to the current mappings.
1549                Break(NoOpStateTransition(Some(*schema_id)))
1550            }
1551            None if self.is_tombstone() => {
1552                // TODO: Is this right?
1553                Break(NoOpStateTransition(None))
1554            }
1555            None if self.schemas.is_empty() => {
1556                // We'll have to do something more sophisticated here to
1557                // generate the next id if/when we start supporting the removal
1558                // of schemas.
1559                let id = SchemaId(self.schemas.len());
1560                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1561                    .expect("valid key schema");
1562                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1563                    .expect("valid val schema");
1564                let prev = self.schemas.insert(
1565                    id,
1566                    EncodedSchemas {
1567                        key: K::encode_schema(key_schema),
1568                        key_data_type: encode_data_type(&key_data_type),
1569                        val: V::encode_schema(val_schema),
1570                        val_data_type: encode_data_type(&val_data_type),
1571                    },
1572                );
1573                assert_eq!(prev, None);
1574                Continue(Some(id))
1575            }
1576            None => {
1577                info!(
1578                    "register_schemas got {:?} expected {:?}",
1579                    key_schema,
1580                    self.schemas
1581                        .iter()
1582                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1583                        .collect::<Vec<_>>()
1584                );
1585                // Until we implement persist schema changes, only allow at most
1586                // one registered schema.
1587                Break(NoOpStateTransition(None))
1588            }
1589        }
1590    }
1591
1592    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1593        &mut self,
1594        expected: SchemaId,
1595        key_schema: &K::Schema,
1596        val_schema: &V::Schema,
1597    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1598        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1599            // To be defensive, create an empty batch and inspect the resulting
1600            // data type (as opposed to something like allowing the `Schema` to
1601            // declare the DataType).
1602            let array = Schema::encoder(schema).expect("valid schema").finish();
1603            Array::data_type(&array).clone()
1604        }
1605
1606        let (current_id, current) = self
1607            .schemas
1608            .last_key_value()
1609            .expect("all shards have a schema");
1610        if *current_id != expected {
1611            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1612                schema_id: *current_id,
1613                key: K::decode_schema(&current.key),
1614                val: V::decode_schema(&current.val),
1615            }));
1616        }
1617
1618        let current_key = K::decode_schema(&current.key);
1619        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1620        let current_val = V::decode_schema(&current.val);
1621        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1622
1623        let key_dt = data_type(key_schema);
1624        let val_dt = data_type(val_schema);
1625
1626        // If the schema is exactly the same as the current one, no-op.
1627        if current_key == *key_schema
1628            && current_key_dt == key_dt
1629            && current_val == *val_schema
1630            && current_val_dt == val_dt
1631        {
1632            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1633        }
1634
1635        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1636        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1637        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1638            return Break(NoOpStateTransition(CaESchema::Incompatible));
1639        };
1640        // Persist initially disallows dropping columns. This would require a
1641        // bunch more work (e.g. not safe to use the latest schema in
1642        // compaction) and isn't initially necessary in mz.
1643        if key_fn.contains_drop() || val_fn.contains_drop() {
1644            return Break(NoOpStateTransition(CaESchema::Incompatible));
1645        }
1646
1647        // We'll have to do something more sophisticated here to
1648        // generate the next id if/when we start supporting the removal
1649        // of schemas.
1650        let id = SchemaId(self.schemas.len());
1651        self.schemas.insert(
1652            id,
1653            EncodedSchemas {
1654                key: K::encode_schema(key_schema),
1655                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1656                val: V::encode_schema(val_schema),
1657                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1658            },
1659        );
1660        Continue(CaESchema::Ok(id))
1661    }
1662
1663    pub fn compare_and_append(
1664        &mut self,
1665        batch: &HollowBatch<T>,
1666        writer_id: &WriterId,
1667        heartbeat_timestamp_ms: u64,
1668        lease_duration_ms: u64,
1669        idempotency_token: &IdempotencyToken,
1670        debug_info: &HandleDebugState,
1671        inline_writes_total_max_bytes: usize,
1672        claim_compaction_percent: usize,
1673        claim_compaction_min_version: Option<&Version>,
1674    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1675        // We expire all writers if the upper and since both advance to the
1676        // empty antichain. Gracefully handle this. At the same time,
1677        // short-circuit the cmd application so we don't needlessly create new
1678        // SeqNos.
1679        if self.is_tombstone() {
1680            assert_eq!(self.trace.upper(), &Antichain::new());
1681            return Break(CompareAndAppendBreak::Upper {
1682                shard_upper: Antichain::new(),
1683                // This writer might have been registered before the shard upper
1684                // was advanced, which would make this pessimistic in the
1685                // Indeterminate handling of compare_and_append at the machine
1686                // level, but that's fine.
1687                writer_upper: Antichain::new(),
1688            });
1689        }
1690
1691        let writer_state = self
1692            .writers
1693            .entry(writer_id.clone())
1694            .or_insert_with(|| WriterState {
1695                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1696                lease_duration_ms,
1697                most_recent_write_token: IdempotencyToken::SENTINEL,
1698                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1699                debug: debug_info.clone(),
1700            });
1701
1702        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1703            return Break(CompareAndAppendBreak::InvalidUsage(
1704                InvalidUsage::InvalidBounds {
1705                    lower: batch.desc.lower().clone(),
1706                    upper: batch.desc.upper().clone(),
1707                },
1708            ));
1709        }
1710
1711        // If the time interval is empty, the list of updates must also be
1712        // empty.
1713        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1714            return Break(CompareAndAppendBreak::InvalidUsage(
1715                InvalidUsage::InvalidEmptyTimeInterval {
1716                    lower: batch.desc.lower().clone(),
1717                    upper: batch.desc.upper().clone(),
1718                    keys: batch
1719                        .parts
1720                        .iter()
1721                        .map(|x| x.printable_name().to_owned())
1722                        .collect(),
1723                },
1724            ));
1725        }
1726
1727        if idempotency_token == &writer_state.most_recent_write_token {
1728            // If the last write had the same idempotency_token, then this must
1729            // have already committed. Sanity check that the most recent write
1730            // upper matches and that the shard upper is at least the write
1731            // upper, if it's not something very suspect is going on.
1732            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1733            assert!(
1734                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1735                "{:?} vs {:?}",
1736                batch.desc.upper(),
1737                self.trace.upper()
1738            );
1739            return Break(CompareAndAppendBreak::AlreadyCommitted);
1740        }
1741
1742        let shard_upper = self.trace.upper();
1743        if shard_upper != batch.desc.lower() {
1744            return Break(CompareAndAppendBreak::Upper {
1745                shard_upper: shard_upper.clone(),
1746                writer_upper: writer_state.most_recent_write_upper.clone(),
1747            });
1748        }
1749
1750        let new_inline_bytes = batch.inline_bytes();
1751        if new_inline_bytes > 0 {
1752            let mut existing_inline_bytes = 0;
1753            self.trace
1754                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1755            // TODO: For very small batches, it may actually _increase_ the size
1756            // of state to flush them out. Consider another threshold under
1757            // which an inline part can be appended no matter what.
1758            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1759                return Break(CompareAndAppendBreak::InlineBackpressure);
1760            }
1761        }
1762
1763        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1764            self.trace.push_batch(batch.clone())
1765        } else {
1766            Vec::new()
1767        };
1768
1769        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1770        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1771        let all_empty_reqs = merge_reqs
1772            .iter()
1773            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1774        if all_empty_reqs && !batch.is_empty() {
1775            let mut reqs_to_take = claim_compaction_percent / 100;
1776            if (usize::cast_from(idempotency_token.hashed()) % 100)
1777                < (claim_compaction_percent % 100)
1778            {
1779                reqs_to_take += 1;
1780            }
1781            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1782            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1783            merge_reqs.extend(
1784                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1785                // likely to be compacted soon for other reasons.
1786                self.trace
1787                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1788                    .take(reqs_to_take),
1789            )
1790        }
1791
1792        for req in &merge_reqs {
1793            self.trace.claim_compaction(
1794                req.id,
1795                ActiveCompaction {
1796                    start_ms: heartbeat_timestamp_ms,
1797                },
1798            )
1799        }
1800
1801        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1802        writer_state.most_recent_write_token = idempotency_token.clone();
1803        // The writer's most recent upper should only go forward.
1804        assert!(
1805            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1806            "{:?} vs {:?}",
1807            &writer_state.most_recent_write_upper,
1808            batch.desc.upper()
1809        );
1810        writer_state
1811            .most_recent_write_upper
1812            .clone_from(batch.desc.upper());
1813
1814        // Heartbeat the writer state to keep our idempotency token alive.
1815        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1816            heartbeat_timestamp_ms,
1817            writer_state.last_heartbeat_timestamp_ms,
1818        );
1819
1820        Continue(merge_reqs)
1821    }
1822
1823    pub fn apply_merge_res<D: Codec64 + Semigroup + PartialEq>(
1824        &mut self,
1825        res: &FueledMergeRes<T>,
1826        metrics: &ColumnarMetrics,
1827    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1828        // We expire all writers if the upper and since both advance to the
1829        // empty antichain. Gracefully handle this. At the same time,
1830        // short-circuit the cmd application so we don't needlessly create new
1831        // SeqNos.
1832        if self.is_tombstone() {
1833            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1834        }
1835
1836        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1837        Continue(apply_merge_result)
1838    }
1839
1840    pub fn apply_merge_res_classic<D: Codec64 + Semigroup + PartialEq>(
1841        &mut self,
1842        res: &FueledMergeRes<T>,
1843        metrics: &ColumnarMetrics,
1844    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1845        // We expire all writers if the upper and since both advance to the
1846        // empty antichain. Gracefully handle this. At the same time,
1847        // short-circuit the cmd application so we don't needlessly create new
1848        // SeqNos.
1849        if self.is_tombstone() {
1850            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1851        }
1852
1853        let apply_merge_result = self
1854            .trace
1855            .apply_merge_res_checked_classic::<D>(res, metrics);
1856        Continue(apply_merge_result)
1857    }
1858
1859    pub fn spine_exert(
1860        &mut self,
1861        fuel: usize,
1862    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1863        let (merge_reqs, did_work) = self.trace.exert(fuel);
1864        if did_work {
1865            Continue(merge_reqs)
1866        } else {
1867            assert!(merge_reqs.is_empty());
1868            // Break if we have nothing useful to do to save the seqno (and
1869            // resulting crdb traffic)
1870            Break(NoOpStateTransition(Vec::new()))
1871        }
1872    }
1873
1874    pub fn downgrade_since(
1875        &mut self,
1876        reader_id: &LeasedReaderId,
1877        seqno: SeqNo,
1878        outstanding_seqno: Option<SeqNo>,
1879        new_since: &Antichain<T>,
1880        heartbeat_timestamp_ms: u64,
1881    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1882        // We expire all readers if the upper and since both advance to the
1883        // empty antichain. Gracefully handle this. At the same time,
1884        // short-circuit the cmd application so we don't needlessly create new
1885        // SeqNos.
1886        if self.is_tombstone() {
1887            return Break(NoOpStateTransition(Since(Antichain::new())));
1888        }
1889
1890        // The only way to have a missing reader in state is if it's been expired... and in that
1891        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1892        let Some(reader_state) = self.leased_reader(reader_id) else {
1893            tracing::warn!(
1894                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1895            );
1896            return Break(NoOpStateTransition(Since(Antichain::new())));
1897        };
1898
1899        // Also use this as an opportunity to heartbeat the reader and downgrade
1900        // the seqno capability.
1901        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1902            heartbeat_timestamp_ms,
1903            reader_state.last_heartbeat_timestamp_ms,
1904        );
1905
1906        let seqno = match outstanding_seqno {
1907            Some(outstanding_seqno) => {
1908                assert!(
1909                    outstanding_seqno >= reader_state.seqno,
1910                    "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1911                    is behind current reader_state ({:?})",
1912                    outstanding_seqno,
1913                    reader_state.seqno,
1914                );
1915                std::cmp::min(outstanding_seqno, seqno)
1916            }
1917            None => seqno,
1918        };
1919
1920        reader_state.seqno = seqno;
1921
1922        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1923            reader_state.since.clone_from(new_since);
1924            self.update_since();
1925            new_since.clone()
1926        } else {
1927            // No-op, but still commit the state change so that this gets
1928            // linearized.
1929            reader_state.since.clone()
1930        };
1931
1932        Continue(Since(reader_current_since))
1933    }
1934
1935    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1936        &mut self,
1937        reader_id: &CriticalReaderId,
1938        expected_opaque: &O,
1939        (new_opaque, new_since): (&O, &Antichain<T>),
1940    ) -> ControlFlow<
1941        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1942        Result<Since<T>, (O, Since<T>)>,
1943    > {
1944        // We expire all readers if the upper and since both advance to the
1945        // empty antichain. Gracefully handle this. At the same time,
1946        // short-circuit the cmd application so we don't needlessly create new
1947        // SeqNos.
1948        if self.is_tombstone() {
1949            // Match the idempotence behavior below of ignoring the token if
1950            // since is already advanced enough (in this case, because it's a
1951            // tombstone, we know it's the empty antichain).
1952            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1953        }
1954
1955        let reader_state = self.critical_reader(reader_id);
1956        assert_eq!(reader_state.opaque_codec, O::codec_name());
1957
1958        if &O::decode(reader_state.opaque.0) != expected_opaque {
1959            // No-op, but still commit the state change so that this gets
1960            // linearized.
1961            return Continue(Err((
1962                Codec64::decode(reader_state.opaque.0),
1963                Since(reader_state.since.clone()),
1964            )));
1965        }
1966
1967        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1968        if PartialOrder::less_equal(&reader_state.since, new_since) {
1969            reader_state.since.clone_from(new_since);
1970            self.update_since();
1971            Continue(Ok(Since(new_since.clone())))
1972        } else {
1973            // no work to be done -- the reader state's `since` is already sufficiently
1974            // advanced. we may someday need to revisit this branch when it's possible
1975            // for two `since` frontiers to be incomparable.
1976            Continue(Ok(Since(reader_state.since.clone())))
1977        }
1978    }
1979
1980    pub fn heartbeat_leased_reader(
1981        &mut self,
1982        reader_id: &LeasedReaderId,
1983        heartbeat_timestamp_ms: u64,
1984    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1985        // We expire all readers if the upper and since both advance to the
1986        // empty antichain. Gracefully handle this. At the same time,
1987        // short-circuit the cmd application so we don't needlessly create new
1988        // SeqNos.
1989        if self.is_tombstone() {
1990            return Break(NoOpStateTransition(false));
1991        }
1992
1993        match self.leased_readers.get_mut(reader_id) {
1994            Some(reader_state) => {
1995                reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1996                    heartbeat_timestamp_ms,
1997                    reader_state.last_heartbeat_timestamp_ms,
1998                );
1999                Continue(true)
2000            }
2001            // No-op, but we still commit the state change so that this gets
2002            // linearized (maybe we're looking at old state).
2003            None => Continue(false),
2004        }
2005    }
2006
2007    pub fn expire_leased_reader(
2008        &mut self,
2009        reader_id: &LeasedReaderId,
2010    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2011        // We expire all readers if the upper and since both advance to the
2012        // empty antichain. Gracefully handle this. At the same time,
2013        // short-circuit the cmd application so we don't needlessly create new
2014        // SeqNos.
2015        if self.is_tombstone() {
2016            return Break(NoOpStateTransition(false));
2017        }
2018
2019        let existed = self.leased_readers.remove(reader_id).is_some();
2020        if existed {
2021            // TODO(database-issues#6885): Re-enable this
2022            //
2023            // Temporarily disabling this because we think it might be the cause
2024            // of the remap since bug. Specifically, a clusterd process has a
2025            // ReadHandle for maintaining the once and one inside a Listen. If
2026            // we crash and stay down for longer than the read lease duration,
2027            // it's possible that an expiry of them both in quick succession
2028            // jumps the since forward to the Listen one.
2029            //
2030            // Don't forget to update the downgrade_since when this gets
2031            // switched back on.
2032            //
2033            // self.update_since();
2034        }
2035        // No-op if existed is false, but still commit the state change so that
2036        // this gets linearized.
2037        Continue(existed)
2038    }
2039
2040    pub fn expire_critical_reader(
2041        &mut self,
2042        reader_id: &CriticalReaderId,
2043    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2044        // We expire all readers if the upper and since both advance to the
2045        // empty antichain. Gracefully handle this. At the same time,
2046        // short-circuit the cmd application so we don't needlessly create new
2047        // SeqNos.
2048        if self.is_tombstone() {
2049            return Break(NoOpStateTransition(false));
2050        }
2051
2052        let existed = self.critical_readers.remove(reader_id).is_some();
2053        if existed {
2054            // TODO(database-issues#6885): Re-enable this
2055            //
2056            // Temporarily disabling this because we think it might be the cause
2057            // of the remap since bug. Specifically, a clusterd process has a
2058            // ReadHandle for maintaining the once and one inside a Listen. If
2059            // we crash and stay down for longer than the read lease duration,
2060            // it's possible that an expiry of them both in quick succession
2061            // jumps the since forward to the Listen one.
2062            //
2063            // Don't forget to update the downgrade_since when this gets
2064            // switched back on.
2065            //
2066            // self.update_since();
2067        }
2068        // This state transition is a no-op if existed is false, but we still
2069        // commit the state change so that this gets linearized (maybe we're
2070        // looking at old state).
2071        Continue(existed)
2072    }
2073
2074    pub fn expire_writer(
2075        &mut self,
2076        writer_id: &WriterId,
2077    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2078        // We expire all writers if the upper and since both advance to the
2079        // empty antichain. Gracefully handle this. At the same time,
2080        // short-circuit the cmd application so we don't needlessly create new
2081        // SeqNos.
2082        if self.is_tombstone() {
2083            return Break(NoOpStateTransition(false));
2084        }
2085
2086        let existed = self.writers.remove(writer_id).is_some();
2087        // This state transition is a no-op if existed is false, but we still
2088        // commit the state change so that this gets linearized (maybe we're
2089        // looking at old state).
2090        Continue(existed)
2091    }
2092
2093    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2094        self.leased_readers.get_mut(id)
2095    }
2096
2097    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2098        self.critical_readers
2099            .get_mut(id)
2100            .unwrap_or_else(|| {
2101                panic!(
2102                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2103                    id
2104                )
2105            })
2106    }
2107
2108    fn critical_since(&self) -> Option<Antichain<T>> {
2109        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2110        let mut since = critical_sinces.next().cloned()?;
2111        for s in critical_sinces {
2112            since.meet_assign(s);
2113        }
2114        Some(since)
2115    }
2116
2117    fn update_since(&mut self) {
2118        let mut sinces_iter = self
2119            .leased_readers
2120            .values()
2121            .map(|x| &x.since)
2122            .chain(self.critical_readers.values().map(|x| &x.since));
2123        let mut since = match sinces_iter.next() {
2124            Some(since) => since.clone(),
2125            None => {
2126                // If there are no current readers, leave `since` unchanged so
2127                // it doesn't regress.
2128                return;
2129            }
2130        };
2131        while let Some(s) = sinces_iter.next() {
2132            since.meet_assign(s);
2133        }
2134        self.trace.downgrade_since(&since);
2135    }
2136
2137    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2138        let mut seqno_since = seqno;
2139        for cap in self.leased_readers.values() {
2140            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2141        }
2142        // critical_readers don't hold a seqno capability.
2143        seqno_since
2144    }
2145
2146    fn tombstone_batch() -> HollowBatch<T> {
2147        HollowBatch::empty(Description::new(
2148            Antichain::from_elem(T::minimum()),
2149            Antichain::new(),
2150            Antichain::new(),
2151        ))
2152    }
2153
2154    pub(crate) fn is_tombstone(&self) -> bool {
2155        self.trace.upper().is_empty()
2156            && self.trace.since().is_empty()
2157            && self.writers.is_empty()
2158            && self.leased_readers.is_empty()
2159            && self.critical_readers.is_empty()
2160    }
2161
2162    pub(crate) fn is_single_empty_batch(&self) -> bool {
2163        let mut batch_count = 0;
2164        let mut is_empty = true;
2165        self.trace.map_batches(|b| {
2166            batch_count += 1;
2167            is_empty &= b.is_empty()
2168        });
2169        batch_count <= 1 && is_empty
2170    }
2171
2172    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2173        assert_eq!(self.trace.upper(), &Antichain::new());
2174        assert_eq!(self.trace.since(), &Antichain::new());
2175
2176        // Remember our current state, so we can decide whether we have to
2177        // record a transition in durable state.
2178        let was_tombstone = self.is_tombstone();
2179
2180        // Enter the "tombstone" state, if we're not in it already.
2181        self.writers.clear();
2182        self.leased_readers.clear();
2183        self.critical_readers.clear();
2184
2185        debug_assert!(self.is_tombstone());
2186
2187        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2188        // it -- the actual contents of our batches no longer matter.
2189        // This method progressively replaces batches in our state with simpler versions, to allow
2190        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2191        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2192        // transitions... most operations happen incrementally, and large single writes can overwhelm
2193        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2194
2195        let mut to_replace = None;
2196        let mut batch_count = 0;
2197        self.trace.map_batches(|b| {
2198            batch_count += 1;
2199            if !b.is_empty() && to_replace.is_none() {
2200                to_replace = Some(b.desc.clone());
2201            }
2202        });
2203        if let Some(desc) = to_replace {
2204            // We have a nonempty batch: replace it with an empty batch and return.
2205            // This should not produce an excessively large diff: if it did, we wouldn't have been
2206            // able to append that batch in the first place.
2207            let result = self.trace.apply_tombstone_merge(&desc);
2208            assert!(
2209                result.matched(),
2210                "merge with a matching desc should always match"
2211            );
2212            Continue(())
2213        } else if batch_count > 1 {
2214            // All our batches are empty, but we have more than one of them. Replace the whole set
2215            // with a new single-batch trace.
2216            // This produces a diff with a size proportional to the number of batches, but since
2217            // Spine keeps a logarithmic number of batches this should never be excessively large.
2218            let mut new_trace = Trace::default();
2219            new_trace.downgrade_since(&Antichain::new());
2220            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2221            assert_eq!(merge_reqs, Vec::new());
2222            self.trace = new_trace;
2223            Continue(())
2224        } else if !was_tombstone {
2225            // We were not tombstoned before, so have to make sure this state
2226            // transition is recorded.
2227            Continue(())
2228        } else {
2229            // All our batches are empty, and there's only one... there's no shrinking this
2230            // tombstone further.
2231            Break(NoOpStateTransition(()))
2232        }
2233    }
2234}
2235
2236// TODO: Document invariants.
2237#[derive(Debug)]
2238#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2239pub struct State<T> {
2240    pub(crate) applier_version: semver::Version,
2241    pub(crate) shard_id: ShardId,
2242
2243    pub(crate) seqno: SeqNo,
2244    /// A strictly increasing wall time of when this state was written, in
2245    /// milliseconds since the unix epoch.
2246    pub(crate) walltime_ms: u64,
2247    /// Hostname of the persist user that created this version of state. For
2248    /// debugging.
2249    pub(crate) hostname: String,
2250    pub(crate) collections: StateCollections<T>,
2251}
2252
2253/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2254/// ones in durable storage.
2255pub struct TypedState<K, V, T, D> {
2256    pub(crate) state: State<T>,
2257
2258    // According to the docs, PhantomData is to "mark things that act like they
2259    // own a T". State doesn't actually own K, V, or D, just the ability to
2260    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2261    // T [1], but also allows State to correctly derive Send+Sync.
2262    //
2263    // [1]:
2264    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2265    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2266}
2267
2268impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2269    #[cfg(any(test, debug_assertions))]
2270    pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2271        TypedState {
2272            state: State {
2273                applier_version,
2274                shard_id: self.shard_id.clone(),
2275                seqno: self.seqno.clone(),
2276                walltime_ms: self.walltime_ms,
2277                hostname,
2278                collections: self.collections.clone(),
2279            },
2280            _phantom: PhantomData,
2281        }
2282    }
2283
2284    pub(crate) fn clone_for_rollup(&self) -> Self {
2285        TypedState {
2286            state: State {
2287                applier_version: self.applier_version.clone(),
2288                shard_id: self.shard_id.clone(),
2289                seqno: self.seqno.clone(),
2290                walltime_ms: self.walltime_ms,
2291                hostname: self.hostname.clone(),
2292                collections: self.collections.clone(),
2293            },
2294            _phantom: PhantomData,
2295        }
2296    }
2297}
2298
2299impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2300    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2301        // Deconstruct self so we get a compile failure if new fields
2302        // are added.
2303        let TypedState { state, _phantom } = self;
2304        f.debug_struct("TypedState").field("state", state).finish()
2305    }
2306}
2307
2308// Impl PartialEq regardless of the type params.
2309#[cfg(any(test, debug_assertions))]
2310impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2311    fn eq(&self, other: &Self) -> bool {
2312        // Deconstruct self and other so we get a compile failure if new fields
2313        // are added.
2314        let TypedState {
2315            state: self_state,
2316            _phantom,
2317        } = self;
2318        let TypedState {
2319            state: other_state,
2320            _phantom,
2321        } = other;
2322        self_state == other_state
2323    }
2324}
2325
2326impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2327    type Target = State<T>;
2328
2329    fn deref(&self) -> &Self::Target {
2330        &self.state
2331    }
2332}
2333
2334impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2335    fn deref_mut(&mut self) -> &mut Self::Target {
2336        &mut self.state
2337    }
2338}
2339
2340impl<K, V, T, D> TypedState<K, V, T, D>
2341where
2342    K: Codec,
2343    V: Codec,
2344    T: Timestamp + Lattice + Codec64,
2345    D: Codec64,
2346{
2347    pub fn new(
2348        applier_version: Version,
2349        shard_id: ShardId,
2350        hostname: String,
2351        walltime_ms: u64,
2352    ) -> Self {
2353        let state = State {
2354            applier_version,
2355            shard_id,
2356            seqno: SeqNo::minimum(),
2357            walltime_ms,
2358            hostname,
2359            collections: StateCollections {
2360                last_gc_req: SeqNo::minimum(),
2361                rollups: BTreeMap::new(),
2362                active_rollup: None,
2363                active_gc: None,
2364                leased_readers: BTreeMap::new(),
2365                critical_readers: BTreeMap::new(),
2366                writers: BTreeMap::new(),
2367                schemas: BTreeMap::new(),
2368                trace: Trace::default(),
2369            },
2370        };
2371        TypedState {
2372            state,
2373            _phantom: PhantomData,
2374        }
2375    }
2376
2377    pub fn clone_apply<R, E, WorkFn>(
2378        &self,
2379        cfg: &PersistConfig,
2380        work_fn: &mut WorkFn,
2381    ) -> ControlFlow<E, (R, Self)>
2382    where
2383        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2384    {
2385        // Now that we support one minor version of forward compatibility, tag
2386        // each version of state with the _max_ version of code that has ever
2387        // contributed to it. Otherwise, we'd erroneously allow rolling back an
2388        // arbitrary number of versions if they were done one-by-one.
2389        let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2390        let mut new_state = State {
2391            applier_version: new_applier_version.clone(),
2392            shard_id: self.shard_id,
2393            seqno: self.seqno.next(),
2394            walltime_ms: (cfg.now)(),
2395            hostname: cfg.hostname.clone(),
2396            collections: self.collections.clone(),
2397        };
2398        // Make sure walltime_ms is strictly increasing, in case clocks are
2399        // offset.
2400        if new_state.walltime_ms <= self.walltime_ms {
2401            new_state.walltime_ms = self.walltime_ms + 1;
2402        }
2403
2404        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2405        let new_state = TypedState {
2406            state: new_state,
2407            _phantom: PhantomData,
2408        };
2409        Continue((work_ret, new_state))
2410    }
2411}
2412
2413#[derive(Copy, Clone, Debug)]
2414pub struct GcConfig {
2415    pub use_active_gc: bool,
2416    pub fallback_threshold_ms: u64,
2417    pub min_versions: usize,
2418    pub max_versions: usize,
2419}
2420
2421impl<T> State<T>
2422where
2423    T: Timestamp + Lattice + Codec64,
2424{
2425    pub fn shard_id(&self) -> ShardId {
2426        self.shard_id
2427    }
2428
2429    pub fn seqno(&self) -> SeqNo {
2430        self.seqno
2431    }
2432
2433    pub fn since(&self) -> &Antichain<T> {
2434        self.collections.trace.since()
2435    }
2436
2437    pub fn upper(&self) -> &Antichain<T> {
2438        self.collections.trace.upper()
2439    }
2440
2441    pub fn spine_batch_count(&self) -> usize {
2442        self.collections.trace.num_spine_batches()
2443    }
2444
2445    pub fn size_metrics(&self) -> StateSizeMetrics {
2446        let mut ret = StateSizeMetrics::default();
2447        self.blobs().for_each(|x| match x {
2448            HollowBlobRef::Batch(x) => {
2449                ret.hollow_batch_count += 1;
2450                ret.batch_part_count += x.part_count();
2451                ret.num_updates += x.len;
2452
2453                let batch_size = x.encoded_size_bytes();
2454                for x in x.parts.iter() {
2455                    if x.ts_rewrite().is_some() {
2456                        ret.rewrite_part_count += 1;
2457                    }
2458                    if x.is_inline() {
2459                        ret.inline_part_count += 1;
2460                        ret.inline_part_bytes += x.inline_bytes();
2461                    }
2462                }
2463                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2464                ret.state_batches_bytes += batch_size;
2465            }
2466            HollowBlobRef::Rollup(x) => {
2467                ret.state_rollup_count += 1;
2468                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2469            }
2470        });
2471        ret
2472    }
2473
2474    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2475        // We maintain the invariant that every version of state has at least
2476        // one rollup.
2477        self.collections
2478            .rollups
2479            .iter()
2480            .rev()
2481            .next()
2482            .expect("State should have at least one rollup if seqno > minimum")
2483    }
2484
2485    pub(crate) fn seqno_since(&self) -> SeqNo {
2486        self.collections.seqno_since(self.seqno)
2487    }
2488
2489    // Returns whether the cmd proposing this state has been selected to perform
2490    // background garbage collection work.
2491    //
2492    // If it was selected, this information is recorded in the state itself for
2493    // commit along with the cmd's state transition. This helps us to avoid
2494    // redundant work.
2495    //
2496    // Correctness does not depend on a gc assignment being executed, nor on
2497    // them being executed in the order they are given. But it is expected that
2498    // gc assignments are best-effort respected. In practice, cmds like
2499    // register_foo or expire_foo, where it would be awkward, ignore gc.
2500    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2501        let GcConfig {
2502            use_active_gc,
2503            fallback_threshold_ms,
2504            min_versions,
2505            max_versions,
2506        } = cfg;
2507        // This is an arbitrary-ish threshold that scales with seqno, but never
2508        // gets particularly big. It probably could be much bigger and certainly
2509        // could use a tuning pass at some point.
2510        let gc_threshold = if use_active_gc {
2511            u64::cast_from(min_versions)
2512        } else {
2513            std::cmp::max(
2514                1,
2515                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2516            )
2517        };
2518        let new_seqno_since = self.seqno_since();
2519        // Collect until the new seqno since... or the old since plus the max number of versions,
2520        // whatever is less.
2521        let gc_until_seqno = new_seqno_since.min(SeqNo(
2522            self.collections
2523                .last_gc_req
2524                .0
2525                .saturating_add(u64::cast_from(max_versions)),
2526        ));
2527        let should_gc = new_seqno_since
2528            .0
2529            .saturating_sub(self.collections.last_gc_req.0)
2530            >= gc_threshold;
2531
2532        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2533        // it's been a while since it started, we should gc.
2534        let should_gc = if use_active_gc && !should_gc {
2535            match self.collections.active_gc {
2536                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2537                None => false,
2538            }
2539        } else {
2540            should_gc
2541        };
2542        // Assign GC traffic preferentially to writers, falling back to anyone
2543        // generating new state versions if there are no writers.
2544        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2545        // Always assign GC work to a tombstoned shard to have the chance to
2546        // clean up any residual blobs. This is safe (won't cause excess gc)
2547        // as the only allowed command after becoming a tombstone is to write
2548        // the final rollup.
2549        let tombstone_needs_gc = self.collections.is_tombstone();
2550        let should_gc = should_gc || tombstone_needs_gc;
2551        let should_gc = if use_active_gc {
2552            // If we have an active gc, we should only gc if the active gc is
2553            // sufficiently old. This is to avoid doing more gc work than
2554            // necessary.
2555            should_gc
2556                && match self.collections.active_gc {
2557                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2558                    None => true,
2559                }
2560        } else {
2561            should_gc
2562        };
2563        if should_gc {
2564            self.collections.last_gc_req = gc_until_seqno;
2565            Some(GcReq {
2566                shard_id: self.shard_id,
2567                new_seqno_since: gc_until_seqno,
2568            })
2569        } else {
2570            None
2571        }
2572    }
2573
2574    /// Return the number of gc-ineligible state versions.
2575    pub fn seqnos_held(&self) -> usize {
2576        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2577    }
2578
2579    /// Expire all readers and writers up to the given walltime_ms.
2580    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2581        let mut metrics = ExpiryMetrics::default();
2582        let shard_id = self.shard_id();
2583        self.collections.leased_readers.retain(|id, state| {
2584            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2585            if !retain {
2586                info!(
2587                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2588                    state.debug.purpose
2589                );
2590                metrics.readers_expired += 1;
2591            }
2592            retain
2593        });
2594        // critical_readers don't need forced expiration. (In fact, that's the point!)
2595        self.collections.writers.retain(|id, state| {
2596            let retain =
2597                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2598            if !retain {
2599                info!(
2600                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2601                    state.debug.purpose
2602                );
2603                metrics.writers_expired += 1;
2604            }
2605            retain
2606        });
2607        metrics
2608    }
2609
2610    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2611    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2612    /// referenced parts are valid to read.
2613    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2614        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2615            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2616                self.collections.trace.since().clone(),
2617            )));
2618        }
2619        let upper = self.collections.trace.upper();
2620        if PartialOrder::less_equal(upper, as_of) {
2621            return Err(SnapshotErr::AsOfNotYetAvailable(
2622                self.seqno,
2623                Upper(upper.clone()),
2624            ));
2625        }
2626
2627        let batches = self
2628            .collections
2629            .trace
2630            .batches()
2631            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2632            .cloned()
2633            .collect();
2634        Ok(batches)
2635    }
2636
2637    // NB: Unlike the other methods here, this one is read-only.
2638    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2639        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2640            return Err(Since(self.collections.trace.since().clone()));
2641        }
2642        Ok(())
2643    }
2644
2645    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2646        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2647        // batch and this iterates through all batches to find the next one.
2648        self.collections
2649            .trace
2650            .batches()
2651            .find(|b| {
2652                PartialOrder::less_equal(b.desc.lower(), frontier)
2653                    && PartialOrder::less_than(frontier, b.desc.upper())
2654            })
2655            .cloned()
2656            .ok_or(self.seqno)
2657    }
2658
2659    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2660        self.collections.active_rollup
2661    }
2662
2663    pub fn need_rollup(
2664        &self,
2665        threshold: usize,
2666        use_active_rollup: bool,
2667        fallback_threshold_ms: u64,
2668        now: u64,
2669    ) -> Option<SeqNo> {
2670        let (latest_rollup_seqno, _) = self.latest_rollup();
2671
2672        // Tombstoned shards require one final rollup. However, because we
2673        // write a rollup as of SeqNo X and then link it in using a state
2674        // transition (in this case from X to X+1), the minimum number of
2675        // live diffs is actually two. Detect when we're in this minimal
2676        // two diff state and stop the (otherwise) infinite iteration.
2677        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2678            return Some(self.seqno);
2679        }
2680
2681        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2682
2683        if use_active_rollup {
2684            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2685            // we should start a new rollup.
2686            // If there is an active rollup, we should check if it has been running too long.
2687            // If it has, we should start a new rollup.
2688            // This is to guard against a worker dying/taking too long/etc.
2689            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2690                match self.active_rollup() {
2691                    Some(active_rollup) => {
2692                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2693                            return Some(self.seqno);
2694                        }
2695                    }
2696                    None => {
2697                        return Some(self.seqno);
2698                    }
2699                }
2700            }
2701        } else {
2702            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2703            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2704            // racing / performing redundant work.
2705            if seqnos_since_last_rollup > 0
2706                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2707            {
2708                return Some(self.seqno);
2709            }
2710
2711            // however, since maintenance is best-effort and could fail, do assign rollup
2712            // work to every seqno after a fallback threshold to ensure one is written.
2713            if seqnos_since_last_rollup
2714                > u64::cast_from(
2715                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2716                )
2717            {
2718                return Some(self.seqno);
2719            }
2720        }
2721
2722        None
2723    }
2724
2725    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2726        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2727        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2728        batches.chain(rollups)
2729    }
2730}
2731
2732fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2733    let val = hex::encode(val);
2734    val.serialize(s)
2735}
2736
2737fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2738    val: &Option<LazyProto<T>>,
2739    s: S,
2740) -> Result<S::Ok, S::Error> {
2741    val.as_ref()
2742        .map(|lazy| hex::encode(&lazy.into_proto()))
2743        .serialize(s)
2744}
2745
2746fn serialize_part_stats<S: Serializer>(
2747    val: &Option<LazyPartStats>,
2748    s: S,
2749) -> Result<S::Ok, S::Error> {
2750    let val = val.as_ref().map(|x| x.decode().key);
2751    val.serialize(s)
2752}
2753
2754fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2755    // This is only used for debugging, so hack to assume that D is i64.
2756    let val = val.map(i64::decode);
2757    val.serialize(s)
2758}
2759
2760// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2761// intentionally gated from users, so not strictly subject to our backward
2762// compatibility guarantees, but still probably best to be thoughtful about
2763// making unnecessary changes. Additionally, it's nice to make the output as
2764// nice to use as possible without tying our hands for the actual code usages.
2765impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2766    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2767        let State {
2768            applier_version,
2769            shard_id,
2770            seqno,
2771            walltime_ms,
2772            hostname,
2773            collections:
2774                StateCollections {
2775                    last_gc_req,
2776                    rollups,
2777                    active_rollup,
2778                    active_gc,
2779                    leased_readers,
2780                    critical_readers,
2781                    writers,
2782                    schemas,
2783                    trace,
2784                },
2785        } = self;
2786        let mut s = s.serialize_struct("State", 13)?;
2787        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2788        let () = s.serialize_field("shard_id", shard_id)?;
2789        let () = s.serialize_field("seqno", seqno)?;
2790        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2791        let () = s.serialize_field("hostname", hostname)?;
2792        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2793        let () = s.serialize_field("rollups", rollups)?;
2794        let () = s.serialize_field("active_rollup", active_rollup)?;
2795        let () = s.serialize_field("active_gc", active_gc)?;
2796        let () = s.serialize_field("leased_readers", leased_readers)?;
2797        let () = s.serialize_field("critical_readers", critical_readers)?;
2798        let () = s.serialize_field("writers", writers)?;
2799        let () = s.serialize_field("schemas", schemas)?;
2800        let () = s.serialize_field("since", &trace.since().elements())?;
2801        let () = s.serialize_field("upper", &trace.upper().elements())?;
2802        let trace = trace.flatten();
2803        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2804        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2805        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2806        let () = s.serialize_field("merges", &trace.merges)?;
2807        s.end()
2808    }
2809}
2810
2811#[derive(Debug, Default)]
2812pub struct StateSizeMetrics {
2813    pub hollow_batch_count: usize,
2814    pub batch_part_count: usize,
2815    pub rewrite_part_count: usize,
2816    pub num_updates: usize,
2817    pub largest_batch_bytes: usize,
2818    pub state_batches_bytes: usize,
2819    pub state_rollups_bytes: usize,
2820    pub state_rollup_count: usize,
2821    pub inline_part_count: usize,
2822    pub inline_part_bytes: usize,
2823}
2824
2825#[derive(Default)]
2826pub struct ExpiryMetrics {
2827    pub(crate) readers_expired: usize,
2828    pub(crate) writers_expired: usize,
2829}
2830
2831/// Wrapper for Antichain that represents a Since
2832#[derive(Debug, Clone, PartialEq)]
2833pub struct Since<T>(pub Antichain<T>);
2834
2835/// Wrapper for Antichain that represents an Upper
2836#[derive(Debug, PartialEq)]
2837pub struct Upper<T>(pub Antichain<T>);
2838
2839#[cfg(test)]
2840pub(crate) mod tests {
2841    use std::ops::Range;
2842    use std::str::FromStr;
2843
2844    use bytes::Bytes;
2845    use mz_build_info::DUMMY_BUILD_INFO;
2846    use mz_dyncfg::ConfigUpdates;
2847    use mz_ore::now::SYSTEM_TIME;
2848    use mz_ore::{assert_none, assert_ok};
2849    use mz_proto::RustType;
2850    use proptest::prelude::*;
2851    use proptest::strategy::ValueTree;
2852
2853    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2854    use crate::PersistLocation;
2855    use crate::cache::PersistClientCache;
2856    use crate::internal::encoding::any_some_lazy_part_stats;
2857    use crate::internal::paths::RollupId;
2858    use crate::internal::trace::tests::any_trace;
2859    use crate::tests::new_test_client_cache;
2860
2861    use super::*;
2862
2863    const LEASE_DURATION_MS: u64 = 900 * 1000;
2864    fn debug_state() -> HandleDebugState {
2865        HandleDebugState {
2866            hostname: "debug".to_owned(),
2867            purpose: "finding the bugs".to_owned(),
2868        }
2869    }
2870
2871    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2872        num_runs: usize,
2873    ) -> impl Strategy<Value = HollowBatch<T>> {
2874        (
2875            any::<T>(),
2876            any::<T>(),
2877            any::<T>(),
2878            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2879            any::<usize>(),
2880        )
2881            .prop_map(move |(t0, t1, since, parts, len)| {
2882                let (lower, upper) = if t0 <= t1 {
2883                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2884                } else {
2885                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2886                };
2887                let since = Antichain::from_elem(since);
2888
2889                let run_splits = (1..num_runs)
2890                    .map(|i| i * parts.len() / num_runs)
2891                    .collect::<Vec<_>>();
2892
2893                let run_meta = (0..num_runs)
2894                    .map(|_| {
2895                        let mut meta = RunMeta::default();
2896                        meta.id = Some(RunId::new());
2897                        meta
2898                    })
2899                    .collect::<Vec<_>>();
2900
2901                HollowBatch::new(
2902                    Description::new(lower, upper, since),
2903                    parts,
2904                    len % 10,
2905                    run_meta,
2906                    run_splits,
2907                )
2908            })
2909    }
2910
2911    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2912        Strategy::prop_map(
2913            (
2914                any::<T>(),
2915                any::<T>(),
2916                any::<T>(),
2917                proptest::collection::vec(any_run_part::<T>(), 0..20),
2918                any::<usize>(),
2919                0..=10usize,
2920                proptest::collection::vec(any::<RunId>(), 10),
2921            ),
2922            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2923                let (lower, upper) = if t0 <= t1 {
2924                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2925                } else {
2926                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2927                };
2928                let since = Antichain::from_elem(since);
2929                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2930                    let run_splits = (1..num_runs)
2931                        .map(|i| i * parts.len() / num_runs)
2932                        .collect::<Vec<_>>();
2933
2934                    let run_meta = (0..num_runs)
2935                        .enumerate()
2936                        .map(|(i, _)| {
2937                            let mut meta = RunMeta::default();
2938                            meta.id = Some(run_ids[i]);
2939                            meta
2940                        })
2941                        .collect::<Vec<_>>();
2942
2943                    HollowBatch::new(
2944                        Description::new(lower, upper, since),
2945                        parts,
2946                        len % 10,
2947                        run_meta,
2948                        run_splits,
2949                    )
2950                } else {
2951                    HollowBatch::new_run_for_test(
2952                        Description::new(lower, upper, since),
2953                        parts,
2954                        len % 10,
2955                        run_ids[0],
2956                    )
2957                }
2958            },
2959        )
2960    }
2961
2962    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2963        Strategy::prop_map(
2964            (
2965                any::<bool>(),
2966                any_hollow_batch_part(),
2967                any::<Option<T>>(),
2968                any::<Option<SchemaId>>(),
2969                any::<Option<SchemaId>>(),
2970            ),
2971            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2972                if is_hollow {
2973                    BatchPart::Hollow(hollow)
2974                } else {
2975                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2976                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2977                    BatchPart::Inline {
2978                        updates,
2979                        ts_rewrite,
2980                        schema_id,
2981                        deprecated_schema_id,
2982                    }
2983                }
2984            },
2985        )
2986    }
2987
2988    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2989        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2990    }
2991
2992    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2993    -> impl Strategy<Value = HollowBatchPart<T>> {
2994        Strategy::prop_map(
2995            (
2996                any::<PartialBatchKey>(),
2997                any::<usize>(),
2998                any::<Vec<u8>>(),
2999                any_some_lazy_part_stats(),
3000                any::<Option<T>>(),
3001                any::<[u8; 8]>(),
3002                any::<Option<BatchColumnarFormat>>(),
3003                any::<Option<SchemaId>>(),
3004                any::<Option<SchemaId>>(),
3005            ),
3006            |(
3007                key,
3008                encoded_size_bytes,
3009                key_lower,
3010                stats,
3011                ts_rewrite,
3012                diffs_sum,
3013                format,
3014                schema_id,
3015                deprecated_schema_id,
3016            )| {
3017                HollowBatchPart {
3018                    key,
3019                    encoded_size_bytes,
3020                    key_lower,
3021                    structured_key_lower: None,
3022                    stats,
3023                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3024                    diffs_sum: Some(diffs_sum),
3025                    format,
3026                    schema_id,
3027                    deprecated_schema_id,
3028                }
3029            },
3030        )
3031    }
3032
3033    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3034        Strategy::prop_map(
3035            (
3036                any::<SeqNo>(),
3037                any::<Option<T>>(),
3038                any::<u64>(),
3039                any::<u64>(),
3040                any::<HandleDebugState>(),
3041            ),
3042            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3043                // lease_duration_ms of 0 means this state was written by an old
3044                // version of code, which means we'll migrate it in the decode
3045                // path. Avoid.
3046                if lease_duration_ms == 0 {
3047                    lease_duration_ms += 1;
3048                }
3049                LeasedReaderState {
3050                    seqno,
3051                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3052                    last_heartbeat_timestamp_ms,
3053                    lease_duration_ms,
3054                    debug,
3055                }
3056            },
3057        )
3058    }
3059
3060    pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3061    {
3062        Strategy::prop_map(
3063            (
3064                any::<Option<T>>(),
3065                any::<OpaqueState>(),
3066                any::<String>(),
3067                any::<HandleDebugState>(),
3068            ),
3069            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3070                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3071                opaque,
3072                opaque_codec,
3073                debug,
3074            },
3075        )
3076    }
3077
3078    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3079        Strategy::prop_map(
3080            (
3081                any::<u64>(),
3082                any::<u64>(),
3083                any::<IdempotencyToken>(),
3084                any::<Option<T>>(),
3085                any::<HandleDebugState>(),
3086            ),
3087            |(
3088                last_heartbeat_timestamp_ms,
3089                lease_duration_ms,
3090                most_recent_write_token,
3091                most_recent_write_upper,
3092                debug,
3093            )| WriterState {
3094                last_heartbeat_timestamp_ms,
3095                lease_duration_ms,
3096                most_recent_write_token,
3097                most_recent_write_upper: most_recent_write_upper
3098                    .map_or_else(Antichain::new, Antichain::from_elem),
3099                debug,
3100            },
3101        )
3102    }
3103
3104    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3105        Strategy::prop_map(
3106            (
3107                any::<Vec<u8>>(),
3108                any::<Vec<u8>>(),
3109                any::<Vec<u8>>(),
3110                any::<Vec<u8>>(),
3111            ),
3112            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3113                key: Bytes::from(key),
3114                key_data_type: Bytes::from(key_data_type),
3115                val: Bytes::from(val),
3116                val_data_type: Bytes::from(val_data_type),
3117            },
3118        )
3119    }
3120
3121    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3122        num_trace_batches: Range<usize>,
3123    ) -> impl Strategy<Value = State<T>> {
3124        let part1 = (
3125            any::<ShardId>(),
3126            any::<SeqNo>(),
3127            any::<u64>(),
3128            any::<String>(),
3129            any::<SeqNo>(),
3130            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3131            proptest::option::of(any::<ActiveRollup>()),
3132        );
3133
3134        let part2 = (
3135            proptest::option::of(any::<ActiveGc>()),
3136            proptest::collection::btree_map(
3137                any::<LeasedReaderId>(),
3138                any_leased_reader_state::<T>(),
3139                1..3,
3140            ),
3141            proptest::collection::btree_map(
3142                any::<CriticalReaderId>(),
3143                any_critical_reader_state::<T>(),
3144                1..3,
3145            ),
3146            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3147            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3148            any_trace::<T>(num_trace_batches),
3149        );
3150
3151        (part1, part2).prop_map(
3152            |(
3153                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3154                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3155            )| State {
3156                applier_version: semver::Version::new(1, 2, 3),
3157                shard_id,
3158                seqno,
3159                walltime_ms,
3160                hostname,
3161                collections: StateCollections {
3162                    last_gc_req,
3163                    rollups,
3164                    active_rollup,
3165                    active_gc,
3166                    leased_readers,
3167                    critical_readers,
3168                    writers,
3169                    schemas,
3170                    trace,
3171                },
3172            },
3173        )
3174    }
3175
3176    pub(crate) fn hollow<T: Timestamp>(
3177        lower: T,
3178        upper: T,
3179        keys: &[&str],
3180        len: usize,
3181    ) -> HollowBatch<T> {
3182        HollowBatch::new_run(
3183            Description::new(
3184                Antichain::from_elem(lower),
3185                Antichain::from_elem(upper),
3186                Antichain::from_elem(T::minimum()),
3187            ),
3188            keys.iter()
3189                .map(|x| {
3190                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3191                        key: PartialBatchKey((*x).to_owned()),
3192                        encoded_size_bytes: 0,
3193                        key_lower: vec![],
3194                        structured_key_lower: None,
3195                        stats: None,
3196                        ts_rewrite: None,
3197                        diffs_sum: None,
3198                        format: None,
3199                        schema_id: None,
3200                        deprecated_schema_id: None,
3201                    }))
3202                })
3203                .collect(),
3204            len,
3205        )
3206    }
3207
3208    #[mz_ore::test]
3209    fn downgrade_since() {
3210        let mut state = TypedState::<(), (), u64, i64>::new(
3211            DUMMY_BUILD_INFO.semver_version(),
3212            ShardId::new(),
3213            "".to_owned(),
3214            0,
3215        );
3216        let reader = LeasedReaderId::new();
3217        let seqno = SeqNo::minimum();
3218        let now = SYSTEM_TIME.clone();
3219        let _ = state.collections.register_leased_reader(
3220            "",
3221            &reader,
3222            "",
3223            seqno,
3224            Duration::from_secs(10),
3225            now(),
3226            false,
3227        );
3228
3229        // The shard global since == 0 initially.
3230        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3231
3232        // Greater
3233        assert_eq!(
3234            state.collections.downgrade_since(
3235                &reader,
3236                seqno,
3237                None,
3238                &Antichain::from_elem(2),
3239                now()
3240            ),
3241            Continue(Since(Antichain::from_elem(2)))
3242        );
3243        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3244        // Equal (no-op)
3245        assert_eq!(
3246            state.collections.downgrade_since(
3247                &reader,
3248                seqno,
3249                None,
3250                &Antichain::from_elem(2),
3251                now()
3252            ),
3253            Continue(Since(Antichain::from_elem(2)))
3254        );
3255        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3256        // Less (no-op)
3257        assert_eq!(
3258            state.collections.downgrade_since(
3259                &reader,
3260                seqno,
3261                None,
3262                &Antichain::from_elem(1),
3263                now()
3264            ),
3265            Continue(Since(Antichain::from_elem(2)))
3266        );
3267        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3268
3269        // Create a second reader.
3270        let reader2 = LeasedReaderId::new();
3271        let _ = state.collections.register_leased_reader(
3272            "",
3273            &reader2,
3274            "",
3275            seqno,
3276            Duration::from_secs(10),
3277            now(),
3278            false,
3279        );
3280
3281        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3282        assert_eq!(
3283            state.collections.downgrade_since(
3284                &reader2,
3285                seqno,
3286                None,
3287                &Antichain::from_elem(3),
3288                now()
3289            ),
3290            Continue(Since(Antichain::from_elem(3)))
3291        );
3292        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3293        // Shard since == 3 when all readers have since >= 3.
3294        assert_eq!(
3295            state.collections.downgrade_since(
3296                &reader,
3297                seqno,
3298                None,
3299                &Antichain::from_elem(5),
3300                now()
3301            ),
3302            Continue(Since(Antichain::from_elem(5)))
3303        );
3304        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3305
3306        // Shard since unaffected readers with since > shard since expiring.
3307        assert_eq!(
3308            state.collections.expire_leased_reader(&reader),
3309            Continue(true)
3310        );
3311        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3312
3313        // Create a third reader.
3314        let reader3 = LeasedReaderId::new();
3315        let _ = state.collections.register_leased_reader(
3316            "",
3317            &reader3,
3318            "",
3319            seqno,
3320            Duration::from_secs(10),
3321            now(),
3322            false,
3323        );
3324
3325        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3326        assert_eq!(
3327            state.collections.downgrade_since(
3328                &reader3,
3329                seqno,
3330                None,
3331                &Antichain::from_elem(10),
3332                now()
3333            ),
3334            Continue(Since(Antichain::from_elem(10)))
3335        );
3336        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3337
3338        // Shard since advances when reader with the minimal since expires.
3339        assert_eq!(
3340            state.collections.expire_leased_reader(&reader2),
3341            Continue(true)
3342        );
3343        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3344        // Switch this assertion back when we re-enable this.
3345        //
3346        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3347        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3348
3349        // Shard since unaffected when all readers are expired.
3350        assert_eq!(
3351            state.collections.expire_leased_reader(&reader3),
3352            Continue(true)
3353        );
3354        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3355        // Switch this assertion back when we re-enable this.
3356        //
3357        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3358        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3359    }
3360
3361    #[mz_ore::test]
3362    fn compare_and_downgrade_since() {
3363        let mut state = TypedState::<(), (), u64, i64>::new(
3364            DUMMY_BUILD_INFO.semver_version(),
3365            ShardId::new(),
3366            "".to_owned(),
3367            0,
3368        );
3369        let reader = CriticalReaderId::new();
3370        let _ = state
3371            .collections
3372            .register_critical_reader::<u64>("", &reader, "");
3373
3374        // The shard global since == 0 initially.
3375        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3376        // The initial opaque value should be set.
3377        assert_eq!(
3378            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3379            u64::initial()
3380        );
3381
3382        // Greater
3383        assert_eq!(
3384            state.collections.compare_and_downgrade_since::<u64>(
3385                &reader,
3386                &u64::initial(),
3387                (&1, &Antichain::from_elem(2)),
3388            ),
3389            Continue(Ok(Since(Antichain::from_elem(2))))
3390        );
3391        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3392        assert_eq!(
3393            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3394            1
3395        );
3396        // Equal (no-op)
3397        assert_eq!(
3398            state.collections.compare_and_downgrade_since::<u64>(
3399                &reader,
3400                &1,
3401                (&2, &Antichain::from_elem(2)),
3402            ),
3403            Continue(Ok(Since(Antichain::from_elem(2))))
3404        );
3405        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3406        assert_eq!(
3407            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3408            2
3409        );
3410        // Less (no-op)
3411        assert_eq!(
3412            state.collections.compare_and_downgrade_since::<u64>(
3413                &reader,
3414                &2,
3415                (&3, &Antichain::from_elem(1)),
3416            ),
3417            Continue(Ok(Since(Antichain::from_elem(2))))
3418        );
3419        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3420        assert_eq!(
3421            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3422            3
3423        );
3424    }
3425
3426    #[mz_ore::test]
3427    fn compare_and_append() {
3428        let state = &mut TypedState::<String, String, u64, i64>::new(
3429            DUMMY_BUILD_INFO.semver_version(),
3430            ShardId::new(),
3431            "".to_owned(),
3432            0,
3433        )
3434        .collections;
3435
3436        let writer_id = WriterId::new();
3437        let now = SYSTEM_TIME.clone();
3438
3439        // State is initially empty.
3440        assert_eq!(state.trace.num_spine_batches(), 0);
3441        assert_eq!(state.trace.num_hollow_batches(), 0);
3442        assert_eq!(state.trace.num_updates(), 0);
3443
3444        // Cannot insert a batch with a lower != current shard upper.
3445        assert_eq!(
3446            state.compare_and_append(
3447                &hollow(1, 2, &["key1"], 1),
3448                &writer_id,
3449                now(),
3450                LEASE_DURATION_MS,
3451                &IdempotencyToken::new(),
3452                &debug_state(),
3453                0,
3454                100,
3455                None
3456            ),
3457            Break(CompareAndAppendBreak::Upper {
3458                shard_upper: Antichain::from_elem(0),
3459                writer_upper: Antichain::from_elem(0)
3460            })
3461        );
3462
3463        // Insert an empty batch with an upper > lower..
3464        assert!(
3465            state
3466                .compare_and_append(
3467                    &hollow(0, 5, &[], 0),
3468                    &writer_id,
3469                    now(),
3470                    LEASE_DURATION_MS,
3471                    &IdempotencyToken::new(),
3472                    &debug_state(),
3473                    0,
3474                    100,
3475                    None
3476                )
3477                .is_continue()
3478        );
3479
3480        // Cannot insert a batch with a upper less than the lower.
3481        assert_eq!(
3482            state.compare_and_append(
3483                &hollow(5, 4, &["key1"], 1),
3484                &writer_id,
3485                now(),
3486                LEASE_DURATION_MS,
3487                &IdempotencyToken::new(),
3488                &debug_state(),
3489                0,
3490                100,
3491                None
3492            ),
3493            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3494                lower: Antichain::from_elem(5),
3495                upper: Antichain::from_elem(4)
3496            }))
3497        );
3498
3499        // Cannot insert a nonempty batch with an upper equal to lower.
3500        assert_eq!(
3501            state.compare_and_append(
3502                &hollow(5, 5, &["key1"], 1),
3503                &writer_id,
3504                now(),
3505                LEASE_DURATION_MS,
3506                &IdempotencyToken::new(),
3507                &debug_state(),
3508                0,
3509                100,
3510                None
3511            ),
3512            Break(CompareAndAppendBreak::InvalidUsage(
3513                InvalidEmptyTimeInterval {
3514                    lower: Antichain::from_elem(5),
3515                    upper: Antichain::from_elem(5),
3516                    keys: vec!["key1".to_owned()],
3517                }
3518            ))
3519        );
3520
3521        // Can insert an empty batch with an upper equal to lower.
3522        assert!(
3523            state
3524                .compare_and_append(
3525                    &hollow(5, 5, &[], 0),
3526                    &writer_id,
3527                    now(),
3528                    LEASE_DURATION_MS,
3529                    &IdempotencyToken::new(),
3530                    &debug_state(),
3531                    0,
3532                    100,
3533                    None
3534                )
3535                .is_continue()
3536        );
3537    }
3538
3539    #[mz_ore::test]
3540    fn snapshot() {
3541        let now = SYSTEM_TIME.clone();
3542
3543        let mut state = TypedState::<String, String, u64, i64>::new(
3544            DUMMY_BUILD_INFO.semver_version(),
3545            ShardId::new(),
3546            "".to_owned(),
3547            0,
3548        );
3549        // Cannot take a snapshot with as_of == shard upper.
3550        assert_eq!(
3551            state.snapshot(&Antichain::from_elem(0)),
3552            Err(SnapshotErr::AsOfNotYetAvailable(
3553                SeqNo(0),
3554                Upper(Antichain::from_elem(0))
3555            ))
3556        );
3557
3558        // Cannot take a snapshot with as_of > shard upper.
3559        assert_eq!(
3560            state.snapshot(&Antichain::from_elem(5)),
3561            Err(SnapshotErr::AsOfNotYetAvailable(
3562                SeqNo(0),
3563                Upper(Antichain::from_elem(0))
3564            ))
3565        );
3566
3567        let writer_id = WriterId::new();
3568
3569        // Advance upper to 5.
3570        assert!(
3571            state
3572                .collections
3573                .compare_and_append(
3574                    &hollow(0, 5, &["key1"], 1),
3575                    &writer_id,
3576                    now(),
3577                    LEASE_DURATION_MS,
3578                    &IdempotencyToken::new(),
3579                    &debug_state(),
3580                    0,
3581                    100,
3582                    None
3583                )
3584                .is_continue()
3585        );
3586
3587        // Can take a snapshot with as_of < upper.
3588        assert_eq!(
3589            state.snapshot(&Antichain::from_elem(0)),
3590            Ok(vec![hollow(0, 5, &["key1"], 1)])
3591        );
3592
3593        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3594        assert_eq!(
3595            state.snapshot(&Antichain::from_elem(4)),
3596            Ok(vec![hollow(0, 5, &["key1"], 1)])
3597        );
3598
3599        // Cannot take a snapshot with as_of >= upper.
3600        assert_eq!(
3601            state.snapshot(&Antichain::from_elem(5)),
3602            Err(SnapshotErr::AsOfNotYetAvailable(
3603                SeqNo(0),
3604                Upper(Antichain::from_elem(5))
3605            ))
3606        );
3607        assert_eq!(
3608            state.snapshot(&Antichain::from_elem(6)),
3609            Err(SnapshotErr::AsOfNotYetAvailable(
3610                SeqNo(0),
3611                Upper(Antichain::from_elem(5))
3612            ))
3613        );
3614
3615        let reader = LeasedReaderId::new();
3616        // Advance the since to 2.
3617        let _ = state.collections.register_leased_reader(
3618            "",
3619            &reader,
3620            "",
3621            SeqNo::minimum(),
3622            Duration::from_secs(10),
3623            now(),
3624            false,
3625        );
3626        assert_eq!(
3627            state.collections.downgrade_since(
3628                &reader,
3629                SeqNo::minimum(),
3630                None,
3631                &Antichain::from_elem(2),
3632                now()
3633            ),
3634            Continue(Since(Antichain::from_elem(2)))
3635        );
3636        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3637        // Cannot take a snapshot with as_of < shard_since.
3638        assert_eq!(
3639            state.snapshot(&Antichain::from_elem(1)),
3640            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3641                Antichain::from_elem(2)
3642            )))
3643        );
3644
3645        // Advance the upper to 10 via an empty batch.
3646        assert!(
3647            state
3648                .collections
3649                .compare_and_append(
3650                    &hollow(5, 10, &[], 0),
3651                    &writer_id,
3652                    now(),
3653                    LEASE_DURATION_MS,
3654                    &IdempotencyToken::new(),
3655                    &debug_state(),
3656                    0,
3657                    100,
3658                    None
3659                )
3660                .is_continue()
3661        );
3662
3663        // Can still take snapshots at times < upper.
3664        assert_eq!(
3665            state.snapshot(&Antichain::from_elem(7)),
3666            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3667        );
3668
3669        // Cannot take snapshots with as_of >= upper.
3670        assert_eq!(
3671            state.snapshot(&Antichain::from_elem(10)),
3672            Err(SnapshotErr::AsOfNotYetAvailable(
3673                SeqNo(0),
3674                Upper(Antichain::from_elem(10))
3675            ))
3676        );
3677
3678        // Advance upper to 15.
3679        assert!(
3680            state
3681                .collections
3682                .compare_and_append(
3683                    &hollow(10, 15, &["key2"], 1),
3684                    &writer_id,
3685                    now(),
3686                    LEASE_DURATION_MS,
3687                    &IdempotencyToken::new(),
3688                    &debug_state(),
3689                    0,
3690                    100,
3691                    None
3692                )
3693                .is_continue()
3694        );
3695
3696        // Filter out batches whose lowers are less than the requested as of (the
3697        // batches that are too far in the future for the requested as_of).
3698        assert_eq!(
3699            state.snapshot(&Antichain::from_elem(9)),
3700            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3701        );
3702
3703        // Don't filter out batches whose lowers are <= the requested as_of.
3704        assert_eq!(
3705            state.snapshot(&Antichain::from_elem(10)),
3706            Ok(vec![
3707                hollow(0, 5, &["key1"], 1),
3708                hollow(5, 10, &[], 0),
3709                hollow(10, 15, &["key2"], 1)
3710            ])
3711        );
3712
3713        assert_eq!(
3714            state.snapshot(&Antichain::from_elem(11)),
3715            Ok(vec![
3716                hollow(0, 5, &["key1"], 1),
3717                hollow(5, 10, &[], 0),
3718                hollow(10, 15, &["key2"], 1)
3719            ])
3720        );
3721    }
3722
3723    #[mz_ore::test]
3724    fn next_listen_batch() {
3725        let mut state = TypedState::<String, String, u64, i64>::new(
3726            DUMMY_BUILD_INFO.semver_version(),
3727            ShardId::new(),
3728            "".to_owned(),
3729            0,
3730        );
3731
3732        // Empty collection never has any batches to listen for, regardless of the
3733        // current frontier.
3734        assert_eq!(
3735            state.next_listen_batch(&Antichain::from_elem(0)),
3736            Err(SeqNo(0))
3737        );
3738        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3739
3740        let writer_id = WriterId::new();
3741        let now = SYSTEM_TIME.clone();
3742
3743        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3744        assert!(
3745            state
3746                .collections
3747                .compare_and_append(
3748                    &hollow(0, 5, &["key1"], 1),
3749                    &writer_id,
3750                    now(),
3751                    LEASE_DURATION_MS,
3752                    &IdempotencyToken::new(),
3753                    &debug_state(),
3754                    0,
3755                    100,
3756                    None
3757                )
3758                .is_continue()
3759        );
3760        assert!(
3761            state
3762                .collections
3763                .compare_and_append(
3764                    &hollow(5, 10, &["key2"], 1),
3765                    &writer_id,
3766                    now(),
3767                    LEASE_DURATION_MS,
3768                    &IdempotencyToken::new(),
3769                    &debug_state(),
3770                    0,
3771                    100,
3772                    None
3773                )
3774                .is_continue()
3775        );
3776
3777        // All frontiers in [0, 5) return the first batch.
3778        for t in 0..=4 {
3779            assert_eq!(
3780                state.next_listen_batch(&Antichain::from_elem(t)),
3781                Ok(hollow(0, 5, &["key1"], 1))
3782            );
3783        }
3784
3785        // All frontiers in [5, 10) return the second batch.
3786        for t in 5..=9 {
3787            assert_eq!(
3788                state.next_listen_batch(&Antichain::from_elem(t)),
3789                Ok(hollow(5, 10, &["key2"], 1))
3790            );
3791        }
3792
3793        // There is no batch currently available for t = 10.
3794        assert_eq!(
3795            state.next_listen_batch(&Antichain::from_elem(10)),
3796            Err(SeqNo(0))
3797        );
3798
3799        // By definition, there is no frontier ever at the empty antichain which
3800        // is the time after all possible times.
3801        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3802    }
3803
3804    #[mz_ore::test]
3805    fn expire_writer() {
3806        let mut state = TypedState::<String, String, u64, i64>::new(
3807            DUMMY_BUILD_INFO.semver_version(),
3808            ShardId::new(),
3809            "".to_owned(),
3810            0,
3811        );
3812        let now = SYSTEM_TIME.clone();
3813
3814        let writer_id_one = WriterId::new();
3815
3816        let writer_id_two = WriterId::new();
3817
3818        // Writer is eligible to write
3819        assert!(
3820            state
3821                .collections
3822                .compare_and_append(
3823                    &hollow(0, 2, &["key1"], 1),
3824                    &writer_id_one,
3825                    now(),
3826                    LEASE_DURATION_MS,
3827                    &IdempotencyToken::new(),
3828                    &debug_state(),
3829                    0,
3830                    100,
3831                    None
3832                )
3833                .is_continue()
3834        );
3835
3836        assert!(
3837            state
3838                .collections
3839                .expire_writer(&writer_id_one)
3840                .is_continue()
3841        );
3842
3843        // Other writers should still be able to write
3844        assert!(
3845            state
3846                .collections
3847                .compare_and_append(
3848                    &hollow(2, 5, &["key2"], 1),
3849                    &writer_id_two,
3850                    now(),
3851                    LEASE_DURATION_MS,
3852                    &IdempotencyToken::new(),
3853                    &debug_state(),
3854                    0,
3855                    100,
3856                    None
3857                )
3858                .is_continue()
3859        );
3860    }
3861
3862    #[mz_ore::test]
3863    fn maybe_gc_active_gc() {
3864        const GC_CONFIG: GcConfig = GcConfig {
3865            use_active_gc: true,
3866            fallback_threshold_ms: 5000,
3867            min_versions: 99,
3868            max_versions: 500,
3869        };
3870        let now_fn = SYSTEM_TIME.clone();
3871
3872        let mut state = TypedState::<String, String, u64, i64>::new(
3873            DUMMY_BUILD_INFO.semver_version(),
3874            ShardId::new(),
3875            "".to_owned(),
3876            0,
3877        );
3878
3879        let now = now_fn();
3880        // Empty state doesn't need gc, regardless of is_write.
3881        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3882        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3883
3884        // Artificially advance the seqno so the seqno_since advances past our
3885        // internal gc_threshold.
3886        state.seqno = SeqNo(100);
3887        assert_eq!(state.seqno_since(), SeqNo(100));
3888
3889        // When a writer is present, non-writes don't gc.
3890        let writer_id = WriterId::new();
3891        let _ = state.collections.compare_and_append(
3892            &hollow(1, 2, &["key1"], 1),
3893            &writer_id,
3894            now,
3895            LEASE_DURATION_MS,
3896            &IdempotencyToken::new(),
3897            &debug_state(),
3898            0,
3899            100,
3900            None,
3901        );
3902        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3903
3904        // A write will gc though.
3905        assert_eq!(
3906            state.maybe_gc(true, now, GC_CONFIG),
3907            Some(GcReq {
3908                shard_id: state.shard_id,
3909                new_seqno_since: SeqNo(100)
3910            })
3911        );
3912
3913        // But if we write down an active gc, we won't gc.
3914        state.collections.active_gc = Some(ActiveGc {
3915            seqno: state.seqno,
3916            start_ms: now,
3917        });
3918
3919        state.seqno = SeqNo(200);
3920        assert_eq!(state.seqno_since(), SeqNo(200));
3921
3922        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3923
3924        state.seqno = SeqNo(300);
3925        assert_eq!(state.seqno_since(), SeqNo(300));
3926        // But if we advance the time past the threshold, we will gc.
3927        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3928        assert_eq!(
3929            state.maybe_gc(true, new_now, GC_CONFIG),
3930            Some(GcReq {
3931                shard_id: state.shard_id,
3932                new_seqno_since: SeqNo(300)
3933            })
3934        );
3935
3936        // Even if the sequence number doesn't pass the threshold, if the
3937        // active gc is expired, we will gc.
3938
3939        state.seqno = SeqNo(301);
3940        assert_eq!(state.seqno_since(), SeqNo(301));
3941        assert_eq!(
3942            state.maybe_gc(true, new_now, GC_CONFIG),
3943            Some(GcReq {
3944                shard_id: state.shard_id,
3945                new_seqno_since: SeqNo(301)
3946            })
3947        );
3948
3949        state.collections.active_gc = None;
3950
3951        // Artificially advance the seqno (again) so the seqno_since advances
3952        // past our internal gc_threshold (again).
3953        state.seqno = SeqNo(400);
3954        assert_eq!(state.seqno_since(), SeqNo(400));
3955
3956        let now = now_fn();
3957
3958        // If there are no writers, even a non-write will gc.
3959        let _ = state.collections.expire_writer(&writer_id);
3960        assert_eq!(
3961            state.maybe_gc(false, now, GC_CONFIG),
3962            Some(GcReq {
3963                shard_id: state.shard_id,
3964                new_seqno_since: SeqNo(400)
3965            })
3966        );
3967
3968        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3969        let previous_seqno = state.seqno;
3970        state.seqno = SeqNo(10_000);
3971        assert_eq!(state.seqno_since(), SeqNo(10_000));
3972
3973        let now = now_fn();
3974        assert_eq!(
3975            state.maybe_gc(true, now, GC_CONFIG),
3976            Some(GcReq {
3977                shard_id: state.shard_id,
3978                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3979            })
3980        );
3981    }
3982
3983    #[mz_ore::test]
3984    fn maybe_gc_classic() {
3985        const GC_CONFIG: GcConfig = GcConfig {
3986            use_active_gc: false,
3987            fallback_threshold_ms: 5000,
3988            min_versions: 16,
3989            max_versions: 128,
3990        };
3991        const NOW_MS: u64 = 0;
3992
3993        let mut state = TypedState::<String, String, u64, i64>::new(
3994            DUMMY_BUILD_INFO.semver_version(),
3995            ShardId::new(),
3996            "".to_owned(),
3997            0,
3998        );
3999
4000        // Empty state doesn't need gc, regardless of is_write.
4001        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
4002        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4003
4004        // Artificially advance the seqno so the seqno_since advances past our
4005        // internal gc_threshold.
4006        state.seqno = SeqNo(100);
4007        assert_eq!(state.seqno_since(), SeqNo(100));
4008
4009        // When a writer is present, non-writes don't gc.
4010        let writer_id = WriterId::new();
4011        let now = SYSTEM_TIME.clone();
4012        let _ = state.collections.compare_and_append(
4013            &hollow(1, 2, &["key1"], 1),
4014            &writer_id,
4015            now(),
4016            LEASE_DURATION_MS,
4017            &IdempotencyToken::new(),
4018            &debug_state(),
4019            0,
4020            100,
4021            None,
4022        );
4023        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4024
4025        // A write will gc though.
4026        assert_eq!(
4027            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4028            Some(GcReq {
4029                shard_id: state.shard_id,
4030                new_seqno_since: SeqNo(100)
4031            })
4032        );
4033
4034        // Artificially advance the seqno (again) so the seqno_since advances
4035        // past our internal gc_threshold (again).
4036        state.seqno = SeqNo(200);
4037        assert_eq!(state.seqno_since(), SeqNo(200));
4038
4039        // If there are no writers, even a non-write will gc.
4040        let _ = state.collections.expire_writer(&writer_id);
4041        assert_eq!(
4042            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4043            Some(GcReq {
4044                shard_id: state.shard_id,
4045                new_seqno_since: SeqNo(200)
4046            })
4047        );
4048    }
4049
4050    #[mz_ore::test]
4051    fn need_rollup_active_rollup() {
4052        const ROLLUP_THRESHOLD: usize = 3;
4053        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4054        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4055        let now = SYSTEM_TIME.clone();
4056
4057        mz_ore::test::init_logging();
4058        let mut state = TypedState::<String, String, u64, i64>::new(
4059            DUMMY_BUILD_INFO.semver_version(),
4060            ShardId::new(),
4061            "".to_owned(),
4062            0,
4063        );
4064
4065        let rollup_seqno = SeqNo(5);
4066        let rollup = HollowRollup {
4067            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4068            encoded_size_bytes: None,
4069        };
4070
4071        assert!(
4072            state
4073                .collections
4074                .add_rollup((rollup_seqno, &rollup))
4075                .is_continue()
4076        );
4077
4078        // shouldn't need a rollup at the seqno of the rollup
4079        state.seqno = SeqNo(5);
4080        assert_none!(state.need_rollup(
4081            ROLLUP_THRESHOLD,
4082            ROLLUP_USE_ACTIVE_ROLLUP,
4083            ROLLUP_FALLBACK_THRESHOLD_MS,
4084            now()
4085        ));
4086
4087        // shouldn't need a rollup at seqnos less than our threshold
4088        state.seqno = SeqNo(6);
4089        assert_none!(state.need_rollup(
4090            ROLLUP_THRESHOLD,
4091            ROLLUP_USE_ACTIVE_ROLLUP,
4092            ROLLUP_FALLBACK_THRESHOLD_MS,
4093            now()
4094        ));
4095        state.seqno = SeqNo(7);
4096        assert_none!(state.need_rollup(
4097            ROLLUP_THRESHOLD,
4098            ROLLUP_USE_ACTIVE_ROLLUP,
4099            ROLLUP_FALLBACK_THRESHOLD_MS,
4100            now()
4101        ));
4102        state.seqno = SeqNo(8);
4103        assert_none!(state.need_rollup(
4104            ROLLUP_THRESHOLD,
4105            ROLLUP_USE_ACTIVE_ROLLUP,
4106            ROLLUP_FALLBACK_THRESHOLD_MS,
4107            now()
4108        ));
4109
4110        let mut current_time = now();
4111        // hit our threshold! we should need a rollup
4112        state.seqno = SeqNo(9);
4113        assert_eq!(
4114            state
4115                .need_rollup(
4116                    ROLLUP_THRESHOLD,
4117                    ROLLUP_USE_ACTIVE_ROLLUP,
4118                    ROLLUP_FALLBACK_THRESHOLD_MS,
4119                    current_time
4120                )
4121                .expect("rollup"),
4122            SeqNo(9)
4123        );
4124
4125        state.collections.active_rollup = Some(ActiveRollup {
4126            seqno: SeqNo(9),
4127            start_ms: current_time,
4128        });
4129
4130        // There is now an active rollup, so we shouldn't need a rollup.
4131        assert_none!(state.need_rollup(
4132            ROLLUP_THRESHOLD,
4133            ROLLUP_USE_ACTIVE_ROLLUP,
4134            ROLLUP_FALLBACK_THRESHOLD_MS,
4135            current_time
4136        ));
4137
4138        state.seqno = SeqNo(10);
4139        // We still don't need a rollup, even though the seqno is greater than
4140        // the rollup threshold.
4141        assert_none!(state.need_rollup(
4142            ROLLUP_THRESHOLD,
4143            ROLLUP_USE_ACTIVE_ROLLUP,
4144            ROLLUP_FALLBACK_THRESHOLD_MS,
4145            current_time
4146        ));
4147
4148        // But if we wait long enough, we should need a rollup again.
4149        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4150        assert_eq!(
4151            state
4152                .need_rollup(
4153                    ROLLUP_THRESHOLD,
4154                    ROLLUP_USE_ACTIVE_ROLLUP,
4155                    ROLLUP_FALLBACK_THRESHOLD_MS,
4156                    current_time
4157                )
4158                .expect("rollup"),
4159            SeqNo(10)
4160        );
4161
4162        state.seqno = SeqNo(9);
4163        // Clear the active rollup and ensure we need a rollup again.
4164        state.collections.active_rollup = None;
4165        let rollup_seqno = SeqNo(9);
4166        let rollup = HollowRollup {
4167            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4168            encoded_size_bytes: None,
4169        };
4170        assert!(
4171            state
4172                .collections
4173                .add_rollup((rollup_seqno, &rollup))
4174                .is_continue()
4175        );
4176
4177        state.seqno = SeqNo(11);
4178        // We shouldn't need a rollup at seqnos less than our threshold
4179        assert_none!(state.need_rollup(
4180            ROLLUP_THRESHOLD,
4181            ROLLUP_USE_ACTIVE_ROLLUP,
4182            ROLLUP_FALLBACK_THRESHOLD_MS,
4183            current_time
4184        ));
4185        // hit our threshold! we should need a rollup
4186        state.seqno = SeqNo(13);
4187        assert_eq!(
4188            state
4189                .need_rollup(
4190                    ROLLUP_THRESHOLD,
4191                    ROLLUP_USE_ACTIVE_ROLLUP,
4192                    ROLLUP_FALLBACK_THRESHOLD_MS,
4193                    current_time
4194                )
4195                .expect("rollup"),
4196            SeqNo(13)
4197        );
4198    }
4199
4200    #[mz_ore::test]
4201    fn need_rollup_classic() {
4202        const ROLLUP_THRESHOLD: usize = 3;
4203        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4204        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4205        const NOW: u64 = 0;
4206
4207        mz_ore::test::init_logging();
4208        let mut state = TypedState::<String, String, u64, i64>::new(
4209            DUMMY_BUILD_INFO.semver_version(),
4210            ShardId::new(),
4211            "".to_owned(),
4212            0,
4213        );
4214
4215        let rollup_seqno = SeqNo(5);
4216        let rollup = HollowRollup {
4217            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4218            encoded_size_bytes: None,
4219        };
4220
4221        assert!(
4222            state
4223                .collections
4224                .add_rollup((rollup_seqno, &rollup))
4225                .is_continue()
4226        );
4227
4228        // shouldn't need a rollup at the seqno of the rollup
4229        state.seqno = SeqNo(5);
4230        assert_none!(state.need_rollup(
4231            ROLLUP_THRESHOLD,
4232            ROLLUP_USE_ACTIVE_ROLLUP,
4233            ROLLUP_FALLBACK_THRESHOLD_MS,
4234            NOW
4235        ));
4236
4237        // shouldn't need a rollup at seqnos less than our threshold
4238        state.seqno = SeqNo(6);
4239        assert_none!(state.need_rollup(
4240            ROLLUP_THRESHOLD,
4241            ROLLUP_USE_ACTIVE_ROLLUP,
4242            ROLLUP_FALLBACK_THRESHOLD_MS,
4243            NOW
4244        ));
4245        state.seqno = SeqNo(7);
4246        assert_none!(state.need_rollup(
4247            ROLLUP_THRESHOLD,
4248            ROLLUP_USE_ACTIVE_ROLLUP,
4249            ROLLUP_FALLBACK_THRESHOLD_MS,
4250            NOW
4251        ));
4252
4253        // hit our threshold! we should need a rollup
4254        state.seqno = SeqNo(8);
4255        assert_eq!(
4256            state
4257                .need_rollup(
4258                    ROLLUP_THRESHOLD,
4259                    ROLLUP_USE_ACTIVE_ROLLUP,
4260                    ROLLUP_FALLBACK_THRESHOLD_MS,
4261                    NOW
4262                )
4263                .expect("rollup"),
4264            SeqNo(8)
4265        );
4266
4267        // but we don't need rollups for every seqno > the threshold
4268        state.seqno = SeqNo(9);
4269        assert_none!(state.need_rollup(
4270            ROLLUP_THRESHOLD,
4271            ROLLUP_USE_ACTIVE_ROLLUP,
4272            ROLLUP_FALLBACK_THRESHOLD_MS,
4273            NOW
4274        ));
4275
4276        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4277        state.seqno = SeqNo(11);
4278        assert_eq!(
4279            state
4280                .need_rollup(
4281                    ROLLUP_THRESHOLD,
4282                    ROLLUP_USE_ACTIVE_ROLLUP,
4283                    ROLLUP_FALLBACK_THRESHOLD_MS,
4284                    NOW
4285                )
4286                .expect("rollup"),
4287            SeqNo(11)
4288        );
4289
4290        // add another rollup and ensure we're always picking the latest
4291        let rollup_seqno = SeqNo(6);
4292        let rollup = HollowRollup {
4293            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4294            encoded_size_bytes: None,
4295        };
4296        assert!(
4297            state
4298                .collections
4299                .add_rollup((rollup_seqno, &rollup))
4300                .is_continue()
4301        );
4302
4303        state.seqno = SeqNo(8);
4304        assert_none!(state.need_rollup(
4305            ROLLUP_THRESHOLD,
4306            ROLLUP_USE_ACTIVE_ROLLUP,
4307            ROLLUP_FALLBACK_THRESHOLD_MS,
4308            NOW
4309        ));
4310        state.seqno = SeqNo(9);
4311        assert_eq!(
4312            state
4313                .need_rollup(
4314                    ROLLUP_THRESHOLD,
4315                    ROLLUP_USE_ACTIVE_ROLLUP,
4316                    ROLLUP_FALLBACK_THRESHOLD_MS,
4317                    NOW
4318                )
4319                .expect("rollup"),
4320            SeqNo(9)
4321        );
4322
4323        // and ensure that after a fallback point, we assign every seqno work
4324        let fallback_seqno = SeqNo(
4325            rollup_seqno.0
4326                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4327        );
4328        state.seqno = fallback_seqno;
4329        assert_eq!(
4330            state
4331                .need_rollup(
4332                    ROLLUP_THRESHOLD,
4333                    ROLLUP_USE_ACTIVE_ROLLUP,
4334                    ROLLUP_FALLBACK_THRESHOLD_MS,
4335                    NOW
4336                )
4337                .expect("rollup"),
4338            fallback_seqno
4339        );
4340        state.seqno = fallback_seqno.next();
4341        assert_eq!(
4342            state
4343                .need_rollup(
4344                    ROLLUP_THRESHOLD,
4345                    ROLLUP_USE_ACTIVE_ROLLUP,
4346                    ROLLUP_FALLBACK_THRESHOLD_MS,
4347                    NOW
4348                )
4349                .expect("rollup"),
4350            fallback_seqno.next()
4351        );
4352    }
4353
4354    #[mz_ore::test]
4355    fn idempotency_token_sentinel() {
4356        assert_eq!(
4357            IdempotencyToken::SENTINEL.to_string(),
4358            "i11111111-1111-1111-1111-111111111111"
4359        );
4360    }
4361
4362    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4363    /// randomness, so that it's deterministic. This lets us assert the
4364    /// serialization of that State against a golden file that's committed,
4365    /// making it easy to see what the serialization (used in an upcoming
4366    /// INSPECT feature) looks like.
4367    ///
4368    /// This golden will have to be updated each time we change State, but
4369    /// that's a feature, not a bug.
4370    #[mz_ore::test]
4371    #[cfg_attr(miri, ignore)] // too slow
4372    fn state_inspect_serde_json() {
4373        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4374        let mut runner = proptest::test_runner::TestRunner::deterministic();
4375        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4376        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4377        assert_eq!(
4378            json.trim(),
4379            STATE_SERDE_JSON.trim(),
4380            "\n\nNEW GOLDEN\n{}\n",
4381            json
4382        );
4383    }
4384
4385    #[mz_persist_proc::test(tokio::test)]
4386    #[cfg_attr(miri, ignore)] // too slow
4387    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4388        let mut clients = new_test_client_cache(&dyncfgs);
4389        let shard_id = ShardId::new();
4390
4391        async fn open_and_write(
4392            clients: &mut PersistClientCache,
4393            version: semver::Version,
4394            shard_id: ShardId,
4395        ) -> Result<(), tokio::task::JoinError> {
4396            clients.cfg.build_version = version.clone();
4397            clients.clear_state_cache();
4398            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4399            // Run in a task so we can catch the panic.
4400            mz_ore::task::spawn(|| version.to_string(), async move {
4401                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4402                let current = *write.upper().as_option().unwrap();
4403                // Do a write so that we tag the state with the version.
4404                write
4405                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4406                    .await;
4407            })
4408            .await
4409        }
4410
4411        // Start at v0.10.0.
4412        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4413        assert_ok!(res);
4414
4415        // Upgrade to v0.11.0 is allowed.
4416        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4417        assert_ok!(res);
4418
4419        // Downgrade to v0.10.0 is allowed.
4420        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4421        assert_ok!(res);
4422
4423        // Downgrade to v0.9.0 is _NOT_ allowed.
4424        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4425        assert!(res.unwrap_err().is_panic());
4426    }
4427
4428    #[mz_ore::test]
4429    fn runid_roundtrip() {
4430        proptest!(|(runid: RunId)| {
4431            let runid_str = runid.to_string();
4432            let parsed = RunId::from_str(&runid_str);
4433            prop_assert_eq!(parsed, Ok(runid));
4434        });
4435    }
4436}