Skip to main content

mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::{CriticalReaderId, Opaque};
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75    env!("OUT_DIR"),
76    "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80    env!("OUT_DIR"),
81    "/mz_persist_client.internal.diff.rs"
82));
83
84/// Determines how often to write rollups, assigning a maintenance task after
85/// `rollup_threshold` seqnos have passed since the last rollup.
86///
87/// Tuning note: in the absence of a long reader seqno hold, and with
88/// incremental GC, this threshold will determine about how many live diffs are
89/// held in Consensus. Lowering this value decreases the live diff count at the
90/// cost of more maintenance work + blob writes.
91pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92    "persist_rollup_threshold",
93    128,
94    "The number of seqnos between rollups.",
95);
96
97/// Determines how long to wait before an active rollup is considered
98/// "stuck" and a new rollup is started.
99pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100    "persist_rollup_fallback_threshold_ms",
101    5000,
102    "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105/// Feature flag the new active rollup tracking mechanism.
106/// We musn't enable this until we are fully deployed on the new version.
107pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108    "persist_rollup_use_active_rollup",
109    true,
110    "Whether to use the new active rollup tracking mechanism.",
111);
112
113/// Determines how long to wait before an active GC is considered
114/// "stuck" and a new GC is started.
115pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116    "persist_gc_fallback_threshold_ms",
117    900000,
118    "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121/// See the config description string.
122pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123    "persist_gc_min_versions",
124    32,
125    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128/// See the config description string.
129pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130    "persist_gc_max_versions",
131    128_000,
132    "The maximum number of versions to GC in a single GC run.",
133);
134
135/// Feature flag the new active GC tracking mechanism.
136/// We musn't enable this until we are fully deployed on the new version.
137pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138    "persist_gc_use_active_gc",
139    false,
140    "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144    "persist_enable_incremental_compaction",
145    false,
146    "Whether to enable incremental compaction.",
147);
148
149/// A token to disambiguate state commands that could not otherwise be
150/// idempotent.
151#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(f, "i{}", Uuid::from_bytes(self.0))
158    }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164    }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168    type Err = String;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172    }
173}
174
175impl From<IdempotencyToken> for String {
176    fn from(x: IdempotencyToken) -> Self {
177        x.to_string()
178    }
179}
180
181impl IdempotencyToken {
182    pub(crate) fn new() -> Self {
183        IdempotencyToken(*Uuid::new_v4().as_bytes())
184    }
185    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190    /// The seqno capability of this reader.
191    pub seqno: SeqNo,
192    /// The since capability of this reader.
193    pub since: Antichain<T>,
194    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
195    pub last_heartbeat_timestamp_ms: u64,
196    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
197    /// after which this reader may be expired
198    pub lease_duration_ms: u64,
199    /// For debugging.
200    pub debug: HandleDebugState,
201}
202
203#[derive(Clone, Debug, PartialEq, Serialize)]
204pub struct CriticalReaderState<T> {
205    /// The since capability of this reader.
206    pub since: Antichain<T>,
207    /// An opaque token matched on by compare_and_downgrade_since.
208    pub opaque: Opaque,
209    /// For debugging.
210    pub debug: HandleDebugState,
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct WriterState<T> {
215    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
216    pub last_heartbeat_timestamp_ms: u64,
217    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
218    /// after which this writer may be expired
219    pub lease_duration_ms: u64,
220    /// The idempotency token of the most recent successful compare_and_append
221    /// by this writer.
222    pub most_recent_write_token: IdempotencyToken,
223    /// The upper of the most recent successful compare_and_append by this
224    /// writer.
225    pub most_recent_write_upper: Antichain<T>,
226    /// For debugging.
227    pub debug: HandleDebugState,
228}
229
230/// Debugging info for a reader or writer.
231#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
232pub struct HandleDebugState {
233    /// Hostname of the persist user that registered this writer or reader. For
234    /// critical readers, this is the _most recent_ registration.
235    pub hostname: String,
236    /// Plaintext description of this writer or reader's intent.
237    pub purpose: String,
238}
239
240/// Part of the updates in a Batch.
241///
242/// Either a pointer to ones stored in Blob or the updates themselves inlined.
243#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
244#[serde(tag = "type")]
245pub enum BatchPart<T> {
246    Hollow(HollowBatchPart<T>),
247    Inline {
248        updates: LazyInlineBatchPart,
249        ts_rewrite: Option<Antichain<T>>,
250        schema_id: Option<SchemaId>,
251
252        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
253        deprecated_schema_id: Option<SchemaId>,
254    },
255}
256
257fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
258    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
259        let proto = lower.decode()?;
260        let data = ArrayData::from_proto(proto)?;
261        ensure!(data.len() == 1);
262        Ok(ArrayBound::new(make_array(data), 0))
263    };
264
265    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
266
267    match decoded {
268        Ok(bound) => Some(bound),
269        Err(e) => {
270            soft_panic_or_log!("failed to decode bound: {e:#?}");
271            None
272        }
273    }
274}
275
276impl<T> BatchPart<T> {
277    pub fn hollow_bytes(&self) -> usize {
278        match self {
279            BatchPart::Hollow(x) => x.encoded_size_bytes,
280            BatchPart::Inline { .. } => 0,
281        }
282    }
283
284    pub fn is_inline(&self) -> bool {
285        matches!(self, BatchPart::Inline { .. })
286    }
287
288    pub fn inline_bytes(&self) -> usize {
289        match self {
290            BatchPart::Hollow(_) => 0,
291            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
292        }
293    }
294
295    pub fn writer_key(&self) -> Option<WriterKey> {
296        match self {
297            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
298            BatchPart::Inline { .. } => None,
299        }
300    }
301
302    pub fn encoded_size_bytes(&self) -> usize {
303        match self {
304            BatchPart::Hollow(x) => x.encoded_size_bytes,
305            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
306        }
307    }
308
309    // A user-interpretable identifier or description of the part (for logs and
310    // such).
311    pub fn printable_name(&self) -> &str {
312        match self {
313            BatchPart::Hollow(x) => x.key.0.as_str(),
314            BatchPart::Inline { .. } => "<inline>",
315        }
316    }
317
318    pub fn stats(&self) -> Option<&LazyPartStats> {
319        match self {
320            BatchPart::Hollow(x) => x.stats.as_ref(),
321            BatchPart::Inline { .. } => None,
322        }
323    }
324
325    pub fn key_lower(&self) -> &[u8] {
326        match self {
327            BatchPart::Hollow(x) => x.key_lower.as_slice(),
328            // We don't duplicate the lowest key because this can be
329            // considerable overhead for small parts.
330            //
331            // The empty key might not be a tight lower bound, but it is a valid
332            // lower bound. If a caller is interested in a tighter lower bound,
333            // the data is inline.
334            BatchPart::Inline { .. } => &[],
335        }
336    }
337
338    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
339        let part = match self {
340            BatchPart::Hollow(part) => part,
341            BatchPart::Inline { .. } => return None,
342        };
343
344        decode_structured_lower(part.structured_key_lower.as_ref()?)
345    }
346
347    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
348        match self {
349            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
350            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
351        }
352    }
353
354    pub fn schema_id(&self) -> Option<SchemaId> {
355        match self {
356            BatchPart::Hollow(x) => x.schema_id,
357            BatchPart::Inline { schema_id, .. } => *schema_id,
358        }
359    }
360
361    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
362        match self {
363            BatchPart::Hollow(x) => x.deprecated_schema_id,
364            BatchPart::Inline {
365                deprecated_schema_id,
366                ..
367            } => *deprecated_schema_id,
368        }
369    }
370}
371
372impl<T: Timestamp + Codec64> BatchPart<T> {
373    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
374        match self {
375            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
376            BatchPart::Inline { updates, .. } => {
377                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
378                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
379            }
380        }
381    }
382
383    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
384        match self {
385            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
386            BatchPart::Inline { updates, .. } => Some(
387                updates
388                    .decode::<T>(metrics)
389                    .expect("valid inline part")
390                    .updates
391                    .diffs_sum(),
392            ),
393        }
394    }
395}
396
397/// An ordered list of parts, generally stored as part of a larger run.
398#[derive(Debug, Clone)]
399pub struct HollowRun<T> {
400    /// Pointers usable to retrieve the updates.
401    pub(crate) parts: Vec<RunPart<T>>,
402}
403
404/// A reference to a [HollowRun], including the key in the blob store and some denormalized
405/// metadata.
406#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
407pub struct HollowRunRef<T> {
408    pub key: PartialBatchKey,
409
410    /// The size of the referenced run object, plus all of the hollow objects it contains.
411    pub hollow_bytes: usize,
412
413    /// The size of the largest individual part in the run; useful for sizing compaction.
414    pub max_part_bytes: usize,
415
416    /// The lower bound of the data in this part, ordered by the codec ordering.
417    pub key_lower: Vec<u8>,
418
419    /// The lower bound of the data in this part, ordered by the structured ordering.
420    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
421
422    pub diffs_sum: Option<[u8; 8]>,
423
424    pub(crate) _phantom_data: PhantomData<T>,
425}
426impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
427    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428        Some(self.cmp(other))
429    }
430}
431
432impl<T: Eq> Ord for HollowRunRef<T> {
433    fn cmp(&self, other: &Self) -> Ordering {
434        self.key.cmp(&other.key)
435    }
436}
437
438impl<T> HollowRunRef<T> {
439    pub fn writer_key(&self) -> Option<WriterKey> {
440        Some(self.key.split()?.0)
441    }
442}
443
444impl<T: Timestamp + Codec64> HollowRunRef<T> {
445    /// Stores the given runs and returns a [HollowRunRef] that points to them.
446    pub async fn set<D: Codec64 + Monoid>(
447        shard_id: ShardId,
448        blob: &dyn Blob,
449        writer: &WriterKey,
450        data: HollowRun<T>,
451        metrics: &Metrics,
452    ) -> Self {
453        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
454        let max_part_bytes = data
455            .parts
456            .iter()
457            .map(|p| p.max_part_bytes())
458            .max()
459            .unwrap_or(0);
460        let key_lower = data
461            .parts
462            .first()
463            .map_or(vec![], |p| p.key_lower().to_vec());
464        let structured_key_lower = match data.parts.first() {
465            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
466            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
467            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
468        };
469        let diffs_sum = data
470            .parts
471            .iter()
472            .map(|p| {
473                p.diffs_sum::<D>(&metrics.columnar)
474                    .expect("valid diffs sum")
475            })
476            .reduce(|mut a, b| {
477                a.plus_equals(&b);
478                a
479            })
480            .expect("valid diffs sum")
481            .encode();
482
483        let key = PartialBatchKey::new(writer, &PartId::new());
484        let blob_key = key.complete(&shard_id);
485        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
486        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
487            blob.set(&blob_key, bytes.clone())
488        })
489        .await;
490        Self {
491            key,
492            hollow_bytes,
493            max_part_bytes,
494            key_lower,
495            structured_key_lower,
496            diffs_sum: Some(diffs_sum),
497            _phantom_data: Default::default(),
498        }
499    }
500
501    /// Retrieve the [HollowRun] that this reference points to.
502    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
503    /// with the same shard id and backing store.
504    pub async fn get(
505        &self,
506        shard_id: ShardId,
507        blob: &dyn Blob,
508        metrics: &Metrics,
509    ) -> Option<HollowRun<T>> {
510        let blob_key = self.key.complete(&shard_id);
511        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
512            blob.get(&blob_key)
513        })
514        .await?;
515        let proto_runs: ProtoHollowRun =
516            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
517        let runs = proto_runs
518            .into_rust()
519            .expect("illegal state: invalid encoded runs proto");
520        Some(runs)
521    }
522}
523
524/// Part of the updates in a run.
525///
526/// Either a pointer to ones stored in Blob or a single part stored inline.
527#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
528#[serde(untagged)]
529pub enum RunPart<T> {
530    Single(BatchPart<T>),
531    Many(HollowRunRef<T>),
532}
533
534impl<T: Ord> PartialOrd<Self> for RunPart<T> {
535    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
536        Some(self.cmp(other))
537    }
538}
539
540impl<T: Ord> Ord for RunPart<T> {
541    fn cmp(&self, other: &Self) -> Ordering {
542        match (self, other) {
543            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
544            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
545            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
546            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
547        }
548    }
549}
550
551impl<T> RunPart<T> {
552    #[cfg(test)]
553    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
554        match self {
555            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
556            _ => panic!("expected hollow part!"),
557        }
558    }
559
560    pub fn hollow_bytes(&self) -> usize {
561        match self {
562            Self::Single(p) => p.hollow_bytes(),
563            Self::Many(r) => r.hollow_bytes,
564        }
565    }
566
567    pub fn is_inline(&self) -> bool {
568        match self {
569            Self::Single(p) => p.is_inline(),
570            Self::Many(_) => false,
571        }
572    }
573
574    pub fn inline_bytes(&self) -> usize {
575        match self {
576            Self::Single(p) => p.inline_bytes(),
577            Self::Many(_) => 0,
578        }
579    }
580
581    pub fn max_part_bytes(&self) -> usize {
582        match self {
583            Self::Single(p) => p.encoded_size_bytes(),
584            Self::Many(r) => r.max_part_bytes,
585        }
586    }
587
588    pub fn writer_key(&self) -> Option<WriterKey> {
589        match self {
590            Self::Single(p) => p.writer_key(),
591            Self::Many(r) => r.writer_key(),
592        }
593    }
594
595    pub fn encoded_size_bytes(&self) -> usize {
596        match self {
597            Self::Single(p) => p.encoded_size_bytes(),
598            Self::Many(r) => r.hollow_bytes,
599        }
600    }
601
602    pub fn schema_id(&self) -> Option<SchemaId> {
603        match self {
604            Self::Single(p) => p.schema_id(),
605            Self::Many(_) => None,
606        }
607    }
608
609    // A user-interpretable identifier or description of the part (for logs and
610    // such).
611    pub fn printable_name(&self) -> &str {
612        match self {
613            Self::Single(p) => p.printable_name(),
614            Self::Many(r) => r.key.0.as_str(),
615        }
616    }
617
618    pub fn stats(&self) -> Option<&LazyPartStats> {
619        match self {
620            Self::Single(p) => p.stats(),
621            // TODO: if we kept stats we could avoid fetching the metadata here.
622            Self::Many(_) => None,
623        }
624    }
625
626    pub fn key_lower(&self) -> &[u8] {
627        match self {
628            Self::Single(p) => p.key_lower(),
629            Self::Many(r) => r.key_lower.as_slice(),
630        }
631    }
632
633    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
634        match self {
635            Self::Single(p) => p.structured_key_lower(),
636            Self::Many(_) => None,
637        }
638    }
639
640    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
641        match self {
642            Self::Single(p) => p.ts_rewrite(),
643            Self::Many(_) => None,
644        }
645    }
646}
647
648impl<T> RunPart<T>
649where
650    T: Timestamp + Codec64,
651{
652    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
653        match self {
654            Self::Single(p) => p.diffs_sum(metrics),
655            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
656        }
657    }
658}
659
660/// A blob was missing!
661#[derive(Clone, Debug)]
662pub struct MissingBlob(BlobKey);
663
664impl std::fmt::Display for MissingBlob {
665    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666        write!(f, "unexpectedly missing key: {}", self.0)
667    }
668}
669
670impl std::error::Error for MissingBlob {}
671
672impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
673    pub fn part_stream<'a>(
674        &'a self,
675        shard_id: ShardId,
676        blob: &'a dyn Blob,
677        metrics: &'a Metrics,
678    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
679        try_stream! {
680            match self {
681                RunPart::Single(p) => {
682                    yield Cow::Borrowed(p);
683                }
684                RunPart::Many(r) => {
685                    let fetched = r.get(shard_id, blob, metrics).await
686                        .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
687                    for run_part in fetched.parts {
688                        for await batch_part in
689                            run_part.part_stream(shard_id, blob, metrics).boxed()
690                        {
691                            yield Cow::Owned(batch_part?.into_owned());
692                        }
693                    }
694                }
695            }
696        }
697    }
698}
699
700impl<T: Ord> PartialOrd for BatchPart<T> {
701    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
702        Some(self.cmp(other))
703    }
704}
705
706impl<T: Ord> Ord for BatchPart<T> {
707    fn cmp(&self, other: &Self) -> Ordering {
708        match (self, other) {
709            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
710            (
711                BatchPart::Inline {
712                    updates: s_updates,
713                    ts_rewrite: s_ts_rewrite,
714                    schema_id: s_schema_id,
715                    deprecated_schema_id: s_deprecated_schema_id,
716                },
717                BatchPart::Inline {
718                    updates: o_updates,
719                    ts_rewrite: o_ts_rewrite,
720                    schema_id: o_schema_id,
721                    deprecated_schema_id: o_deprecated_schema_id,
722                },
723            ) => (
724                s_updates,
725                s_ts_rewrite.as_ref().map(|x| x.elements()),
726                s_schema_id,
727                s_deprecated_schema_id,
728            )
729                .cmp(&(
730                    o_updates,
731                    o_ts_rewrite.as_ref().map(|x| x.elements()),
732                    o_schema_id,
733                    o_deprecated_schema_id,
734                )),
735            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
736            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
737        }
738    }
739}
740
741/// What order are the parts in this run in?
742#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
743pub(crate) enum RunOrder {
744    /// They're in no particular order.
745    Unordered,
746    /// They're ordered based on the codec-encoded K/V bytes.
747    Codec,
748    /// They're ordered by the natural ordering of the structured data.
749    Structured,
750}
751
752#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
753pub struct RunId(pub(crate) [u8; 16]);
754
755impl std::fmt::Display for RunId {
756    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
757        write!(f, "ri{}", Uuid::from_bytes(self.0))
758    }
759}
760
761impl std::fmt::Debug for RunId {
762    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
763        write!(f, "RunId({})", Uuid::from_bytes(self.0))
764    }
765}
766
767impl std::str::FromStr for RunId {
768    type Err = String;
769
770    fn from_str(s: &str) -> Result<Self, Self::Err> {
771        parse_id("ri", "RunId", s).map(RunId)
772    }
773}
774
775impl From<RunId> for String {
776    fn from(x: RunId) -> Self {
777        x.to_string()
778    }
779}
780
781impl RunId {
782    pub(crate) fn new() -> Self {
783        RunId(*Uuid::new_v4().as_bytes())
784    }
785}
786
787impl Arbitrary for RunId {
788    type Parameters = ();
789    type Strategy = proptest::strategy::BoxedStrategy<Self>;
790    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
791        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
792            RunId(*Uuid::from_u128(n).as_bytes())
793        })
794        .boxed()
795    }
796}
797
798/// Metadata shared across a run.
799#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
800pub struct RunMeta {
801    /// If none, Persist should infer the order based on the proto metadata.
802    pub(crate) order: Option<RunOrder>,
803    /// All parts in a run should have the same schema.
804    pub(crate) schema: Option<SchemaId>,
805
806    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
807    pub(crate) deprecated_schema: Option<SchemaId>,
808
809    /// If set, a UUID that uniquely identifies this run.
810    pub(crate) id: Option<RunId>,
811
812    /// The number of updates in this run, or `None` if the number is unknown.
813    pub(crate) len: Option<usize>,
814
815    /// Additional unstructured metadata.
816    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
817    pub(crate) meta: MetadataMap,
818}
819
820/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
821#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823    /// Pointer usable to retrieve the updates.
824    pub key: PartialBatchKey,
825    /// Miscellaneous metadata.
826    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
827    pub meta: MetadataMap,
828    /// The encoded size of this part.
829    pub encoded_size_bytes: usize,
830    /// A lower bound on the keys in the part. (By default, this the minimum
831    /// possible key: `vec![]`.)
832    #[serde(serialize_with = "serialize_part_bytes")]
833    pub key_lower: Vec<u8>,
834    /// A lower bound on the keys in the part, stored as structured data.
835    #[serde(serialize_with = "serialize_lazy_proto")]
836    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837    /// Aggregate statistics about data contained in this part.
838    #[serde(serialize_with = "serialize_part_stats")]
839    pub stats: Option<LazyPartStats>,
840    /// A frontier to which timestamps in this part are advanced on read, if
841    /// set.
842    ///
843    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
844    /// but we maintain the distinction between the two for some internal sanity
845    /// checking of invariants as well as metrics. If this ever becomes an
846    /// issue, everything still works with this as just `Antichain<T>`.
847    pub ts_rewrite: Option<Antichain<T>>,
848    /// A Codec64 encoded sum of all diffs in this part, if known.
849    ///
850    /// This is `None` if this part was written before we started storing this
851    /// information, or if it was written when the dyncfg was off.
852    ///
853    /// It could also make sense to model this as part of the pushdown stats, if
854    /// we later decide that's of some benefit.
855    #[serde(serialize_with = "serialize_diffs_sum")]
856    pub diffs_sum: Option<[u8; 8]>,
857    /// Columnar format that this batch was written in.
858    ///
859    /// This is `None` if this part was written before we started writing structured
860    /// columnar data.
861    pub format: Option<BatchColumnarFormat>,
862    /// The schemas used to encode the data in this batch part.
863    ///
864    /// Or None for historical data written before the schema registry was
865    /// added.
866    pub schema_id: Option<SchemaId>,
867
868    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
869    pub deprecated_schema_id: Option<SchemaId>,
870}
871
872/// A [Batch] but with the updates themselves stored externally.
873///
874/// [Batch]: differential_dataflow::trace::BatchReader
875#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877    /// Describes the times of the updates in the batch.
878    pub desc: Description<T>,
879    /// The number of updates in the batch.
880    pub len: usize,
881    /// Pointers usable to retrieve the updates.
882    pub(crate) parts: Vec<RunPart<T>>,
883    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
884    /// ex.
885    /// ```text
886    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
887    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
888    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
889    /// ```
890    pub(crate) run_splits: Vec<usize>,
891    /// Run-level metadata: the first entry has metadata for the first run, and so on.
892    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
893    pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898        let HollowBatch {
899            desc,
900            parts,
901            len,
902            run_splits: runs,
903            run_meta,
904        } = self;
905        f.debug_struct("HollowBatch")
906            .field(
907                "desc",
908                &(
909                    desc.lower().elements(),
910                    desc.upper().elements(),
911                    desc.since().elements(),
912                ),
913            )
914            .field("parts", &parts)
915            .field("len", &len)
916            .field("runs", &runs)
917            .field("run_meta", &run_meta)
918            .finish()
919    }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924        let HollowBatch {
925            desc,
926            len,
927            // Both parts and runs are covered by the self.runs call.
928            parts: _,
929            run_splits: _,
930            run_meta: _,
931        } = self;
932        let mut s = s.serialize_struct("HollowBatch", 5)?;
933        let () = s.serialize_field("lower", &desc.lower().elements())?;
934        let () = s.serialize_field("upper", &desc.upper().elements())?;
935        let () = s.serialize_field("since", &desc.since().elements())?;
936        let () = s.serialize_field("len", len)?;
937        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938        s.end()
939    }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944        Some(self.cmp(other))
945    }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949    fn cmp(&self, other: &Self) -> Ordering {
950        // Deconstruct self and other so we get a compile failure if new fields
951        // are added.
952        let HollowBatch {
953            desc: self_desc,
954            parts: self_parts,
955            len: self_len,
956            run_splits: self_runs,
957            run_meta: self_run_meta,
958        } = self;
959        let HollowBatch {
960            desc: other_desc,
961            parts: other_parts,
962            len: other_len,
963            run_splits: other_runs,
964            run_meta: other_run_meta,
965        } = other;
966        (
967            self_desc.lower().elements(),
968            self_desc.upper().elements(),
969            self_desc.since().elements(),
970            self_parts,
971            self_len,
972            self_runs,
973            self_run_meta,
974        )
975            .cmp(&(
976                other_desc.lower().elements(),
977                other_desc.upper().elements(),
978                other_desc.since().elements(),
979                other_parts,
980                other_len,
981                other_runs,
982                other_run_meta,
983            ))
984    }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988    pub(crate) fn part_stream<'a>(
989        &'a self,
990        shard_id: ShardId,
991        blob: &'a dyn Blob,
992        metrics: &'a Metrics,
993    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994        stream! {
995            for part in &self.parts {
996                for await part in part.part_stream(shard_id, blob, metrics) {
997                    yield part;
998                }
999            }
1000        }
1001    }
1002}
1003impl<T> HollowBatch<T> {
1004    /// Construct an in-memory hollow batch from the given metadata.
1005    ///
1006    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1007    /// is responsible for ensuring that the defined runs are valid.
1008    ///
1009    /// `len` should represent the number of valid updates in the referenced parts.
1010    pub(crate) fn new(
1011        desc: Description<T>,
1012        parts: Vec<RunPart<T>>,
1013        len: usize,
1014        run_meta: Vec<RunMeta>,
1015        run_splits: Vec<usize>,
1016    ) -> Self {
1017        debug_assert!(
1018            run_splits.is_strictly_sorted(),
1019            "run indices should be strictly increasing"
1020        );
1021        debug_assert!(
1022            run_splits.first().map_or(true, |i| *i > 0),
1023            "run indices should be positive"
1024        );
1025        debug_assert!(
1026            run_splits.last().map_or(true, |i| *i < parts.len()),
1027            "run indices should be valid indices into parts"
1028        );
1029        debug_assert!(
1030            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031            "all metadata should correspond to a run"
1032        );
1033
1034        Self {
1035            desc,
1036            len,
1037            parts,
1038            run_splits,
1039            run_meta,
1040        }
1041    }
1042
1043    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1044    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045        let run_meta = if parts.is_empty() {
1046            vec![]
1047        } else {
1048            vec![RunMeta::default()]
1049        };
1050        Self {
1051            desc,
1052            len,
1053            parts,
1054            run_splits: vec![],
1055            run_meta,
1056        }
1057    }
1058
1059    #[cfg(test)]
1060    pub(crate) fn new_run_for_test(
1061        desc: Description<T>,
1062        parts: Vec<RunPart<T>>,
1063        len: usize,
1064        run_id: RunId,
1065    ) -> Self {
1066        let run_meta = if parts.is_empty() {
1067            vec![]
1068        } else {
1069            let mut meta = RunMeta::default();
1070            meta.id = Some(run_id);
1071            vec![meta]
1072        };
1073        Self {
1074            desc,
1075            len,
1076            parts,
1077            run_splits: vec![],
1078            run_meta,
1079        }
1080    }
1081
1082    /// An empty hollow batch, representing no updates over the given desc.
1083    pub(crate) fn empty(desc: Description<T>) -> Self {
1084        Self {
1085            desc,
1086            len: 0,
1087            parts: vec![],
1088            run_splits: vec![],
1089            run_meta: vec![],
1090        }
1091    }
1092
1093    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094        let run_ends = self
1095            .run_splits
1096            .iter()
1097            .copied()
1098            .chain(std::iter::once(self.parts.len()));
1099        let run_metas = self.run_meta.iter();
1100        let run_parts = run_ends
1101            .scan(0, |start, end| {
1102                let range = *start..end;
1103                *start = end;
1104                Some(range)
1105            })
1106            .filter(|range| !range.is_empty())
1107            .map(|range| &self.parts[range]);
1108        run_metas.zip_eq(run_parts)
1109    }
1110
1111    pub(crate) fn inline_bytes(&self) -> usize {
1112        self.parts.iter().map(|x| x.inline_bytes()).sum()
1113    }
1114
1115    pub(crate) fn is_empty(&self) -> bool {
1116        self.parts.is_empty()
1117    }
1118
1119    pub(crate) fn part_count(&self) -> usize {
1120        self.parts.len()
1121    }
1122
1123    /// The sum of the encoded sizes of all parts in the batch.
1124    pub fn encoded_size_bytes(&self) -> usize {
1125        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126    }
1127}
1128
1129// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1130impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131    pub(crate) fn rewrite_ts(
1132        &mut self,
1133        frontier: &Antichain<T>,
1134        new_upper: Antichain<T>,
1135    ) -> Result<(), String> {
1136        if !PartialOrder::less_than(frontier, &new_upper) {
1137            return Err(format!(
1138                "rewrite frontier {:?} !< rewrite upper {:?}",
1139                frontier.elements(),
1140                new_upper.elements(),
1141            ));
1142        }
1143        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144            return Err(format!(
1145                "rewrite upper {:?} < batch upper {:?}",
1146                new_upper.elements(),
1147                self.desc.upper().elements(),
1148            ));
1149        }
1150
1151        // The following are things that it seems like we could support, but
1152        // initially we don't because we don't have a use case for them.
1153        if PartialOrder::less_than(frontier, self.desc.lower()) {
1154            return Err(format!(
1155                "rewrite frontier {:?} < batch lower {:?}",
1156                frontier.elements(),
1157                self.desc.lower().elements(),
1158            ));
1159        }
1160        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161            return Err(format!(
1162                "batch since {:?} != minimum antichain {:?}",
1163                self.desc.since().elements(),
1164                &[T::minimum()],
1165            ));
1166        }
1167        for part in self.parts.iter() {
1168            let Some(ts_rewrite) = part.ts_rewrite() else {
1169                continue;
1170            };
1171            if PartialOrder::less_than(frontier, ts_rewrite) {
1172                return Err(format!(
1173                    "rewrite frontier {:?} < batch rewrite {:?}",
1174                    frontier.elements(),
1175                    ts_rewrite.elements(),
1176                ));
1177            }
1178        }
1179
1180        self.desc = Description::new(
1181            self.desc.lower().clone(),
1182            new_upper,
1183            self.desc.since().clone(),
1184        );
1185        for part in &mut self.parts {
1186            match part {
1187                RunPart::Single(BatchPart::Hollow(part)) => {
1188                    part.ts_rewrite = Some(frontier.clone())
1189                }
1190                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191                    *ts_rewrite = Some(frontier.clone())
1192                }
1193                RunPart::Many(runs) => {
1194                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1195                    // ever generate runs of >1 part for those.
1196                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197                }
1198            }
1199        }
1200        Ok(())
1201    }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206        Some(self.cmp(other))
1207    }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211    fn cmp(&self, other: &Self) -> Ordering {
1212        // Deconstruct self and other so we get a compile failure if new fields
1213        // are added.
1214        let HollowBatchPart {
1215            key: self_key,
1216            meta: self_meta,
1217            encoded_size_bytes: self_encoded_size_bytes,
1218            key_lower: self_key_lower,
1219            structured_key_lower: self_structured_key_lower,
1220            stats: self_stats,
1221            ts_rewrite: self_ts_rewrite,
1222            diffs_sum: self_diffs_sum,
1223            format: self_format,
1224            schema_id: self_schema_id,
1225            deprecated_schema_id: self_deprecated_schema_id,
1226        } = self;
1227        let HollowBatchPart {
1228            key: other_key,
1229            meta: other_meta,
1230            encoded_size_bytes: other_encoded_size_bytes,
1231            key_lower: other_key_lower,
1232            structured_key_lower: other_structured_key_lower,
1233            stats: other_stats,
1234            ts_rewrite: other_ts_rewrite,
1235            diffs_sum: other_diffs_sum,
1236            format: other_format,
1237            schema_id: other_schema_id,
1238            deprecated_schema_id: other_deprecated_schema_id,
1239        } = other;
1240        (
1241            self_key,
1242            self_meta,
1243            self_encoded_size_bytes,
1244            self_key_lower,
1245            self_structured_key_lower,
1246            self_stats,
1247            self_ts_rewrite.as_ref().map(|x| x.elements()),
1248            self_diffs_sum,
1249            self_format,
1250            self_schema_id,
1251            self_deprecated_schema_id,
1252        )
1253            .cmp(&(
1254                other_key,
1255                other_meta,
1256                other_encoded_size_bytes,
1257                other_key_lower,
1258                other_structured_key_lower,
1259                other_stats,
1260                other_ts_rewrite.as_ref().map(|x| x.elements()),
1261                other_diffs_sum,
1262                other_format,
1263                other_schema_id,
1264                other_deprecated_schema_id,
1265            ))
1266    }
1267}
1268
1269/// A pointer to a rollup stored externally.
1270#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1271pub struct HollowRollup {
1272    /// Pointer usable to retrieve the rollup.
1273    pub key: PartialRollupKey,
1274    /// The encoded size of this rollup, if known.
1275    pub encoded_size_bytes: Option<usize>,
1276}
1277
1278/// A pointer to a blob stored externally.
1279#[derive(Debug)]
1280pub enum HollowBlobRef<'a, T> {
1281    Batch(&'a HollowBatch<T>),
1282    Rollup(&'a HollowRollup),
1283}
1284
1285/// A rollup that is currently being computed.
1286#[derive(
1287    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1288)]
1289pub struct ActiveRollup {
1290    pub seqno: SeqNo,
1291    pub start_ms: u64,
1292}
1293
1294/// A garbage collection request that is currently being computed.
1295#[derive(
1296    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1297)]
1298pub struct ActiveGc {
1299    pub seqno: SeqNo,
1300    pub start_ms: u64,
1301}
1302
1303/// A sentinel for a state transition that was a no-op.
1304///
1305/// Critically, this also indicates that the no-op state transition was not
1306/// committed through compare_and_append and thus is _not linearized_.
1307#[derive(Debug)]
1308#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1309pub struct NoOpStateTransition<T>(pub T);
1310
1311// TODO: Document invariants.
1312#[derive(Debug, Clone)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct StateCollections<T> {
1315    /// The version of this state. This is typically identical to the version of the code
1316    /// that wrote it, but may diverge during 0dt upgrades and similar operations when a
1317    /// new version of code is intentionally interoperating with an older state format.
1318    pub(crate) version: Version,
1319
1320    // - Invariant: `<= all reader.since`
1321    // - Invariant: Doesn't regress across state versions.
1322    pub(crate) last_gc_req: SeqNo,
1323
1324    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1325    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1326
1327    /// The rollup that is currently being computed.
1328    pub(crate) active_rollup: Option<ActiveRollup>,
1329    /// The gc request that is currently being computed.
1330    pub(crate) active_gc: Option<ActiveGc>,
1331
1332    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1333    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1334    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1335    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1336
1337    // - Invariant: `trace.since == meet(all reader.since)`
1338    // - Invariant: `trace.since` doesn't regress across state versions.
1339    // - Invariant: `trace.upper` doesn't regress across state versions.
1340    // - Invariant: `trace` upholds its own invariants.
1341    pub(crate) trace: Trace<T>,
1342}
1343
1344/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1345///
1346/// This strategy of directly serializing the schema objects requires that
1347/// persist users do the right thing. Specifically, that an encoded schema
1348/// doesn't in some later version of mz decode to an in-mem object that acts
1349/// differently. In a sense, the current system (before the introduction of the
1350/// schema registry) where schemas are passed in unchecked to reader and writer
1351/// registration calls also has the same defect, so seems fine.
1352///
1353/// An alternative is to write down here some persist-specific representation of
1354/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1355/// the potential to lead down a similar failure mode to the mz_persist_types
1356/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1357/// can always migrate later by rehydrating these, seems fine to start with the
1358/// easy thing.
1359#[derive(Debug, Clone, Serialize, PartialEq)]
1360pub struct EncodedSchemas {
1361    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1362    pub key: Bytes,
1363    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1364    /// registered, encoded as a `ProtoDataType`.
1365    pub key_data_type: Bytes,
1366    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1367    pub val: Bytes,
1368    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1369    /// registered, encoded as a `ProtoDataType`.
1370    pub val_data_type: Bytes,
1371}
1372
1373impl EncodedSchemas {
1374    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1375        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1376        DataType::from_proto(proto).expect("valid DataType")
1377    }
1378}
1379
1380#[derive(Debug)]
1381#[cfg_attr(test, derive(PartialEq))]
1382pub enum CompareAndAppendBreak<T> {
1383    AlreadyCommitted,
1384    Upper {
1385        shard_upper: Antichain<T>,
1386        writer_upper: Antichain<T>,
1387    },
1388    InvalidUsage(InvalidUsage<T>),
1389    InlineBackpressure,
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum SnapshotErr<T> {
1395    AsOfNotYetAvailable(SeqNo, Upper<T>),
1396    AsOfHistoricalDistinctionsLost(Since<T>),
1397}
1398
1399impl<T> StateCollections<T>
1400where
1401    T: Timestamp + Lattice + Codec64,
1402{
1403    pub fn add_rollup(
1404        &mut self,
1405        add_rollup: (SeqNo, &HollowRollup),
1406    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1407        let (rollup_seqno, rollup) = add_rollup;
1408        let applied = match self.rollups.get(&rollup_seqno) {
1409            Some(x) => x.key == rollup.key,
1410            None => {
1411                self.active_rollup = None;
1412                self.rollups.insert(rollup_seqno, rollup.to_owned());
1413                true
1414            }
1415        };
1416        // This state transition is a no-op if applied is false but we
1417        // still commit the state change so that this gets linearized
1418        // (maybe we're looking at old state).
1419        Continue(applied)
1420    }
1421
1422    pub fn remove_rollups(
1423        &mut self,
1424        remove_rollups: &[(SeqNo, PartialRollupKey)],
1425    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1426        if self.is_tombstone() {
1427            return Break(NoOpStateTransition(vec![]));
1428        }
1429
1430        // This state transition is called at the end of the GC process, so we
1431        // need to unset the `active_gc` field.
1432        let active_gc_was_set = self.active_gc.take().is_some();
1433
1434        if remove_rollups.is_empty() {
1435            return if active_gc_was_set {
1436                Continue(vec![])
1437            } else {
1438                Break(NoOpStateTransition(vec![]))
1439            };
1440        }
1441
1442        let mut removed = vec![];
1443        for (seqno, key) in remove_rollups {
1444            let removed_key = self.rollups.remove(seqno);
1445            debug_assert!(
1446                removed_key.as_ref().map_or(true, |x| &x.key == key),
1447                "{} vs {:?}",
1448                key,
1449                removed_key
1450            );
1451
1452            if removed_key.is_some() {
1453                removed.push(*seqno);
1454            }
1455        }
1456
1457        Continue(removed)
1458    }
1459
1460    pub fn register_leased_reader(
1461        &mut self,
1462        hostname: &str,
1463        reader_id: &LeasedReaderId,
1464        purpose: &str,
1465        seqno: SeqNo,
1466        lease_duration: Duration,
1467        heartbeat_timestamp_ms: u64,
1468        use_critical_since: bool,
1469    ) -> ControlFlow<
1470        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1471        (LeasedReaderState<T>, SeqNo),
1472    > {
1473        let since = if use_critical_since {
1474            self.critical_since()
1475                .unwrap_or_else(|| self.trace.since().clone())
1476        } else {
1477            self.trace.since().clone()
1478        };
1479        let reader_state = LeasedReaderState {
1480            debug: HandleDebugState {
1481                hostname: hostname.to_owned(),
1482                purpose: purpose.to_owned(),
1483            },
1484            seqno,
1485            since,
1486            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1487            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1488                .expect("lease duration as millis should fit within u64"),
1489        };
1490
1491        // If the shard-global upper and since are both the empty antichain,
1492        // then no further writes can ever commit and no further reads can be
1493        // served. Optimize this by no-op-ing reader registration so that we can
1494        // settle the shard into a final unchanging tombstone state.
1495        if self.is_tombstone() {
1496            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1497        }
1498
1499        // TODO: Handle if the reader or writer already exists.
1500        self.leased_readers
1501            .insert(reader_id.clone(), reader_state.clone());
1502        Continue((reader_state, self.seqno_since(seqno)))
1503    }
1504
1505    pub fn register_critical_reader(
1506        &mut self,
1507        hostname: &str,
1508        reader_id: &CriticalReaderId,
1509        opaque: Opaque,
1510        purpose: &str,
1511    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1512        let state = CriticalReaderState {
1513            debug: HandleDebugState {
1514                hostname: hostname.to_owned(),
1515                purpose: purpose.to_owned(),
1516            },
1517            since: self.trace.since().clone(),
1518            opaque,
1519        };
1520
1521        // We expire all readers if the upper and since both advance to the
1522        // empty antichain. Gracefully handle this. At the same time,
1523        // short-circuit the cmd application so we don't needlessly create new
1524        // SeqNos.
1525        if self.is_tombstone() {
1526            return Break(NoOpStateTransition(state));
1527        }
1528
1529        let state = match self.critical_readers.get_mut(reader_id) {
1530            Some(existing_state) => {
1531                existing_state.debug = state.debug;
1532                existing_state.clone()
1533            }
1534            None => {
1535                self.critical_readers
1536                    .insert(reader_id.clone(), state.clone());
1537                state
1538            }
1539        };
1540        Continue(state)
1541    }
1542
1543    pub fn register_schema<K: Codec, V: Codec>(
1544        &mut self,
1545        key_schema: &K::Schema,
1546        val_schema: &V::Schema,
1547    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1548        fn encode_data_type(data_type: &DataType) -> Bytes {
1549            let proto = data_type.into_proto();
1550            prost::Message::encode_to_vec(&proto).into()
1551        }
1552
1553        // Look for an existing registered SchemaId for these schemas.
1554        //
1555        // The common case is that this should be a recent one, so as a minor
1556        // optimization, do this search in reverse order.
1557        //
1558        // TODO: Note that this impl is `O(schemas)`. Combined with the
1559        // possibility of cmd retries, it's possible but unlikely for this to
1560        // get expensive. We could maintain a reverse map to speed this up in
1561        // necessary. This would either need to work on the encoded
1562        // representation (which, we'd have to fall back to the linear scan) or
1563        // we'd need to add a Hash/Ord bound to Schema.
1564        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1565            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1566        });
1567        match existing_id {
1568            Some((schema_id, _)) => {
1569                // TODO: Validate that the decoded schemas still produce records
1570                // of the recorded DataType, to detect shenanigans. Probably
1571                // best to wait until we've turned on Schema2 in prod and thus
1572                // committed to the current mappings.
1573                Break(NoOpStateTransition(Some(*schema_id)))
1574            }
1575            None if self.is_tombstone() => {
1576                // TODO: Is this right?
1577                Break(NoOpStateTransition(None))
1578            }
1579            None if self.schemas.is_empty() => {
1580                // We'll have to do something more sophisticated here to
1581                // generate the next id if/when we start supporting the removal
1582                // of schemas.
1583                let id = SchemaId(self.schemas.len());
1584                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1585                    .expect("valid key schema");
1586                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1587                    .expect("valid val schema");
1588                let prev = self.schemas.insert(
1589                    id,
1590                    EncodedSchemas {
1591                        key: K::encode_schema(key_schema),
1592                        key_data_type: encode_data_type(&key_data_type),
1593                        val: V::encode_schema(val_schema),
1594                        val_data_type: encode_data_type(&val_data_type),
1595                    },
1596                );
1597                assert_eq!(prev, None);
1598                Continue(Some(id))
1599            }
1600            None => {
1601                info!(
1602                    "register_schemas got {:?} expected {:?}",
1603                    key_schema,
1604                    self.schemas
1605                        .iter()
1606                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1607                        .collect::<Vec<_>>()
1608                );
1609                // Until we implement persist schema changes, only allow at most
1610                // one registered schema.
1611                Break(NoOpStateTransition(None))
1612            }
1613        }
1614    }
1615
1616    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1617        &mut self,
1618        expected: SchemaId,
1619        key_schema: &K::Schema,
1620        val_schema: &V::Schema,
1621    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1622        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1623            // To be defensive, create an empty batch and inspect the resulting
1624            // data type (as opposed to something like allowing the `Schema` to
1625            // declare the DataType).
1626            let array = Schema::encoder(schema).expect("valid schema").finish();
1627            Array::data_type(&array).clone()
1628        }
1629
1630        let (current_id, current) = self
1631            .schemas
1632            .last_key_value()
1633            .expect("all shards have a schema");
1634        if *current_id != expected {
1635            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1636                schema_id: *current_id,
1637                key: K::decode_schema(&current.key),
1638                val: V::decode_schema(&current.val),
1639            }));
1640        }
1641
1642        let current_key = K::decode_schema(&current.key);
1643        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1644        let current_val = V::decode_schema(&current.val);
1645        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1646
1647        let key_dt = data_type(key_schema);
1648        let val_dt = data_type(val_schema);
1649
1650        // If the schema is exactly the same as the current one, no-op.
1651        if current_key == *key_schema
1652            && current_key_dt == key_dt
1653            && current_val == *val_schema
1654            && current_val_dt == val_dt
1655        {
1656            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1657        }
1658
1659        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1660        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1661        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1662            return Break(NoOpStateTransition(CaESchema::Incompatible));
1663        };
1664        // Persist initially disallows dropping columns. This would require a
1665        // bunch more work (e.g. not safe to use the latest schema in
1666        // compaction) and isn't initially necessary in mz.
1667        if key_fn.contains_drop() || val_fn.contains_drop() {
1668            return Break(NoOpStateTransition(CaESchema::Incompatible));
1669        }
1670
1671        // We'll have to do something more sophisticated here to
1672        // generate the next id if/when we start supporting the removal
1673        // of schemas.
1674        let id = SchemaId(self.schemas.len());
1675        self.schemas.insert(
1676            id,
1677            EncodedSchemas {
1678                key: K::encode_schema(key_schema),
1679                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1680                val: V::encode_schema(val_schema),
1681                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1682            },
1683        );
1684        Continue(CaESchema::Ok(id))
1685    }
1686
1687    pub fn compare_and_append(
1688        &mut self,
1689        batch: &HollowBatch<T>,
1690        writer_id: &WriterId,
1691        heartbeat_timestamp_ms: u64,
1692        lease_duration_ms: u64,
1693        idempotency_token: &IdempotencyToken,
1694        debug_info: &HandleDebugState,
1695        inline_writes_total_max_bytes: usize,
1696        claim_compaction_percent: usize,
1697        claim_compaction_min_version: Option<&Version>,
1698    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1699        // We expire all writers if the upper and since both advance to the
1700        // empty antichain. Gracefully handle this. At the same time,
1701        // short-circuit the cmd application so we don't needlessly create new
1702        // SeqNos.
1703        if self.is_tombstone() {
1704            assert_eq!(self.trace.upper(), &Antichain::new());
1705            return Break(CompareAndAppendBreak::Upper {
1706                shard_upper: Antichain::new(),
1707                // This writer might have been registered before the shard upper
1708                // was advanced, which would make this pessimistic in the
1709                // Indeterminate handling of compare_and_append at the machine
1710                // level, but that's fine.
1711                writer_upper: Antichain::new(),
1712            });
1713        }
1714
1715        let writer_state = self
1716            .writers
1717            .entry(writer_id.clone())
1718            .or_insert_with(|| WriterState {
1719                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1720                lease_duration_ms,
1721                most_recent_write_token: IdempotencyToken::SENTINEL,
1722                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1723                debug: debug_info.clone(),
1724            });
1725
1726        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1727            return Break(CompareAndAppendBreak::InvalidUsage(
1728                InvalidUsage::InvalidBounds {
1729                    lower: batch.desc.lower().clone(),
1730                    upper: batch.desc.upper().clone(),
1731                },
1732            ));
1733        }
1734
1735        // If the time interval is empty, the list of updates must also be
1736        // empty.
1737        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1738            return Break(CompareAndAppendBreak::InvalidUsage(
1739                InvalidUsage::InvalidEmptyTimeInterval {
1740                    lower: batch.desc.lower().clone(),
1741                    upper: batch.desc.upper().clone(),
1742                    keys: batch
1743                        .parts
1744                        .iter()
1745                        .map(|x| x.printable_name().to_owned())
1746                        .collect(),
1747                },
1748            ));
1749        }
1750
1751        if idempotency_token == &writer_state.most_recent_write_token {
1752            // If the last write had the same idempotency_token, then this must
1753            // have already committed. Sanity check that the most recent write
1754            // upper matches and that the shard upper is at least the write
1755            // upper, if it's not something very suspect is going on.
1756            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1757            assert!(
1758                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1759                "{:?} vs {:?}",
1760                batch.desc.upper(),
1761                self.trace.upper()
1762            );
1763            return Break(CompareAndAppendBreak::AlreadyCommitted);
1764        }
1765
1766        let shard_upper = self.trace.upper();
1767        if shard_upper != batch.desc.lower() {
1768            return Break(CompareAndAppendBreak::Upper {
1769                shard_upper: shard_upper.clone(),
1770                writer_upper: writer_state.most_recent_write_upper.clone(),
1771            });
1772        }
1773
1774        let new_inline_bytes = batch.inline_bytes();
1775        if new_inline_bytes > 0 {
1776            let mut existing_inline_bytes = 0;
1777            self.trace
1778                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1779            // TODO: For very small batches, it may actually _increase_ the size
1780            // of state to flush them out. Consider another threshold under
1781            // which an inline part can be appended no matter what.
1782            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1783                return Break(CompareAndAppendBreak::InlineBackpressure);
1784            }
1785        }
1786
1787        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1788            self.trace.push_batch(batch.clone())
1789        } else {
1790            Vec::new()
1791        };
1792
1793        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1794        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1795        let all_empty_reqs = merge_reqs
1796            .iter()
1797            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1798        if all_empty_reqs && !batch.is_empty() {
1799            let mut reqs_to_take = claim_compaction_percent / 100;
1800            if (usize::cast_from(idempotency_token.hashed()) % 100)
1801                < (claim_compaction_percent % 100)
1802            {
1803                reqs_to_take += 1;
1804            }
1805            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1806            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1807            merge_reqs.extend(
1808                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1809                // likely to be compacted soon for other reasons.
1810                self.trace
1811                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1812                    .take(reqs_to_take),
1813            )
1814        }
1815
1816        for req in &merge_reqs {
1817            self.trace.claim_compaction(
1818                req.id,
1819                ActiveCompaction {
1820                    start_ms: heartbeat_timestamp_ms,
1821                },
1822            )
1823        }
1824
1825        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1826        writer_state.most_recent_write_token = idempotency_token.clone();
1827        // The writer's most recent upper should only go forward.
1828        assert!(
1829            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1830            "{:?} vs {:?}",
1831            &writer_state.most_recent_write_upper,
1832            batch.desc.upper()
1833        );
1834        writer_state
1835            .most_recent_write_upper
1836            .clone_from(batch.desc.upper());
1837
1838        // Heartbeat the writer state to keep our idempotency token alive.
1839        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1840            heartbeat_timestamp_ms,
1841            writer_state.last_heartbeat_timestamp_ms,
1842        );
1843
1844        Continue(merge_reqs)
1845    }
1846
1847    pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1848        &mut self,
1849        res: &FueledMergeRes<T>,
1850        metrics: &ColumnarMetrics,
1851    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1852        // We expire all writers if the upper and since both advance to the
1853        // empty antichain. Gracefully handle this. At the same time,
1854        // short-circuit the cmd application so we don't needlessly create new
1855        // SeqNos.
1856        if self.is_tombstone() {
1857            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1858        }
1859
1860        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1861        Continue(apply_merge_result)
1862    }
1863
1864    pub fn spine_exert(
1865        &mut self,
1866        fuel: usize,
1867    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1868        let (merge_reqs, did_work) = self.trace.exert(fuel);
1869        if did_work {
1870            Continue(merge_reqs)
1871        } else {
1872            assert!(merge_reqs.is_empty());
1873            // Break if we have nothing useful to do to save the seqno (and
1874            // resulting crdb traffic)
1875            Break(NoOpStateTransition(Vec::new()))
1876        }
1877    }
1878
1879    pub fn downgrade_since(
1880        &mut self,
1881        reader_id: &LeasedReaderId,
1882        seqno: SeqNo,
1883        outstanding_seqno: SeqNo,
1884        new_since: &Antichain<T>,
1885        heartbeat_timestamp_ms: u64,
1886    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1887        // We expire all readers if the upper and since both advance to the
1888        // empty antichain. Gracefully handle this. At the same time,
1889        // short-circuit the cmd application so we don't needlessly create new
1890        // SeqNos.
1891        if self.is_tombstone() {
1892            return Break(NoOpStateTransition(Since(Antichain::new())));
1893        }
1894
1895        // The only way to have a missing reader in state is if it's been expired... and in that
1896        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1897        let Some(reader_state) = self.leased_reader(reader_id) else {
1898            tracing::warn!(
1899                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1900            );
1901            return Break(NoOpStateTransition(Since(Antichain::new())));
1902        };
1903
1904        // Also use this as an opportunity to heartbeat the reader and downgrade
1905        // the seqno capability.
1906        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1907            heartbeat_timestamp_ms,
1908            reader_state.last_heartbeat_timestamp_ms,
1909        );
1910
1911        let seqno = {
1912            assert!(
1913                outstanding_seqno >= reader_state.seqno,
1914                "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1915                    is behind current reader_state ({:?})",
1916                outstanding_seqno,
1917                reader_state.seqno,
1918            );
1919            std::cmp::min(outstanding_seqno, seqno)
1920        };
1921
1922        reader_state.seqno = seqno;
1923
1924        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1925            reader_state.since.clone_from(new_since);
1926            self.update_since();
1927            new_since.clone()
1928        } else {
1929            // No-op, but still commit the state change so that this gets
1930            // linearized.
1931            reader_state.since.clone()
1932        };
1933
1934        Continue(Since(reader_current_since))
1935    }
1936
1937    pub fn compare_and_downgrade_since(
1938        &mut self,
1939        reader_id: &CriticalReaderId,
1940        expected_opaque: &Opaque,
1941        (new_opaque, new_since): (&Opaque, &Antichain<T>),
1942    ) -> ControlFlow<
1943        NoOpStateTransition<Result<Since<T>, (Opaque, Since<T>)>>,
1944        Result<Since<T>, (Opaque, Since<T>)>,
1945    > {
1946        // We expire all readers if the upper and since both advance to the
1947        // empty antichain. Gracefully handle this. At the same time,
1948        // short-circuit the cmd application so we don't needlessly create new
1949        // SeqNos.
1950        if self.is_tombstone() {
1951            // Match the idempotence behavior below of ignoring the token if
1952            // since is already advanced enough (in this case, because it's a
1953            // tombstone, we know it's the empty antichain).
1954            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1955        }
1956
1957        let reader_state = self.critical_reader(reader_id);
1958
1959        if reader_state.opaque != *expected_opaque {
1960            // No-op, but still commit the state change so that this gets
1961            // linearized.
1962            return Continue(Err((
1963                reader_state.opaque.clone(),
1964                Since(reader_state.since.clone()),
1965            )));
1966        }
1967
1968        reader_state.opaque = new_opaque.clone();
1969        if PartialOrder::less_equal(&reader_state.since, new_since) {
1970            reader_state.since.clone_from(new_since);
1971            self.update_since();
1972            Continue(Ok(Since(new_since.clone())))
1973        } else {
1974            // no work to be done -- the reader state's `since` is already sufficiently
1975            // advanced. we may someday need to revisit this branch when it's possible
1976            // for two `since` frontiers to be incomparable.
1977            Continue(Ok(Since(reader_state.since.clone())))
1978        }
1979    }
1980
1981    pub fn expire_leased_reader(
1982        &mut self,
1983        reader_id: &LeasedReaderId,
1984    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1985        // We expire all readers if the upper and since both advance to the
1986        // empty antichain. Gracefully handle this. At the same time,
1987        // short-circuit the cmd application so we don't needlessly create new
1988        // SeqNos.
1989        if self.is_tombstone() {
1990            return Break(NoOpStateTransition(false));
1991        }
1992
1993        let existed = self.leased_readers.remove(reader_id).is_some();
1994        if existed {
1995            // TODO(database-issues#6885): Re-enable this
1996            //
1997            // Temporarily disabling this because we think it might be the cause
1998            // of the remap since bug. Specifically, a clusterd process has a
1999            // ReadHandle for maintaining the once and one inside a Listen. If
2000            // we crash and stay down for longer than the read lease duration,
2001            // it's possible that an expiry of them both in quick succession
2002            // jumps the since forward to the Listen one.
2003            //
2004            // Don't forget to update the downgrade_since when this gets
2005            // switched back on.
2006            //
2007            // self.update_since();
2008        }
2009        // No-op if existed is false, but still commit the state change so that
2010        // this gets linearized.
2011        Continue(existed)
2012    }
2013
2014    pub fn expire_critical_reader(
2015        &mut self,
2016        reader_id: &CriticalReaderId,
2017    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2018        // We expire all readers if the upper and since both advance to the
2019        // empty antichain. Gracefully handle this. At the same time,
2020        // short-circuit the cmd application so we don't needlessly create new
2021        // SeqNos.
2022        if self.is_tombstone() {
2023            return Break(NoOpStateTransition(false));
2024        }
2025
2026        let existed = self.critical_readers.remove(reader_id).is_some();
2027        if existed {
2028            // TODO(database-issues#6885): Re-enable this
2029            //
2030            // Temporarily disabling this because we think it might be the cause
2031            // of the remap since bug. Specifically, a clusterd process has a
2032            // ReadHandle for maintaining the once and one inside a Listen. If
2033            // we crash and stay down for longer than the read lease duration,
2034            // it's possible that an expiry of them both in quick succession
2035            // jumps the since forward to the Listen one.
2036            //
2037            // Don't forget to update the downgrade_since when this gets
2038            // switched back on.
2039            //
2040            // self.update_since();
2041        }
2042        // This state transition is a no-op if existed is false, but we still
2043        // commit the state change so that this gets linearized (maybe we're
2044        // looking at old state).
2045        Continue(existed)
2046    }
2047
2048    pub fn expire_writer(
2049        &mut self,
2050        writer_id: &WriterId,
2051    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2052        // We expire all writers if the upper and since both advance to the
2053        // empty antichain. Gracefully handle this. At the same time,
2054        // short-circuit the cmd application so we don't needlessly create new
2055        // SeqNos.
2056        if self.is_tombstone() {
2057            return Break(NoOpStateTransition(false));
2058        }
2059
2060        let existed = self.writers.remove(writer_id).is_some();
2061        // This state transition is a no-op if existed is false, but we still
2062        // commit the state change so that this gets linearized (maybe we're
2063        // looking at old state).
2064        Continue(existed)
2065    }
2066
2067    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2068        self.leased_readers.get_mut(id)
2069    }
2070
2071    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2072        self.critical_readers
2073            .get_mut(id)
2074            .unwrap_or_else(|| {
2075                panic!(
2076                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2077                    id
2078                )
2079            })
2080    }
2081
2082    fn critical_since(&self) -> Option<Antichain<T>> {
2083        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2084        let mut since = critical_sinces.next().cloned()?;
2085        for s in critical_sinces {
2086            since.meet_assign(s);
2087        }
2088        Some(since)
2089    }
2090
2091    fn update_since(&mut self) {
2092        let mut sinces_iter = self
2093            .leased_readers
2094            .values()
2095            .map(|x| &x.since)
2096            .chain(self.critical_readers.values().map(|x| &x.since));
2097        let mut since = match sinces_iter.next() {
2098            Some(since) => since.clone(),
2099            None => {
2100                // If there are no current readers, leave `since` unchanged so
2101                // it doesn't regress.
2102                return;
2103            }
2104        };
2105        while let Some(s) = sinces_iter.next() {
2106            since.meet_assign(s);
2107        }
2108        self.trace.downgrade_since(&since);
2109    }
2110
2111    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2112        let mut seqno_since = seqno;
2113        for cap in self.leased_readers.values() {
2114            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2115        }
2116        // critical_readers don't hold a seqno capability.
2117        seqno_since
2118    }
2119
2120    fn tombstone_batch() -> HollowBatch<T> {
2121        HollowBatch::empty(Description::new(
2122            Antichain::from_elem(T::minimum()),
2123            Antichain::new(),
2124            Antichain::new(),
2125        ))
2126    }
2127
2128    pub(crate) fn is_tombstone(&self) -> bool {
2129        self.trace.upper().is_empty()
2130            && self.trace.since().is_empty()
2131            && self.writers.is_empty()
2132            && self.leased_readers.is_empty()
2133            && self.critical_readers.is_empty()
2134    }
2135
2136    pub(crate) fn is_single_empty_batch(&self) -> bool {
2137        let mut batch_count = 0;
2138        let mut is_empty = true;
2139        self.trace.map_batches(|b| {
2140            batch_count += 1;
2141            is_empty &= b.is_empty()
2142        });
2143        batch_count <= 1 && is_empty
2144    }
2145
2146    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2147        assert_eq!(self.trace.upper(), &Antichain::new());
2148        assert_eq!(self.trace.since(), &Antichain::new());
2149
2150        // Remember our current state, so we can decide whether we have to
2151        // record a transition in durable state.
2152        let was_tombstone = self.is_tombstone();
2153
2154        // Enter the "tombstone" state, if we're not in it already.
2155        self.writers.clear();
2156        self.leased_readers.clear();
2157        self.critical_readers.clear();
2158
2159        debug_assert!(self.is_tombstone());
2160
2161        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2162        // it -- the actual contents of our batches no longer matter.
2163        // This method progressively replaces batches in our state with simpler versions, to allow
2164        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2165        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2166        // transitions... most operations happen incrementally, and large single writes can overwhelm
2167        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2168
2169        let mut to_replace = None;
2170        let mut batch_count = 0;
2171        self.trace.map_batches(|b| {
2172            batch_count += 1;
2173            if !b.is_empty() && to_replace.is_none() {
2174                to_replace = Some(b.desc.clone());
2175            }
2176        });
2177        if let Some(desc) = to_replace {
2178            // We have a nonempty batch: replace it with an empty batch and return.
2179            // This should not produce an excessively large diff: if it did, we wouldn't have been
2180            // able to append that batch in the first place.
2181            let result = self.trace.apply_tombstone_merge(&desc);
2182            assert!(
2183                result.matched(),
2184                "merge with a matching desc should always match"
2185            );
2186            Continue(())
2187        } else if batch_count > 1 {
2188            // All our batches are empty, but we have more than one of them. Replace the whole set
2189            // with a new single-batch trace.
2190            // This produces a diff with a size proportional to the number of batches, but since
2191            // Spine keeps a logarithmic number of batches this should never be excessively large.
2192            let mut new_trace = Trace::default();
2193            new_trace.downgrade_since(&Antichain::new());
2194            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2195            assert_eq!(merge_reqs, Vec::new());
2196            self.trace = new_trace;
2197            Continue(())
2198        } else if !was_tombstone {
2199            // We were not tombstoned before, so have to make sure this state
2200            // transition is recorded.
2201            Continue(())
2202        } else {
2203            // All our batches are empty, and there's only one... there's no shrinking this
2204            // tombstone further.
2205            Break(NoOpStateTransition(()))
2206        }
2207    }
2208}
2209
2210// TODO: Document invariants.
2211#[derive(Debug)]
2212#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2213pub struct State<T> {
2214    pub(crate) shard_id: ShardId,
2215
2216    pub(crate) seqno: SeqNo,
2217    /// A strictly increasing wall time of when this state was written, in
2218    /// milliseconds since the unix epoch.
2219    pub(crate) walltime_ms: u64,
2220    /// Hostname of the persist user that created this version of state. For
2221    /// debugging.
2222    pub(crate) hostname: String,
2223    pub(crate) collections: StateCollections<T>,
2224}
2225
2226/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2227/// ones in durable storage.
2228pub struct TypedState<K, V, T, D> {
2229    pub(crate) state: State<T>,
2230
2231    // According to the docs, PhantomData is to "mark things that act like they
2232    // own a T". State doesn't actually own K, V, or D, just the ability to
2233    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2234    // T [1], but also allows State to correctly derive Send+Sync.
2235    //
2236    // [1]:
2237    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2238    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2239}
2240
2241impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2242    #[cfg(any(test, debug_assertions))]
2243    pub(crate) fn clone(&self, hostname: String) -> Self {
2244        TypedState {
2245            state: State {
2246                shard_id: self.shard_id.clone(),
2247                seqno: self.seqno.clone(),
2248                walltime_ms: self.walltime_ms,
2249                hostname,
2250                collections: self.collections.clone(),
2251            },
2252            _phantom: PhantomData,
2253        }
2254    }
2255
2256    pub(crate) fn clone_for_rollup(&self) -> Self {
2257        TypedState {
2258            state: State {
2259                shard_id: self.shard_id.clone(),
2260                seqno: self.seqno.clone(),
2261                walltime_ms: self.walltime_ms,
2262                hostname: self.hostname.clone(),
2263                collections: self.collections.clone(),
2264            },
2265            _phantom: PhantomData,
2266        }
2267    }
2268}
2269
2270impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2271    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2272        // Deconstruct self so we get a compile failure if new fields
2273        // are added.
2274        let TypedState { state, _phantom } = self;
2275        f.debug_struct("TypedState").field("state", state).finish()
2276    }
2277}
2278
2279// Impl PartialEq regardless of the type params.
2280#[cfg(any(test, debug_assertions))]
2281impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2282    fn eq(&self, other: &Self) -> bool {
2283        // Deconstruct self and other so we get a compile failure if new fields
2284        // are added.
2285        let TypedState {
2286            state: self_state,
2287            _phantom,
2288        } = self;
2289        let TypedState {
2290            state: other_state,
2291            _phantom,
2292        } = other;
2293        self_state == other_state
2294    }
2295}
2296
2297impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2298    type Target = State<T>;
2299
2300    fn deref(&self) -> &Self::Target {
2301        &self.state
2302    }
2303}
2304
2305impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2306    fn deref_mut(&mut self) -> &mut Self::Target {
2307        &mut self.state
2308    }
2309}
2310
2311impl<K, V, T, D> TypedState<K, V, T, D>
2312where
2313    K: Codec,
2314    V: Codec,
2315    T: Timestamp + Lattice + Codec64,
2316    D: Codec64,
2317{
2318    pub fn new(
2319        applier_version: Version,
2320        shard_id: ShardId,
2321        hostname: String,
2322        walltime_ms: u64,
2323    ) -> Self {
2324        let state = State {
2325            shard_id,
2326            seqno: SeqNo::minimum(),
2327            walltime_ms,
2328            hostname,
2329            collections: StateCollections {
2330                version: applier_version,
2331                last_gc_req: SeqNo::minimum(),
2332                rollups: BTreeMap::new(),
2333                active_rollup: None,
2334                active_gc: None,
2335                leased_readers: BTreeMap::new(),
2336                critical_readers: BTreeMap::new(),
2337                writers: BTreeMap::new(),
2338                schemas: BTreeMap::new(),
2339                trace: Trace::default(),
2340            },
2341        };
2342        TypedState {
2343            state,
2344            _phantom: PhantomData,
2345        }
2346    }
2347
2348    pub fn clone_apply<R, E, WorkFn>(
2349        &self,
2350        cfg: &PersistConfig,
2351        work_fn: &mut WorkFn,
2352    ) -> ControlFlow<E, (R, Self)>
2353    where
2354        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2355    {
2356        // We do not increment the version by default, though work_fn can if it chooses to.
2357        let mut new_state = State {
2358            shard_id: self.shard_id,
2359            seqno: self.seqno.next(),
2360            walltime_ms: (cfg.now)(),
2361            hostname: cfg.hostname.clone(),
2362            collections: self.collections.clone(),
2363        };
2364
2365        // Make sure walltime_ms is strictly increasing, in case clocks are
2366        // offset.
2367        if new_state.walltime_ms <= self.walltime_ms {
2368            new_state.walltime_ms = self.walltime_ms + 1;
2369        }
2370
2371        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2372        let new_state = TypedState {
2373            state: new_state,
2374            _phantom: PhantomData,
2375        };
2376        Continue((work_ret, new_state))
2377    }
2378}
2379
2380#[derive(Copy, Clone, Debug)]
2381pub struct GcConfig {
2382    pub use_active_gc: bool,
2383    pub fallback_threshold_ms: u64,
2384    pub min_versions: usize,
2385    pub max_versions: usize,
2386}
2387
2388impl<T> State<T>
2389where
2390    T: Timestamp + Lattice + Codec64,
2391{
2392    pub fn shard_id(&self) -> ShardId {
2393        self.shard_id
2394    }
2395
2396    pub fn seqno(&self) -> SeqNo {
2397        self.seqno
2398    }
2399
2400    pub fn since(&self) -> &Antichain<T> {
2401        self.collections.trace.since()
2402    }
2403
2404    pub fn upper(&self) -> &Antichain<T> {
2405        self.collections.trace.upper()
2406    }
2407
2408    pub fn spine_batch_count(&self) -> usize {
2409        self.collections.trace.num_spine_batches()
2410    }
2411
2412    pub fn size_metrics(&self) -> StateSizeMetrics {
2413        let mut ret = StateSizeMetrics::default();
2414        self.blobs().for_each(|x| match x {
2415            HollowBlobRef::Batch(x) => {
2416                ret.hollow_batch_count += 1;
2417                ret.batch_part_count += x.part_count();
2418                ret.num_updates += x.len;
2419
2420                let batch_size = x.encoded_size_bytes();
2421                for x in x.parts.iter() {
2422                    if x.ts_rewrite().is_some() {
2423                        ret.rewrite_part_count += 1;
2424                    }
2425                    if x.is_inline() {
2426                        ret.inline_part_count += 1;
2427                        ret.inline_part_bytes += x.inline_bytes();
2428                    }
2429                }
2430                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2431                ret.state_batches_bytes += batch_size;
2432            }
2433            HollowBlobRef::Rollup(x) => {
2434                ret.state_rollup_count += 1;
2435                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2436            }
2437        });
2438        ret
2439    }
2440
2441    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2442        // We maintain the invariant that every version of state has at least
2443        // one rollup.
2444        self.collections
2445            .rollups
2446            .iter()
2447            .rev()
2448            .next()
2449            .expect("State should have at least one rollup if seqno > minimum")
2450    }
2451
2452    pub(crate) fn seqno_since(&self) -> SeqNo {
2453        self.collections.seqno_since(self.seqno)
2454    }
2455
2456    // Returns whether the cmd proposing this state has been selected to perform
2457    // background garbage collection work.
2458    //
2459    // If it was selected, this information is recorded in the state itself for
2460    // commit along with the cmd's state transition. This helps us to avoid
2461    // redundant work.
2462    //
2463    // Correctness does not depend on a gc assignment being executed, nor on
2464    // them being executed in the order they are given. But it is expected that
2465    // gc assignments are best-effort respected. In practice, cmds like
2466    // register_foo or expire_foo, where it would be awkward, ignore gc.
2467    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2468        let GcConfig {
2469            use_active_gc,
2470            fallback_threshold_ms,
2471            min_versions,
2472            max_versions,
2473        } = cfg;
2474        // This is an arbitrary-ish threshold that scales with seqno, but never
2475        // gets particularly big. It probably could be much bigger and certainly
2476        // could use a tuning pass at some point.
2477        let gc_threshold = if use_active_gc {
2478            u64::cast_from(min_versions)
2479        } else {
2480            std::cmp::max(
2481                1,
2482                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2483            )
2484        };
2485        let new_seqno_since = self.seqno_since();
2486        // Collect until the new seqno since... or the old since plus the max number of versions,
2487        // whatever is less.
2488        let gc_until_seqno = new_seqno_since.min(SeqNo(
2489            self.collections
2490                .last_gc_req
2491                .0
2492                .saturating_add(u64::cast_from(max_versions)),
2493        ));
2494        let should_gc = new_seqno_since
2495            .0
2496            .saturating_sub(self.collections.last_gc_req.0)
2497            >= gc_threshold;
2498
2499        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2500        // it's been a while since it started, we should gc.
2501        let should_gc = if use_active_gc && !should_gc {
2502            match self.collections.active_gc {
2503                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2504                None => false,
2505            }
2506        } else {
2507            should_gc
2508        };
2509        // Assign GC traffic preferentially to writers, falling back to anyone
2510        // generating new state versions if there are no writers.
2511        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2512        // Always assign GC work to a tombstoned shard to have the chance to
2513        // clean up any residual blobs. This is safe (won't cause excess gc)
2514        // as the only allowed command after becoming a tombstone is to write
2515        // the final rollup.
2516        let tombstone_needs_gc = self.collections.is_tombstone();
2517        let should_gc = should_gc || tombstone_needs_gc;
2518        let should_gc = if use_active_gc {
2519            // If we have an active gc, we should only gc if the active gc is
2520            // sufficiently old. This is to avoid doing more gc work than
2521            // necessary.
2522            should_gc
2523                && match self.collections.active_gc {
2524                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2525                    None => true,
2526                }
2527        } else {
2528            should_gc
2529        };
2530        if should_gc {
2531            self.collections.last_gc_req = gc_until_seqno;
2532            Some(GcReq {
2533                shard_id: self.shard_id,
2534                new_seqno_since: gc_until_seqno,
2535            })
2536        } else {
2537            None
2538        }
2539    }
2540
2541    /// Return the number of gc-ineligible state versions.
2542    pub fn seqnos_held(&self) -> usize {
2543        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2544    }
2545
2546    /// Expire all readers and writers up to the given walltime_ms.
2547    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2548        let mut metrics = ExpiryMetrics::default();
2549        let shard_id = self.shard_id();
2550        self.collections.leased_readers.retain(|id, state| {
2551            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2552            if !retain {
2553                info!(
2554                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2555                    state.debug.purpose
2556                );
2557                metrics.readers_expired += 1;
2558            }
2559            retain
2560        });
2561        // critical_readers don't need forced expiration. (In fact, that's the point!)
2562        self.collections.writers.retain(|id, state| {
2563            let retain =
2564                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2565            if !retain {
2566                info!(
2567                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2568                    state.debug.purpose
2569                );
2570                metrics.writers_expired += 1;
2571            }
2572            retain
2573        });
2574        metrics
2575    }
2576
2577    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2578    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2579    /// referenced parts are valid to read.
2580    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2581        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2582            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2583                self.collections.trace.since().clone(),
2584            )));
2585        }
2586        let upper = self.collections.trace.upper();
2587        if PartialOrder::less_equal(upper, as_of) {
2588            return Err(SnapshotErr::AsOfNotYetAvailable(
2589                self.seqno,
2590                Upper(upper.clone()),
2591            ));
2592        }
2593
2594        let batches = self
2595            .collections
2596            .trace
2597            .batches()
2598            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2599            .cloned()
2600            .collect();
2601        Ok(batches)
2602    }
2603
2604    // NB: Unlike the other methods here, this one is read-only.
2605    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2606        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2607            return Err(Since(self.collections.trace.since().clone()));
2608        }
2609        Ok(())
2610    }
2611
2612    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2613        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2614        // batch and this iterates through all batches to find the next one.
2615        self.collections
2616            .trace
2617            .batches()
2618            .find(|b| {
2619                PartialOrder::less_equal(b.desc.lower(), frontier)
2620                    && PartialOrder::less_than(frontier, b.desc.upper())
2621            })
2622            .cloned()
2623            .ok_or(self.seqno)
2624    }
2625
2626    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2627        self.collections.active_rollup
2628    }
2629
2630    pub fn need_rollup(
2631        &self,
2632        threshold: usize,
2633        use_active_rollup: bool,
2634        fallback_threshold_ms: u64,
2635        now: u64,
2636    ) -> Option<SeqNo> {
2637        let (latest_rollup_seqno, _) = self.latest_rollup();
2638
2639        // Tombstoned shards require one final rollup. However, because we
2640        // write a rollup as of SeqNo X and then link it in using a state
2641        // transition (in this case from X to X+1), the minimum number of
2642        // live diffs is actually two. Detect when we're in this minimal
2643        // two diff state and stop the (otherwise) infinite iteration.
2644        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2645            return Some(self.seqno);
2646        }
2647
2648        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2649
2650        if use_active_rollup {
2651            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2652            // we should start a new rollup.
2653            // If there is an active rollup, we should check if it has been running too long.
2654            // If it has, we should start a new rollup.
2655            // This is to guard against a worker dying/taking too long/etc.
2656            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2657                match self.active_rollup() {
2658                    Some(active_rollup) => {
2659                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2660                            return Some(self.seqno);
2661                        }
2662                    }
2663                    None => {
2664                        return Some(self.seqno);
2665                    }
2666                }
2667            }
2668        } else {
2669            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2670            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2671            // racing / performing redundant work.
2672            if seqnos_since_last_rollup > 0
2673                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2674            {
2675                return Some(self.seqno);
2676            }
2677
2678            // however, since maintenance is best-effort and could fail, do assign rollup
2679            // work to every seqno after a fallback threshold to ensure one is written.
2680            if seqnos_since_last_rollup
2681                > u64::cast_from(
2682                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2683                )
2684            {
2685                return Some(self.seqno);
2686            }
2687        }
2688
2689        None
2690    }
2691
2692    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2693        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2694        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2695        batches.chain(rollups)
2696    }
2697}
2698
2699fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2700    let val = hex::encode(val);
2701    val.serialize(s)
2702}
2703
2704fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2705    val: &Option<LazyProto<T>>,
2706    s: S,
2707) -> Result<S::Ok, S::Error> {
2708    val.as_ref()
2709        .map(|lazy| hex::encode(&lazy.into_proto()))
2710        .serialize(s)
2711}
2712
2713fn serialize_part_stats<S: Serializer>(
2714    val: &Option<LazyPartStats>,
2715    s: S,
2716) -> Result<S::Ok, S::Error> {
2717    let val = val.as_ref().map(|x| x.decode().key);
2718    val.serialize(s)
2719}
2720
2721fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2722    // This is only used for debugging, so hack to assume that D is i64.
2723    let val = val.map(i64::decode);
2724    val.serialize(s)
2725}
2726
2727// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2728// intentionally gated from users, so not strictly subject to our backward
2729// compatibility guarantees, but still probably best to be thoughtful about
2730// making unnecessary changes. Additionally, it's nice to make the output as
2731// nice to use as possible without tying our hands for the actual code usages.
2732impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2733    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2734        let State {
2735            shard_id,
2736            seqno,
2737            walltime_ms,
2738            hostname,
2739            collections:
2740                StateCollections {
2741                    version: applier_version,
2742                    last_gc_req,
2743                    rollups,
2744                    active_rollup,
2745                    active_gc,
2746                    leased_readers,
2747                    critical_readers,
2748                    writers,
2749                    schemas,
2750                    trace,
2751                },
2752        } = self;
2753        let mut s = s.serialize_struct("State", 13)?;
2754        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2755        let () = s.serialize_field("shard_id", shard_id)?;
2756        let () = s.serialize_field("seqno", seqno)?;
2757        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2758        let () = s.serialize_field("hostname", hostname)?;
2759        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2760        let () = s.serialize_field("rollups", rollups)?;
2761        let () = s.serialize_field("active_rollup", active_rollup)?;
2762        let () = s.serialize_field("active_gc", active_gc)?;
2763        let () = s.serialize_field("leased_readers", leased_readers)?;
2764        let () = s.serialize_field("critical_readers", critical_readers)?;
2765        let () = s.serialize_field("writers", writers)?;
2766        let () = s.serialize_field("schemas", schemas)?;
2767        let () = s.serialize_field("since", &trace.since().elements())?;
2768        let () = s.serialize_field("upper", &trace.upper().elements())?;
2769        let trace = trace.flatten();
2770        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2771        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2772        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2773        let () = s.serialize_field("merges", &trace.merges)?;
2774        s.end()
2775    }
2776}
2777
2778#[derive(Debug, Default)]
2779pub struct StateSizeMetrics {
2780    pub hollow_batch_count: usize,
2781    pub batch_part_count: usize,
2782    pub rewrite_part_count: usize,
2783    pub num_updates: usize,
2784    pub largest_batch_bytes: usize,
2785    pub state_batches_bytes: usize,
2786    pub state_rollups_bytes: usize,
2787    pub state_rollup_count: usize,
2788    pub inline_part_count: usize,
2789    pub inline_part_bytes: usize,
2790}
2791
2792#[derive(Default)]
2793pub struct ExpiryMetrics {
2794    pub(crate) readers_expired: usize,
2795    pub(crate) writers_expired: usize,
2796}
2797
2798/// Wrapper for Antichain that represents a Since
2799#[derive(Debug, Clone, PartialEq)]
2800pub struct Since<T>(pub Antichain<T>);
2801
2802/// Wrapper for Antichain that represents an Upper
2803#[derive(Debug, PartialEq)]
2804pub struct Upper<T>(pub Antichain<T>);
2805
2806#[cfg(test)]
2807pub(crate) mod tests {
2808    use std::ops::Range;
2809    use std::str::FromStr;
2810
2811    use bytes::Bytes;
2812    use mz_build_info::DUMMY_BUILD_INFO;
2813    use mz_dyncfg::ConfigUpdates;
2814    use mz_ore::now::SYSTEM_TIME;
2815    use mz_ore::{assert_none, assert_ok};
2816    use mz_proto::RustType;
2817    use proptest::prelude::*;
2818    use proptest::strategy::ValueTree;
2819
2820    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2821    use crate::cache::PersistClientCache;
2822    use crate::internal::encoding::any_some_lazy_part_stats;
2823    use crate::internal::paths::RollupId;
2824    use crate::internal::trace::tests::any_trace;
2825    use crate::tests::new_test_client_cache;
2826    use crate::{Diagnostics, PersistLocation};
2827
2828    use super::*;
2829
2830    const LEASE_DURATION_MS: u64 = 900 * 1000;
2831    fn debug_state() -> HandleDebugState {
2832        HandleDebugState {
2833            hostname: "debug".to_owned(),
2834            purpose: "finding the bugs".to_owned(),
2835        }
2836    }
2837
2838    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2839        num_runs: usize,
2840    ) -> impl Strategy<Value = HollowBatch<T>> {
2841        (
2842            any::<T>(),
2843            any::<T>(),
2844            any::<T>(),
2845            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2846            any::<usize>(),
2847        )
2848            .prop_map(move |(t0, t1, since, parts, len)| {
2849                let (lower, upper) = if t0 <= t1 {
2850                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2851                } else {
2852                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2853                };
2854                let since = Antichain::from_elem(since);
2855
2856                let run_splits = (1..num_runs)
2857                    .map(|i| i * parts.len() / num_runs)
2858                    .collect::<Vec<_>>();
2859
2860                let run_meta = (0..num_runs)
2861                    .map(|_| {
2862                        let mut meta = RunMeta::default();
2863                        meta.id = Some(RunId::new());
2864                        meta
2865                    })
2866                    .collect::<Vec<_>>();
2867
2868                HollowBatch::new(
2869                    Description::new(lower, upper, since),
2870                    parts,
2871                    len % 10,
2872                    run_meta,
2873                    run_splits,
2874                )
2875            })
2876    }
2877
2878    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2879        Strategy::prop_map(
2880            (
2881                any::<T>(),
2882                any::<T>(),
2883                any::<T>(),
2884                proptest::collection::vec(any_run_part::<T>(), 0..20),
2885                any::<usize>(),
2886                0..=10usize,
2887                proptest::collection::vec(any::<RunId>(), 10),
2888            ),
2889            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2890                let (lower, upper) = if t0 <= t1 {
2891                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2892                } else {
2893                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2894                };
2895                let since = Antichain::from_elem(since);
2896                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2897                    let run_splits = (1..num_runs)
2898                        .map(|i| i * parts.len() / num_runs)
2899                        .collect::<Vec<_>>();
2900
2901                    let run_meta = (0..num_runs)
2902                        .enumerate()
2903                        .map(|(i, _)| {
2904                            let mut meta = RunMeta::default();
2905                            meta.id = Some(run_ids[i]);
2906                            meta
2907                        })
2908                        .collect::<Vec<_>>();
2909
2910                    HollowBatch::new(
2911                        Description::new(lower, upper, since),
2912                        parts,
2913                        len % 10,
2914                        run_meta,
2915                        run_splits,
2916                    )
2917                } else {
2918                    HollowBatch::new_run_for_test(
2919                        Description::new(lower, upper, since),
2920                        parts,
2921                        len % 10,
2922                        run_ids[0],
2923                    )
2924                }
2925            },
2926        )
2927    }
2928
2929    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2930        Strategy::prop_map(
2931            (
2932                any::<bool>(),
2933                any_hollow_batch_part(),
2934                any::<Option<T>>(),
2935                any::<Option<SchemaId>>(),
2936                any::<Option<SchemaId>>(),
2937            ),
2938            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2939                if is_hollow {
2940                    BatchPart::Hollow(hollow)
2941                } else {
2942                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2943                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2944                    BatchPart::Inline {
2945                        updates,
2946                        ts_rewrite,
2947                        schema_id,
2948                        deprecated_schema_id,
2949                    }
2950                }
2951            },
2952        )
2953    }
2954
2955    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2956        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2957    }
2958
2959    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2960    -> impl Strategy<Value = HollowBatchPart<T>> {
2961        Strategy::prop_map(
2962            (
2963                any::<PartialBatchKey>(),
2964                any::<usize>(),
2965                any::<Vec<u8>>(),
2966                any_some_lazy_part_stats(),
2967                any::<Option<T>>(),
2968                any::<[u8; 8]>(),
2969                any::<Option<BatchColumnarFormat>>(),
2970                any::<Option<SchemaId>>(),
2971                any::<Option<SchemaId>>(),
2972            ),
2973            |(
2974                key,
2975                encoded_size_bytes,
2976                key_lower,
2977                stats,
2978                ts_rewrite,
2979                diffs_sum,
2980                format,
2981                schema_id,
2982                deprecated_schema_id,
2983            )| {
2984                HollowBatchPart {
2985                    key,
2986                    meta: Default::default(),
2987                    encoded_size_bytes,
2988                    key_lower,
2989                    structured_key_lower: None,
2990                    stats,
2991                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2992                    diffs_sum: Some(diffs_sum),
2993                    format,
2994                    schema_id,
2995                    deprecated_schema_id,
2996                }
2997            },
2998        )
2999    }
3000
3001    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3002        Strategy::prop_map(
3003            (
3004                any::<SeqNo>(),
3005                any::<Option<T>>(),
3006                any::<u64>(),
3007                any::<u64>(),
3008                any::<HandleDebugState>(),
3009            ),
3010            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3011                // lease_duration_ms of 0 means this state was written by an old
3012                // version of code, which means we'll migrate it in the decode
3013                // path. Avoid.
3014                if lease_duration_ms == 0 {
3015                    lease_duration_ms += 1;
3016                }
3017                LeasedReaderState {
3018                    seqno,
3019                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3020                    last_heartbeat_timestamp_ms,
3021                    lease_duration_ms,
3022                    debug,
3023                }
3024            },
3025        )
3026    }
3027
3028    pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3029    where
3030        T: Arbitrary,
3031    {
3032        Strategy::prop_map(
3033            (
3034                any::<Option<T>>(),
3035                any::<Opaque>(),
3036                any::<HandleDebugState>(),
3037            ),
3038            |(since, opaque, debug)| CriticalReaderState {
3039                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3040                opaque,
3041                debug,
3042            },
3043        )
3044    }
3045
3046    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3047        Strategy::prop_map(
3048            (
3049                any::<u64>(),
3050                any::<u64>(),
3051                any::<IdempotencyToken>(),
3052                any::<Option<T>>(),
3053                any::<HandleDebugState>(),
3054            ),
3055            |(
3056                last_heartbeat_timestamp_ms,
3057                lease_duration_ms,
3058                most_recent_write_token,
3059                most_recent_write_upper,
3060                debug,
3061            )| WriterState {
3062                last_heartbeat_timestamp_ms,
3063                lease_duration_ms,
3064                most_recent_write_token,
3065                most_recent_write_upper: most_recent_write_upper
3066                    .map_or_else(Antichain::new, Antichain::from_elem),
3067                debug,
3068            },
3069        )
3070    }
3071
3072    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3073        Strategy::prop_map(
3074            (
3075                any::<Vec<u8>>(),
3076                any::<Vec<u8>>(),
3077                any::<Vec<u8>>(),
3078                any::<Vec<u8>>(),
3079            ),
3080            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3081                key: Bytes::from(key),
3082                key_data_type: Bytes::from(key_data_type),
3083                val: Bytes::from(val),
3084                val_data_type: Bytes::from(val_data_type),
3085            },
3086        )
3087    }
3088
3089    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3090        num_trace_batches: Range<usize>,
3091    ) -> impl Strategy<Value = State<T>> {
3092        let part1 = (
3093            any::<ShardId>(),
3094            any::<SeqNo>(),
3095            any::<u64>(),
3096            any::<String>(),
3097            any::<SeqNo>(),
3098            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3099            proptest::option::of(any::<ActiveRollup>()),
3100        );
3101
3102        let part2 = (
3103            proptest::option::of(any::<ActiveGc>()),
3104            proptest::collection::btree_map(
3105                any::<LeasedReaderId>(),
3106                any_leased_reader_state::<T>(),
3107                1..3,
3108            ),
3109            proptest::collection::btree_map(
3110                any::<CriticalReaderId>(),
3111                any_critical_reader_state::<T>(),
3112                1..3,
3113            ),
3114            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3115            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3116            any_trace::<T>(num_trace_batches),
3117        );
3118
3119        (part1, part2).prop_map(
3120            |(
3121                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3122                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3123            )| State {
3124                shard_id,
3125                seqno,
3126                walltime_ms,
3127                hostname,
3128                collections: StateCollections {
3129                    version: Version::new(1, 2, 3),
3130                    last_gc_req,
3131                    rollups,
3132                    active_rollup,
3133                    active_gc,
3134                    leased_readers,
3135                    critical_readers,
3136                    writers,
3137                    schemas,
3138                    trace,
3139                },
3140            },
3141        )
3142    }
3143
3144    pub(crate) fn hollow<T: Timestamp>(
3145        lower: T,
3146        upper: T,
3147        keys: &[&str],
3148        len: usize,
3149    ) -> HollowBatch<T> {
3150        HollowBatch::new_run(
3151            Description::new(
3152                Antichain::from_elem(lower),
3153                Antichain::from_elem(upper),
3154                Antichain::from_elem(T::minimum()),
3155            ),
3156            keys.iter()
3157                .map(|x| {
3158                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3159                        key: PartialBatchKey((*x).to_owned()),
3160                        meta: Default::default(),
3161                        encoded_size_bytes: 0,
3162                        key_lower: vec![],
3163                        structured_key_lower: None,
3164                        stats: None,
3165                        ts_rewrite: None,
3166                        diffs_sum: None,
3167                        format: None,
3168                        schema_id: None,
3169                        deprecated_schema_id: None,
3170                    }))
3171                })
3172                .collect(),
3173            len,
3174        )
3175    }
3176
3177    #[mz_ore::test]
3178    fn downgrade_since() {
3179        let mut state = TypedState::<(), (), u64, i64>::new(
3180            DUMMY_BUILD_INFO.semver_version(),
3181            ShardId::new(),
3182            "".to_owned(),
3183            0,
3184        );
3185        let reader = LeasedReaderId::new();
3186        let seqno = SeqNo::minimum();
3187        let now = SYSTEM_TIME.clone();
3188        let _ = state.collections.register_leased_reader(
3189            "",
3190            &reader,
3191            "",
3192            seqno,
3193            Duration::from_secs(10),
3194            now(),
3195            false,
3196        );
3197
3198        // The shard global since == 0 initially.
3199        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3200
3201        // Greater
3202        assert_eq!(
3203            state.collections.downgrade_since(
3204                &reader,
3205                seqno,
3206                seqno,
3207                &Antichain::from_elem(2),
3208                now()
3209            ),
3210            Continue(Since(Antichain::from_elem(2)))
3211        );
3212        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3213        // Equal (no-op)
3214        assert_eq!(
3215            state.collections.downgrade_since(
3216                &reader,
3217                seqno,
3218                seqno,
3219                &Antichain::from_elem(2),
3220                now()
3221            ),
3222            Continue(Since(Antichain::from_elem(2)))
3223        );
3224        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3225        // Less (no-op)
3226        assert_eq!(
3227            state.collections.downgrade_since(
3228                &reader,
3229                seqno,
3230                seqno,
3231                &Antichain::from_elem(1),
3232                now()
3233            ),
3234            Continue(Since(Antichain::from_elem(2)))
3235        );
3236        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3237
3238        // Create a second reader.
3239        let reader2 = LeasedReaderId::new();
3240        let _ = state.collections.register_leased_reader(
3241            "",
3242            &reader2,
3243            "",
3244            seqno,
3245            Duration::from_secs(10),
3246            now(),
3247            false,
3248        );
3249
3250        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3251        assert_eq!(
3252            state.collections.downgrade_since(
3253                &reader2,
3254                seqno,
3255                seqno,
3256                &Antichain::from_elem(3),
3257                now()
3258            ),
3259            Continue(Since(Antichain::from_elem(3)))
3260        );
3261        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3262        // Shard since == 3 when all readers have since >= 3.
3263        assert_eq!(
3264            state.collections.downgrade_since(
3265                &reader,
3266                seqno,
3267                seqno,
3268                &Antichain::from_elem(5),
3269                now()
3270            ),
3271            Continue(Since(Antichain::from_elem(5)))
3272        );
3273        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3274
3275        // Shard since unaffected readers with since > shard since expiring.
3276        assert_eq!(
3277            state.collections.expire_leased_reader(&reader),
3278            Continue(true)
3279        );
3280        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3281
3282        // Create a third reader.
3283        let reader3 = LeasedReaderId::new();
3284        let _ = state.collections.register_leased_reader(
3285            "",
3286            &reader3,
3287            "",
3288            seqno,
3289            Duration::from_secs(10),
3290            now(),
3291            false,
3292        );
3293
3294        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3295        assert_eq!(
3296            state.collections.downgrade_since(
3297                &reader3,
3298                seqno,
3299                seqno,
3300                &Antichain::from_elem(10),
3301                now()
3302            ),
3303            Continue(Since(Antichain::from_elem(10)))
3304        );
3305        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3306
3307        // Shard since advances when reader with the minimal since expires.
3308        assert_eq!(
3309            state.collections.expire_leased_reader(&reader2),
3310            Continue(true)
3311        );
3312        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3313        // Switch this assertion back when we re-enable this.
3314        //
3315        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3316        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3317
3318        // Shard since unaffected when all readers are expired.
3319        assert_eq!(
3320            state.collections.expire_leased_reader(&reader3),
3321            Continue(true)
3322        );
3323        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3324        // Switch this assertion back when we re-enable this.
3325        //
3326        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3327        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3328    }
3329
3330    #[mz_ore::test]
3331    fn compare_and_downgrade_since() {
3332        let mut state = TypedState::<(), (), u64, i64>::new(
3333            DUMMY_BUILD_INFO.semver_version(),
3334            ShardId::new(),
3335            "".to_owned(),
3336            0,
3337        );
3338        let reader = CriticalReaderId::new();
3339        let _ = state
3340            .collections
3341            .register_critical_reader("", &reader, Opaque::encode(&0u64), "");
3342
3343        // The shard global since == 0 initially.
3344        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3345        // The initial opaque value should be set.
3346        assert_eq!(
3347            state
3348                .collections
3349                .critical_reader(&reader)
3350                .opaque
3351                .decode::<u64>(),
3352            u64::MIN
3353        );
3354
3355        // Greater
3356        assert_eq!(
3357            state.collections.compare_and_downgrade_since(
3358                &reader,
3359                &Opaque::encode(&0u64),
3360                (&Opaque::encode(&1u64), &Antichain::from_elem(2)),
3361            ),
3362            Continue(Ok(Since(Antichain::from_elem(2))))
3363        );
3364        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3365        assert_eq!(
3366            state
3367                .collections
3368                .critical_reader(&reader)
3369                .opaque
3370                .decode::<u64>(),
3371            1
3372        );
3373        // Equal (no-op)
3374        assert_eq!(
3375            state.collections.compare_and_downgrade_since(
3376                &reader,
3377                &Opaque::encode(&1u64),
3378                (&Opaque::encode(&2u64), &Antichain::from_elem(2)),
3379            ),
3380            Continue(Ok(Since(Antichain::from_elem(2))))
3381        );
3382        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3383        assert_eq!(
3384            state
3385                .collections
3386                .critical_reader(&reader)
3387                .opaque
3388                .decode::<u64>(),
3389            2
3390        );
3391        // Less (no-op)
3392        assert_eq!(
3393            state.collections.compare_and_downgrade_since(
3394                &reader,
3395                &Opaque::encode(&2u64),
3396                (&Opaque::encode(&3u64), &Antichain::from_elem(1)),
3397            ),
3398            Continue(Ok(Since(Antichain::from_elem(2))))
3399        );
3400        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3401        assert_eq!(
3402            state
3403                .collections
3404                .critical_reader(&reader)
3405                .opaque
3406                .decode::<u64>(),
3407            3
3408        );
3409    }
3410
3411    #[mz_ore::test]
3412    fn compare_and_append() {
3413        let state = &mut TypedState::<String, String, u64, i64>::new(
3414            DUMMY_BUILD_INFO.semver_version(),
3415            ShardId::new(),
3416            "".to_owned(),
3417            0,
3418        )
3419        .collections;
3420
3421        let writer_id = WriterId::new();
3422        let now = SYSTEM_TIME.clone();
3423
3424        // State is initially empty.
3425        assert_eq!(state.trace.num_spine_batches(), 0);
3426        assert_eq!(state.trace.num_hollow_batches(), 0);
3427        assert_eq!(state.trace.num_updates(), 0);
3428
3429        // Cannot insert a batch with a lower != current shard upper.
3430        assert_eq!(
3431            state.compare_and_append(
3432                &hollow(1, 2, &["key1"], 1),
3433                &writer_id,
3434                now(),
3435                LEASE_DURATION_MS,
3436                &IdempotencyToken::new(),
3437                &debug_state(),
3438                0,
3439                100,
3440                None
3441            ),
3442            Break(CompareAndAppendBreak::Upper {
3443                shard_upper: Antichain::from_elem(0),
3444                writer_upper: Antichain::from_elem(0)
3445            })
3446        );
3447
3448        // Insert an empty batch with an upper > lower..
3449        assert!(
3450            state
3451                .compare_and_append(
3452                    &hollow(0, 5, &[], 0),
3453                    &writer_id,
3454                    now(),
3455                    LEASE_DURATION_MS,
3456                    &IdempotencyToken::new(),
3457                    &debug_state(),
3458                    0,
3459                    100,
3460                    None
3461                )
3462                .is_continue()
3463        );
3464
3465        // Cannot insert a batch with a upper less than the lower.
3466        assert_eq!(
3467            state.compare_and_append(
3468                &hollow(5, 4, &["key1"], 1),
3469                &writer_id,
3470                now(),
3471                LEASE_DURATION_MS,
3472                &IdempotencyToken::new(),
3473                &debug_state(),
3474                0,
3475                100,
3476                None
3477            ),
3478            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3479                lower: Antichain::from_elem(5),
3480                upper: Antichain::from_elem(4)
3481            }))
3482        );
3483
3484        // Cannot insert a nonempty batch with an upper equal to lower.
3485        assert_eq!(
3486            state.compare_and_append(
3487                &hollow(5, 5, &["key1"], 1),
3488                &writer_id,
3489                now(),
3490                LEASE_DURATION_MS,
3491                &IdempotencyToken::new(),
3492                &debug_state(),
3493                0,
3494                100,
3495                None
3496            ),
3497            Break(CompareAndAppendBreak::InvalidUsage(
3498                InvalidEmptyTimeInterval {
3499                    lower: Antichain::from_elem(5),
3500                    upper: Antichain::from_elem(5),
3501                    keys: vec!["key1".to_owned()],
3502                }
3503            ))
3504        );
3505
3506        // Can insert an empty batch with an upper equal to lower.
3507        assert!(
3508            state
3509                .compare_and_append(
3510                    &hollow(5, 5, &[], 0),
3511                    &writer_id,
3512                    now(),
3513                    LEASE_DURATION_MS,
3514                    &IdempotencyToken::new(),
3515                    &debug_state(),
3516                    0,
3517                    100,
3518                    None
3519                )
3520                .is_continue()
3521        );
3522    }
3523
3524    #[mz_ore::test]
3525    fn snapshot() {
3526        let now = SYSTEM_TIME.clone();
3527
3528        let mut state = TypedState::<String, String, u64, i64>::new(
3529            DUMMY_BUILD_INFO.semver_version(),
3530            ShardId::new(),
3531            "".to_owned(),
3532            0,
3533        );
3534        // Cannot take a snapshot with as_of == shard upper.
3535        assert_eq!(
3536            state.snapshot(&Antichain::from_elem(0)),
3537            Err(SnapshotErr::AsOfNotYetAvailable(
3538                SeqNo(0),
3539                Upper(Antichain::from_elem(0))
3540            ))
3541        );
3542
3543        // Cannot take a snapshot with as_of > shard upper.
3544        assert_eq!(
3545            state.snapshot(&Antichain::from_elem(5)),
3546            Err(SnapshotErr::AsOfNotYetAvailable(
3547                SeqNo(0),
3548                Upper(Antichain::from_elem(0))
3549            ))
3550        );
3551
3552        let writer_id = WriterId::new();
3553
3554        // Advance upper to 5.
3555        assert!(
3556            state
3557                .collections
3558                .compare_and_append(
3559                    &hollow(0, 5, &["key1"], 1),
3560                    &writer_id,
3561                    now(),
3562                    LEASE_DURATION_MS,
3563                    &IdempotencyToken::new(),
3564                    &debug_state(),
3565                    0,
3566                    100,
3567                    None
3568                )
3569                .is_continue()
3570        );
3571
3572        // Can take a snapshot with as_of < upper.
3573        assert_eq!(
3574            state.snapshot(&Antichain::from_elem(0)),
3575            Ok(vec![hollow(0, 5, &["key1"], 1)])
3576        );
3577
3578        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3579        assert_eq!(
3580            state.snapshot(&Antichain::from_elem(4)),
3581            Ok(vec![hollow(0, 5, &["key1"], 1)])
3582        );
3583
3584        // Cannot take a snapshot with as_of >= upper.
3585        assert_eq!(
3586            state.snapshot(&Antichain::from_elem(5)),
3587            Err(SnapshotErr::AsOfNotYetAvailable(
3588                SeqNo(0),
3589                Upper(Antichain::from_elem(5))
3590            ))
3591        );
3592        assert_eq!(
3593            state.snapshot(&Antichain::from_elem(6)),
3594            Err(SnapshotErr::AsOfNotYetAvailable(
3595                SeqNo(0),
3596                Upper(Antichain::from_elem(5))
3597            ))
3598        );
3599
3600        let reader = LeasedReaderId::new();
3601        // Advance the since to 2.
3602        let _ = state.collections.register_leased_reader(
3603            "",
3604            &reader,
3605            "",
3606            SeqNo::minimum(),
3607            Duration::from_secs(10),
3608            now(),
3609            false,
3610        );
3611        assert_eq!(
3612            state.collections.downgrade_since(
3613                &reader,
3614                SeqNo::minimum(),
3615                SeqNo::minimum(),
3616                &Antichain::from_elem(2),
3617                now()
3618            ),
3619            Continue(Since(Antichain::from_elem(2)))
3620        );
3621        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3622        // Cannot take a snapshot with as_of < shard_since.
3623        assert_eq!(
3624            state.snapshot(&Antichain::from_elem(1)),
3625            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3626                Antichain::from_elem(2)
3627            )))
3628        );
3629
3630        // Advance the upper to 10 via an empty batch.
3631        assert!(
3632            state
3633                .collections
3634                .compare_and_append(
3635                    &hollow(5, 10, &[], 0),
3636                    &writer_id,
3637                    now(),
3638                    LEASE_DURATION_MS,
3639                    &IdempotencyToken::new(),
3640                    &debug_state(),
3641                    0,
3642                    100,
3643                    None
3644                )
3645                .is_continue()
3646        );
3647
3648        // Can still take snapshots at times < upper.
3649        assert_eq!(
3650            state.snapshot(&Antichain::from_elem(7)),
3651            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3652        );
3653
3654        // Cannot take snapshots with as_of >= upper.
3655        assert_eq!(
3656            state.snapshot(&Antichain::from_elem(10)),
3657            Err(SnapshotErr::AsOfNotYetAvailable(
3658                SeqNo(0),
3659                Upper(Antichain::from_elem(10))
3660            ))
3661        );
3662
3663        // Advance upper to 15.
3664        assert!(
3665            state
3666                .collections
3667                .compare_and_append(
3668                    &hollow(10, 15, &["key2"], 1),
3669                    &writer_id,
3670                    now(),
3671                    LEASE_DURATION_MS,
3672                    &IdempotencyToken::new(),
3673                    &debug_state(),
3674                    0,
3675                    100,
3676                    None
3677                )
3678                .is_continue()
3679        );
3680
3681        // Filter out batches whose lowers are less than the requested as of (the
3682        // batches that are too far in the future for the requested as_of).
3683        assert_eq!(
3684            state.snapshot(&Antichain::from_elem(9)),
3685            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3686        );
3687
3688        // Don't filter out batches whose lowers are <= the requested as_of.
3689        assert_eq!(
3690            state.snapshot(&Antichain::from_elem(10)),
3691            Ok(vec![
3692                hollow(0, 5, &["key1"], 1),
3693                hollow(5, 10, &[], 0),
3694                hollow(10, 15, &["key2"], 1)
3695            ])
3696        );
3697
3698        assert_eq!(
3699            state.snapshot(&Antichain::from_elem(11)),
3700            Ok(vec![
3701                hollow(0, 5, &["key1"], 1),
3702                hollow(5, 10, &[], 0),
3703                hollow(10, 15, &["key2"], 1)
3704            ])
3705        );
3706    }
3707
3708    #[mz_ore::test]
3709    fn next_listen_batch() {
3710        let mut state = TypedState::<String, String, u64, i64>::new(
3711            DUMMY_BUILD_INFO.semver_version(),
3712            ShardId::new(),
3713            "".to_owned(),
3714            0,
3715        );
3716
3717        // Empty collection never has any batches to listen for, regardless of the
3718        // current frontier.
3719        assert_eq!(
3720            state.next_listen_batch(&Antichain::from_elem(0)),
3721            Err(SeqNo(0))
3722        );
3723        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3724
3725        let writer_id = WriterId::new();
3726        let now = SYSTEM_TIME.clone();
3727
3728        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3729        assert!(
3730            state
3731                .collections
3732                .compare_and_append(
3733                    &hollow(0, 5, &["key1"], 1),
3734                    &writer_id,
3735                    now(),
3736                    LEASE_DURATION_MS,
3737                    &IdempotencyToken::new(),
3738                    &debug_state(),
3739                    0,
3740                    100,
3741                    None
3742                )
3743                .is_continue()
3744        );
3745        assert!(
3746            state
3747                .collections
3748                .compare_and_append(
3749                    &hollow(5, 10, &["key2"], 1),
3750                    &writer_id,
3751                    now(),
3752                    LEASE_DURATION_MS,
3753                    &IdempotencyToken::new(),
3754                    &debug_state(),
3755                    0,
3756                    100,
3757                    None
3758                )
3759                .is_continue()
3760        );
3761
3762        // All frontiers in [0, 5) return the first batch.
3763        for t in 0..=4 {
3764            assert_eq!(
3765                state.next_listen_batch(&Antichain::from_elem(t)),
3766                Ok(hollow(0, 5, &["key1"], 1))
3767            );
3768        }
3769
3770        // All frontiers in [5, 10) return the second batch.
3771        for t in 5..=9 {
3772            assert_eq!(
3773                state.next_listen_batch(&Antichain::from_elem(t)),
3774                Ok(hollow(5, 10, &["key2"], 1))
3775            );
3776        }
3777
3778        // There is no batch currently available for t = 10.
3779        assert_eq!(
3780            state.next_listen_batch(&Antichain::from_elem(10)),
3781            Err(SeqNo(0))
3782        );
3783
3784        // By definition, there is no frontier ever at the empty antichain which
3785        // is the time after all possible times.
3786        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3787    }
3788
3789    #[mz_ore::test]
3790    fn expire_writer() {
3791        let mut state = TypedState::<String, String, u64, i64>::new(
3792            DUMMY_BUILD_INFO.semver_version(),
3793            ShardId::new(),
3794            "".to_owned(),
3795            0,
3796        );
3797        let now = SYSTEM_TIME.clone();
3798
3799        let writer_id_one = WriterId::new();
3800
3801        let writer_id_two = WriterId::new();
3802
3803        // Writer is eligible to write
3804        assert!(
3805            state
3806                .collections
3807                .compare_and_append(
3808                    &hollow(0, 2, &["key1"], 1),
3809                    &writer_id_one,
3810                    now(),
3811                    LEASE_DURATION_MS,
3812                    &IdempotencyToken::new(),
3813                    &debug_state(),
3814                    0,
3815                    100,
3816                    None
3817                )
3818                .is_continue()
3819        );
3820
3821        assert!(
3822            state
3823                .collections
3824                .expire_writer(&writer_id_one)
3825                .is_continue()
3826        );
3827
3828        // Other writers should still be able to write
3829        assert!(
3830            state
3831                .collections
3832                .compare_and_append(
3833                    &hollow(2, 5, &["key2"], 1),
3834                    &writer_id_two,
3835                    now(),
3836                    LEASE_DURATION_MS,
3837                    &IdempotencyToken::new(),
3838                    &debug_state(),
3839                    0,
3840                    100,
3841                    None
3842                )
3843                .is_continue()
3844        );
3845    }
3846
3847    #[mz_ore::test]
3848    fn maybe_gc_active_gc() {
3849        const GC_CONFIG: GcConfig = GcConfig {
3850            use_active_gc: true,
3851            fallback_threshold_ms: 5000,
3852            min_versions: 99,
3853            max_versions: 500,
3854        };
3855        let now_fn = SYSTEM_TIME.clone();
3856
3857        let mut state = TypedState::<String, String, u64, i64>::new(
3858            DUMMY_BUILD_INFO.semver_version(),
3859            ShardId::new(),
3860            "".to_owned(),
3861            0,
3862        );
3863
3864        let now = now_fn();
3865        // Empty state doesn't need gc, regardless of is_write.
3866        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3867        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3868
3869        // Artificially advance the seqno so the seqno_since advances past our
3870        // internal gc_threshold.
3871        state.seqno = SeqNo(100);
3872        assert_eq!(state.seqno_since(), SeqNo(100));
3873
3874        // When a writer is present, non-writes don't gc.
3875        let writer_id = WriterId::new();
3876        let _ = state.collections.compare_and_append(
3877            &hollow(1, 2, &["key1"], 1),
3878            &writer_id,
3879            now,
3880            LEASE_DURATION_MS,
3881            &IdempotencyToken::new(),
3882            &debug_state(),
3883            0,
3884            100,
3885            None,
3886        );
3887        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3888
3889        // A write will gc though.
3890        assert_eq!(
3891            state.maybe_gc(true, now, GC_CONFIG),
3892            Some(GcReq {
3893                shard_id: state.shard_id,
3894                new_seqno_since: SeqNo(100)
3895            })
3896        );
3897
3898        // But if we write down an active gc, we won't gc.
3899        state.collections.active_gc = Some(ActiveGc {
3900            seqno: state.seqno,
3901            start_ms: now,
3902        });
3903
3904        state.seqno = SeqNo(200);
3905        assert_eq!(state.seqno_since(), SeqNo(200));
3906
3907        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3908
3909        state.seqno = SeqNo(300);
3910        assert_eq!(state.seqno_since(), SeqNo(300));
3911        // But if we advance the time past the threshold, we will gc.
3912        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3913        assert_eq!(
3914            state.maybe_gc(true, new_now, GC_CONFIG),
3915            Some(GcReq {
3916                shard_id: state.shard_id,
3917                new_seqno_since: SeqNo(300)
3918            })
3919        );
3920
3921        // Even if the sequence number doesn't pass the threshold, if the
3922        // active gc is expired, we will gc.
3923
3924        state.seqno = SeqNo(301);
3925        assert_eq!(state.seqno_since(), SeqNo(301));
3926        assert_eq!(
3927            state.maybe_gc(true, new_now, GC_CONFIG),
3928            Some(GcReq {
3929                shard_id: state.shard_id,
3930                new_seqno_since: SeqNo(301)
3931            })
3932        );
3933
3934        state.collections.active_gc = None;
3935
3936        // Artificially advance the seqno (again) so the seqno_since advances
3937        // past our internal gc_threshold (again).
3938        state.seqno = SeqNo(400);
3939        assert_eq!(state.seqno_since(), SeqNo(400));
3940
3941        let now = now_fn();
3942
3943        // If there are no writers, even a non-write will gc.
3944        let _ = state.collections.expire_writer(&writer_id);
3945        assert_eq!(
3946            state.maybe_gc(false, now, GC_CONFIG),
3947            Some(GcReq {
3948                shard_id: state.shard_id,
3949                new_seqno_since: SeqNo(400)
3950            })
3951        );
3952
3953        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3954        let previous_seqno = state.seqno;
3955        state.seqno = SeqNo(10_000);
3956        assert_eq!(state.seqno_since(), SeqNo(10_000));
3957
3958        let now = now_fn();
3959        assert_eq!(
3960            state.maybe_gc(true, now, GC_CONFIG),
3961            Some(GcReq {
3962                shard_id: state.shard_id,
3963                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3964            })
3965        );
3966    }
3967
3968    #[mz_ore::test]
3969    fn maybe_gc_classic() {
3970        const GC_CONFIG: GcConfig = GcConfig {
3971            use_active_gc: false,
3972            fallback_threshold_ms: 5000,
3973            min_versions: 16,
3974            max_versions: 128,
3975        };
3976        const NOW_MS: u64 = 0;
3977
3978        let mut state = TypedState::<String, String, u64, i64>::new(
3979            DUMMY_BUILD_INFO.semver_version(),
3980            ShardId::new(),
3981            "".to_owned(),
3982            0,
3983        );
3984
3985        // Empty state doesn't need gc, regardless of is_write.
3986        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3987        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3988
3989        // Artificially advance the seqno so the seqno_since advances past our
3990        // internal gc_threshold.
3991        state.seqno = SeqNo(100);
3992        assert_eq!(state.seqno_since(), SeqNo(100));
3993
3994        // When a writer is present, non-writes don't gc.
3995        let writer_id = WriterId::new();
3996        let now = SYSTEM_TIME.clone();
3997        let _ = state.collections.compare_and_append(
3998            &hollow(1, 2, &["key1"], 1),
3999            &writer_id,
4000            now(),
4001            LEASE_DURATION_MS,
4002            &IdempotencyToken::new(),
4003            &debug_state(),
4004            0,
4005            100,
4006            None,
4007        );
4008        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4009
4010        // A write will gc though.
4011        assert_eq!(
4012            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4013            Some(GcReq {
4014                shard_id: state.shard_id,
4015                new_seqno_since: SeqNo(100)
4016            })
4017        );
4018
4019        // Artificially advance the seqno (again) so the seqno_since advances
4020        // past our internal gc_threshold (again).
4021        state.seqno = SeqNo(200);
4022        assert_eq!(state.seqno_since(), SeqNo(200));
4023
4024        // If there are no writers, even a non-write will gc.
4025        let _ = state.collections.expire_writer(&writer_id);
4026        assert_eq!(
4027            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4028            Some(GcReq {
4029                shard_id: state.shard_id,
4030                new_seqno_since: SeqNo(200)
4031            })
4032        );
4033    }
4034
4035    #[mz_ore::test]
4036    fn need_rollup_active_rollup() {
4037        const ROLLUP_THRESHOLD: usize = 3;
4038        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4039        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4040        let now = SYSTEM_TIME.clone();
4041
4042        mz_ore::test::init_logging();
4043        let mut state = TypedState::<String, String, u64, i64>::new(
4044            DUMMY_BUILD_INFO.semver_version(),
4045            ShardId::new(),
4046            "".to_owned(),
4047            0,
4048        );
4049
4050        let rollup_seqno = SeqNo(5);
4051        let rollup = HollowRollup {
4052            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4053            encoded_size_bytes: None,
4054        };
4055
4056        assert!(
4057            state
4058                .collections
4059                .add_rollup((rollup_seqno, &rollup))
4060                .is_continue()
4061        );
4062
4063        // shouldn't need a rollup at the seqno of the rollup
4064        state.seqno = SeqNo(5);
4065        assert_none!(state.need_rollup(
4066            ROLLUP_THRESHOLD,
4067            ROLLUP_USE_ACTIVE_ROLLUP,
4068            ROLLUP_FALLBACK_THRESHOLD_MS,
4069            now()
4070        ));
4071
4072        // shouldn't need a rollup at seqnos less than our threshold
4073        state.seqno = SeqNo(6);
4074        assert_none!(state.need_rollup(
4075            ROLLUP_THRESHOLD,
4076            ROLLUP_USE_ACTIVE_ROLLUP,
4077            ROLLUP_FALLBACK_THRESHOLD_MS,
4078            now()
4079        ));
4080        state.seqno = SeqNo(7);
4081        assert_none!(state.need_rollup(
4082            ROLLUP_THRESHOLD,
4083            ROLLUP_USE_ACTIVE_ROLLUP,
4084            ROLLUP_FALLBACK_THRESHOLD_MS,
4085            now()
4086        ));
4087        state.seqno = SeqNo(8);
4088        assert_none!(state.need_rollup(
4089            ROLLUP_THRESHOLD,
4090            ROLLUP_USE_ACTIVE_ROLLUP,
4091            ROLLUP_FALLBACK_THRESHOLD_MS,
4092            now()
4093        ));
4094
4095        let mut current_time = now();
4096        // hit our threshold! we should need a rollup
4097        state.seqno = SeqNo(9);
4098        assert_eq!(
4099            state
4100                .need_rollup(
4101                    ROLLUP_THRESHOLD,
4102                    ROLLUP_USE_ACTIVE_ROLLUP,
4103                    ROLLUP_FALLBACK_THRESHOLD_MS,
4104                    current_time
4105                )
4106                .expect("rollup"),
4107            SeqNo(9)
4108        );
4109
4110        state.collections.active_rollup = Some(ActiveRollup {
4111            seqno: SeqNo(9),
4112            start_ms: current_time,
4113        });
4114
4115        // There is now an active rollup, so we shouldn't need a rollup.
4116        assert_none!(state.need_rollup(
4117            ROLLUP_THRESHOLD,
4118            ROLLUP_USE_ACTIVE_ROLLUP,
4119            ROLLUP_FALLBACK_THRESHOLD_MS,
4120            current_time
4121        ));
4122
4123        state.seqno = SeqNo(10);
4124        // We still don't need a rollup, even though the seqno is greater than
4125        // the rollup threshold.
4126        assert_none!(state.need_rollup(
4127            ROLLUP_THRESHOLD,
4128            ROLLUP_USE_ACTIVE_ROLLUP,
4129            ROLLUP_FALLBACK_THRESHOLD_MS,
4130            current_time
4131        ));
4132
4133        // But if we wait long enough, we should need a rollup again.
4134        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4135        assert_eq!(
4136            state
4137                .need_rollup(
4138                    ROLLUP_THRESHOLD,
4139                    ROLLUP_USE_ACTIVE_ROLLUP,
4140                    ROLLUP_FALLBACK_THRESHOLD_MS,
4141                    current_time
4142                )
4143                .expect("rollup"),
4144            SeqNo(10)
4145        );
4146
4147        state.seqno = SeqNo(9);
4148        // Clear the active rollup and ensure we need a rollup again.
4149        state.collections.active_rollup = None;
4150        let rollup_seqno = SeqNo(9);
4151        let rollup = HollowRollup {
4152            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4153            encoded_size_bytes: None,
4154        };
4155        assert!(
4156            state
4157                .collections
4158                .add_rollup((rollup_seqno, &rollup))
4159                .is_continue()
4160        );
4161
4162        state.seqno = SeqNo(11);
4163        // We shouldn't need a rollup at seqnos less than our threshold
4164        assert_none!(state.need_rollup(
4165            ROLLUP_THRESHOLD,
4166            ROLLUP_USE_ACTIVE_ROLLUP,
4167            ROLLUP_FALLBACK_THRESHOLD_MS,
4168            current_time
4169        ));
4170        // hit our threshold! we should need a rollup
4171        state.seqno = SeqNo(13);
4172        assert_eq!(
4173            state
4174                .need_rollup(
4175                    ROLLUP_THRESHOLD,
4176                    ROLLUP_USE_ACTIVE_ROLLUP,
4177                    ROLLUP_FALLBACK_THRESHOLD_MS,
4178                    current_time
4179                )
4180                .expect("rollup"),
4181            SeqNo(13)
4182        );
4183    }
4184
4185    #[mz_ore::test]
4186    fn need_rollup_classic() {
4187        const ROLLUP_THRESHOLD: usize = 3;
4188        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4189        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4190        const NOW: u64 = 0;
4191
4192        mz_ore::test::init_logging();
4193        let mut state = TypedState::<String, String, u64, i64>::new(
4194            DUMMY_BUILD_INFO.semver_version(),
4195            ShardId::new(),
4196            "".to_owned(),
4197            0,
4198        );
4199
4200        let rollup_seqno = SeqNo(5);
4201        let rollup = HollowRollup {
4202            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4203            encoded_size_bytes: None,
4204        };
4205
4206        assert!(
4207            state
4208                .collections
4209                .add_rollup((rollup_seqno, &rollup))
4210                .is_continue()
4211        );
4212
4213        // shouldn't need a rollup at the seqno of the rollup
4214        state.seqno = SeqNo(5);
4215        assert_none!(state.need_rollup(
4216            ROLLUP_THRESHOLD,
4217            ROLLUP_USE_ACTIVE_ROLLUP,
4218            ROLLUP_FALLBACK_THRESHOLD_MS,
4219            NOW
4220        ));
4221
4222        // shouldn't need a rollup at seqnos less than our threshold
4223        state.seqno = SeqNo(6);
4224        assert_none!(state.need_rollup(
4225            ROLLUP_THRESHOLD,
4226            ROLLUP_USE_ACTIVE_ROLLUP,
4227            ROLLUP_FALLBACK_THRESHOLD_MS,
4228            NOW
4229        ));
4230        state.seqno = SeqNo(7);
4231        assert_none!(state.need_rollup(
4232            ROLLUP_THRESHOLD,
4233            ROLLUP_USE_ACTIVE_ROLLUP,
4234            ROLLUP_FALLBACK_THRESHOLD_MS,
4235            NOW
4236        ));
4237
4238        // hit our threshold! we should need a rollup
4239        state.seqno = SeqNo(8);
4240        assert_eq!(
4241            state
4242                .need_rollup(
4243                    ROLLUP_THRESHOLD,
4244                    ROLLUP_USE_ACTIVE_ROLLUP,
4245                    ROLLUP_FALLBACK_THRESHOLD_MS,
4246                    NOW
4247                )
4248                .expect("rollup"),
4249            SeqNo(8)
4250        );
4251
4252        // but we don't need rollups for every seqno > the threshold
4253        state.seqno = SeqNo(9);
4254        assert_none!(state.need_rollup(
4255            ROLLUP_THRESHOLD,
4256            ROLLUP_USE_ACTIVE_ROLLUP,
4257            ROLLUP_FALLBACK_THRESHOLD_MS,
4258            NOW
4259        ));
4260
4261        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4262        state.seqno = SeqNo(11);
4263        assert_eq!(
4264            state
4265                .need_rollup(
4266                    ROLLUP_THRESHOLD,
4267                    ROLLUP_USE_ACTIVE_ROLLUP,
4268                    ROLLUP_FALLBACK_THRESHOLD_MS,
4269                    NOW
4270                )
4271                .expect("rollup"),
4272            SeqNo(11)
4273        );
4274
4275        // add another rollup and ensure we're always picking the latest
4276        let rollup_seqno = SeqNo(6);
4277        let rollup = HollowRollup {
4278            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4279            encoded_size_bytes: None,
4280        };
4281        assert!(
4282            state
4283                .collections
4284                .add_rollup((rollup_seqno, &rollup))
4285                .is_continue()
4286        );
4287
4288        state.seqno = SeqNo(8);
4289        assert_none!(state.need_rollup(
4290            ROLLUP_THRESHOLD,
4291            ROLLUP_USE_ACTIVE_ROLLUP,
4292            ROLLUP_FALLBACK_THRESHOLD_MS,
4293            NOW
4294        ));
4295        state.seqno = SeqNo(9);
4296        assert_eq!(
4297            state
4298                .need_rollup(
4299                    ROLLUP_THRESHOLD,
4300                    ROLLUP_USE_ACTIVE_ROLLUP,
4301                    ROLLUP_FALLBACK_THRESHOLD_MS,
4302                    NOW
4303                )
4304                .expect("rollup"),
4305            SeqNo(9)
4306        );
4307
4308        // and ensure that after a fallback point, we assign every seqno work
4309        let fallback_seqno = SeqNo(
4310            rollup_seqno.0
4311                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4312        );
4313        state.seqno = fallback_seqno;
4314        assert_eq!(
4315            state
4316                .need_rollup(
4317                    ROLLUP_THRESHOLD,
4318                    ROLLUP_USE_ACTIVE_ROLLUP,
4319                    ROLLUP_FALLBACK_THRESHOLD_MS,
4320                    NOW
4321                )
4322                .expect("rollup"),
4323            fallback_seqno
4324        );
4325        state.seqno = fallback_seqno.next();
4326        assert_eq!(
4327            state
4328                .need_rollup(
4329                    ROLLUP_THRESHOLD,
4330                    ROLLUP_USE_ACTIVE_ROLLUP,
4331                    ROLLUP_FALLBACK_THRESHOLD_MS,
4332                    NOW
4333                )
4334                .expect("rollup"),
4335            fallback_seqno.next()
4336        );
4337    }
4338
4339    #[mz_ore::test]
4340    fn idempotency_token_sentinel() {
4341        assert_eq!(
4342            IdempotencyToken::SENTINEL.to_string(),
4343            "i11111111-1111-1111-1111-111111111111"
4344        );
4345    }
4346
4347    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4348    /// randomness, so that it's deterministic. This lets us assert the
4349    /// serialization of that State against a golden file that's committed,
4350    /// making it easy to see what the serialization (used in an upcoming
4351    /// INSPECT feature) looks like.
4352    ///
4353    /// This golden will have to be updated each time we change State, but
4354    /// that's a feature, not a bug.
4355    #[mz_ore::test]
4356    #[cfg_attr(miri, ignore)] // too slow
4357    fn state_inspect_serde_json() {
4358        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4359        let mut runner = proptest::test_runner::TestRunner::deterministic();
4360        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4361        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4362        assert_eq!(
4363            json.trim(),
4364            STATE_SERDE_JSON.trim(),
4365            "\n\nNEW GOLDEN\n{}\n",
4366            json
4367        );
4368    }
4369
4370    #[mz_persist_proc::test(tokio::test)]
4371    #[cfg_attr(miri, ignore)] // too slow
4372    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4373        let mut clients = new_test_client_cache(&dyncfgs);
4374        let shard_id = ShardId::new();
4375
4376        async fn open_and_write(
4377            clients: &mut PersistClientCache,
4378            version: semver::Version,
4379            shard_id: ShardId,
4380        ) -> Result<(), tokio::task::JoinError> {
4381            clients.cfg.build_version = version.clone();
4382            clients.clear_state_cache();
4383            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4384            // Run in a task so we can catch the panic.
4385            mz_ore::task::spawn(|| version.to_string(), async move {
4386                let () = client
4387                    .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4388                    .await
4389                    .expect("valid usage");
4390                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4391                let current = *write.upper().as_option().unwrap();
4392                // Do a write so that we tag the state with the version.
4393                write
4394                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4395                    .await;
4396            })
4397            .into_tokio_handle()
4398            .await
4399        }
4400
4401        // Start at v0.10.0.
4402        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4403        assert_ok!(res);
4404
4405        // Upgrade to v0.11.0 is allowed.
4406        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4407        assert_ok!(res);
4408
4409        // Downgrade to v0.10.0 is no longer allowed.
4410        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4411        assert!(res.unwrap_err().is_panic());
4412
4413        // Downgrade to v0.9.0 is _NOT_ allowed.
4414        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4415        assert!(res.unwrap_err().is_panic());
4416    }
4417
4418    #[mz_ore::test]
4419    fn runid_roundtrip() {
4420        proptest!(|(runid: RunId)| {
4421            let runid_str = runid.to_string();
4422            let parsed = RunId::from_str(&runid_str);
4423            prop_assert_eq!(parsed, Ok(runid));
4424        });
4425    }
4426}