mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75    env!("OUT_DIR"),
76    "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80    env!("OUT_DIR"),
81    "/mz_persist_client.internal.diff.rs"
82));
83
84/// Determines how often to write rollups, assigning a maintenance task after
85/// `rollup_threshold` seqnos have passed since the last rollup.
86///
87/// Tuning note: in the absence of a long reader seqno hold, and with
88/// incremental GC, this threshold will determine about how many live diffs are
89/// held in Consensus. Lowering this value decreases the live diff count at the
90/// cost of more maintenance work + blob writes.
91pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92    "persist_rollup_threshold",
93    128,
94    "The number of seqnos between rollups.",
95);
96
97/// Determines how long to wait before an active rollup is considered
98/// "stuck" and a new rollup is started.
99pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100    "persist_rollup_fallback_threshold_ms",
101    5000,
102    "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105/// Feature flag the new active rollup tracking mechanism.
106/// We musn't enable this until we are fully deployed on the new version.
107pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108    "persist_rollup_use_active_rollup",
109    false,
110    "Whether to use the new active rollup tracking mechanism.",
111);
112
113/// Determines how long to wait before an active GC is considered
114/// "stuck" and a new GC is started.
115pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116    "persist_gc_fallback_threshold_ms",
117    900000,
118    "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121/// See the config description string.
122pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123    "persist_gc_min_versions",
124    32,
125    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128/// See the config description string.
129pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130    "persist_gc_max_versions",
131    128_000,
132    "The maximum number of versions to GC in a single GC run.",
133);
134
135/// Feature flag the new active GC tracking mechanism.
136/// We musn't enable this until we are fully deployed on the new version.
137pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138    "persist_gc_use_active_gc",
139    false,
140    "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144    "persist_enable_incremental_compaction",
145    false,
146    "Whether to enable incremental compaction.",
147);
148
149/// A token to disambiguate state commands that could not otherwise be
150/// idempotent.
151#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(f, "i{}", Uuid::from_bytes(self.0))
158    }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164    }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168    type Err = String;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172    }
173}
174
175impl From<IdempotencyToken> for String {
176    fn from(x: IdempotencyToken) -> Self {
177        x.to_string()
178    }
179}
180
181impl IdempotencyToken {
182    pub(crate) fn new() -> Self {
183        IdempotencyToken(*Uuid::new_v4().as_bytes())
184    }
185    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190    /// The seqno capability of this reader.
191    pub seqno: SeqNo,
192    /// The since capability of this reader.
193    pub since: Antichain<T>,
194    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
195    pub last_heartbeat_timestamp_ms: u64,
196    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
197    /// after which this reader may be expired
198    pub lease_duration_ms: u64,
199    /// For debugging.
200    pub debug: HandleDebugState,
201}
202
203#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
204#[serde(into = "u64")]
205pub struct OpaqueState(pub [u8; 8]);
206
207impl From<OpaqueState> for u64 {
208    fn from(value: OpaqueState) -> Self {
209        u64::from_le_bytes(value.0)
210    }
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct CriticalReaderState<T> {
215    /// The since capability of this reader.
216    pub since: Antichain<T>,
217    /// An opaque token matched on by compare_and_downgrade_since.
218    pub opaque: OpaqueState,
219    /// The [Codec64] used to encode [Self::opaque].
220    pub opaque_codec: String,
221    /// For debugging.
222    pub debug: HandleDebugState,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize)]
226pub struct WriterState<T> {
227    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
228    pub last_heartbeat_timestamp_ms: u64,
229    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
230    /// after which this writer may be expired
231    pub lease_duration_ms: u64,
232    /// The idempotency token of the most recent successful compare_and_append
233    /// by this writer.
234    pub most_recent_write_token: IdempotencyToken,
235    /// The upper of the most recent successful compare_and_append by this
236    /// writer.
237    pub most_recent_write_upper: Antichain<T>,
238    /// For debugging.
239    pub debug: HandleDebugState,
240}
241
242/// Debugging info for a reader or writer.
243#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
244pub struct HandleDebugState {
245    /// Hostname of the persist user that registered this writer or reader. For
246    /// critical readers, this is the _most recent_ registration.
247    pub hostname: String,
248    /// Plaintext description of this writer or reader's intent.
249    pub purpose: String,
250}
251
252/// Part of the updates in a Batch.
253///
254/// Either a pointer to ones stored in Blob or the updates themselves inlined.
255#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
256#[serde(tag = "type")]
257pub enum BatchPart<T> {
258    Hollow(HollowBatchPart<T>),
259    Inline {
260        updates: LazyInlineBatchPart,
261        ts_rewrite: Option<Antichain<T>>,
262        schema_id: Option<SchemaId>,
263
264        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
265        deprecated_schema_id: Option<SchemaId>,
266    },
267}
268
269fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
270    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
271        let proto = lower.decode()?;
272        let data = ArrayData::from_proto(proto)?;
273        ensure!(data.len() == 1);
274        Ok(ArrayBound::new(make_array(data), 0))
275    };
276
277    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
278
279    match decoded {
280        Ok(bound) => Some(bound),
281        Err(e) => {
282            soft_panic_or_log!("failed to decode bound: {e:#?}");
283            None
284        }
285    }
286}
287
288impl<T> BatchPart<T> {
289    pub fn hollow_bytes(&self) -> usize {
290        match self {
291            BatchPart::Hollow(x) => x.encoded_size_bytes,
292            BatchPart::Inline { .. } => 0,
293        }
294    }
295
296    pub fn is_inline(&self) -> bool {
297        matches!(self, BatchPart::Inline { .. })
298    }
299
300    pub fn inline_bytes(&self) -> usize {
301        match self {
302            BatchPart::Hollow(_) => 0,
303            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
304        }
305    }
306
307    pub fn writer_key(&self) -> Option<WriterKey> {
308        match self {
309            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
310            BatchPart::Inline { .. } => None,
311        }
312    }
313
314    pub fn encoded_size_bytes(&self) -> usize {
315        match self {
316            BatchPart::Hollow(x) => x.encoded_size_bytes,
317            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
318        }
319    }
320
321    // A user-interpretable identifier or description of the part (for logs and
322    // such).
323    pub fn printable_name(&self) -> &str {
324        match self {
325            BatchPart::Hollow(x) => x.key.0.as_str(),
326            BatchPart::Inline { .. } => "<inline>",
327        }
328    }
329
330    pub fn stats(&self) -> Option<&LazyPartStats> {
331        match self {
332            BatchPart::Hollow(x) => x.stats.as_ref(),
333            BatchPart::Inline { .. } => None,
334        }
335    }
336
337    pub fn key_lower(&self) -> &[u8] {
338        match self {
339            BatchPart::Hollow(x) => x.key_lower.as_slice(),
340            // We don't duplicate the lowest key because this can be
341            // considerable overhead for small parts.
342            //
343            // The empty key might not be a tight lower bound, but it is a valid
344            // lower bound. If a caller is interested in a tighter lower bound,
345            // the data is inline.
346            BatchPart::Inline { .. } => &[],
347        }
348    }
349
350    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
351        let part = match self {
352            BatchPart::Hollow(part) => part,
353            BatchPart::Inline { .. } => return None,
354        };
355
356        decode_structured_lower(part.structured_key_lower.as_ref()?)
357    }
358
359    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
360        match self {
361            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
362            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
363        }
364    }
365
366    pub fn schema_id(&self) -> Option<SchemaId> {
367        match self {
368            BatchPart::Hollow(x) => x.schema_id,
369            BatchPart::Inline { schema_id, .. } => *schema_id,
370        }
371    }
372
373    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
374        match self {
375            BatchPart::Hollow(x) => x.deprecated_schema_id,
376            BatchPart::Inline {
377                deprecated_schema_id,
378                ..
379            } => *deprecated_schema_id,
380        }
381    }
382}
383
384impl<T: Timestamp + Codec64> BatchPart<T> {
385    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
386        match self {
387            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
388            BatchPart::Inline { updates, .. } => {
389                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
390                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
391            }
392        }
393    }
394
395    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
396        match self {
397            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
398            BatchPart::Inline { updates, .. } => Some(
399                updates
400                    .decode::<T>(metrics)
401                    .expect("valid inline part")
402                    .updates
403                    .diffs_sum(),
404            ),
405        }
406    }
407}
408
409/// An ordered list of parts, generally stored as part of a larger run.
410#[derive(Debug, Clone)]
411pub struct HollowRun<T> {
412    /// Pointers usable to retrieve the updates.
413    pub(crate) parts: Vec<RunPart<T>>,
414}
415
416/// A reference to a [HollowRun], including the key in the blob store and some denormalized
417/// metadata.
418#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
419pub struct HollowRunRef<T> {
420    pub key: PartialBatchKey,
421
422    /// The size of the referenced run object, plus all of the hollow objects it contains.
423    pub hollow_bytes: usize,
424
425    /// The size of the largest individual part in the run; useful for sizing compaction.
426    pub max_part_bytes: usize,
427
428    /// The lower bound of the data in this part, ordered by the codec ordering.
429    pub key_lower: Vec<u8>,
430
431    /// The lower bound of the data in this part, ordered by the structured ordering.
432    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
433
434    pub diffs_sum: Option<[u8; 8]>,
435
436    pub(crate) _phantom_data: PhantomData<T>,
437}
438impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
439    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
440        Some(self.cmp(other))
441    }
442}
443
444impl<T: Eq> Ord for HollowRunRef<T> {
445    fn cmp(&self, other: &Self) -> Ordering {
446        self.key.cmp(&other.key)
447    }
448}
449
450impl<T> HollowRunRef<T> {
451    pub fn writer_key(&self) -> Option<WriterKey> {
452        Some(self.key.split()?.0)
453    }
454}
455
456impl<T: Timestamp + Codec64> HollowRunRef<T> {
457    /// Stores the given runs and returns a [HollowRunRef] that points to them.
458    pub async fn set<D: Codec64 + Monoid>(
459        shard_id: ShardId,
460        blob: &dyn Blob,
461        writer: &WriterKey,
462        data: HollowRun<T>,
463        metrics: &Metrics,
464    ) -> Self {
465        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
466        let max_part_bytes = data
467            .parts
468            .iter()
469            .map(|p| p.max_part_bytes())
470            .max()
471            .unwrap_or(0);
472        let key_lower = data
473            .parts
474            .first()
475            .map_or(vec![], |p| p.key_lower().to_vec());
476        let structured_key_lower = match data.parts.first() {
477            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
478            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
479            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
480        };
481        let diffs_sum = data
482            .parts
483            .iter()
484            .map(|p| {
485                p.diffs_sum::<D>(&metrics.columnar)
486                    .expect("valid diffs sum")
487            })
488            .reduce(|mut a, b| {
489                a.plus_equals(&b);
490                a
491            })
492            .expect("valid diffs sum")
493            .encode();
494
495        let key = PartialBatchKey::new(writer, &PartId::new());
496        let blob_key = key.complete(&shard_id);
497        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
498        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
499            blob.set(&blob_key, bytes.clone())
500        })
501        .await;
502        Self {
503            key,
504            hollow_bytes,
505            max_part_bytes,
506            key_lower,
507            structured_key_lower,
508            diffs_sum: Some(diffs_sum),
509            _phantom_data: Default::default(),
510        }
511    }
512
513    /// Retrieve the [HollowRun] that this reference points to.
514    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
515    /// with the same shard id and backing store.
516    pub async fn get(
517        &self,
518        shard_id: ShardId,
519        blob: &dyn Blob,
520        metrics: &Metrics,
521    ) -> Option<HollowRun<T>> {
522        let blob_key = self.key.complete(&shard_id);
523        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
524            blob.get(&blob_key)
525        })
526        .await?;
527        let proto_runs: ProtoHollowRun =
528            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
529        let runs = proto_runs
530            .into_rust()
531            .expect("illegal state: invalid encoded runs proto");
532        Some(runs)
533    }
534}
535
536/// Part of the updates in a run.
537///
538/// Either a pointer to ones stored in Blob or a single part stored inline.
539#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
540#[serde(untagged)]
541pub enum RunPart<T> {
542    Single(BatchPart<T>),
543    Many(HollowRunRef<T>),
544}
545
546impl<T: Ord> PartialOrd<Self> for RunPart<T> {
547    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
548        Some(self.cmp(other))
549    }
550}
551
552impl<T: Ord> Ord for RunPart<T> {
553    fn cmp(&self, other: &Self) -> Ordering {
554        match (self, other) {
555            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
556            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
557            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
558            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
559        }
560    }
561}
562
563impl<T> RunPart<T> {
564    #[cfg(test)]
565    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
566        match self {
567            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
568            _ => panic!("expected hollow part!"),
569        }
570    }
571
572    pub fn hollow_bytes(&self) -> usize {
573        match self {
574            Self::Single(p) => p.hollow_bytes(),
575            Self::Many(r) => r.hollow_bytes,
576        }
577    }
578
579    pub fn is_inline(&self) -> bool {
580        match self {
581            Self::Single(p) => p.is_inline(),
582            Self::Many(_) => false,
583        }
584    }
585
586    pub fn inline_bytes(&self) -> usize {
587        match self {
588            Self::Single(p) => p.inline_bytes(),
589            Self::Many(_) => 0,
590        }
591    }
592
593    pub fn max_part_bytes(&self) -> usize {
594        match self {
595            Self::Single(p) => p.encoded_size_bytes(),
596            Self::Many(r) => r.max_part_bytes,
597        }
598    }
599
600    pub fn writer_key(&self) -> Option<WriterKey> {
601        match self {
602            Self::Single(p) => p.writer_key(),
603            Self::Many(r) => r.writer_key(),
604        }
605    }
606
607    pub fn encoded_size_bytes(&self) -> usize {
608        match self {
609            Self::Single(p) => p.encoded_size_bytes(),
610            Self::Many(r) => r.hollow_bytes,
611        }
612    }
613
614    pub fn schema_id(&self) -> Option<SchemaId> {
615        match self {
616            Self::Single(p) => p.schema_id(),
617            Self::Many(_) => None,
618        }
619    }
620
621    // A user-interpretable identifier or description of the part (for logs and
622    // such).
623    pub fn printable_name(&self) -> &str {
624        match self {
625            Self::Single(p) => p.printable_name(),
626            Self::Many(r) => r.key.0.as_str(),
627        }
628    }
629
630    pub fn stats(&self) -> Option<&LazyPartStats> {
631        match self {
632            Self::Single(p) => p.stats(),
633            // TODO: if we kept stats we could avoid fetching the metadata here.
634            Self::Many(_) => None,
635        }
636    }
637
638    pub fn key_lower(&self) -> &[u8] {
639        match self {
640            Self::Single(p) => p.key_lower(),
641            Self::Many(r) => r.key_lower.as_slice(),
642        }
643    }
644
645    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
646        match self {
647            Self::Single(p) => p.structured_key_lower(),
648            Self::Many(_) => None,
649        }
650    }
651
652    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
653        match self {
654            Self::Single(p) => p.ts_rewrite(),
655            Self::Many(_) => None,
656        }
657    }
658}
659
660impl<T> RunPart<T>
661where
662    T: Timestamp + Codec64,
663{
664    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
665        match self {
666            Self::Single(p) => p.diffs_sum(metrics),
667            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
668        }
669    }
670}
671
672/// A blob was missing!
673#[derive(Clone, Debug)]
674pub struct MissingBlob(BlobKey);
675
676impl std::fmt::Display for MissingBlob {
677    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678        write!(f, "unexpectedly missing key: {}", self.0)
679    }
680}
681
682impl std::error::Error for MissingBlob {}
683
684impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
685    pub fn part_stream<'a>(
686        &'a self,
687        shard_id: ShardId,
688        blob: &'a dyn Blob,
689        metrics: &'a Metrics,
690    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
691        try_stream! {
692            match self {
693                RunPart::Single(p) => {
694                    yield Cow::Borrowed(p);
695                }
696                RunPart::Many(r) => {
697                    let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
698                    for run_part in fetched.parts {
699                        for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
700                            yield Cow::Owned(batch_part?.into_owned());
701                        }
702                    }
703                }
704            }
705        }
706    }
707}
708
709impl<T: Ord> PartialOrd for BatchPart<T> {
710    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
711        Some(self.cmp(other))
712    }
713}
714
715impl<T: Ord> Ord for BatchPart<T> {
716    fn cmp(&self, other: &Self) -> Ordering {
717        match (self, other) {
718            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
719            (
720                BatchPart::Inline {
721                    updates: s_updates,
722                    ts_rewrite: s_ts_rewrite,
723                    schema_id: s_schema_id,
724                    deprecated_schema_id: s_deprecated_schema_id,
725                },
726                BatchPart::Inline {
727                    updates: o_updates,
728                    ts_rewrite: o_ts_rewrite,
729                    schema_id: o_schema_id,
730                    deprecated_schema_id: o_deprecated_schema_id,
731                },
732            ) => (
733                s_updates,
734                s_ts_rewrite.as_ref().map(|x| x.elements()),
735                s_schema_id,
736                s_deprecated_schema_id,
737            )
738                .cmp(&(
739                    o_updates,
740                    o_ts_rewrite.as_ref().map(|x| x.elements()),
741                    o_schema_id,
742                    o_deprecated_schema_id,
743                )),
744            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
745            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
746        }
747    }
748}
749
750/// What order are the parts in this run in?
751#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
752pub(crate) enum RunOrder {
753    /// They're in no particular order.
754    Unordered,
755    /// They're ordered based on the codec-encoded K/V bytes.
756    Codec,
757    /// They're ordered by the natural ordering of the structured data.
758    Structured,
759}
760
761#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
762pub struct RunId(pub(crate) [u8; 16]);
763
764impl std::fmt::Display for RunId {
765    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
766        write!(f, "ri{}", Uuid::from_bytes(self.0))
767    }
768}
769
770impl std::fmt::Debug for RunId {
771    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
772        write!(f, "RunId({})", Uuid::from_bytes(self.0))
773    }
774}
775
776impl std::str::FromStr for RunId {
777    type Err = String;
778
779    fn from_str(s: &str) -> Result<Self, Self::Err> {
780        parse_id("ri", "RunId", s).map(RunId)
781    }
782}
783
784impl From<RunId> for String {
785    fn from(x: RunId) -> Self {
786        x.to_string()
787    }
788}
789
790impl RunId {
791    pub(crate) fn new() -> Self {
792        RunId(*Uuid::new_v4().as_bytes())
793    }
794}
795
796impl Arbitrary for RunId {
797    type Parameters = ();
798    type Strategy = proptest::strategy::BoxedStrategy<Self>;
799    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
800        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
801            RunId(*Uuid::from_u128(n).as_bytes())
802        })
803        .boxed()
804    }
805}
806
807/// Metadata shared across a run.
808#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
809pub struct RunMeta {
810    /// If none, Persist should infer the order based on the proto metadata.
811    pub(crate) order: Option<RunOrder>,
812    /// All parts in a run should have the same schema.
813    pub(crate) schema: Option<SchemaId>,
814
815    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
816    pub(crate) deprecated_schema: Option<SchemaId>,
817
818    /// If set, a UUID that uniquely identifies this run.
819    pub(crate) id: Option<RunId>,
820
821    /// The number of updates in this run, or `None` if the number is unknown.
822    pub(crate) len: Option<usize>,
823
824    /// Additional unstructured metadata.
825    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
826    pub(crate) meta: MetadataMap,
827}
828
829/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
830#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
831pub struct HollowBatchPart<T> {
832    /// Pointer usable to retrieve the updates.
833    pub key: PartialBatchKey,
834    /// Miscellaneous metadata.
835    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
836    pub meta: MetadataMap,
837    /// The encoded size of this part.
838    pub encoded_size_bytes: usize,
839    /// A lower bound on the keys in the part. (By default, this the minimum
840    /// possible key: `vec![]`.)
841    #[serde(serialize_with = "serialize_part_bytes")]
842    pub key_lower: Vec<u8>,
843    /// A lower bound on the keys in the part, stored as structured data.
844    #[serde(serialize_with = "serialize_lazy_proto")]
845    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
846    /// Aggregate statistics about data contained in this part.
847    #[serde(serialize_with = "serialize_part_stats")]
848    pub stats: Option<LazyPartStats>,
849    /// A frontier to which timestamps in this part are advanced on read, if
850    /// set.
851    ///
852    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
853    /// but we maintain the distinction between the two for some internal sanity
854    /// checking of invariants as well as metrics. If this ever becomes an
855    /// issue, everything still works with this as just `Antichain<T>`.
856    pub ts_rewrite: Option<Antichain<T>>,
857    /// A Codec64 encoded sum of all diffs in this part, if known.
858    ///
859    /// This is `None` if this part was written before we started storing this
860    /// information, or if it was written when the dyncfg was off.
861    ///
862    /// It could also make sense to model this as part of the pushdown stats, if
863    /// we later decide that's of some benefit.
864    #[serde(serialize_with = "serialize_diffs_sum")]
865    pub diffs_sum: Option<[u8; 8]>,
866    /// Columnar format that this batch was written in.
867    ///
868    /// This is `None` if this part was written before we started writing structured
869    /// columnar data.
870    pub format: Option<BatchColumnarFormat>,
871    /// The schemas used to encode the data in this batch part.
872    ///
873    /// Or None for historical data written before the schema registry was
874    /// added.
875    pub schema_id: Option<SchemaId>,
876
877    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
878    pub deprecated_schema_id: Option<SchemaId>,
879}
880
881/// A [Batch] but with the updates themselves stored externally.
882///
883/// [Batch]: differential_dataflow::trace::BatchReader
884#[derive(Clone, PartialEq, Eq)]
885pub struct HollowBatch<T> {
886    /// Describes the times of the updates in the batch.
887    pub desc: Description<T>,
888    /// The number of updates in the batch.
889    pub len: usize,
890    /// Pointers usable to retrieve the updates.
891    pub(crate) parts: Vec<RunPart<T>>,
892    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
893    /// ex.
894    /// ```text
895    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
896    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
897    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
898    /// ```
899    pub(crate) run_splits: Vec<usize>,
900    /// Run-level metadata: the first entry has metadata for the first run, and so on.
901    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
902    pub(crate) run_meta: Vec<RunMeta>,
903}
904
905impl<T: Debug> Debug for HollowBatch<T> {
906    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
907        let HollowBatch {
908            desc,
909            parts,
910            len,
911            run_splits: runs,
912            run_meta,
913        } = self;
914        f.debug_struct("HollowBatch")
915            .field(
916                "desc",
917                &(
918                    desc.lower().elements(),
919                    desc.upper().elements(),
920                    desc.since().elements(),
921                ),
922            )
923            .field("parts", &parts)
924            .field("len", &len)
925            .field("runs", &runs)
926            .field("run_meta", &run_meta)
927            .finish()
928    }
929}
930
931impl<T: Serialize> serde::Serialize for HollowBatch<T> {
932    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
933        let HollowBatch {
934            desc,
935            len,
936            // Both parts and runs are covered by the self.runs call.
937            parts: _,
938            run_splits: _,
939            run_meta: _,
940        } = self;
941        let mut s = s.serialize_struct("HollowBatch", 5)?;
942        let () = s.serialize_field("lower", &desc.lower().elements())?;
943        let () = s.serialize_field("upper", &desc.upper().elements())?;
944        let () = s.serialize_field("since", &desc.since().elements())?;
945        let () = s.serialize_field("len", len)?;
946        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
947        s.end()
948    }
949}
950
951impl<T: Ord> PartialOrd for HollowBatch<T> {
952    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
953        Some(self.cmp(other))
954    }
955}
956
957impl<T: Ord> Ord for HollowBatch<T> {
958    fn cmp(&self, other: &Self) -> Ordering {
959        // Deconstruct self and other so we get a compile failure if new fields
960        // are added.
961        let HollowBatch {
962            desc: self_desc,
963            parts: self_parts,
964            len: self_len,
965            run_splits: self_runs,
966            run_meta: self_run_meta,
967        } = self;
968        let HollowBatch {
969            desc: other_desc,
970            parts: other_parts,
971            len: other_len,
972            run_splits: other_runs,
973            run_meta: other_run_meta,
974        } = other;
975        (
976            self_desc.lower().elements(),
977            self_desc.upper().elements(),
978            self_desc.since().elements(),
979            self_parts,
980            self_len,
981            self_runs,
982            self_run_meta,
983        )
984            .cmp(&(
985                other_desc.lower().elements(),
986                other_desc.upper().elements(),
987                other_desc.since().elements(),
988                other_parts,
989                other_len,
990                other_runs,
991                other_run_meta,
992            ))
993    }
994}
995
996impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
997    pub(crate) fn part_stream<'a>(
998        &'a self,
999        shard_id: ShardId,
1000        blob: &'a dyn Blob,
1001        metrics: &'a Metrics,
1002    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
1003        stream! {
1004            for part in &self.parts {
1005                for await part in part.part_stream(shard_id, blob, metrics) {
1006                    yield part;
1007                }
1008            }
1009        }
1010    }
1011}
1012impl<T> HollowBatch<T> {
1013    /// Construct an in-memory hollow batch from the given metadata.
1014    ///
1015    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1016    /// is responsible for ensuring that the defined runs are valid.
1017    ///
1018    /// `len` should represent the number of valid updates in the referenced parts.
1019    pub(crate) fn new(
1020        desc: Description<T>,
1021        parts: Vec<RunPart<T>>,
1022        len: usize,
1023        run_meta: Vec<RunMeta>,
1024        run_splits: Vec<usize>,
1025    ) -> Self {
1026        debug_assert!(
1027            run_splits.is_strictly_sorted(),
1028            "run indices should be strictly increasing"
1029        );
1030        debug_assert!(
1031            run_splits.first().map_or(true, |i| *i > 0),
1032            "run indices should be positive"
1033        );
1034        debug_assert!(
1035            run_splits.last().map_or(true, |i| *i < parts.len()),
1036            "run indices should be valid indices into parts"
1037        );
1038        debug_assert!(
1039            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1040            "all metadata should correspond to a run"
1041        );
1042
1043        Self {
1044            desc,
1045            len,
1046            parts,
1047            run_splits,
1048            run_meta,
1049        }
1050    }
1051
1052    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1053    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1054        let run_meta = if parts.is_empty() {
1055            vec![]
1056        } else {
1057            vec![RunMeta::default()]
1058        };
1059        Self {
1060            desc,
1061            len,
1062            parts,
1063            run_splits: vec![],
1064            run_meta,
1065        }
1066    }
1067
1068    #[cfg(test)]
1069    pub(crate) fn new_run_for_test(
1070        desc: Description<T>,
1071        parts: Vec<RunPart<T>>,
1072        len: usize,
1073        run_id: RunId,
1074    ) -> Self {
1075        let run_meta = if parts.is_empty() {
1076            vec![]
1077        } else {
1078            let mut meta = RunMeta::default();
1079            meta.id = Some(run_id);
1080            vec![meta]
1081        };
1082        Self {
1083            desc,
1084            len,
1085            parts,
1086            run_splits: vec![],
1087            run_meta,
1088        }
1089    }
1090
1091    /// An empty hollow batch, representing no updates over the given desc.
1092    pub(crate) fn empty(desc: Description<T>) -> Self {
1093        Self {
1094            desc,
1095            len: 0,
1096            parts: vec![],
1097            run_splits: vec![],
1098            run_meta: vec![],
1099        }
1100    }
1101
1102    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1103        let run_ends = self
1104            .run_splits
1105            .iter()
1106            .copied()
1107            .chain(std::iter::once(self.parts.len()));
1108        let run_metas = self.run_meta.iter();
1109        let run_parts = run_ends
1110            .scan(0, |start, end| {
1111                let range = *start..end;
1112                *start = end;
1113                Some(range)
1114            })
1115            .filter(|range| !range.is_empty())
1116            .map(|range| &self.parts[range]);
1117        run_metas.zip_eq(run_parts)
1118    }
1119
1120    pub(crate) fn inline_bytes(&self) -> usize {
1121        self.parts.iter().map(|x| x.inline_bytes()).sum()
1122    }
1123
1124    pub(crate) fn is_empty(&self) -> bool {
1125        self.parts.is_empty()
1126    }
1127
1128    pub(crate) fn part_count(&self) -> usize {
1129        self.parts.len()
1130    }
1131
1132    /// The sum of the encoded sizes of all parts in the batch.
1133    pub fn encoded_size_bytes(&self) -> usize {
1134        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1135    }
1136}
1137
1138// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1139impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1140    pub(crate) fn rewrite_ts(
1141        &mut self,
1142        frontier: &Antichain<T>,
1143        new_upper: Antichain<T>,
1144    ) -> Result<(), String> {
1145        if !PartialOrder::less_than(frontier, &new_upper) {
1146            return Err(format!(
1147                "rewrite frontier {:?} !< rewrite upper {:?}",
1148                frontier.elements(),
1149                new_upper.elements(),
1150            ));
1151        }
1152        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1153            return Err(format!(
1154                "rewrite upper {:?} < batch upper {:?}",
1155                new_upper.elements(),
1156                self.desc.upper().elements(),
1157            ));
1158        }
1159
1160        // The following are things that it seems like we could support, but
1161        // initially we don't because we don't have a use case for them.
1162        if PartialOrder::less_than(frontier, self.desc.lower()) {
1163            return Err(format!(
1164                "rewrite frontier {:?} < batch lower {:?}",
1165                frontier.elements(),
1166                self.desc.lower().elements(),
1167            ));
1168        }
1169        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1170            return Err(format!(
1171                "batch since {:?} != minimum antichain {:?}",
1172                self.desc.since().elements(),
1173                &[T::minimum()],
1174            ));
1175        }
1176        for part in self.parts.iter() {
1177            let Some(ts_rewrite) = part.ts_rewrite() else {
1178                continue;
1179            };
1180            if PartialOrder::less_than(frontier, ts_rewrite) {
1181                return Err(format!(
1182                    "rewrite frontier {:?} < batch rewrite {:?}",
1183                    frontier.elements(),
1184                    ts_rewrite.elements(),
1185                ));
1186            }
1187        }
1188
1189        self.desc = Description::new(
1190            self.desc.lower().clone(),
1191            new_upper,
1192            self.desc.since().clone(),
1193        );
1194        for part in &mut self.parts {
1195            match part {
1196                RunPart::Single(BatchPart::Hollow(part)) => {
1197                    part.ts_rewrite = Some(frontier.clone())
1198                }
1199                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1200                    *ts_rewrite = Some(frontier.clone())
1201                }
1202                RunPart::Many(runs) => {
1203                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1204                    // ever generate runs of >1 part for those.
1205                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1206                }
1207            }
1208        }
1209        Ok(())
1210    }
1211}
1212
1213impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1214    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1215        Some(self.cmp(other))
1216    }
1217}
1218
1219impl<T: Ord> Ord for HollowBatchPart<T> {
1220    fn cmp(&self, other: &Self) -> Ordering {
1221        // Deconstruct self and other so we get a compile failure if new fields
1222        // are added.
1223        let HollowBatchPart {
1224            key: self_key,
1225            meta: self_meta,
1226            encoded_size_bytes: self_encoded_size_bytes,
1227            key_lower: self_key_lower,
1228            structured_key_lower: self_structured_key_lower,
1229            stats: self_stats,
1230            ts_rewrite: self_ts_rewrite,
1231            diffs_sum: self_diffs_sum,
1232            format: self_format,
1233            schema_id: self_schema_id,
1234            deprecated_schema_id: self_deprecated_schema_id,
1235        } = self;
1236        let HollowBatchPart {
1237            key: other_key,
1238            meta: other_meta,
1239            encoded_size_bytes: other_encoded_size_bytes,
1240            key_lower: other_key_lower,
1241            structured_key_lower: other_structured_key_lower,
1242            stats: other_stats,
1243            ts_rewrite: other_ts_rewrite,
1244            diffs_sum: other_diffs_sum,
1245            format: other_format,
1246            schema_id: other_schema_id,
1247            deprecated_schema_id: other_deprecated_schema_id,
1248        } = other;
1249        (
1250            self_key,
1251            self_meta,
1252            self_encoded_size_bytes,
1253            self_key_lower,
1254            self_structured_key_lower,
1255            self_stats,
1256            self_ts_rewrite.as_ref().map(|x| x.elements()),
1257            self_diffs_sum,
1258            self_format,
1259            self_schema_id,
1260            self_deprecated_schema_id,
1261        )
1262            .cmp(&(
1263                other_key,
1264                other_meta,
1265                other_encoded_size_bytes,
1266                other_key_lower,
1267                other_structured_key_lower,
1268                other_stats,
1269                other_ts_rewrite.as_ref().map(|x| x.elements()),
1270                other_diffs_sum,
1271                other_format,
1272                other_schema_id,
1273                other_deprecated_schema_id,
1274            ))
1275    }
1276}
1277
1278/// A pointer to a rollup stored externally.
1279#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1280pub struct HollowRollup {
1281    /// Pointer usable to retrieve the rollup.
1282    pub key: PartialRollupKey,
1283    /// The encoded size of this rollup, if known.
1284    pub encoded_size_bytes: Option<usize>,
1285}
1286
1287/// A pointer to a blob stored externally.
1288#[derive(Debug)]
1289pub enum HollowBlobRef<'a, T> {
1290    Batch(&'a HollowBatch<T>),
1291    Rollup(&'a HollowRollup),
1292}
1293
1294/// A rollup that is currently being computed.
1295#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1296pub struct ActiveRollup {
1297    pub seqno: SeqNo,
1298    pub start_ms: u64,
1299}
1300
1301/// A garbage collection request that is currently being computed.
1302#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1303pub struct ActiveGc {
1304    pub seqno: SeqNo,
1305    pub start_ms: u64,
1306}
1307
1308/// A sentinel for a state transition that was a no-op.
1309///
1310/// Critically, this also indicates that the no-op state transition was not
1311/// committed through compare_and_append and thus is _not linearized_.
1312#[derive(Debug)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct NoOpStateTransition<T>(pub T);
1315
1316// TODO: Document invariants.
1317#[derive(Debug, Clone)]
1318#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1319pub struct StateCollections<T> {
1320    /// The version of this state. This is typically identical to the version of the code
1321    /// that wrote it, but may diverge during 0dt upgrades and similar operations when a
1322    /// new version of code is intentionally interoperating with an older state format.
1323    pub(crate) version: Version,
1324
1325    // - Invariant: `<= all reader.since`
1326    // - Invariant: Doesn't regress across state versions.
1327    pub(crate) last_gc_req: SeqNo,
1328
1329    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1330    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1331
1332    /// The rollup that is currently being computed.
1333    pub(crate) active_rollup: Option<ActiveRollup>,
1334    /// The gc request that is currently being computed.
1335    pub(crate) active_gc: Option<ActiveGc>,
1336
1337    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1338    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1339    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1340    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1341
1342    // - Invariant: `trace.since == meet(all reader.since)`
1343    // - Invariant: `trace.since` doesn't regress across state versions.
1344    // - Invariant: `trace.upper` doesn't regress across state versions.
1345    // - Invariant: `trace` upholds its own invariants.
1346    pub(crate) trace: Trace<T>,
1347}
1348
1349/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1350///
1351/// This strategy of directly serializing the schema objects requires that
1352/// persist users do the right thing. Specifically, that an encoded schema
1353/// doesn't in some later version of mz decode to an in-mem object that acts
1354/// differently. In a sense, the current system (before the introduction of the
1355/// schema registry) where schemas are passed in unchecked to reader and writer
1356/// registration calls also has the same defect, so seems fine.
1357///
1358/// An alternative is to write down here some persist-specific representation of
1359/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1360/// the potential to lead down a similar failure mode to the mz_persist_types
1361/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1362/// can always migrate later by rehydrating these, seems fine to start with the
1363/// easy thing.
1364#[derive(Debug, Clone, Serialize, PartialEq)]
1365pub struct EncodedSchemas {
1366    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1367    pub key: Bytes,
1368    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1369    /// registered, encoded as a `ProtoDataType`.
1370    pub key_data_type: Bytes,
1371    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1372    pub val: Bytes,
1373    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1374    /// registered, encoded as a `ProtoDataType`.
1375    pub val_data_type: Bytes,
1376}
1377
1378impl EncodedSchemas {
1379    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1380        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1381        DataType::from_proto(proto).expect("valid DataType")
1382    }
1383}
1384
1385#[derive(Debug)]
1386#[cfg_attr(test, derive(PartialEq))]
1387pub enum CompareAndAppendBreak<T> {
1388    AlreadyCommitted,
1389    Upper {
1390        shard_upper: Antichain<T>,
1391        writer_upper: Antichain<T>,
1392    },
1393    InvalidUsage(InvalidUsage<T>),
1394    InlineBackpressure,
1395}
1396
1397#[derive(Debug)]
1398#[cfg_attr(test, derive(PartialEq))]
1399pub enum SnapshotErr<T> {
1400    AsOfNotYetAvailable(SeqNo, Upper<T>),
1401    AsOfHistoricalDistinctionsLost(Since<T>),
1402}
1403
1404impl<T> StateCollections<T>
1405where
1406    T: Timestamp + Lattice + Codec64,
1407{
1408    pub fn add_rollup(
1409        &mut self,
1410        add_rollup: (SeqNo, &HollowRollup),
1411    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1412        let (rollup_seqno, rollup) = add_rollup;
1413        let applied = match self.rollups.get(&rollup_seqno) {
1414            Some(x) => x.key == rollup.key,
1415            None => {
1416                self.active_rollup = None;
1417                self.rollups.insert(rollup_seqno, rollup.to_owned());
1418                true
1419            }
1420        };
1421        // This state transition is a no-op if applied is false but we
1422        // still commit the state change so that this gets linearized
1423        // (maybe we're looking at old state).
1424        Continue(applied)
1425    }
1426
1427    pub fn remove_rollups(
1428        &mut self,
1429        remove_rollups: &[(SeqNo, PartialRollupKey)],
1430    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1431        if remove_rollups.is_empty() || self.is_tombstone() {
1432            return Break(NoOpStateTransition(vec![]));
1433        }
1434
1435        //This state transition is called at the end of the GC process, so we
1436        //need to unset the `active_gc` field.
1437        self.active_gc = None;
1438
1439        let mut removed = vec![];
1440        for (seqno, key) in remove_rollups {
1441            let removed_key = self.rollups.remove(seqno);
1442            debug_assert!(
1443                removed_key.as_ref().map_or(true, |x| &x.key == key),
1444                "{} vs {:?}",
1445                key,
1446                removed_key
1447            );
1448
1449            if removed_key.is_some() {
1450                removed.push(*seqno);
1451            }
1452        }
1453
1454        Continue(removed)
1455    }
1456
1457    pub fn register_leased_reader(
1458        &mut self,
1459        hostname: &str,
1460        reader_id: &LeasedReaderId,
1461        purpose: &str,
1462        seqno: SeqNo,
1463        lease_duration: Duration,
1464        heartbeat_timestamp_ms: u64,
1465        use_critical_since: bool,
1466    ) -> ControlFlow<
1467        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1468        (LeasedReaderState<T>, SeqNo),
1469    > {
1470        let since = if use_critical_since {
1471            self.critical_since()
1472                .unwrap_or_else(|| self.trace.since().clone())
1473        } else {
1474            self.trace.since().clone()
1475        };
1476        let reader_state = LeasedReaderState {
1477            debug: HandleDebugState {
1478                hostname: hostname.to_owned(),
1479                purpose: purpose.to_owned(),
1480            },
1481            seqno,
1482            since,
1483            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1484            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1485                .expect("lease duration as millis should fit within u64"),
1486        };
1487
1488        // If the shard-global upper and since are both the empty antichain,
1489        // then no further writes can ever commit and no further reads can be
1490        // served. Optimize this by no-op-ing reader registration so that we can
1491        // settle the shard into a final unchanging tombstone state.
1492        if self.is_tombstone() {
1493            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1494        }
1495
1496        // TODO: Handle if the reader or writer already exists.
1497        self.leased_readers
1498            .insert(reader_id.clone(), reader_state.clone());
1499        Continue((reader_state, self.seqno_since(seqno)))
1500    }
1501
1502    pub fn register_critical_reader<O: Opaque + Codec64>(
1503        &mut self,
1504        hostname: &str,
1505        reader_id: &CriticalReaderId,
1506        purpose: &str,
1507    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1508        let state = CriticalReaderState {
1509            debug: HandleDebugState {
1510                hostname: hostname.to_owned(),
1511                purpose: purpose.to_owned(),
1512            },
1513            since: self.trace.since().clone(),
1514            opaque: OpaqueState(Codec64::encode(&O::initial())),
1515            opaque_codec: O::codec_name(),
1516        };
1517
1518        // We expire all readers if the upper and since both advance to the
1519        // empty antichain. Gracefully handle this. At the same time,
1520        // short-circuit the cmd application so we don't needlessly create new
1521        // SeqNos.
1522        if self.is_tombstone() {
1523            return Break(NoOpStateTransition(state));
1524        }
1525
1526        let state = match self.critical_readers.get_mut(reader_id) {
1527            Some(existing_state) => {
1528                existing_state.debug = state.debug;
1529                existing_state.clone()
1530            }
1531            None => {
1532                self.critical_readers
1533                    .insert(reader_id.clone(), state.clone());
1534                state
1535            }
1536        };
1537        Continue(state)
1538    }
1539
1540    pub fn register_schema<K: Codec, V: Codec>(
1541        &mut self,
1542        key_schema: &K::Schema,
1543        val_schema: &V::Schema,
1544    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1545        fn encode_data_type(data_type: &DataType) -> Bytes {
1546            let proto = data_type.into_proto();
1547            prost::Message::encode_to_vec(&proto).into()
1548        }
1549
1550        // Look for an existing registered SchemaId for these schemas.
1551        //
1552        // The common case is that this should be a recent one, so as a minor
1553        // optimization, do this search in reverse order.
1554        //
1555        // TODO: Note that this impl is `O(schemas)`. Combined with the
1556        // possibility of cmd retries, it's possible but unlikely for this to
1557        // get expensive. We could maintain a reverse map to speed this up in
1558        // necessary. This would either need to work on the encoded
1559        // representation (which, we'd have to fall back to the linear scan) or
1560        // we'd need to add a Hash/Ord bound to Schema.
1561        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1562            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1563        });
1564        match existing_id {
1565            Some((schema_id, _)) => {
1566                // TODO: Validate that the decoded schemas still produce records
1567                // of the recorded DataType, to detect shenanigans. Probably
1568                // best to wait until we've turned on Schema2 in prod and thus
1569                // committed to the current mappings.
1570                Break(NoOpStateTransition(Some(*schema_id)))
1571            }
1572            None if self.is_tombstone() => {
1573                // TODO: Is this right?
1574                Break(NoOpStateTransition(None))
1575            }
1576            None if self.schemas.is_empty() => {
1577                // We'll have to do something more sophisticated here to
1578                // generate the next id if/when we start supporting the removal
1579                // of schemas.
1580                let id = SchemaId(self.schemas.len());
1581                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1582                    .expect("valid key schema");
1583                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1584                    .expect("valid val schema");
1585                let prev = self.schemas.insert(
1586                    id,
1587                    EncodedSchemas {
1588                        key: K::encode_schema(key_schema),
1589                        key_data_type: encode_data_type(&key_data_type),
1590                        val: V::encode_schema(val_schema),
1591                        val_data_type: encode_data_type(&val_data_type),
1592                    },
1593                );
1594                assert_eq!(prev, None);
1595                Continue(Some(id))
1596            }
1597            None => {
1598                info!(
1599                    "register_schemas got {:?} expected {:?}",
1600                    key_schema,
1601                    self.schemas
1602                        .iter()
1603                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1604                        .collect::<Vec<_>>()
1605                );
1606                // Until we implement persist schema changes, only allow at most
1607                // one registered schema.
1608                Break(NoOpStateTransition(None))
1609            }
1610        }
1611    }
1612
1613    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1614        &mut self,
1615        expected: SchemaId,
1616        key_schema: &K::Schema,
1617        val_schema: &V::Schema,
1618    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1619        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1620            // To be defensive, create an empty batch and inspect the resulting
1621            // data type (as opposed to something like allowing the `Schema` to
1622            // declare the DataType).
1623            let array = Schema::encoder(schema).expect("valid schema").finish();
1624            Array::data_type(&array).clone()
1625        }
1626
1627        let (current_id, current) = self
1628            .schemas
1629            .last_key_value()
1630            .expect("all shards have a schema");
1631        if *current_id != expected {
1632            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1633                schema_id: *current_id,
1634                key: K::decode_schema(&current.key),
1635                val: V::decode_schema(&current.val),
1636            }));
1637        }
1638
1639        let current_key = K::decode_schema(&current.key);
1640        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1641        let current_val = V::decode_schema(&current.val);
1642        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1643
1644        let key_dt = data_type(key_schema);
1645        let val_dt = data_type(val_schema);
1646
1647        // If the schema is exactly the same as the current one, no-op.
1648        if current_key == *key_schema
1649            && current_key_dt == key_dt
1650            && current_val == *val_schema
1651            && current_val_dt == val_dt
1652        {
1653            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1654        }
1655
1656        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1657        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1658        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1659            return Break(NoOpStateTransition(CaESchema::Incompatible));
1660        };
1661        // Persist initially disallows dropping columns. This would require a
1662        // bunch more work (e.g. not safe to use the latest schema in
1663        // compaction) and isn't initially necessary in mz.
1664        if key_fn.contains_drop() || val_fn.contains_drop() {
1665            return Break(NoOpStateTransition(CaESchema::Incompatible));
1666        }
1667
1668        // We'll have to do something more sophisticated here to
1669        // generate the next id if/when we start supporting the removal
1670        // of schemas.
1671        let id = SchemaId(self.schemas.len());
1672        self.schemas.insert(
1673            id,
1674            EncodedSchemas {
1675                key: K::encode_schema(key_schema),
1676                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1677                val: V::encode_schema(val_schema),
1678                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1679            },
1680        );
1681        Continue(CaESchema::Ok(id))
1682    }
1683
1684    pub fn compare_and_append(
1685        &mut self,
1686        batch: &HollowBatch<T>,
1687        writer_id: &WriterId,
1688        heartbeat_timestamp_ms: u64,
1689        lease_duration_ms: u64,
1690        idempotency_token: &IdempotencyToken,
1691        debug_info: &HandleDebugState,
1692        inline_writes_total_max_bytes: usize,
1693        claim_compaction_percent: usize,
1694        claim_compaction_min_version: Option<&Version>,
1695    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1696        // We expire all writers if the upper and since both advance to the
1697        // empty antichain. Gracefully handle this. At the same time,
1698        // short-circuit the cmd application so we don't needlessly create new
1699        // SeqNos.
1700        if self.is_tombstone() {
1701            assert_eq!(self.trace.upper(), &Antichain::new());
1702            return Break(CompareAndAppendBreak::Upper {
1703                shard_upper: Antichain::new(),
1704                // This writer might have been registered before the shard upper
1705                // was advanced, which would make this pessimistic in the
1706                // Indeterminate handling of compare_and_append at the machine
1707                // level, but that's fine.
1708                writer_upper: Antichain::new(),
1709            });
1710        }
1711
1712        let writer_state = self
1713            .writers
1714            .entry(writer_id.clone())
1715            .or_insert_with(|| WriterState {
1716                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1717                lease_duration_ms,
1718                most_recent_write_token: IdempotencyToken::SENTINEL,
1719                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1720                debug: debug_info.clone(),
1721            });
1722
1723        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1724            return Break(CompareAndAppendBreak::InvalidUsage(
1725                InvalidUsage::InvalidBounds {
1726                    lower: batch.desc.lower().clone(),
1727                    upper: batch.desc.upper().clone(),
1728                },
1729            ));
1730        }
1731
1732        // If the time interval is empty, the list of updates must also be
1733        // empty.
1734        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1735            return Break(CompareAndAppendBreak::InvalidUsage(
1736                InvalidUsage::InvalidEmptyTimeInterval {
1737                    lower: batch.desc.lower().clone(),
1738                    upper: batch.desc.upper().clone(),
1739                    keys: batch
1740                        .parts
1741                        .iter()
1742                        .map(|x| x.printable_name().to_owned())
1743                        .collect(),
1744                },
1745            ));
1746        }
1747
1748        if idempotency_token == &writer_state.most_recent_write_token {
1749            // If the last write had the same idempotency_token, then this must
1750            // have already committed. Sanity check that the most recent write
1751            // upper matches and that the shard upper is at least the write
1752            // upper, if it's not something very suspect is going on.
1753            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1754            assert!(
1755                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1756                "{:?} vs {:?}",
1757                batch.desc.upper(),
1758                self.trace.upper()
1759            );
1760            return Break(CompareAndAppendBreak::AlreadyCommitted);
1761        }
1762
1763        let shard_upper = self.trace.upper();
1764        if shard_upper != batch.desc.lower() {
1765            return Break(CompareAndAppendBreak::Upper {
1766                shard_upper: shard_upper.clone(),
1767                writer_upper: writer_state.most_recent_write_upper.clone(),
1768            });
1769        }
1770
1771        let new_inline_bytes = batch.inline_bytes();
1772        if new_inline_bytes > 0 {
1773            let mut existing_inline_bytes = 0;
1774            self.trace
1775                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1776            // TODO: For very small batches, it may actually _increase_ the size
1777            // of state to flush them out. Consider another threshold under
1778            // which an inline part can be appended no matter what.
1779            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1780                return Break(CompareAndAppendBreak::InlineBackpressure);
1781            }
1782        }
1783
1784        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1785            self.trace.push_batch(batch.clone())
1786        } else {
1787            Vec::new()
1788        };
1789
1790        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1791        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1792        let all_empty_reqs = merge_reqs
1793            .iter()
1794            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1795        if all_empty_reqs && !batch.is_empty() {
1796            let mut reqs_to_take = claim_compaction_percent / 100;
1797            if (usize::cast_from(idempotency_token.hashed()) % 100)
1798                < (claim_compaction_percent % 100)
1799            {
1800                reqs_to_take += 1;
1801            }
1802            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1803            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1804            merge_reqs.extend(
1805                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1806                // likely to be compacted soon for other reasons.
1807                self.trace
1808                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1809                    .take(reqs_to_take),
1810            )
1811        }
1812
1813        for req in &merge_reqs {
1814            self.trace.claim_compaction(
1815                req.id,
1816                ActiveCompaction {
1817                    start_ms: heartbeat_timestamp_ms,
1818                },
1819            )
1820        }
1821
1822        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1823        writer_state.most_recent_write_token = idempotency_token.clone();
1824        // The writer's most recent upper should only go forward.
1825        assert!(
1826            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1827            "{:?} vs {:?}",
1828            &writer_state.most_recent_write_upper,
1829            batch.desc.upper()
1830        );
1831        writer_state
1832            .most_recent_write_upper
1833            .clone_from(batch.desc.upper());
1834
1835        // Heartbeat the writer state to keep our idempotency token alive.
1836        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1837            heartbeat_timestamp_ms,
1838            writer_state.last_heartbeat_timestamp_ms,
1839        );
1840
1841        Continue(merge_reqs)
1842    }
1843
1844    pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1845        &mut self,
1846        res: &FueledMergeRes<T>,
1847        metrics: &ColumnarMetrics,
1848    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1849        // We expire all writers if the upper and since both advance to the
1850        // empty antichain. Gracefully handle this. At the same time,
1851        // short-circuit the cmd application so we don't needlessly create new
1852        // SeqNos.
1853        if self.is_tombstone() {
1854            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1855        }
1856
1857        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1858        Continue(apply_merge_result)
1859    }
1860
1861    pub fn spine_exert(
1862        &mut self,
1863        fuel: usize,
1864    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1865        let (merge_reqs, did_work) = self.trace.exert(fuel);
1866        if did_work {
1867            Continue(merge_reqs)
1868        } else {
1869            assert!(merge_reqs.is_empty());
1870            // Break if we have nothing useful to do to save the seqno (and
1871            // resulting crdb traffic)
1872            Break(NoOpStateTransition(Vec::new()))
1873        }
1874    }
1875
1876    pub fn downgrade_since(
1877        &mut self,
1878        reader_id: &LeasedReaderId,
1879        seqno: SeqNo,
1880        outstanding_seqno: Option<SeqNo>,
1881        new_since: &Antichain<T>,
1882        heartbeat_timestamp_ms: u64,
1883    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1884        // We expire all readers if the upper and since both advance to the
1885        // empty antichain. Gracefully handle this. At the same time,
1886        // short-circuit the cmd application so we don't needlessly create new
1887        // SeqNos.
1888        if self.is_tombstone() {
1889            return Break(NoOpStateTransition(Since(Antichain::new())));
1890        }
1891
1892        // The only way to have a missing reader in state is if it's been expired... and in that
1893        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1894        let Some(reader_state) = self.leased_reader(reader_id) else {
1895            tracing::warn!(
1896                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1897            );
1898            return Break(NoOpStateTransition(Since(Antichain::new())));
1899        };
1900
1901        // Also use this as an opportunity to heartbeat the reader and downgrade
1902        // the seqno capability.
1903        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1904            heartbeat_timestamp_ms,
1905            reader_state.last_heartbeat_timestamp_ms,
1906        );
1907
1908        let seqno = match outstanding_seqno {
1909            Some(outstanding_seqno) => {
1910                assert!(
1911                    outstanding_seqno >= reader_state.seqno,
1912                    "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1913                    is behind current reader_state ({:?})",
1914                    outstanding_seqno,
1915                    reader_state.seqno,
1916                );
1917                std::cmp::min(outstanding_seqno, seqno)
1918            }
1919            None => seqno,
1920        };
1921
1922        reader_state.seqno = seqno;
1923
1924        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1925            reader_state.since.clone_from(new_since);
1926            self.update_since();
1927            new_since.clone()
1928        } else {
1929            // No-op, but still commit the state change so that this gets
1930            // linearized.
1931            reader_state.since.clone()
1932        };
1933
1934        Continue(Since(reader_current_since))
1935    }
1936
1937    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1938        &mut self,
1939        reader_id: &CriticalReaderId,
1940        expected_opaque: &O,
1941        (new_opaque, new_since): (&O, &Antichain<T>),
1942    ) -> ControlFlow<
1943        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1944        Result<Since<T>, (O, Since<T>)>,
1945    > {
1946        // We expire all readers if the upper and since both advance to the
1947        // empty antichain. Gracefully handle this. At the same time,
1948        // short-circuit the cmd application so we don't needlessly create new
1949        // SeqNos.
1950        if self.is_tombstone() {
1951            // Match the idempotence behavior below of ignoring the token if
1952            // since is already advanced enough (in this case, because it's a
1953            // tombstone, we know it's the empty antichain).
1954            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1955        }
1956
1957        let reader_state = self.critical_reader(reader_id);
1958        assert_eq!(reader_state.opaque_codec, O::codec_name());
1959
1960        if &O::decode(reader_state.opaque.0) != expected_opaque {
1961            // No-op, but still commit the state change so that this gets
1962            // linearized.
1963            return Continue(Err((
1964                Codec64::decode(reader_state.opaque.0),
1965                Since(reader_state.since.clone()),
1966            )));
1967        }
1968
1969        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1970        if PartialOrder::less_equal(&reader_state.since, new_since) {
1971            reader_state.since.clone_from(new_since);
1972            self.update_since();
1973            Continue(Ok(Since(new_since.clone())))
1974        } else {
1975            // no work to be done -- the reader state's `since` is already sufficiently
1976            // advanced. we may someday need to revisit this branch when it's possible
1977            // for two `since` frontiers to be incomparable.
1978            Continue(Ok(Since(reader_state.since.clone())))
1979        }
1980    }
1981
1982    pub fn heartbeat_leased_reader(
1983        &mut self,
1984        reader_id: &LeasedReaderId,
1985        heartbeat_timestamp_ms: u64,
1986    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1987        // We expire all readers if the upper and since both advance to the
1988        // empty antichain. Gracefully handle this. At the same time,
1989        // short-circuit the cmd application so we don't needlessly create new
1990        // SeqNos.
1991        if self.is_tombstone() {
1992            return Break(NoOpStateTransition(false));
1993        }
1994
1995        match self.leased_readers.get_mut(reader_id) {
1996            Some(reader_state) => {
1997                reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1998                    heartbeat_timestamp_ms,
1999                    reader_state.last_heartbeat_timestamp_ms,
2000                );
2001                Continue(true)
2002            }
2003            // No-op, but we still commit the state change so that this gets
2004            // linearized (maybe we're looking at old state).
2005            None => Continue(false),
2006        }
2007    }
2008
2009    pub fn expire_leased_reader(
2010        &mut self,
2011        reader_id: &LeasedReaderId,
2012    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2013        // We expire all readers if the upper and since both advance to the
2014        // empty antichain. Gracefully handle this. At the same time,
2015        // short-circuit the cmd application so we don't needlessly create new
2016        // SeqNos.
2017        if self.is_tombstone() {
2018            return Break(NoOpStateTransition(false));
2019        }
2020
2021        let existed = self.leased_readers.remove(reader_id).is_some();
2022        if existed {
2023            // TODO(database-issues#6885): Re-enable this
2024            //
2025            // Temporarily disabling this because we think it might be the cause
2026            // of the remap since bug. Specifically, a clusterd process has a
2027            // ReadHandle for maintaining the once and one inside a Listen. If
2028            // we crash and stay down for longer than the read lease duration,
2029            // it's possible that an expiry of them both in quick succession
2030            // jumps the since forward to the Listen one.
2031            //
2032            // Don't forget to update the downgrade_since when this gets
2033            // switched back on.
2034            //
2035            // self.update_since();
2036        }
2037        // No-op if existed is false, but still commit the state change so that
2038        // this gets linearized.
2039        Continue(existed)
2040    }
2041
2042    pub fn expire_critical_reader(
2043        &mut self,
2044        reader_id: &CriticalReaderId,
2045    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2046        // We expire all readers if the upper and since both advance to the
2047        // empty antichain. Gracefully handle this. At the same time,
2048        // short-circuit the cmd application so we don't needlessly create new
2049        // SeqNos.
2050        if self.is_tombstone() {
2051            return Break(NoOpStateTransition(false));
2052        }
2053
2054        let existed = self.critical_readers.remove(reader_id).is_some();
2055        if existed {
2056            // TODO(database-issues#6885): Re-enable this
2057            //
2058            // Temporarily disabling this because we think it might be the cause
2059            // of the remap since bug. Specifically, a clusterd process has a
2060            // ReadHandle for maintaining the once and one inside a Listen. If
2061            // we crash and stay down for longer than the read lease duration,
2062            // it's possible that an expiry of them both in quick succession
2063            // jumps the since forward to the Listen one.
2064            //
2065            // Don't forget to update the downgrade_since when this gets
2066            // switched back on.
2067            //
2068            // self.update_since();
2069        }
2070        // This state transition is a no-op if existed is false, but we still
2071        // commit the state change so that this gets linearized (maybe we're
2072        // looking at old state).
2073        Continue(existed)
2074    }
2075
2076    pub fn expire_writer(
2077        &mut self,
2078        writer_id: &WriterId,
2079    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2080        // We expire all writers if the upper and since both advance to the
2081        // empty antichain. Gracefully handle this. At the same time,
2082        // short-circuit the cmd application so we don't needlessly create new
2083        // SeqNos.
2084        if self.is_tombstone() {
2085            return Break(NoOpStateTransition(false));
2086        }
2087
2088        let existed = self.writers.remove(writer_id).is_some();
2089        // This state transition is a no-op if existed is false, but we still
2090        // commit the state change so that this gets linearized (maybe we're
2091        // looking at old state).
2092        Continue(existed)
2093    }
2094
2095    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2096        self.leased_readers.get_mut(id)
2097    }
2098
2099    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2100        self.critical_readers
2101            .get_mut(id)
2102            .unwrap_or_else(|| {
2103                panic!(
2104                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2105                    id
2106                )
2107            })
2108    }
2109
2110    fn critical_since(&self) -> Option<Antichain<T>> {
2111        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2112        let mut since = critical_sinces.next().cloned()?;
2113        for s in critical_sinces {
2114            since.meet_assign(s);
2115        }
2116        Some(since)
2117    }
2118
2119    fn update_since(&mut self) {
2120        let mut sinces_iter = self
2121            .leased_readers
2122            .values()
2123            .map(|x| &x.since)
2124            .chain(self.critical_readers.values().map(|x| &x.since));
2125        let mut since = match sinces_iter.next() {
2126            Some(since) => since.clone(),
2127            None => {
2128                // If there are no current readers, leave `since` unchanged so
2129                // it doesn't regress.
2130                return;
2131            }
2132        };
2133        while let Some(s) = sinces_iter.next() {
2134            since.meet_assign(s);
2135        }
2136        self.trace.downgrade_since(&since);
2137    }
2138
2139    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2140        let mut seqno_since = seqno;
2141        for cap in self.leased_readers.values() {
2142            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2143        }
2144        // critical_readers don't hold a seqno capability.
2145        seqno_since
2146    }
2147
2148    fn tombstone_batch() -> HollowBatch<T> {
2149        HollowBatch::empty(Description::new(
2150            Antichain::from_elem(T::minimum()),
2151            Antichain::new(),
2152            Antichain::new(),
2153        ))
2154    }
2155
2156    pub(crate) fn is_tombstone(&self) -> bool {
2157        self.trace.upper().is_empty()
2158            && self.trace.since().is_empty()
2159            && self.writers.is_empty()
2160            && self.leased_readers.is_empty()
2161            && self.critical_readers.is_empty()
2162    }
2163
2164    pub(crate) fn is_single_empty_batch(&self) -> bool {
2165        let mut batch_count = 0;
2166        let mut is_empty = true;
2167        self.trace.map_batches(|b| {
2168            batch_count += 1;
2169            is_empty &= b.is_empty()
2170        });
2171        batch_count <= 1 && is_empty
2172    }
2173
2174    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2175        assert_eq!(self.trace.upper(), &Antichain::new());
2176        assert_eq!(self.trace.since(), &Antichain::new());
2177
2178        // Remember our current state, so we can decide whether we have to
2179        // record a transition in durable state.
2180        let was_tombstone = self.is_tombstone();
2181
2182        // Enter the "tombstone" state, if we're not in it already.
2183        self.writers.clear();
2184        self.leased_readers.clear();
2185        self.critical_readers.clear();
2186
2187        debug_assert!(self.is_tombstone());
2188
2189        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2190        // it -- the actual contents of our batches no longer matter.
2191        // This method progressively replaces batches in our state with simpler versions, to allow
2192        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2193        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2194        // transitions... most operations happen incrementally, and large single writes can overwhelm
2195        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2196
2197        let mut to_replace = None;
2198        let mut batch_count = 0;
2199        self.trace.map_batches(|b| {
2200            batch_count += 1;
2201            if !b.is_empty() && to_replace.is_none() {
2202                to_replace = Some(b.desc.clone());
2203            }
2204        });
2205        if let Some(desc) = to_replace {
2206            // We have a nonempty batch: replace it with an empty batch and return.
2207            // This should not produce an excessively large diff: if it did, we wouldn't have been
2208            // able to append that batch in the first place.
2209            let result = self.trace.apply_tombstone_merge(&desc);
2210            assert!(
2211                result.matched(),
2212                "merge with a matching desc should always match"
2213            );
2214            Continue(())
2215        } else if batch_count > 1 {
2216            // All our batches are empty, but we have more than one of them. Replace the whole set
2217            // with a new single-batch trace.
2218            // This produces a diff with a size proportional to the number of batches, but since
2219            // Spine keeps a logarithmic number of batches this should never be excessively large.
2220            let mut new_trace = Trace::default();
2221            new_trace.downgrade_since(&Antichain::new());
2222            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2223            assert_eq!(merge_reqs, Vec::new());
2224            self.trace = new_trace;
2225            Continue(())
2226        } else if !was_tombstone {
2227            // We were not tombstoned before, so have to make sure this state
2228            // transition is recorded.
2229            Continue(())
2230        } else {
2231            // All our batches are empty, and there's only one... there's no shrinking this
2232            // tombstone further.
2233            Break(NoOpStateTransition(()))
2234        }
2235    }
2236}
2237
2238// TODO: Document invariants.
2239#[derive(Debug)]
2240#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2241pub struct State<T> {
2242    pub(crate) shard_id: ShardId,
2243
2244    pub(crate) seqno: SeqNo,
2245    /// A strictly increasing wall time of when this state was written, in
2246    /// milliseconds since the unix epoch.
2247    pub(crate) walltime_ms: u64,
2248    /// Hostname of the persist user that created this version of state. For
2249    /// debugging.
2250    pub(crate) hostname: String,
2251    pub(crate) collections: StateCollections<T>,
2252}
2253
2254/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2255/// ones in durable storage.
2256pub struct TypedState<K, V, T, D> {
2257    pub(crate) state: State<T>,
2258
2259    // According to the docs, PhantomData is to "mark things that act like they
2260    // own a T". State doesn't actually own K, V, or D, just the ability to
2261    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2262    // T [1], but also allows State to correctly derive Send+Sync.
2263    //
2264    // [1]:
2265    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2266    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2267}
2268
2269impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2270    #[cfg(any(test, debug_assertions))]
2271    pub(crate) fn clone(&self, hostname: String) -> Self {
2272        TypedState {
2273            state: State {
2274                shard_id: self.shard_id.clone(),
2275                seqno: self.seqno.clone(),
2276                walltime_ms: self.walltime_ms,
2277                hostname,
2278                collections: self.collections.clone(),
2279            },
2280            _phantom: PhantomData,
2281        }
2282    }
2283
2284    pub(crate) fn clone_for_rollup(&self) -> Self {
2285        TypedState {
2286            state: State {
2287                shard_id: self.shard_id.clone(),
2288                seqno: self.seqno.clone(),
2289                walltime_ms: self.walltime_ms,
2290                hostname: self.hostname.clone(),
2291                collections: self.collections.clone(),
2292            },
2293            _phantom: PhantomData,
2294        }
2295    }
2296}
2297
2298impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2299    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2300        // Deconstruct self so we get a compile failure if new fields
2301        // are added.
2302        let TypedState { state, _phantom } = self;
2303        f.debug_struct("TypedState").field("state", state).finish()
2304    }
2305}
2306
2307// Impl PartialEq regardless of the type params.
2308#[cfg(any(test, debug_assertions))]
2309impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2310    fn eq(&self, other: &Self) -> bool {
2311        // Deconstruct self and other so we get a compile failure if new fields
2312        // are added.
2313        let TypedState {
2314            state: self_state,
2315            _phantom,
2316        } = self;
2317        let TypedState {
2318            state: other_state,
2319            _phantom,
2320        } = other;
2321        self_state == other_state
2322    }
2323}
2324
2325impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2326    type Target = State<T>;
2327
2328    fn deref(&self) -> &Self::Target {
2329        &self.state
2330    }
2331}
2332
2333impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2334    fn deref_mut(&mut self) -> &mut Self::Target {
2335        &mut self.state
2336    }
2337}
2338
2339impl<K, V, T, D> TypedState<K, V, T, D>
2340where
2341    K: Codec,
2342    V: Codec,
2343    T: Timestamp + Lattice + Codec64,
2344    D: Codec64,
2345{
2346    pub fn new(
2347        applier_version: Version,
2348        shard_id: ShardId,
2349        hostname: String,
2350        walltime_ms: u64,
2351    ) -> Self {
2352        let state = State {
2353            shard_id,
2354            seqno: SeqNo::minimum(),
2355            walltime_ms,
2356            hostname,
2357            collections: StateCollections {
2358                version: applier_version,
2359                last_gc_req: SeqNo::minimum(),
2360                rollups: BTreeMap::new(),
2361                active_rollup: None,
2362                active_gc: None,
2363                leased_readers: BTreeMap::new(),
2364                critical_readers: BTreeMap::new(),
2365                writers: BTreeMap::new(),
2366                schemas: BTreeMap::new(),
2367                trace: Trace::default(),
2368            },
2369        };
2370        TypedState {
2371            state,
2372            _phantom: PhantomData,
2373        }
2374    }
2375
2376    pub fn clone_apply<R, E, WorkFn>(
2377        &self,
2378        cfg: &PersistConfig,
2379        work_fn: &mut WorkFn,
2380    ) -> ControlFlow<E, (R, Self)>
2381    where
2382        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2383    {
2384        // We do not increment the version by default, though work_fn can if it chooses to.
2385        let mut new_state = State {
2386            shard_id: self.shard_id,
2387            seqno: self.seqno.next(),
2388            walltime_ms: (cfg.now)(),
2389            hostname: cfg.hostname.clone(),
2390            collections: self.collections.clone(),
2391        };
2392
2393        // Make sure walltime_ms is strictly increasing, in case clocks are
2394        // offset.
2395        if new_state.walltime_ms <= self.walltime_ms {
2396            new_state.walltime_ms = self.walltime_ms + 1;
2397        }
2398
2399        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2400        let new_state = TypedState {
2401            state: new_state,
2402            _phantom: PhantomData,
2403        };
2404        Continue((work_ret, new_state))
2405    }
2406}
2407
2408#[derive(Copy, Clone, Debug)]
2409pub struct GcConfig {
2410    pub use_active_gc: bool,
2411    pub fallback_threshold_ms: u64,
2412    pub min_versions: usize,
2413    pub max_versions: usize,
2414}
2415
2416impl<T> State<T>
2417where
2418    T: Timestamp + Lattice + Codec64,
2419{
2420    pub fn shard_id(&self) -> ShardId {
2421        self.shard_id
2422    }
2423
2424    pub fn seqno(&self) -> SeqNo {
2425        self.seqno
2426    }
2427
2428    pub fn since(&self) -> &Antichain<T> {
2429        self.collections.trace.since()
2430    }
2431
2432    pub fn upper(&self) -> &Antichain<T> {
2433        self.collections.trace.upper()
2434    }
2435
2436    pub fn spine_batch_count(&self) -> usize {
2437        self.collections.trace.num_spine_batches()
2438    }
2439
2440    pub fn size_metrics(&self) -> StateSizeMetrics {
2441        let mut ret = StateSizeMetrics::default();
2442        self.blobs().for_each(|x| match x {
2443            HollowBlobRef::Batch(x) => {
2444                ret.hollow_batch_count += 1;
2445                ret.batch_part_count += x.part_count();
2446                ret.num_updates += x.len;
2447
2448                let batch_size = x.encoded_size_bytes();
2449                for x in x.parts.iter() {
2450                    if x.ts_rewrite().is_some() {
2451                        ret.rewrite_part_count += 1;
2452                    }
2453                    if x.is_inline() {
2454                        ret.inline_part_count += 1;
2455                        ret.inline_part_bytes += x.inline_bytes();
2456                    }
2457                }
2458                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2459                ret.state_batches_bytes += batch_size;
2460            }
2461            HollowBlobRef::Rollup(x) => {
2462                ret.state_rollup_count += 1;
2463                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2464            }
2465        });
2466        ret
2467    }
2468
2469    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2470        // We maintain the invariant that every version of state has at least
2471        // one rollup.
2472        self.collections
2473            .rollups
2474            .iter()
2475            .rev()
2476            .next()
2477            .expect("State should have at least one rollup if seqno > minimum")
2478    }
2479
2480    pub(crate) fn seqno_since(&self) -> SeqNo {
2481        self.collections.seqno_since(self.seqno)
2482    }
2483
2484    // Returns whether the cmd proposing this state has been selected to perform
2485    // background garbage collection work.
2486    //
2487    // If it was selected, this information is recorded in the state itself for
2488    // commit along with the cmd's state transition. This helps us to avoid
2489    // redundant work.
2490    //
2491    // Correctness does not depend on a gc assignment being executed, nor on
2492    // them being executed in the order they are given. But it is expected that
2493    // gc assignments are best-effort respected. In practice, cmds like
2494    // register_foo or expire_foo, where it would be awkward, ignore gc.
2495    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2496        let GcConfig {
2497            use_active_gc,
2498            fallback_threshold_ms,
2499            min_versions,
2500            max_versions,
2501        } = cfg;
2502        // This is an arbitrary-ish threshold that scales with seqno, but never
2503        // gets particularly big. It probably could be much bigger and certainly
2504        // could use a tuning pass at some point.
2505        let gc_threshold = if use_active_gc {
2506            u64::cast_from(min_versions)
2507        } else {
2508            std::cmp::max(
2509                1,
2510                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2511            )
2512        };
2513        let new_seqno_since = self.seqno_since();
2514        // Collect until the new seqno since... or the old since plus the max number of versions,
2515        // whatever is less.
2516        let gc_until_seqno = new_seqno_since.min(SeqNo(
2517            self.collections
2518                .last_gc_req
2519                .0
2520                .saturating_add(u64::cast_from(max_versions)),
2521        ));
2522        let should_gc = new_seqno_since
2523            .0
2524            .saturating_sub(self.collections.last_gc_req.0)
2525            >= gc_threshold;
2526
2527        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2528        // it's been a while since it started, we should gc.
2529        let should_gc = if use_active_gc && !should_gc {
2530            match self.collections.active_gc {
2531                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2532                None => false,
2533            }
2534        } else {
2535            should_gc
2536        };
2537        // Assign GC traffic preferentially to writers, falling back to anyone
2538        // generating new state versions if there are no writers.
2539        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2540        // Always assign GC work to a tombstoned shard to have the chance to
2541        // clean up any residual blobs. This is safe (won't cause excess gc)
2542        // as the only allowed command after becoming a tombstone is to write
2543        // the final rollup.
2544        let tombstone_needs_gc = self.collections.is_tombstone();
2545        let should_gc = should_gc || tombstone_needs_gc;
2546        let should_gc = if use_active_gc {
2547            // If we have an active gc, we should only gc if the active gc is
2548            // sufficiently old. This is to avoid doing more gc work than
2549            // necessary.
2550            should_gc
2551                && match self.collections.active_gc {
2552                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2553                    None => true,
2554                }
2555        } else {
2556            should_gc
2557        };
2558        if should_gc {
2559            self.collections.last_gc_req = gc_until_seqno;
2560            Some(GcReq {
2561                shard_id: self.shard_id,
2562                new_seqno_since: gc_until_seqno,
2563            })
2564        } else {
2565            None
2566        }
2567    }
2568
2569    /// Return the number of gc-ineligible state versions.
2570    pub fn seqnos_held(&self) -> usize {
2571        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2572    }
2573
2574    /// Expire all readers and writers up to the given walltime_ms.
2575    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2576        let mut metrics = ExpiryMetrics::default();
2577        let shard_id = self.shard_id();
2578        self.collections.leased_readers.retain(|id, state| {
2579            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2580            if !retain {
2581                info!(
2582                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2583                    state.debug.purpose
2584                );
2585                metrics.readers_expired += 1;
2586            }
2587            retain
2588        });
2589        // critical_readers don't need forced expiration. (In fact, that's the point!)
2590        self.collections.writers.retain(|id, state| {
2591            let retain =
2592                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2593            if !retain {
2594                info!(
2595                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2596                    state.debug.purpose
2597                );
2598                metrics.writers_expired += 1;
2599            }
2600            retain
2601        });
2602        metrics
2603    }
2604
2605    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2606    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2607    /// referenced parts are valid to read.
2608    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2609        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2610            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2611                self.collections.trace.since().clone(),
2612            )));
2613        }
2614        let upper = self.collections.trace.upper();
2615        if PartialOrder::less_equal(upper, as_of) {
2616            return Err(SnapshotErr::AsOfNotYetAvailable(
2617                self.seqno,
2618                Upper(upper.clone()),
2619            ));
2620        }
2621
2622        let batches = self
2623            .collections
2624            .trace
2625            .batches()
2626            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2627            .cloned()
2628            .collect();
2629        Ok(batches)
2630    }
2631
2632    // NB: Unlike the other methods here, this one is read-only.
2633    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2634        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2635            return Err(Since(self.collections.trace.since().clone()));
2636        }
2637        Ok(())
2638    }
2639
2640    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2641        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2642        // batch and this iterates through all batches to find the next one.
2643        self.collections
2644            .trace
2645            .batches()
2646            .find(|b| {
2647                PartialOrder::less_equal(b.desc.lower(), frontier)
2648                    && PartialOrder::less_than(frontier, b.desc.upper())
2649            })
2650            .cloned()
2651            .ok_or(self.seqno)
2652    }
2653
2654    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2655        self.collections.active_rollup
2656    }
2657
2658    pub fn need_rollup(
2659        &self,
2660        threshold: usize,
2661        use_active_rollup: bool,
2662        fallback_threshold_ms: u64,
2663        now: u64,
2664    ) -> Option<SeqNo> {
2665        let (latest_rollup_seqno, _) = self.latest_rollup();
2666
2667        // Tombstoned shards require one final rollup. However, because we
2668        // write a rollup as of SeqNo X and then link it in using a state
2669        // transition (in this case from X to X+1), the minimum number of
2670        // live diffs is actually two. Detect when we're in this minimal
2671        // two diff state and stop the (otherwise) infinite iteration.
2672        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2673            return Some(self.seqno);
2674        }
2675
2676        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2677
2678        if use_active_rollup {
2679            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2680            // we should start a new rollup.
2681            // If there is an active rollup, we should check if it has been running too long.
2682            // If it has, we should start a new rollup.
2683            // This is to guard against a worker dying/taking too long/etc.
2684            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2685                match self.active_rollup() {
2686                    Some(active_rollup) => {
2687                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2688                            return Some(self.seqno);
2689                        }
2690                    }
2691                    None => {
2692                        return Some(self.seqno);
2693                    }
2694                }
2695            }
2696        } else {
2697            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2698            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2699            // racing / performing redundant work.
2700            if seqnos_since_last_rollup > 0
2701                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2702            {
2703                return Some(self.seqno);
2704            }
2705
2706            // however, since maintenance is best-effort and could fail, do assign rollup
2707            // work to every seqno after a fallback threshold to ensure one is written.
2708            if seqnos_since_last_rollup
2709                > u64::cast_from(
2710                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2711                )
2712            {
2713                return Some(self.seqno);
2714            }
2715        }
2716
2717        None
2718    }
2719
2720    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2721        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2722        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2723        batches.chain(rollups)
2724    }
2725}
2726
2727fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2728    let val = hex::encode(val);
2729    val.serialize(s)
2730}
2731
2732fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2733    val: &Option<LazyProto<T>>,
2734    s: S,
2735) -> Result<S::Ok, S::Error> {
2736    val.as_ref()
2737        .map(|lazy| hex::encode(&lazy.into_proto()))
2738        .serialize(s)
2739}
2740
2741fn serialize_part_stats<S: Serializer>(
2742    val: &Option<LazyPartStats>,
2743    s: S,
2744) -> Result<S::Ok, S::Error> {
2745    let val = val.as_ref().map(|x| x.decode().key);
2746    val.serialize(s)
2747}
2748
2749fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2750    // This is only used for debugging, so hack to assume that D is i64.
2751    let val = val.map(i64::decode);
2752    val.serialize(s)
2753}
2754
2755// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2756// intentionally gated from users, so not strictly subject to our backward
2757// compatibility guarantees, but still probably best to be thoughtful about
2758// making unnecessary changes. Additionally, it's nice to make the output as
2759// nice to use as possible without tying our hands for the actual code usages.
2760impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2761    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2762        let State {
2763            shard_id,
2764            seqno,
2765            walltime_ms,
2766            hostname,
2767            collections:
2768                StateCollections {
2769                    version: applier_version,
2770                    last_gc_req,
2771                    rollups,
2772                    active_rollup,
2773                    active_gc,
2774                    leased_readers,
2775                    critical_readers,
2776                    writers,
2777                    schemas,
2778                    trace,
2779                },
2780        } = self;
2781        let mut s = s.serialize_struct("State", 13)?;
2782        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2783        let () = s.serialize_field("shard_id", shard_id)?;
2784        let () = s.serialize_field("seqno", seqno)?;
2785        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2786        let () = s.serialize_field("hostname", hostname)?;
2787        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2788        let () = s.serialize_field("rollups", rollups)?;
2789        let () = s.serialize_field("active_rollup", active_rollup)?;
2790        let () = s.serialize_field("active_gc", active_gc)?;
2791        let () = s.serialize_field("leased_readers", leased_readers)?;
2792        let () = s.serialize_field("critical_readers", critical_readers)?;
2793        let () = s.serialize_field("writers", writers)?;
2794        let () = s.serialize_field("schemas", schemas)?;
2795        let () = s.serialize_field("since", &trace.since().elements())?;
2796        let () = s.serialize_field("upper", &trace.upper().elements())?;
2797        let trace = trace.flatten();
2798        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2799        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2800        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2801        let () = s.serialize_field("merges", &trace.merges)?;
2802        s.end()
2803    }
2804}
2805
2806#[derive(Debug, Default)]
2807pub struct StateSizeMetrics {
2808    pub hollow_batch_count: usize,
2809    pub batch_part_count: usize,
2810    pub rewrite_part_count: usize,
2811    pub num_updates: usize,
2812    pub largest_batch_bytes: usize,
2813    pub state_batches_bytes: usize,
2814    pub state_rollups_bytes: usize,
2815    pub state_rollup_count: usize,
2816    pub inline_part_count: usize,
2817    pub inline_part_bytes: usize,
2818}
2819
2820#[derive(Default)]
2821pub struct ExpiryMetrics {
2822    pub(crate) readers_expired: usize,
2823    pub(crate) writers_expired: usize,
2824}
2825
2826/// Wrapper for Antichain that represents a Since
2827#[derive(Debug, Clone, PartialEq)]
2828pub struct Since<T>(pub Antichain<T>);
2829
2830/// Wrapper for Antichain that represents an Upper
2831#[derive(Debug, PartialEq)]
2832pub struct Upper<T>(pub Antichain<T>);
2833
2834#[cfg(test)]
2835pub(crate) mod tests {
2836    use std::ops::Range;
2837    use std::str::FromStr;
2838
2839    use bytes::Bytes;
2840    use mz_build_info::DUMMY_BUILD_INFO;
2841    use mz_dyncfg::ConfigUpdates;
2842    use mz_ore::now::SYSTEM_TIME;
2843    use mz_ore::{assert_none, assert_ok};
2844    use mz_proto::RustType;
2845    use proptest::prelude::*;
2846    use proptest::strategy::ValueTree;
2847
2848    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2849    use crate::cache::PersistClientCache;
2850    use crate::internal::encoding::any_some_lazy_part_stats;
2851    use crate::internal::paths::RollupId;
2852    use crate::internal::trace::tests::any_trace;
2853    use crate::tests::new_test_client_cache;
2854    use crate::{Diagnostics, PersistLocation};
2855
2856    use super::*;
2857
2858    const LEASE_DURATION_MS: u64 = 900 * 1000;
2859    fn debug_state() -> HandleDebugState {
2860        HandleDebugState {
2861            hostname: "debug".to_owned(),
2862            purpose: "finding the bugs".to_owned(),
2863        }
2864    }
2865
2866    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2867        num_runs: usize,
2868    ) -> impl Strategy<Value = HollowBatch<T>> {
2869        (
2870            any::<T>(),
2871            any::<T>(),
2872            any::<T>(),
2873            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2874            any::<usize>(),
2875        )
2876            .prop_map(move |(t0, t1, since, parts, len)| {
2877                let (lower, upper) = if t0 <= t1 {
2878                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2879                } else {
2880                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2881                };
2882                let since = Antichain::from_elem(since);
2883
2884                let run_splits = (1..num_runs)
2885                    .map(|i| i * parts.len() / num_runs)
2886                    .collect::<Vec<_>>();
2887
2888                let run_meta = (0..num_runs)
2889                    .map(|_| {
2890                        let mut meta = RunMeta::default();
2891                        meta.id = Some(RunId::new());
2892                        meta
2893                    })
2894                    .collect::<Vec<_>>();
2895
2896                HollowBatch::new(
2897                    Description::new(lower, upper, since),
2898                    parts,
2899                    len % 10,
2900                    run_meta,
2901                    run_splits,
2902                )
2903            })
2904    }
2905
2906    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2907        Strategy::prop_map(
2908            (
2909                any::<T>(),
2910                any::<T>(),
2911                any::<T>(),
2912                proptest::collection::vec(any_run_part::<T>(), 0..20),
2913                any::<usize>(),
2914                0..=10usize,
2915                proptest::collection::vec(any::<RunId>(), 10),
2916            ),
2917            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2918                let (lower, upper) = if t0 <= t1 {
2919                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2920                } else {
2921                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2922                };
2923                let since = Antichain::from_elem(since);
2924                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2925                    let run_splits = (1..num_runs)
2926                        .map(|i| i * parts.len() / num_runs)
2927                        .collect::<Vec<_>>();
2928
2929                    let run_meta = (0..num_runs)
2930                        .enumerate()
2931                        .map(|(i, _)| {
2932                            let mut meta = RunMeta::default();
2933                            meta.id = Some(run_ids[i]);
2934                            meta
2935                        })
2936                        .collect::<Vec<_>>();
2937
2938                    HollowBatch::new(
2939                        Description::new(lower, upper, since),
2940                        parts,
2941                        len % 10,
2942                        run_meta,
2943                        run_splits,
2944                    )
2945                } else {
2946                    HollowBatch::new_run_for_test(
2947                        Description::new(lower, upper, since),
2948                        parts,
2949                        len % 10,
2950                        run_ids[0],
2951                    )
2952                }
2953            },
2954        )
2955    }
2956
2957    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2958        Strategy::prop_map(
2959            (
2960                any::<bool>(),
2961                any_hollow_batch_part(),
2962                any::<Option<T>>(),
2963                any::<Option<SchemaId>>(),
2964                any::<Option<SchemaId>>(),
2965            ),
2966            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2967                if is_hollow {
2968                    BatchPart::Hollow(hollow)
2969                } else {
2970                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2971                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2972                    BatchPart::Inline {
2973                        updates,
2974                        ts_rewrite,
2975                        schema_id,
2976                        deprecated_schema_id,
2977                    }
2978                }
2979            },
2980        )
2981    }
2982
2983    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2984        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2985    }
2986
2987    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2988    -> impl Strategy<Value = HollowBatchPart<T>> {
2989        Strategy::prop_map(
2990            (
2991                any::<PartialBatchKey>(),
2992                any::<usize>(),
2993                any::<Vec<u8>>(),
2994                any_some_lazy_part_stats(),
2995                any::<Option<T>>(),
2996                any::<[u8; 8]>(),
2997                any::<Option<BatchColumnarFormat>>(),
2998                any::<Option<SchemaId>>(),
2999                any::<Option<SchemaId>>(),
3000            ),
3001            |(
3002                key,
3003                encoded_size_bytes,
3004                key_lower,
3005                stats,
3006                ts_rewrite,
3007                diffs_sum,
3008                format,
3009                schema_id,
3010                deprecated_schema_id,
3011            )| {
3012                HollowBatchPart {
3013                    key,
3014                    meta: Default::default(),
3015                    encoded_size_bytes,
3016                    key_lower,
3017                    structured_key_lower: None,
3018                    stats,
3019                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3020                    diffs_sum: Some(diffs_sum),
3021                    format,
3022                    schema_id,
3023                    deprecated_schema_id,
3024                }
3025            },
3026        )
3027    }
3028
3029    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3030        Strategy::prop_map(
3031            (
3032                any::<SeqNo>(),
3033                any::<Option<T>>(),
3034                any::<u64>(),
3035                any::<u64>(),
3036                any::<HandleDebugState>(),
3037            ),
3038            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3039                // lease_duration_ms of 0 means this state was written by an old
3040                // version of code, which means we'll migrate it in the decode
3041                // path. Avoid.
3042                if lease_duration_ms == 0 {
3043                    lease_duration_ms += 1;
3044                }
3045                LeasedReaderState {
3046                    seqno,
3047                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3048                    last_heartbeat_timestamp_ms,
3049                    lease_duration_ms,
3050                    debug,
3051                }
3052            },
3053        )
3054    }
3055
3056    pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3057    {
3058        Strategy::prop_map(
3059            (
3060                any::<Option<T>>(),
3061                any::<OpaqueState>(),
3062                any::<String>(),
3063                any::<HandleDebugState>(),
3064            ),
3065            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3066                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3067                opaque,
3068                opaque_codec,
3069                debug,
3070            },
3071        )
3072    }
3073
3074    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3075        Strategy::prop_map(
3076            (
3077                any::<u64>(),
3078                any::<u64>(),
3079                any::<IdempotencyToken>(),
3080                any::<Option<T>>(),
3081                any::<HandleDebugState>(),
3082            ),
3083            |(
3084                last_heartbeat_timestamp_ms,
3085                lease_duration_ms,
3086                most_recent_write_token,
3087                most_recent_write_upper,
3088                debug,
3089            )| WriterState {
3090                last_heartbeat_timestamp_ms,
3091                lease_duration_ms,
3092                most_recent_write_token,
3093                most_recent_write_upper: most_recent_write_upper
3094                    .map_or_else(Antichain::new, Antichain::from_elem),
3095                debug,
3096            },
3097        )
3098    }
3099
3100    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3101        Strategy::prop_map(
3102            (
3103                any::<Vec<u8>>(),
3104                any::<Vec<u8>>(),
3105                any::<Vec<u8>>(),
3106                any::<Vec<u8>>(),
3107            ),
3108            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3109                key: Bytes::from(key),
3110                key_data_type: Bytes::from(key_data_type),
3111                val: Bytes::from(val),
3112                val_data_type: Bytes::from(val_data_type),
3113            },
3114        )
3115    }
3116
3117    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3118        num_trace_batches: Range<usize>,
3119    ) -> impl Strategy<Value = State<T>> {
3120        let part1 = (
3121            any::<ShardId>(),
3122            any::<SeqNo>(),
3123            any::<u64>(),
3124            any::<String>(),
3125            any::<SeqNo>(),
3126            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3127            proptest::option::of(any::<ActiveRollup>()),
3128        );
3129
3130        let part2 = (
3131            proptest::option::of(any::<ActiveGc>()),
3132            proptest::collection::btree_map(
3133                any::<LeasedReaderId>(),
3134                any_leased_reader_state::<T>(),
3135                1..3,
3136            ),
3137            proptest::collection::btree_map(
3138                any::<CriticalReaderId>(),
3139                any_critical_reader_state::<T>(),
3140                1..3,
3141            ),
3142            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3143            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3144            any_trace::<T>(num_trace_batches),
3145        );
3146
3147        (part1, part2).prop_map(
3148            |(
3149                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3150                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3151            )| State {
3152                shard_id,
3153                seqno,
3154                walltime_ms,
3155                hostname,
3156                collections: StateCollections {
3157                    version: Version::new(1, 2, 3),
3158                    last_gc_req,
3159                    rollups,
3160                    active_rollup,
3161                    active_gc,
3162                    leased_readers,
3163                    critical_readers,
3164                    writers,
3165                    schemas,
3166                    trace,
3167                },
3168            },
3169        )
3170    }
3171
3172    pub(crate) fn hollow<T: Timestamp>(
3173        lower: T,
3174        upper: T,
3175        keys: &[&str],
3176        len: usize,
3177    ) -> HollowBatch<T> {
3178        HollowBatch::new_run(
3179            Description::new(
3180                Antichain::from_elem(lower),
3181                Antichain::from_elem(upper),
3182                Antichain::from_elem(T::minimum()),
3183            ),
3184            keys.iter()
3185                .map(|x| {
3186                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3187                        key: PartialBatchKey((*x).to_owned()),
3188                        meta: Default::default(),
3189                        encoded_size_bytes: 0,
3190                        key_lower: vec![],
3191                        structured_key_lower: None,
3192                        stats: None,
3193                        ts_rewrite: None,
3194                        diffs_sum: None,
3195                        format: None,
3196                        schema_id: None,
3197                        deprecated_schema_id: None,
3198                    }))
3199                })
3200                .collect(),
3201            len,
3202        )
3203    }
3204
3205    #[mz_ore::test]
3206    fn downgrade_since() {
3207        let mut state = TypedState::<(), (), u64, i64>::new(
3208            DUMMY_BUILD_INFO.semver_version(),
3209            ShardId::new(),
3210            "".to_owned(),
3211            0,
3212        );
3213        let reader = LeasedReaderId::new();
3214        let seqno = SeqNo::minimum();
3215        let now = SYSTEM_TIME.clone();
3216        let _ = state.collections.register_leased_reader(
3217            "",
3218            &reader,
3219            "",
3220            seqno,
3221            Duration::from_secs(10),
3222            now(),
3223            false,
3224        );
3225
3226        // The shard global since == 0 initially.
3227        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3228
3229        // Greater
3230        assert_eq!(
3231            state.collections.downgrade_since(
3232                &reader,
3233                seqno,
3234                None,
3235                &Antichain::from_elem(2),
3236                now()
3237            ),
3238            Continue(Since(Antichain::from_elem(2)))
3239        );
3240        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3241        // Equal (no-op)
3242        assert_eq!(
3243            state.collections.downgrade_since(
3244                &reader,
3245                seqno,
3246                None,
3247                &Antichain::from_elem(2),
3248                now()
3249            ),
3250            Continue(Since(Antichain::from_elem(2)))
3251        );
3252        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3253        // Less (no-op)
3254        assert_eq!(
3255            state.collections.downgrade_since(
3256                &reader,
3257                seqno,
3258                None,
3259                &Antichain::from_elem(1),
3260                now()
3261            ),
3262            Continue(Since(Antichain::from_elem(2)))
3263        );
3264        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3265
3266        // Create a second reader.
3267        let reader2 = LeasedReaderId::new();
3268        let _ = state.collections.register_leased_reader(
3269            "",
3270            &reader2,
3271            "",
3272            seqno,
3273            Duration::from_secs(10),
3274            now(),
3275            false,
3276        );
3277
3278        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3279        assert_eq!(
3280            state.collections.downgrade_since(
3281                &reader2,
3282                seqno,
3283                None,
3284                &Antichain::from_elem(3),
3285                now()
3286            ),
3287            Continue(Since(Antichain::from_elem(3)))
3288        );
3289        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3290        // Shard since == 3 when all readers have since >= 3.
3291        assert_eq!(
3292            state.collections.downgrade_since(
3293                &reader,
3294                seqno,
3295                None,
3296                &Antichain::from_elem(5),
3297                now()
3298            ),
3299            Continue(Since(Antichain::from_elem(5)))
3300        );
3301        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3302
3303        // Shard since unaffected readers with since > shard since expiring.
3304        assert_eq!(
3305            state.collections.expire_leased_reader(&reader),
3306            Continue(true)
3307        );
3308        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3309
3310        // Create a third reader.
3311        let reader3 = LeasedReaderId::new();
3312        let _ = state.collections.register_leased_reader(
3313            "",
3314            &reader3,
3315            "",
3316            seqno,
3317            Duration::from_secs(10),
3318            now(),
3319            false,
3320        );
3321
3322        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3323        assert_eq!(
3324            state.collections.downgrade_since(
3325                &reader3,
3326                seqno,
3327                None,
3328                &Antichain::from_elem(10),
3329                now()
3330            ),
3331            Continue(Since(Antichain::from_elem(10)))
3332        );
3333        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3334
3335        // Shard since advances when reader with the minimal since expires.
3336        assert_eq!(
3337            state.collections.expire_leased_reader(&reader2),
3338            Continue(true)
3339        );
3340        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3341        // Switch this assertion back when we re-enable this.
3342        //
3343        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3344        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3345
3346        // Shard since unaffected when all readers are expired.
3347        assert_eq!(
3348            state.collections.expire_leased_reader(&reader3),
3349            Continue(true)
3350        );
3351        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3352        // Switch this assertion back when we re-enable this.
3353        //
3354        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3355        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3356    }
3357
3358    #[mz_ore::test]
3359    fn compare_and_downgrade_since() {
3360        let mut state = TypedState::<(), (), u64, i64>::new(
3361            DUMMY_BUILD_INFO.semver_version(),
3362            ShardId::new(),
3363            "".to_owned(),
3364            0,
3365        );
3366        let reader = CriticalReaderId::new();
3367        let _ = state
3368            .collections
3369            .register_critical_reader::<u64>("", &reader, "");
3370
3371        // The shard global since == 0 initially.
3372        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3373        // The initial opaque value should be set.
3374        assert_eq!(
3375            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3376            u64::initial()
3377        );
3378
3379        // Greater
3380        assert_eq!(
3381            state.collections.compare_and_downgrade_since::<u64>(
3382                &reader,
3383                &u64::initial(),
3384                (&1, &Antichain::from_elem(2)),
3385            ),
3386            Continue(Ok(Since(Antichain::from_elem(2))))
3387        );
3388        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3389        assert_eq!(
3390            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3391            1
3392        );
3393        // Equal (no-op)
3394        assert_eq!(
3395            state.collections.compare_and_downgrade_since::<u64>(
3396                &reader,
3397                &1,
3398                (&2, &Antichain::from_elem(2)),
3399            ),
3400            Continue(Ok(Since(Antichain::from_elem(2))))
3401        );
3402        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3403        assert_eq!(
3404            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3405            2
3406        );
3407        // Less (no-op)
3408        assert_eq!(
3409            state.collections.compare_and_downgrade_since::<u64>(
3410                &reader,
3411                &2,
3412                (&3, &Antichain::from_elem(1)),
3413            ),
3414            Continue(Ok(Since(Antichain::from_elem(2))))
3415        );
3416        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3417        assert_eq!(
3418            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3419            3
3420        );
3421    }
3422
3423    #[mz_ore::test]
3424    fn compare_and_append() {
3425        let state = &mut TypedState::<String, String, u64, i64>::new(
3426            DUMMY_BUILD_INFO.semver_version(),
3427            ShardId::new(),
3428            "".to_owned(),
3429            0,
3430        )
3431        .collections;
3432
3433        let writer_id = WriterId::new();
3434        let now = SYSTEM_TIME.clone();
3435
3436        // State is initially empty.
3437        assert_eq!(state.trace.num_spine_batches(), 0);
3438        assert_eq!(state.trace.num_hollow_batches(), 0);
3439        assert_eq!(state.trace.num_updates(), 0);
3440
3441        // Cannot insert a batch with a lower != current shard upper.
3442        assert_eq!(
3443            state.compare_and_append(
3444                &hollow(1, 2, &["key1"], 1),
3445                &writer_id,
3446                now(),
3447                LEASE_DURATION_MS,
3448                &IdempotencyToken::new(),
3449                &debug_state(),
3450                0,
3451                100,
3452                None
3453            ),
3454            Break(CompareAndAppendBreak::Upper {
3455                shard_upper: Antichain::from_elem(0),
3456                writer_upper: Antichain::from_elem(0)
3457            })
3458        );
3459
3460        // Insert an empty batch with an upper > lower..
3461        assert!(
3462            state
3463                .compare_and_append(
3464                    &hollow(0, 5, &[], 0),
3465                    &writer_id,
3466                    now(),
3467                    LEASE_DURATION_MS,
3468                    &IdempotencyToken::new(),
3469                    &debug_state(),
3470                    0,
3471                    100,
3472                    None
3473                )
3474                .is_continue()
3475        );
3476
3477        // Cannot insert a batch with a upper less than the lower.
3478        assert_eq!(
3479            state.compare_and_append(
3480                &hollow(5, 4, &["key1"], 1),
3481                &writer_id,
3482                now(),
3483                LEASE_DURATION_MS,
3484                &IdempotencyToken::new(),
3485                &debug_state(),
3486                0,
3487                100,
3488                None
3489            ),
3490            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3491                lower: Antichain::from_elem(5),
3492                upper: Antichain::from_elem(4)
3493            }))
3494        );
3495
3496        // Cannot insert a nonempty batch with an upper equal to lower.
3497        assert_eq!(
3498            state.compare_and_append(
3499                &hollow(5, 5, &["key1"], 1),
3500                &writer_id,
3501                now(),
3502                LEASE_DURATION_MS,
3503                &IdempotencyToken::new(),
3504                &debug_state(),
3505                0,
3506                100,
3507                None
3508            ),
3509            Break(CompareAndAppendBreak::InvalidUsage(
3510                InvalidEmptyTimeInterval {
3511                    lower: Antichain::from_elem(5),
3512                    upper: Antichain::from_elem(5),
3513                    keys: vec!["key1".to_owned()],
3514                }
3515            ))
3516        );
3517
3518        // Can insert an empty batch with an upper equal to lower.
3519        assert!(
3520            state
3521                .compare_and_append(
3522                    &hollow(5, 5, &[], 0),
3523                    &writer_id,
3524                    now(),
3525                    LEASE_DURATION_MS,
3526                    &IdempotencyToken::new(),
3527                    &debug_state(),
3528                    0,
3529                    100,
3530                    None
3531                )
3532                .is_continue()
3533        );
3534    }
3535
3536    #[mz_ore::test]
3537    fn snapshot() {
3538        let now = SYSTEM_TIME.clone();
3539
3540        let mut state = TypedState::<String, String, u64, i64>::new(
3541            DUMMY_BUILD_INFO.semver_version(),
3542            ShardId::new(),
3543            "".to_owned(),
3544            0,
3545        );
3546        // Cannot take a snapshot with as_of == shard upper.
3547        assert_eq!(
3548            state.snapshot(&Antichain::from_elem(0)),
3549            Err(SnapshotErr::AsOfNotYetAvailable(
3550                SeqNo(0),
3551                Upper(Antichain::from_elem(0))
3552            ))
3553        );
3554
3555        // Cannot take a snapshot with as_of > shard upper.
3556        assert_eq!(
3557            state.snapshot(&Antichain::from_elem(5)),
3558            Err(SnapshotErr::AsOfNotYetAvailable(
3559                SeqNo(0),
3560                Upper(Antichain::from_elem(0))
3561            ))
3562        );
3563
3564        let writer_id = WriterId::new();
3565
3566        // Advance upper to 5.
3567        assert!(
3568            state
3569                .collections
3570                .compare_and_append(
3571                    &hollow(0, 5, &["key1"], 1),
3572                    &writer_id,
3573                    now(),
3574                    LEASE_DURATION_MS,
3575                    &IdempotencyToken::new(),
3576                    &debug_state(),
3577                    0,
3578                    100,
3579                    None
3580                )
3581                .is_continue()
3582        );
3583
3584        // Can take a snapshot with as_of < upper.
3585        assert_eq!(
3586            state.snapshot(&Antichain::from_elem(0)),
3587            Ok(vec![hollow(0, 5, &["key1"], 1)])
3588        );
3589
3590        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3591        assert_eq!(
3592            state.snapshot(&Antichain::from_elem(4)),
3593            Ok(vec![hollow(0, 5, &["key1"], 1)])
3594        );
3595
3596        // Cannot take a snapshot with as_of >= upper.
3597        assert_eq!(
3598            state.snapshot(&Antichain::from_elem(5)),
3599            Err(SnapshotErr::AsOfNotYetAvailable(
3600                SeqNo(0),
3601                Upper(Antichain::from_elem(5))
3602            ))
3603        );
3604        assert_eq!(
3605            state.snapshot(&Antichain::from_elem(6)),
3606            Err(SnapshotErr::AsOfNotYetAvailable(
3607                SeqNo(0),
3608                Upper(Antichain::from_elem(5))
3609            ))
3610        );
3611
3612        let reader = LeasedReaderId::new();
3613        // Advance the since to 2.
3614        let _ = state.collections.register_leased_reader(
3615            "",
3616            &reader,
3617            "",
3618            SeqNo::minimum(),
3619            Duration::from_secs(10),
3620            now(),
3621            false,
3622        );
3623        assert_eq!(
3624            state.collections.downgrade_since(
3625                &reader,
3626                SeqNo::minimum(),
3627                None,
3628                &Antichain::from_elem(2),
3629                now()
3630            ),
3631            Continue(Since(Antichain::from_elem(2)))
3632        );
3633        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3634        // Cannot take a snapshot with as_of < shard_since.
3635        assert_eq!(
3636            state.snapshot(&Antichain::from_elem(1)),
3637            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3638                Antichain::from_elem(2)
3639            )))
3640        );
3641
3642        // Advance the upper to 10 via an empty batch.
3643        assert!(
3644            state
3645                .collections
3646                .compare_and_append(
3647                    &hollow(5, 10, &[], 0),
3648                    &writer_id,
3649                    now(),
3650                    LEASE_DURATION_MS,
3651                    &IdempotencyToken::new(),
3652                    &debug_state(),
3653                    0,
3654                    100,
3655                    None
3656                )
3657                .is_continue()
3658        );
3659
3660        // Can still take snapshots at times < upper.
3661        assert_eq!(
3662            state.snapshot(&Antichain::from_elem(7)),
3663            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3664        );
3665
3666        // Cannot take snapshots with as_of >= upper.
3667        assert_eq!(
3668            state.snapshot(&Antichain::from_elem(10)),
3669            Err(SnapshotErr::AsOfNotYetAvailable(
3670                SeqNo(0),
3671                Upper(Antichain::from_elem(10))
3672            ))
3673        );
3674
3675        // Advance upper to 15.
3676        assert!(
3677            state
3678                .collections
3679                .compare_and_append(
3680                    &hollow(10, 15, &["key2"], 1),
3681                    &writer_id,
3682                    now(),
3683                    LEASE_DURATION_MS,
3684                    &IdempotencyToken::new(),
3685                    &debug_state(),
3686                    0,
3687                    100,
3688                    None
3689                )
3690                .is_continue()
3691        );
3692
3693        // Filter out batches whose lowers are less than the requested as of (the
3694        // batches that are too far in the future for the requested as_of).
3695        assert_eq!(
3696            state.snapshot(&Antichain::from_elem(9)),
3697            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3698        );
3699
3700        // Don't filter out batches whose lowers are <= the requested as_of.
3701        assert_eq!(
3702            state.snapshot(&Antichain::from_elem(10)),
3703            Ok(vec![
3704                hollow(0, 5, &["key1"], 1),
3705                hollow(5, 10, &[], 0),
3706                hollow(10, 15, &["key2"], 1)
3707            ])
3708        );
3709
3710        assert_eq!(
3711            state.snapshot(&Antichain::from_elem(11)),
3712            Ok(vec![
3713                hollow(0, 5, &["key1"], 1),
3714                hollow(5, 10, &[], 0),
3715                hollow(10, 15, &["key2"], 1)
3716            ])
3717        );
3718    }
3719
3720    #[mz_ore::test]
3721    fn next_listen_batch() {
3722        let mut state = TypedState::<String, String, u64, i64>::new(
3723            DUMMY_BUILD_INFO.semver_version(),
3724            ShardId::new(),
3725            "".to_owned(),
3726            0,
3727        );
3728
3729        // Empty collection never has any batches to listen for, regardless of the
3730        // current frontier.
3731        assert_eq!(
3732            state.next_listen_batch(&Antichain::from_elem(0)),
3733            Err(SeqNo(0))
3734        );
3735        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3736
3737        let writer_id = WriterId::new();
3738        let now = SYSTEM_TIME.clone();
3739
3740        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3741        assert!(
3742            state
3743                .collections
3744                .compare_and_append(
3745                    &hollow(0, 5, &["key1"], 1),
3746                    &writer_id,
3747                    now(),
3748                    LEASE_DURATION_MS,
3749                    &IdempotencyToken::new(),
3750                    &debug_state(),
3751                    0,
3752                    100,
3753                    None
3754                )
3755                .is_continue()
3756        );
3757        assert!(
3758            state
3759                .collections
3760                .compare_and_append(
3761                    &hollow(5, 10, &["key2"], 1),
3762                    &writer_id,
3763                    now(),
3764                    LEASE_DURATION_MS,
3765                    &IdempotencyToken::new(),
3766                    &debug_state(),
3767                    0,
3768                    100,
3769                    None
3770                )
3771                .is_continue()
3772        );
3773
3774        // All frontiers in [0, 5) return the first batch.
3775        for t in 0..=4 {
3776            assert_eq!(
3777                state.next_listen_batch(&Antichain::from_elem(t)),
3778                Ok(hollow(0, 5, &["key1"], 1))
3779            );
3780        }
3781
3782        // All frontiers in [5, 10) return the second batch.
3783        for t in 5..=9 {
3784            assert_eq!(
3785                state.next_listen_batch(&Antichain::from_elem(t)),
3786                Ok(hollow(5, 10, &["key2"], 1))
3787            );
3788        }
3789
3790        // There is no batch currently available for t = 10.
3791        assert_eq!(
3792            state.next_listen_batch(&Antichain::from_elem(10)),
3793            Err(SeqNo(0))
3794        );
3795
3796        // By definition, there is no frontier ever at the empty antichain which
3797        // is the time after all possible times.
3798        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3799    }
3800
3801    #[mz_ore::test]
3802    fn expire_writer() {
3803        let mut state = TypedState::<String, String, u64, i64>::new(
3804            DUMMY_BUILD_INFO.semver_version(),
3805            ShardId::new(),
3806            "".to_owned(),
3807            0,
3808        );
3809        let now = SYSTEM_TIME.clone();
3810
3811        let writer_id_one = WriterId::new();
3812
3813        let writer_id_two = WriterId::new();
3814
3815        // Writer is eligible to write
3816        assert!(
3817            state
3818                .collections
3819                .compare_and_append(
3820                    &hollow(0, 2, &["key1"], 1),
3821                    &writer_id_one,
3822                    now(),
3823                    LEASE_DURATION_MS,
3824                    &IdempotencyToken::new(),
3825                    &debug_state(),
3826                    0,
3827                    100,
3828                    None
3829                )
3830                .is_continue()
3831        );
3832
3833        assert!(
3834            state
3835                .collections
3836                .expire_writer(&writer_id_one)
3837                .is_continue()
3838        );
3839
3840        // Other writers should still be able to write
3841        assert!(
3842            state
3843                .collections
3844                .compare_and_append(
3845                    &hollow(2, 5, &["key2"], 1),
3846                    &writer_id_two,
3847                    now(),
3848                    LEASE_DURATION_MS,
3849                    &IdempotencyToken::new(),
3850                    &debug_state(),
3851                    0,
3852                    100,
3853                    None
3854                )
3855                .is_continue()
3856        );
3857    }
3858
3859    #[mz_ore::test]
3860    fn maybe_gc_active_gc() {
3861        const GC_CONFIG: GcConfig = GcConfig {
3862            use_active_gc: true,
3863            fallback_threshold_ms: 5000,
3864            min_versions: 99,
3865            max_versions: 500,
3866        };
3867        let now_fn = SYSTEM_TIME.clone();
3868
3869        let mut state = TypedState::<String, String, u64, i64>::new(
3870            DUMMY_BUILD_INFO.semver_version(),
3871            ShardId::new(),
3872            "".to_owned(),
3873            0,
3874        );
3875
3876        let now = now_fn();
3877        // Empty state doesn't need gc, regardless of is_write.
3878        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3879        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3880
3881        // Artificially advance the seqno so the seqno_since advances past our
3882        // internal gc_threshold.
3883        state.seqno = SeqNo(100);
3884        assert_eq!(state.seqno_since(), SeqNo(100));
3885
3886        // When a writer is present, non-writes don't gc.
3887        let writer_id = WriterId::new();
3888        let _ = state.collections.compare_and_append(
3889            &hollow(1, 2, &["key1"], 1),
3890            &writer_id,
3891            now,
3892            LEASE_DURATION_MS,
3893            &IdempotencyToken::new(),
3894            &debug_state(),
3895            0,
3896            100,
3897            None,
3898        );
3899        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3900
3901        // A write will gc though.
3902        assert_eq!(
3903            state.maybe_gc(true, now, GC_CONFIG),
3904            Some(GcReq {
3905                shard_id: state.shard_id,
3906                new_seqno_since: SeqNo(100)
3907            })
3908        );
3909
3910        // But if we write down an active gc, we won't gc.
3911        state.collections.active_gc = Some(ActiveGc {
3912            seqno: state.seqno,
3913            start_ms: now,
3914        });
3915
3916        state.seqno = SeqNo(200);
3917        assert_eq!(state.seqno_since(), SeqNo(200));
3918
3919        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3920
3921        state.seqno = SeqNo(300);
3922        assert_eq!(state.seqno_since(), SeqNo(300));
3923        // But if we advance the time past the threshold, we will gc.
3924        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3925        assert_eq!(
3926            state.maybe_gc(true, new_now, GC_CONFIG),
3927            Some(GcReq {
3928                shard_id: state.shard_id,
3929                new_seqno_since: SeqNo(300)
3930            })
3931        );
3932
3933        // Even if the sequence number doesn't pass the threshold, if the
3934        // active gc is expired, we will gc.
3935
3936        state.seqno = SeqNo(301);
3937        assert_eq!(state.seqno_since(), SeqNo(301));
3938        assert_eq!(
3939            state.maybe_gc(true, new_now, GC_CONFIG),
3940            Some(GcReq {
3941                shard_id: state.shard_id,
3942                new_seqno_since: SeqNo(301)
3943            })
3944        );
3945
3946        state.collections.active_gc = None;
3947
3948        // Artificially advance the seqno (again) so the seqno_since advances
3949        // past our internal gc_threshold (again).
3950        state.seqno = SeqNo(400);
3951        assert_eq!(state.seqno_since(), SeqNo(400));
3952
3953        let now = now_fn();
3954
3955        // If there are no writers, even a non-write will gc.
3956        let _ = state.collections.expire_writer(&writer_id);
3957        assert_eq!(
3958            state.maybe_gc(false, now, GC_CONFIG),
3959            Some(GcReq {
3960                shard_id: state.shard_id,
3961                new_seqno_since: SeqNo(400)
3962            })
3963        );
3964
3965        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3966        let previous_seqno = state.seqno;
3967        state.seqno = SeqNo(10_000);
3968        assert_eq!(state.seqno_since(), SeqNo(10_000));
3969
3970        let now = now_fn();
3971        assert_eq!(
3972            state.maybe_gc(true, now, GC_CONFIG),
3973            Some(GcReq {
3974                shard_id: state.shard_id,
3975                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3976            })
3977        );
3978    }
3979
3980    #[mz_ore::test]
3981    fn maybe_gc_classic() {
3982        const GC_CONFIG: GcConfig = GcConfig {
3983            use_active_gc: false,
3984            fallback_threshold_ms: 5000,
3985            min_versions: 16,
3986            max_versions: 128,
3987        };
3988        const NOW_MS: u64 = 0;
3989
3990        let mut state = TypedState::<String, String, u64, i64>::new(
3991            DUMMY_BUILD_INFO.semver_version(),
3992            ShardId::new(),
3993            "".to_owned(),
3994            0,
3995        );
3996
3997        // Empty state doesn't need gc, regardless of is_write.
3998        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3999        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4000
4001        // Artificially advance the seqno so the seqno_since advances past our
4002        // internal gc_threshold.
4003        state.seqno = SeqNo(100);
4004        assert_eq!(state.seqno_since(), SeqNo(100));
4005
4006        // When a writer is present, non-writes don't gc.
4007        let writer_id = WriterId::new();
4008        let now = SYSTEM_TIME.clone();
4009        let _ = state.collections.compare_and_append(
4010            &hollow(1, 2, &["key1"], 1),
4011            &writer_id,
4012            now(),
4013            LEASE_DURATION_MS,
4014            &IdempotencyToken::new(),
4015            &debug_state(),
4016            0,
4017            100,
4018            None,
4019        );
4020        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4021
4022        // A write will gc though.
4023        assert_eq!(
4024            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4025            Some(GcReq {
4026                shard_id: state.shard_id,
4027                new_seqno_since: SeqNo(100)
4028            })
4029        );
4030
4031        // Artificially advance the seqno (again) so the seqno_since advances
4032        // past our internal gc_threshold (again).
4033        state.seqno = SeqNo(200);
4034        assert_eq!(state.seqno_since(), SeqNo(200));
4035
4036        // If there are no writers, even a non-write will gc.
4037        let _ = state.collections.expire_writer(&writer_id);
4038        assert_eq!(
4039            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4040            Some(GcReq {
4041                shard_id: state.shard_id,
4042                new_seqno_since: SeqNo(200)
4043            })
4044        );
4045    }
4046
4047    #[mz_ore::test]
4048    fn need_rollup_active_rollup() {
4049        const ROLLUP_THRESHOLD: usize = 3;
4050        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4051        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4052        let now = SYSTEM_TIME.clone();
4053
4054        mz_ore::test::init_logging();
4055        let mut state = TypedState::<String, String, u64, i64>::new(
4056            DUMMY_BUILD_INFO.semver_version(),
4057            ShardId::new(),
4058            "".to_owned(),
4059            0,
4060        );
4061
4062        let rollup_seqno = SeqNo(5);
4063        let rollup = HollowRollup {
4064            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4065            encoded_size_bytes: None,
4066        };
4067
4068        assert!(
4069            state
4070                .collections
4071                .add_rollup((rollup_seqno, &rollup))
4072                .is_continue()
4073        );
4074
4075        // shouldn't need a rollup at the seqno of the rollup
4076        state.seqno = SeqNo(5);
4077        assert_none!(state.need_rollup(
4078            ROLLUP_THRESHOLD,
4079            ROLLUP_USE_ACTIVE_ROLLUP,
4080            ROLLUP_FALLBACK_THRESHOLD_MS,
4081            now()
4082        ));
4083
4084        // shouldn't need a rollup at seqnos less than our threshold
4085        state.seqno = SeqNo(6);
4086        assert_none!(state.need_rollup(
4087            ROLLUP_THRESHOLD,
4088            ROLLUP_USE_ACTIVE_ROLLUP,
4089            ROLLUP_FALLBACK_THRESHOLD_MS,
4090            now()
4091        ));
4092        state.seqno = SeqNo(7);
4093        assert_none!(state.need_rollup(
4094            ROLLUP_THRESHOLD,
4095            ROLLUP_USE_ACTIVE_ROLLUP,
4096            ROLLUP_FALLBACK_THRESHOLD_MS,
4097            now()
4098        ));
4099        state.seqno = SeqNo(8);
4100        assert_none!(state.need_rollup(
4101            ROLLUP_THRESHOLD,
4102            ROLLUP_USE_ACTIVE_ROLLUP,
4103            ROLLUP_FALLBACK_THRESHOLD_MS,
4104            now()
4105        ));
4106
4107        let mut current_time = now();
4108        // hit our threshold! we should need a rollup
4109        state.seqno = SeqNo(9);
4110        assert_eq!(
4111            state
4112                .need_rollup(
4113                    ROLLUP_THRESHOLD,
4114                    ROLLUP_USE_ACTIVE_ROLLUP,
4115                    ROLLUP_FALLBACK_THRESHOLD_MS,
4116                    current_time
4117                )
4118                .expect("rollup"),
4119            SeqNo(9)
4120        );
4121
4122        state.collections.active_rollup = Some(ActiveRollup {
4123            seqno: SeqNo(9),
4124            start_ms: current_time,
4125        });
4126
4127        // There is now an active rollup, so we shouldn't need a rollup.
4128        assert_none!(state.need_rollup(
4129            ROLLUP_THRESHOLD,
4130            ROLLUP_USE_ACTIVE_ROLLUP,
4131            ROLLUP_FALLBACK_THRESHOLD_MS,
4132            current_time
4133        ));
4134
4135        state.seqno = SeqNo(10);
4136        // We still don't need a rollup, even though the seqno is greater than
4137        // the rollup threshold.
4138        assert_none!(state.need_rollup(
4139            ROLLUP_THRESHOLD,
4140            ROLLUP_USE_ACTIVE_ROLLUP,
4141            ROLLUP_FALLBACK_THRESHOLD_MS,
4142            current_time
4143        ));
4144
4145        // But if we wait long enough, we should need a rollup again.
4146        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4147        assert_eq!(
4148            state
4149                .need_rollup(
4150                    ROLLUP_THRESHOLD,
4151                    ROLLUP_USE_ACTIVE_ROLLUP,
4152                    ROLLUP_FALLBACK_THRESHOLD_MS,
4153                    current_time
4154                )
4155                .expect("rollup"),
4156            SeqNo(10)
4157        );
4158
4159        state.seqno = SeqNo(9);
4160        // Clear the active rollup and ensure we need a rollup again.
4161        state.collections.active_rollup = None;
4162        let rollup_seqno = SeqNo(9);
4163        let rollup = HollowRollup {
4164            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4165            encoded_size_bytes: None,
4166        };
4167        assert!(
4168            state
4169                .collections
4170                .add_rollup((rollup_seqno, &rollup))
4171                .is_continue()
4172        );
4173
4174        state.seqno = SeqNo(11);
4175        // We shouldn't need a rollup at seqnos less than our threshold
4176        assert_none!(state.need_rollup(
4177            ROLLUP_THRESHOLD,
4178            ROLLUP_USE_ACTIVE_ROLLUP,
4179            ROLLUP_FALLBACK_THRESHOLD_MS,
4180            current_time
4181        ));
4182        // hit our threshold! we should need a rollup
4183        state.seqno = SeqNo(13);
4184        assert_eq!(
4185            state
4186                .need_rollup(
4187                    ROLLUP_THRESHOLD,
4188                    ROLLUP_USE_ACTIVE_ROLLUP,
4189                    ROLLUP_FALLBACK_THRESHOLD_MS,
4190                    current_time
4191                )
4192                .expect("rollup"),
4193            SeqNo(13)
4194        );
4195    }
4196
4197    #[mz_ore::test]
4198    fn need_rollup_classic() {
4199        const ROLLUP_THRESHOLD: usize = 3;
4200        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4201        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4202        const NOW: u64 = 0;
4203
4204        mz_ore::test::init_logging();
4205        let mut state = TypedState::<String, String, u64, i64>::new(
4206            DUMMY_BUILD_INFO.semver_version(),
4207            ShardId::new(),
4208            "".to_owned(),
4209            0,
4210        );
4211
4212        let rollup_seqno = SeqNo(5);
4213        let rollup = HollowRollup {
4214            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4215            encoded_size_bytes: None,
4216        };
4217
4218        assert!(
4219            state
4220                .collections
4221                .add_rollup((rollup_seqno, &rollup))
4222                .is_continue()
4223        );
4224
4225        // shouldn't need a rollup at the seqno of the rollup
4226        state.seqno = SeqNo(5);
4227        assert_none!(state.need_rollup(
4228            ROLLUP_THRESHOLD,
4229            ROLLUP_USE_ACTIVE_ROLLUP,
4230            ROLLUP_FALLBACK_THRESHOLD_MS,
4231            NOW
4232        ));
4233
4234        // shouldn't need a rollup at seqnos less than our threshold
4235        state.seqno = SeqNo(6);
4236        assert_none!(state.need_rollup(
4237            ROLLUP_THRESHOLD,
4238            ROLLUP_USE_ACTIVE_ROLLUP,
4239            ROLLUP_FALLBACK_THRESHOLD_MS,
4240            NOW
4241        ));
4242        state.seqno = SeqNo(7);
4243        assert_none!(state.need_rollup(
4244            ROLLUP_THRESHOLD,
4245            ROLLUP_USE_ACTIVE_ROLLUP,
4246            ROLLUP_FALLBACK_THRESHOLD_MS,
4247            NOW
4248        ));
4249
4250        // hit our threshold! we should need a rollup
4251        state.seqno = SeqNo(8);
4252        assert_eq!(
4253            state
4254                .need_rollup(
4255                    ROLLUP_THRESHOLD,
4256                    ROLLUP_USE_ACTIVE_ROLLUP,
4257                    ROLLUP_FALLBACK_THRESHOLD_MS,
4258                    NOW
4259                )
4260                .expect("rollup"),
4261            SeqNo(8)
4262        );
4263
4264        // but we don't need rollups for every seqno > the threshold
4265        state.seqno = SeqNo(9);
4266        assert_none!(state.need_rollup(
4267            ROLLUP_THRESHOLD,
4268            ROLLUP_USE_ACTIVE_ROLLUP,
4269            ROLLUP_FALLBACK_THRESHOLD_MS,
4270            NOW
4271        ));
4272
4273        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4274        state.seqno = SeqNo(11);
4275        assert_eq!(
4276            state
4277                .need_rollup(
4278                    ROLLUP_THRESHOLD,
4279                    ROLLUP_USE_ACTIVE_ROLLUP,
4280                    ROLLUP_FALLBACK_THRESHOLD_MS,
4281                    NOW
4282                )
4283                .expect("rollup"),
4284            SeqNo(11)
4285        );
4286
4287        // add another rollup and ensure we're always picking the latest
4288        let rollup_seqno = SeqNo(6);
4289        let rollup = HollowRollup {
4290            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4291            encoded_size_bytes: None,
4292        };
4293        assert!(
4294            state
4295                .collections
4296                .add_rollup((rollup_seqno, &rollup))
4297                .is_continue()
4298        );
4299
4300        state.seqno = SeqNo(8);
4301        assert_none!(state.need_rollup(
4302            ROLLUP_THRESHOLD,
4303            ROLLUP_USE_ACTIVE_ROLLUP,
4304            ROLLUP_FALLBACK_THRESHOLD_MS,
4305            NOW
4306        ));
4307        state.seqno = SeqNo(9);
4308        assert_eq!(
4309            state
4310                .need_rollup(
4311                    ROLLUP_THRESHOLD,
4312                    ROLLUP_USE_ACTIVE_ROLLUP,
4313                    ROLLUP_FALLBACK_THRESHOLD_MS,
4314                    NOW
4315                )
4316                .expect("rollup"),
4317            SeqNo(9)
4318        );
4319
4320        // and ensure that after a fallback point, we assign every seqno work
4321        let fallback_seqno = SeqNo(
4322            rollup_seqno.0
4323                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4324        );
4325        state.seqno = fallback_seqno;
4326        assert_eq!(
4327            state
4328                .need_rollup(
4329                    ROLLUP_THRESHOLD,
4330                    ROLLUP_USE_ACTIVE_ROLLUP,
4331                    ROLLUP_FALLBACK_THRESHOLD_MS,
4332                    NOW
4333                )
4334                .expect("rollup"),
4335            fallback_seqno
4336        );
4337        state.seqno = fallback_seqno.next();
4338        assert_eq!(
4339            state
4340                .need_rollup(
4341                    ROLLUP_THRESHOLD,
4342                    ROLLUP_USE_ACTIVE_ROLLUP,
4343                    ROLLUP_FALLBACK_THRESHOLD_MS,
4344                    NOW
4345                )
4346                .expect("rollup"),
4347            fallback_seqno.next()
4348        );
4349    }
4350
4351    #[mz_ore::test]
4352    fn idempotency_token_sentinel() {
4353        assert_eq!(
4354            IdempotencyToken::SENTINEL.to_string(),
4355            "i11111111-1111-1111-1111-111111111111"
4356        );
4357    }
4358
4359    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4360    /// randomness, so that it's deterministic. This lets us assert the
4361    /// serialization of that State against a golden file that's committed,
4362    /// making it easy to see what the serialization (used in an upcoming
4363    /// INSPECT feature) looks like.
4364    ///
4365    /// This golden will have to be updated each time we change State, but
4366    /// that's a feature, not a bug.
4367    #[mz_ore::test]
4368    #[cfg_attr(miri, ignore)] // too slow
4369    fn state_inspect_serde_json() {
4370        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4371        let mut runner = proptest::test_runner::TestRunner::deterministic();
4372        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4373        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4374        assert_eq!(
4375            json.trim(),
4376            STATE_SERDE_JSON.trim(),
4377            "\n\nNEW GOLDEN\n{}\n",
4378            json
4379        );
4380    }
4381
4382    #[mz_persist_proc::test(tokio::test)]
4383    #[cfg_attr(miri, ignore)] // too slow
4384    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4385        let mut clients = new_test_client_cache(&dyncfgs);
4386        let shard_id = ShardId::new();
4387
4388        async fn open_and_write(
4389            clients: &mut PersistClientCache,
4390            version: semver::Version,
4391            shard_id: ShardId,
4392        ) -> Result<(), tokio::task::JoinError> {
4393            clients.cfg.build_version = version.clone();
4394            clients.clear_state_cache();
4395            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4396            // Run in a task so we can catch the panic.
4397            mz_ore::task::spawn(|| version.to_string(), async move {
4398                let () = client
4399                    .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4400                    .await
4401                    .expect("valid usage");
4402                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4403                let current = *write.upper().as_option().unwrap();
4404                // Do a write so that we tag the state with the version.
4405                write
4406                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4407                    .await;
4408            })
4409            .await
4410        }
4411
4412        // Start at v0.10.0.
4413        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4414        assert_ok!(res);
4415
4416        // Upgrade to v0.11.0 is allowed.
4417        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4418        assert_ok!(res);
4419
4420        // Downgrade to v0.10.0 is no longer allowed.
4421        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4422        assert!(res.unwrap_err().is_panic());
4423
4424        // Downgrade to v0.9.0 is _NOT_ allowed.
4425        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4426        assert!(res.unwrap_err().is_panic());
4427    }
4428
4429    #[mz_ore::test]
4430    fn runid_roundtrip() {
4431        proptest!(|(runid: RunId)| {
4432            let runid_str = runid.to_string();
4433            let parsed = RunId::from_str(&runid_str);
4434            prop_assert_eq!(parsed, Ok(runid));
4435        });
4436    }
4437}