Skip to main content

mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75    env!("OUT_DIR"),
76    "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80    env!("OUT_DIR"),
81    "/mz_persist_client.internal.diff.rs"
82));
83
84/// Determines how often to write rollups, assigning a maintenance task after
85/// `rollup_threshold` seqnos have passed since the last rollup.
86///
87/// Tuning note: in the absence of a long reader seqno hold, and with
88/// incremental GC, this threshold will determine about how many live diffs are
89/// held in Consensus. Lowering this value decreases the live diff count at the
90/// cost of more maintenance work + blob writes.
91pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92    "persist_rollup_threshold",
93    128,
94    "The number of seqnos between rollups.",
95);
96
97/// Determines how long to wait before an active rollup is considered
98/// "stuck" and a new rollup is started.
99pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100    "persist_rollup_fallback_threshold_ms",
101    5000,
102    "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105/// Feature flag the new active rollup tracking mechanism.
106/// We musn't enable this until we are fully deployed on the new version.
107pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108    "persist_rollup_use_active_rollup",
109    true,
110    "Whether to use the new active rollup tracking mechanism.",
111);
112
113/// Determines how long to wait before an active GC is considered
114/// "stuck" and a new GC is started.
115pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116    "persist_gc_fallback_threshold_ms",
117    900000,
118    "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121/// See the config description string.
122pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123    "persist_gc_min_versions",
124    32,
125    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128/// See the config description string.
129pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130    "persist_gc_max_versions",
131    128_000,
132    "The maximum number of versions to GC in a single GC run.",
133);
134
135/// Feature flag the new active GC tracking mechanism.
136/// We musn't enable this until we are fully deployed on the new version.
137pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138    "persist_gc_use_active_gc",
139    false,
140    "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144    "persist_enable_incremental_compaction",
145    false,
146    "Whether to enable incremental compaction.",
147);
148
149/// A token to disambiguate state commands that could not otherwise be
150/// idempotent.
151#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(f, "i{}", Uuid::from_bytes(self.0))
158    }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164    }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168    type Err = String;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172    }
173}
174
175impl From<IdempotencyToken> for String {
176    fn from(x: IdempotencyToken) -> Self {
177        x.to_string()
178    }
179}
180
181impl IdempotencyToken {
182    pub(crate) fn new() -> Self {
183        IdempotencyToken(*Uuid::new_v4().as_bytes())
184    }
185    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190    /// The seqno capability of this reader.
191    pub seqno: SeqNo,
192    /// The since capability of this reader.
193    pub since: Antichain<T>,
194    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
195    pub last_heartbeat_timestamp_ms: u64,
196    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
197    /// after which this reader may be expired
198    pub lease_duration_ms: u64,
199    /// For debugging.
200    pub debug: HandleDebugState,
201}
202
203#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
204#[serde(into = "u64")]
205pub struct OpaqueState(pub [u8; 8]);
206
207impl From<OpaqueState> for u64 {
208    fn from(value: OpaqueState) -> Self {
209        u64::from_le_bytes(value.0)
210    }
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct CriticalReaderState<T> {
215    /// The since capability of this reader.
216    pub since: Antichain<T>,
217    /// An opaque token matched on by compare_and_downgrade_since.
218    pub opaque: OpaqueState,
219    /// The [Codec64] used to encode [Self::opaque].
220    pub opaque_codec: String,
221    /// For debugging.
222    pub debug: HandleDebugState,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize)]
226pub struct WriterState<T> {
227    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
228    pub last_heartbeat_timestamp_ms: u64,
229    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
230    /// after which this writer may be expired
231    pub lease_duration_ms: u64,
232    /// The idempotency token of the most recent successful compare_and_append
233    /// by this writer.
234    pub most_recent_write_token: IdempotencyToken,
235    /// The upper of the most recent successful compare_and_append by this
236    /// writer.
237    pub most_recent_write_upper: Antichain<T>,
238    /// For debugging.
239    pub debug: HandleDebugState,
240}
241
242/// Debugging info for a reader or writer.
243#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
244pub struct HandleDebugState {
245    /// Hostname of the persist user that registered this writer or reader. For
246    /// critical readers, this is the _most recent_ registration.
247    pub hostname: String,
248    /// Plaintext description of this writer or reader's intent.
249    pub purpose: String,
250}
251
252/// Part of the updates in a Batch.
253///
254/// Either a pointer to ones stored in Blob or the updates themselves inlined.
255#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
256#[serde(tag = "type")]
257pub enum BatchPart<T> {
258    Hollow(HollowBatchPart<T>),
259    Inline {
260        updates: LazyInlineBatchPart,
261        ts_rewrite: Option<Antichain<T>>,
262        schema_id: Option<SchemaId>,
263
264        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
265        deprecated_schema_id: Option<SchemaId>,
266    },
267}
268
269fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
270    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
271        let proto = lower.decode()?;
272        let data = ArrayData::from_proto(proto)?;
273        ensure!(data.len() == 1);
274        Ok(ArrayBound::new(make_array(data), 0))
275    };
276
277    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
278
279    match decoded {
280        Ok(bound) => Some(bound),
281        Err(e) => {
282            soft_panic_or_log!("failed to decode bound: {e:#?}");
283            None
284        }
285    }
286}
287
288impl<T> BatchPart<T> {
289    pub fn hollow_bytes(&self) -> usize {
290        match self {
291            BatchPart::Hollow(x) => x.encoded_size_bytes,
292            BatchPart::Inline { .. } => 0,
293        }
294    }
295
296    pub fn is_inline(&self) -> bool {
297        matches!(self, BatchPart::Inline { .. })
298    }
299
300    pub fn inline_bytes(&self) -> usize {
301        match self {
302            BatchPart::Hollow(_) => 0,
303            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
304        }
305    }
306
307    pub fn writer_key(&self) -> Option<WriterKey> {
308        match self {
309            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
310            BatchPart::Inline { .. } => None,
311        }
312    }
313
314    pub fn encoded_size_bytes(&self) -> usize {
315        match self {
316            BatchPart::Hollow(x) => x.encoded_size_bytes,
317            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
318        }
319    }
320
321    // A user-interpretable identifier or description of the part (for logs and
322    // such).
323    pub fn printable_name(&self) -> &str {
324        match self {
325            BatchPart::Hollow(x) => x.key.0.as_str(),
326            BatchPart::Inline { .. } => "<inline>",
327        }
328    }
329
330    pub fn stats(&self) -> Option<&LazyPartStats> {
331        match self {
332            BatchPart::Hollow(x) => x.stats.as_ref(),
333            BatchPart::Inline { .. } => None,
334        }
335    }
336
337    pub fn key_lower(&self) -> &[u8] {
338        match self {
339            BatchPart::Hollow(x) => x.key_lower.as_slice(),
340            // We don't duplicate the lowest key because this can be
341            // considerable overhead for small parts.
342            //
343            // The empty key might not be a tight lower bound, but it is a valid
344            // lower bound. If a caller is interested in a tighter lower bound,
345            // the data is inline.
346            BatchPart::Inline { .. } => &[],
347        }
348    }
349
350    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
351        let part = match self {
352            BatchPart::Hollow(part) => part,
353            BatchPart::Inline { .. } => return None,
354        };
355
356        decode_structured_lower(part.structured_key_lower.as_ref()?)
357    }
358
359    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
360        match self {
361            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
362            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
363        }
364    }
365
366    pub fn schema_id(&self) -> Option<SchemaId> {
367        match self {
368            BatchPart::Hollow(x) => x.schema_id,
369            BatchPart::Inline { schema_id, .. } => *schema_id,
370        }
371    }
372
373    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
374        match self {
375            BatchPart::Hollow(x) => x.deprecated_schema_id,
376            BatchPart::Inline {
377                deprecated_schema_id,
378                ..
379            } => *deprecated_schema_id,
380        }
381    }
382}
383
384impl<T: Timestamp + Codec64> BatchPart<T> {
385    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
386        match self {
387            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
388            BatchPart::Inline { updates, .. } => {
389                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
390                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
391            }
392        }
393    }
394
395    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
396        match self {
397            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
398            BatchPart::Inline { updates, .. } => Some(
399                updates
400                    .decode::<T>(metrics)
401                    .expect("valid inline part")
402                    .updates
403                    .diffs_sum(),
404            ),
405        }
406    }
407}
408
409/// An ordered list of parts, generally stored as part of a larger run.
410#[derive(Debug, Clone)]
411pub struct HollowRun<T> {
412    /// Pointers usable to retrieve the updates.
413    pub(crate) parts: Vec<RunPart<T>>,
414}
415
416/// A reference to a [HollowRun], including the key in the blob store and some denormalized
417/// metadata.
418#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
419pub struct HollowRunRef<T> {
420    pub key: PartialBatchKey,
421
422    /// The size of the referenced run object, plus all of the hollow objects it contains.
423    pub hollow_bytes: usize,
424
425    /// The size of the largest individual part in the run; useful for sizing compaction.
426    pub max_part_bytes: usize,
427
428    /// The lower bound of the data in this part, ordered by the codec ordering.
429    pub key_lower: Vec<u8>,
430
431    /// The lower bound of the data in this part, ordered by the structured ordering.
432    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
433
434    pub diffs_sum: Option<[u8; 8]>,
435
436    pub(crate) _phantom_data: PhantomData<T>,
437}
438impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
439    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
440        Some(self.cmp(other))
441    }
442}
443
444impl<T: Eq> Ord for HollowRunRef<T> {
445    fn cmp(&self, other: &Self) -> Ordering {
446        self.key.cmp(&other.key)
447    }
448}
449
450impl<T> HollowRunRef<T> {
451    pub fn writer_key(&self) -> Option<WriterKey> {
452        Some(self.key.split()?.0)
453    }
454}
455
456impl<T: Timestamp + Codec64> HollowRunRef<T> {
457    /// Stores the given runs and returns a [HollowRunRef] that points to them.
458    pub async fn set<D: Codec64 + Monoid>(
459        shard_id: ShardId,
460        blob: &dyn Blob,
461        writer: &WriterKey,
462        data: HollowRun<T>,
463        metrics: &Metrics,
464    ) -> Self {
465        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
466        let max_part_bytes = data
467            .parts
468            .iter()
469            .map(|p| p.max_part_bytes())
470            .max()
471            .unwrap_or(0);
472        let key_lower = data
473            .parts
474            .first()
475            .map_or(vec![], |p| p.key_lower().to_vec());
476        let structured_key_lower = match data.parts.first() {
477            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
478            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
479            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
480        };
481        let diffs_sum = data
482            .parts
483            .iter()
484            .map(|p| {
485                p.diffs_sum::<D>(&metrics.columnar)
486                    .expect("valid diffs sum")
487            })
488            .reduce(|mut a, b| {
489                a.plus_equals(&b);
490                a
491            })
492            .expect("valid diffs sum")
493            .encode();
494
495        let key = PartialBatchKey::new(writer, &PartId::new());
496        let blob_key = key.complete(&shard_id);
497        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
498        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
499            blob.set(&blob_key, bytes.clone())
500        })
501        .await;
502        Self {
503            key,
504            hollow_bytes,
505            max_part_bytes,
506            key_lower,
507            structured_key_lower,
508            diffs_sum: Some(diffs_sum),
509            _phantom_data: Default::default(),
510        }
511    }
512
513    /// Retrieve the [HollowRun] that this reference points to.
514    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
515    /// with the same shard id and backing store.
516    pub async fn get(
517        &self,
518        shard_id: ShardId,
519        blob: &dyn Blob,
520        metrics: &Metrics,
521    ) -> Option<HollowRun<T>> {
522        let blob_key = self.key.complete(&shard_id);
523        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
524            blob.get(&blob_key)
525        })
526        .await?;
527        let proto_runs: ProtoHollowRun =
528            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
529        let runs = proto_runs
530            .into_rust()
531            .expect("illegal state: invalid encoded runs proto");
532        Some(runs)
533    }
534}
535
536/// Part of the updates in a run.
537///
538/// Either a pointer to ones stored in Blob or a single part stored inline.
539#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
540#[serde(untagged)]
541pub enum RunPart<T> {
542    Single(BatchPart<T>),
543    Many(HollowRunRef<T>),
544}
545
546impl<T: Ord> PartialOrd<Self> for RunPart<T> {
547    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
548        Some(self.cmp(other))
549    }
550}
551
552impl<T: Ord> Ord for RunPart<T> {
553    fn cmp(&self, other: &Self) -> Ordering {
554        match (self, other) {
555            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
556            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
557            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
558            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
559        }
560    }
561}
562
563impl<T> RunPart<T> {
564    #[cfg(test)]
565    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
566        match self {
567            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
568            _ => panic!("expected hollow part!"),
569        }
570    }
571
572    pub fn hollow_bytes(&self) -> usize {
573        match self {
574            Self::Single(p) => p.hollow_bytes(),
575            Self::Many(r) => r.hollow_bytes,
576        }
577    }
578
579    pub fn is_inline(&self) -> bool {
580        match self {
581            Self::Single(p) => p.is_inline(),
582            Self::Many(_) => false,
583        }
584    }
585
586    pub fn inline_bytes(&self) -> usize {
587        match self {
588            Self::Single(p) => p.inline_bytes(),
589            Self::Many(_) => 0,
590        }
591    }
592
593    pub fn max_part_bytes(&self) -> usize {
594        match self {
595            Self::Single(p) => p.encoded_size_bytes(),
596            Self::Many(r) => r.max_part_bytes,
597        }
598    }
599
600    pub fn writer_key(&self) -> Option<WriterKey> {
601        match self {
602            Self::Single(p) => p.writer_key(),
603            Self::Many(r) => r.writer_key(),
604        }
605    }
606
607    pub fn encoded_size_bytes(&self) -> usize {
608        match self {
609            Self::Single(p) => p.encoded_size_bytes(),
610            Self::Many(r) => r.hollow_bytes,
611        }
612    }
613
614    pub fn schema_id(&self) -> Option<SchemaId> {
615        match self {
616            Self::Single(p) => p.schema_id(),
617            Self::Many(_) => None,
618        }
619    }
620
621    // A user-interpretable identifier or description of the part (for logs and
622    // such).
623    pub fn printable_name(&self) -> &str {
624        match self {
625            Self::Single(p) => p.printable_name(),
626            Self::Many(r) => r.key.0.as_str(),
627        }
628    }
629
630    pub fn stats(&self) -> Option<&LazyPartStats> {
631        match self {
632            Self::Single(p) => p.stats(),
633            // TODO: if we kept stats we could avoid fetching the metadata here.
634            Self::Many(_) => None,
635        }
636    }
637
638    pub fn key_lower(&self) -> &[u8] {
639        match self {
640            Self::Single(p) => p.key_lower(),
641            Self::Many(r) => r.key_lower.as_slice(),
642        }
643    }
644
645    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
646        match self {
647            Self::Single(p) => p.structured_key_lower(),
648            Self::Many(_) => None,
649        }
650    }
651
652    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
653        match self {
654            Self::Single(p) => p.ts_rewrite(),
655            Self::Many(_) => None,
656        }
657    }
658}
659
660impl<T> RunPart<T>
661where
662    T: Timestamp + Codec64,
663{
664    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
665        match self {
666            Self::Single(p) => p.diffs_sum(metrics),
667            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
668        }
669    }
670}
671
672/// A blob was missing!
673#[derive(Clone, Debug)]
674pub struct MissingBlob(BlobKey);
675
676impl std::fmt::Display for MissingBlob {
677    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678        write!(f, "unexpectedly missing key: {}", self.0)
679    }
680}
681
682impl std::error::Error for MissingBlob {}
683
684impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
685    pub fn part_stream<'a>(
686        &'a self,
687        shard_id: ShardId,
688        blob: &'a dyn Blob,
689        metrics: &'a Metrics,
690    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
691        try_stream! {
692            match self {
693                RunPart::Single(p) => {
694                    yield Cow::Borrowed(p);
695                }
696                RunPart::Many(r) => {
697                    let fetched = r.get(shard_id, blob, metrics).await
698                        .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
699                    for run_part in fetched.parts {
700                        for await batch_part in
701                            run_part.part_stream(shard_id, blob, metrics).boxed()
702                        {
703                            yield Cow::Owned(batch_part?.into_owned());
704                        }
705                    }
706                }
707            }
708        }
709    }
710}
711
712impl<T: Ord> PartialOrd for BatchPart<T> {
713    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
714        Some(self.cmp(other))
715    }
716}
717
718impl<T: Ord> Ord for BatchPart<T> {
719    fn cmp(&self, other: &Self) -> Ordering {
720        match (self, other) {
721            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
722            (
723                BatchPart::Inline {
724                    updates: s_updates,
725                    ts_rewrite: s_ts_rewrite,
726                    schema_id: s_schema_id,
727                    deprecated_schema_id: s_deprecated_schema_id,
728                },
729                BatchPart::Inline {
730                    updates: o_updates,
731                    ts_rewrite: o_ts_rewrite,
732                    schema_id: o_schema_id,
733                    deprecated_schema_id: o_deprecated_schema_id,
734                },
735            ) => (
736                s_updates,
737                s_ts_rewrite.as_ref().map(|x| x.elements()),
738                s_schema_id,
739                s_deprecated_schema_id,
740            )
741                .cmp(&(
742                    o_updates,
743                    o_ts_rewrite.as_ref().map(|x| x.elements()),
744                    o_schema_id,
745                    o_deprecated_schema_id,
746                )),
747            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
748            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
749        }
750    }
751}
752
753/// What order are the parts in this run in?
754#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
755pub(crate) enum RunOrder {
756    /// They're in no particular order.
757    Unordered,
758    /// They're ordered based on the codec-encoded K/V bytes.
759    Codec,
760    /// They're ordered by the natural ordering of the structured data.
761    Structured,
762}
763
764#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
765pub struct RunId(pub(crate) [u8; 16]);
766
767impl std::fmt::Display for RunId {
768    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
769        write!(f, "ri{}", Uuid::from_bytes(self.0))
770    }
771}
772
773impl std::fmt::Debug for RunId {
774    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
775        write!(f, "RunId({})", Uuid::from_bytes(self.0))
776    }
777}
778
779impl std::str::FromStr for RunId {
780    type Err = String;
781
782    fn from_str(s: &str) -> Result<Self, Self::Err> {
783        parse_id("ri", "RunId", s).map(RunId)
784    }
785}
786
787impl From<RunId> for String {
788    fn from(x: RunId) -> Self {
789        x.to_string()
790    }
791}
792
793impl RunId {
794    pub(crate) fn new() -> Self {
795        RunId(*Uuid::new_v4().as_bytes())
796    }
797}
798
799impl Arbitrary for RunId {
800    type Parameters = ();
801    type Strategy = proptest::strategy::BoxedStrategy<Self>;
802    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
803        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
804            RunId(*Uuid::from_u128(n).as_bytes())
805        })
806        .boxed()
807    }
808}
809
810/// Metadata shared across a run.
811#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
812pub struct RunMeta {
813    /// If none, Persist should infer the order based on the proto metadata.
814    pub(crate) order: Option<RunOrder>,
815    /// All parts in a run should have the same schema.
816    pub(crate) schema: Option<SchemaId>,
817
818    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
819    pub(crate) deprecated_schema: Option<SchemaId>,
820
821    /// If set, a UUID that uniquely identifies this run.
822    pub(crate) id: Option<RunId>,
823
824    /// The number of updates in this run, or `None` if the number is unknown.
825    pub(crate) len: Option<usize>,
826
827    /// Additional unstructured metadata.
828    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
829    pub(crate) meta: MetadataMap,
830}
831
832/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
833#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
834pub struct HollowBatchPart<T> {
835    /// Pointer usable to retrieve the updates.
836    pub key: PartialBatchKey,
837    /// Miscellaneous metadata.
838    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
839    pub meta: MetadataMap,
840    /// The encoded size of this part.
841    pub encoded_size_bytes: usize,
842    /// A lower bound on the keys in the part. (By default, this the minimum
843    /// possible key: `vec![]`.)
844    #[serde(serialize_with = "serialize_part_bytes")]
845    pub key_lower: Vec<u8>,
846    /// A lower bound on the keys in the part, stored as structured data.
847    #[serde(serialize_with = "serialize_lazy_proto")]
848    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
849    /// Aggregate statistics about data contained in this part.
850    #[serde(serialize_with = "serialize_part_stats")]
851    pub stats: Option<LazyPartStats>,
852    /// A frontier to which timestamps in this part are advanced on read, if
853    /// set.
854    ///
855    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
856    /// but we maintain the distinction between the two for some internal sanity
857    /// checking of invariants as well as metrics. If this ever becomes an
858    /// issue, everything still works with this as just `Antichain<T>`.
859    pub ts_rewrite: Option<Antichain<T>>,
860    /// A Codec64 encoded sum of all diffs in this part, if known.
861    ///
862    /// This is `None` if this part was written before we started storing this
863    /// information, or if it was written when the dyncfg was off.
864    ///
865    /// It could also make sense to model this as part of the pushdown stats, if
866    /// we later decide that's of some benefit.
867    #[serde(serialize_with = "serialize_diffs_sum")]
868    pub diffs_sum: Option<[u8; 8]>,
869    /// Columnar format that this batch was written in.
870    ///
871    /// This is `None` if this part was written before we started writing structured
872    /// columnar data.
873    pub format: Option<BatchColumnarFormat>,
874    /// The schemas used to encode the data in this batch part.
875    ///
876    /// Or None for historical data written before the schema registry was
877    /// added.
878    pub schema_id: Option<SchemaId>,
879
880    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
881    pub deprecated_schema_id: Option<SchemaId>,
882}
883
884/// A [Batch] but with the updates themselves stored externally.
885///
886/// [Batch]: differential_dataflow::trace::BatchReader
887#[derive(Clone, PartialEq, Eq)]
888pub struct HollowBatch<T> {
889    /// Describes the times of the updates in the batch.
890    pub desc: Description<T>,
891    /// The number of updates in the batch.
892    pub len: usize,
893    /// Pointers usable to retrieve the updates.
894    pub(crate) parts: Vec<RunPart<T>>,
895    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
896    /// ex.
897    /// ```text
898    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
899    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
900    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
901    /// ```
902    pub(crate) run_splits: Vec<usize>,
903    /// Run-level metadata: the first entry has metadata for the first run, and so on.
904    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
905    pub(crate) run_meta: Vec<RunMeta>,
906}
907
908impl<T: Debug> Debug for HollowBatch<T> {
909    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
910        let HollowBatch {
911            desc,
912            parts,
913            len,
914            run_splits: runs,
915            run_meta,
916        } = self;
917        f.debug_struct("HollowBatch")
918            .field(
919                "desc",
920                &(
921                    desc.lower().elements(),
922                    desc.upper().elements(),
923                    desc.since().elements(),
924                ),
925            )
926            .field("parts", &parts)
927            .field("len", &len)
928            .field("runs", &runs)
929            .field("run_meta", &run_meta)
930            .finish()
931    }
932}
933
934impl<T: Serialize> serde::Serialize for HollowBatch<T> {
935    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
936        let HollowBatch {
937            desc,
938            len,
939            // Both parts and runs are covered by the self.runs call.
940            parts: _,
941            run_splits: _,
942            run_meta: _,
943        } = self;
944        let mut s = s.serialize_struct("HollowBatch", 5)?;
945        let () = s.serialize_field("lower", &desc.lower().elements())?;
946        let () = s.serialize_field("upper", &desc.upper().elements())?;
947        let () = s.serialize_field("since", &desc.since().elements())?;
948        let () = s.serialize_field("len", len)?;
949        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
950        s.end()
951    }
952}
953
954impl<T: Ord> PartialOrd for HollowBatch<T> {
955    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
956        Some(self.cmp(other))
957    }
958}
959
960impl<T: Ord> Ord for HollowBatch<T> {
961    fn cmp(&self, other: &Self) -> Ordering {
962        // Deconstruct self and other so we get a compile failure if new fields
963        // are added.
964        let HollowBatch {
965            desc: self_desc,
966            parts: self_parts,
967            len: self_len,
968            run_splits: self_runs,
969            run_meta: self_run_meta,
970        } = self;
971        let HollowBatch {
972            desc: other_desc,
973            parts: other_parts,
974            len: other_len,
975            run_splits: other_runs,
976            run_meta: other_run_meta,
977        } = other;
978        (
979            self_desc.lower().elements(),
980            self_desc.upper().elements(),
981            self_desc.since().elements(),
982            self_parts,
983            self_len,
984            self_runs,
985            self_run_meta,
986        )
987            .cmp(&(
988                other_desc.lower().elements(),
989                other_desc.upper().elements(),
990                other_desc.since().elements(),
991                other_parts,
992                other_len,
993                other_runs,
994                other_run_meta,
995            ))
996    }
997}
998
999impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
1000    pub(crate) fn part_stream<'a>(
1001        &'a self,
1002        shard_id: ShardId,
1003        blob: &'a dyn Blob,
1004        metrics: &'a Metrics,
1005    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
1006        stream! {
1007            for part in &self.parts {
1008                for await part in part.part_stream(shard_id, blob, metrics) {
1009                    yield part;
1010                }
1011            }
1012        }
1013    }
1014}
1015impl<T> HollowBatch<T> {
1016    /// Construct an in-memory hollow batch from the given metadata.
1017    ///
1018    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1019    /// is responsible for ensuring that the defined runs are valid.
1020    ///
1021    /// `len` should represent the number of valid updates in the referenced parts.
1022    pub(crate) fn new(
1023        desc: Description<T>,
1024        parts: Vec<RunPart<T>>,
1025        len: usize,
1026        run_meta: Vec<RunMeta>,
1027        run_splits: Vec<usize>,
1028    ) -> Self {
1029        debug_assert!(
1030            run_splits.is_strictly_sorted(),
1031            "run indices should be strictly increasing"
1032        );
1033        debug_assert!(
1034            run_splits.first().map_or(true, |i| *i > 0),
1035            "run indices should be positive"
1036        );
1037        debug_assert!(
1038            run_splits.last().map_or(true, |i| *i < parts.len()),
1039            "run indices should be valid indices into parts"
1040        );
1041        debug_assert!(
1042            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1043            "all metadata should correspond to a run"
1044        );
1045
1046        Self {
1047            desc,
1048            len,
1049            parts,
1050            run_splits,
1051            run_meta,
1052        }
1053    }
1054
1055    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1056    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1057        let run_meta = if parts.is_empty() {
1058            vec![]
1059        } else {
1060            vec![RunMeta::default()]
1061        };
1062        Self {
1063            desc,
1064            len,
1065            parts,
1066            run_splits: vec![],
1067            run_meta,
1068        }
1069    }
1070
1071    #[cfg(test)]
1072    pub(crate) fn new_run_for_test(
1073        desc: Description<T>,
1074        parts: Vec<RunPart<T>>,
1075        len: usize,
1076        run_id: RunId,
1077    ) -> Self {
1078        let run_meta = if parts.is_empty() {
1079            vec![]
1080        } else {
1081            let mut meta = RunMeta::default();
1082            meta.id = Some(run_id);
1083            vec![meta]
1084        };
1085        Self {
1086            desc,
1087            len,
1088            parts,
1089            run_splits: vec![],
1090            run_meta,
1091        }
1092    }
1093
1094    /// An empty hollow batch, representing no updates over the given desc.
1095    pub(crate) fn empty(desc: Description<T>) -> Self {
1096        Self {
1097            desc,
1098            len: 0,
1099            parts: vec![],
1100            run_splits: vec![],
1101            run_meta: vec![],
1102        }
1103    }
1104
1105    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1106        let run_ends = self
1107            .run_splits
1108            .iter()
1109            .copied()
1110            .chain(std::iter::once(self.parts.len()));
1111        let run_metas = self.run_meta.iter();
1112        let run_parts = run_ends
1113            .scan(0, |start, end| {
1114                let range = *start..end;
1115                *start = end;
1116                Some(range)
1117            })
1118            .filter(|range| !range.is_empty())
1119            .map(|range| &self.parts[range]);
1120        run_metas.zip_eq(run_parts)
1121    }
1122
1123    pub(crate) fn inline_bytes(&self) -> usize {
1124        self.parts.iter().map(|x| x.inline_bytes()).sum()
1125    }
1126
1127    pub(crate) fn is_empty(&self) -> bool {
1128        self.parts.is_empty()
1129    }
1130
1131    pub(crate) fn part_count(&self) -> usize {
1132        self.parts.len()
1133    }
1134
1135    /// The sum of the encoded sizes of all parts in the batch.
1136    pub fn encoded_size_bytes(&self) -> usize {
1137        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1138    }
1139}
1140
1141// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1142impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1143    pub(crate) fn rewrite_ts(
1144        &mut self,
1145        frontier: &Antichain<T>,
1146        new_upper: Antichain<T>,
1147    ) -> Result<(), String> {
1148        if !PartialOrder::less_than(frontier, &new_upper) {
1149            return Err(format!(
1150                "rewrite frontier {:?} !< rewrite upper {:?}",
1151                frontier.elements(),
1152                new_upper.elements(),
1153            ));
1154        }
1155        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1156            return Err(format!(
1157                "rewrite upper {:?} < batch upper {:?}",
1158                new_upper.elements(),
1159                self.desc.upper().elements(),
1160            ));
1161        }
1162
1163        // The following are things that it seems like we could support, but
1164        // initially we don't because we don't have a use case for them.
1165        if PartialOrder::less_than(frontier, self.desc.lower()) {
1166            return Err(format!(
1167                "rewrite frontier {:?} < batch lower {:?}",
1168                frontier.elements(),
1169                self.desc.lower().elements(),
1170            ));
1171        }
1172        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1173            return Err(format!(
1174                "batch since {:?} != minimum antichain {:?}",
1175                self.desc.since().elements(),
1176                &[T::minimum()],
1177            ));
1178        }
1179        for part in self.parts.iter() {
1180            let Some(ts_rewrite) = part.ts_rewrite() else {
1181                continue;
1182            };
1183            if PartialOrder::less_than(frontier, ts_rewrite) {
1184                return Err(format!(
1185                    "rewrite frontier {:?} < batch rewrite {:?}",
1186                    frontier.elements(),
1187                    ts_rewrite.elements(),
1188                ));
1189            }
1190        }
1191
1192        self.desc = Description::new(
1193            self.desc.lower().clone(),
1194            new_upper,
1195            self.desc.since().clone(),
1196        );
1197        for part in &mut self.parts {
1198            match part {
1199                RunPart::Single(BatchPart::Hollow(part)) => {
1200                    part.ts_rewrite = Some(frontier.clone())
1201                }
1202                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1203                    *ts_rewrite = Some(frontier.clone())
1204                }
1205                RunPart::Many(runs) => {
1206                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1207                    // ever generate runs of >1 part for those.
1208                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1209                }
1210            }
1211        }
1212        Ok(())
1213    }
1214}
1215
1216impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1217    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1218        Some(self.cmp(other))
1219    }
1220}
1221
1222impl<T: Ord> Ord for HollowBatchPart<T> {
1223    fn cmp(&self, other: &Self) -> Ordering {
1224        // Deconstruct self and other so we get a compile failure if new fields
1225        // are added.
1226        let HollowBatchPart {
1227            key: self_key,
1228            meta: self_meta,
1229            encoded_size_bytes: self_encoded_size_bytes,
1230            key_lower: self_key_lower,
1231            structured_key_lower: self_structured_key_lower,
1232            stats: self_stats,
1233            ts_rewrite: self_ts_rewrite,
1234            diffs_sum: self_diffs_sum,
1235            format: self_format,
1236            schema_id: self_schema_id,
1237            deprecated_schema_id: self_deprecated_schema_id,
1238        } = self;
1239        let HollowBatchPart {
1240            key: other_key,
1241            meta: other_meta,
1242            encoded_size_bytes: other_encoded_size_bytes,
1243            key_lower: other_key_lower,
1244            structured_key_lower: other_structured_key_lower,
1245            stats: other_stats,
1246            ts_rewrite: other_ts_rewrite,
1247            diffs_sum: other_diffs_sum,
1248            format: other_format,
1249            schema_id: other_schema_id,
1250            deprecated_schema_id: other_deprecated_schema_id,
1251        } = other;
1252        (
1253            self_key,
1254            self_meta,
1255            self_encoded_size_bytes,
1256            self_key_lower,
1257            self_structured_key_lower,
1258            self_stats,
1259            self_ts_rewrite.as_ref().map(|x| x.elements()),
1260            self_diffs_sum,
1261            self_format,
1262            self_schema_id,
1263            self_deprecated_schema_id,
1264        )
1265            .cmp(&(
1266                other_key,
1267                other_meta,
1268                other_encoded_size_bytes,
1269                other_key_lower,
1270                other_structured_key_lower,
1271                other_stats,
1272                other_ts_rewrite.as_ref().map(|x| x.elements()),
1273                other_diffs_sum,
1274                other_format,
1275                other_schema_id,
1276                other_deprecated_schema_id,
1277            ))
1278    }
1279}
1280
1281/// A pointer to a rollup stored externally.
1282#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1283pub struct HollowRollup {
1284    /// Pointer usable to retrieve the rollup.
1285    pub key: PartialRollupKey,
1286    /// The encoded size of this rollup, if known.
1287    pub encoded_size_bytes: Option<usize>,
1288}
1289
1290/// A pointer to a blob stored externally.
1291#[derive(Debug)]
1292pub enum HollowBlobRef<'a, T> {
1293    Batch(&'a HollowBatch<T>),
1294    Rollup(&'a HollowRollup),
1295}
1296
1297/// A rollup that is currently being computed.
1298#[derive(
1299    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1300)]
1301pub struct ActiveRollup {
1302    pub seqno: SeqNo,
1303    pub start_ms: u64,
1304}
1305
1306/// A garbage collection request that is currently being computed.
1307#[derive(
1308    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1309)]
1310pub struct ActiveGc {
1311    pub seqno: SeqNo,
1312    pub start_ms: u64,
1313}
1314
1315/// A sentinel for a state transition that was a no-op.
1316///
1317/// Critically, this also indicates that the no-op state transition was not
1318/// committed through compare_and_append and thus is _not linearized_.
1319#[derive(Debug)]
1320#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1321pub struct NoOpStateTransition<T>(pub T);
1322
1323// TODO: Document invariants.
1324#[derive(Debug, Clone)]
1325#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1326pub struct StateCollections<T> {
1327    /// The version of this state. This is typically identical to the version of the code
1328    /// that wrote it, but may diverge during 0dt upgrades and similar operations when a
1329    /// new version of code is intentionally interoperating with an older state format.
1330    pub(crate) version: Version,
1331
1332    // - Invariant: `<= all reader.since`
1333    // - Invariant: Doesn't regress across state versions.
1334    pub(crate) last_gc_req: SeqNo,
1335
1336    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1337    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1338
1339    /// The rollup that is currently being computed.
1340    pub(crate) active_rollup: Option<ActiveRollup>,
1341    /// The gc request that is currently being computed.
1342    pub(crate) active_gc: Option<ActiveGc>,
1343
1344    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1345    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1346    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1347    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1348
1349    // - Invariant: `trace.since == meet(all reader.since)`
1350    // - Invariant: `trace.since` doesn't regress across state versions.
1351    // - Invariant: `trace.upper` doesn't regress across state versions.
1352    // - Invariant: `trace` upholds its own invariants.
1353    pub(crate) trace: Trace<T>,
1354}
1355
1356/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1357///
1358/// This strategy of directly serializing the schema objects requires that
1359/// persist users do the right thing. Specifically, that an encoded schema
1360/// doesn't in some later version of mz decode to an in-mem object that acts
1361/// differently. In a sense, the current system (before the introduction of the
1362/// schema registry) where schemas are passed in unchecked to reader and writer
1363/// registration calls also has the same defect, so seems fine.
1364///
1365/// An alternative is to write down here some persist-specific representation of
1366/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1367/// the potential to lead down a similar failure mode to the mz_persist_types
1368/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1369/// can always migrate later by rehydrating these, seems fine to start with the
1370/// easy thing.
1371#[derive(Debug, Clone, Serialize, PartialEq)]
1372pub struct EncodedSchemas {
1373    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1374    pub key: Bytes,
1375    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1376    /// registered, encoded as a `ProtoDataType`.
1377    pub key_data_type: Bytes,
1378    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1379    pub val: Bytes,
1380    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1381    /// registered, encoded as a `ProtoDataType`.
1382    pub val_data_type: Bytes,
1383}
1384
1385impl EncodedSchemas {
1386    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1387        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1388        DataType::from_proto(proto).expect("valid DataType")
1389    }
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum CompareAndAppendBreak<T> {
1395    AlreadyCommitted,
1396    Upper {
1397        shard_upper: Antichain<T>,
1398        writer_upper: Antichain<T>,
1399    },
1400    InvalidUsage(InvalidUsage<T>),
1401    InlineBackpressure,
1402}
1403
1404#[derive(Debug)]
1405#[cfg_attr(test, derive(PartialEq))]
1406pub enum SnapshotErr<T> {
1407    AsOfNotYetAvailable(SeqNo, Upper<T>),
1408    AsOfHistoricalDistinctionsLost(Since<T>),
1409}
1410
1411impl<T> StateCollections<T>
1412where
1413    T: Timestamp + Lattice + Codec64,
1414{
1415    pub fn add_rollup(
1416        &mut self,
1417        add_rollup: (SeqNo, &HollowRollup),
1418    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1419        let (rollup_seqno, rollup) = add_rollup;
1420        let applied = match self.rollups.get(&rollup_seqno) {
1421            Some(x) => x.key == rollup.key,
1422            None => {
1423                self.active_rollup = None;
1424                self.rollups.insert(rollup_seqno, rollup.to_owned());
1425                true
1426            }
1427        };
1428        // This state transition is a no-op if applied is false but we
1429        // still commit the state change so that this gets linearized
1430        // (maybe we're looking at old state).
1431        Continue(applied)
1432    }
1433
1434    pub fn remove_rollups(
1435        &mut self,
1436        remove_rollups: &[(SeqNo, PartialRollupKey)],
1437    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1438        if remove_rollups.is_empty() || self.is_tombstone() {
1439            return Break(NoOpStateTransition(vec![]));
1440        }
1441
1442        //This state transition is called at the end of the GC process, so we
1443        //need to unset the `active_gc` field.
1444        self.active_gc = None;
1445
1446        let mut removed = vec![];
1447        for (seqno, key) in remove_rollups {
1448            let removed_key = self.rollups.remove(seqno);
1449            debug_assert!(
1450                removed_key.as_ref().map_or(true, |x| &x.key == key),
1451                "{} vs {:?}",
1452                key,
1453                removed_key
1454            );
1455
1456            if removed_key.is_some() {
1457                removed.push(*seqno);
1458            }
1459        }
1460
1461        Continue(removed)
1462    }
1463
1464    pub fn register_leased_reader(
1465        &mut self,
1466        hostname: &str,
1467        reader_id: &LeasedReaderId,
1468        purpose: &str,
1469        seqno: SeqNo,
1470        lease_duration: Duration,
1471        heartbeat_timestamp_ms: u64,
1472        use_critical_since: bool,
1473    ) -> ControlFlow<
1474        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1475        (LeasedReaderState<T>, SeqNo),
1476    > {
1477        let since = if use_critical_since {
1478            self.critical_since()
1479                .unwrap_or_else(|| self.trace.since().clone())
1480        } else {
1481            self.trace.since().clone()
1482        };
1483        let reader_state = LeasedReaderState {
1484            debug: HandleDebugState {
1485                hostname: hostname.to_owned(),
1486                purpose: purpose.to_owned(),
1487            },
1488            seqno,
1489            since,
1490            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1491            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1492                .expect("lease duration as millis should fit within u64"),
1493        };
1494
1495        // If the shard-global upper and since are both the empty antichain,
1496        // then no further writes can ever commit and no further reads can be
1497        // served. Optimize this by no-op-ing reader registration so that we can
1498        // settle the shard into a final unchanging tombstone state.
1499        if self.is_tombstone() {
1500            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1501        }
1502
1503        // TODO: Handle if the reader or writer already exists.
1504        self.leased_readers
1505            .insert(reader_id.clone(), reader_state.clone());
1506        Continue((reader_state, self.seqno_since(seqno)))
1507    }
1508
1509    pub fn register_critical_reader<O: Opaque + Codec64>(
1510        &mut self,
1511        hostname: &str,
1512        reader_id: &CriticalReaderId,
1513        purpose: &str,
1514    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1515        let state = CriticalReaderState {
1516            debug: HandleDebugState {
1517                hostname: hostname.to_owned(),
1518                purpose: purpose.to_owned(),
1519            },
1520            since: self.trace.since().clone(),
1521            opaque: OpaqueState(Codec64::encode(&O::initial())),
1522            opaque_codec: O::codec_name(),
1523        };
1524
1525        // We expire all readers if the upper and since both advance to the
1526        // empty antichain. Gracefully handle this. At the same time,
1527        // short-circuit the cmd application so we don't needlessly create new
1528        // SeqNos.
1529        if self.is_tombstone() {
1530            return Break(NoOpStateTransition(state));
1531        }
1532
1533        let state = match self.critical_readers.get_mut(reader_id) {
1534            Some(existing_state) => {
1535                existing_state.debug = state.debug;
1536                existing_state.clone()
1537            }
1538            None => {
1539                self.critical_readers
1540                    .insert(reader_id.clone(), state.clone());
1541                state
1542            }
1543        };
1544        Continue(state)
1545    }
1546
1547    pub fn register_schema<K: Codec, V: Codec>(
1548        &mut self,
1549        key_schema: &K::Schema,
1550        val_schema: &V::Schema,
1551    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1552        fn encode_data_type(data_type: &DataType) -> Bytes {
1553            let proto = data_type.into_proto();
1554            prost::Message::encode_to_vec(&proto).into()
1555        }
1556
1557        // Look for an existing registered SchemaId for these schemas.
1558        //
1559        // The common case is that this should be a recent one, so as a minor
1560        // optimization, do this search in reverse order.
1561        //
1562        // TODO: Note that this impl is `O(schemas)`. Combined with the
1563        // possibility of cmd retries, it's possible but unlikely for this to
1564        // get expensive. We could maintain a reverse map to speed this up in
1565        // necessary. This would either need to work on the encoded
1566        // representation (which, we'd have to fall back to the linear scan) or
1567        // we'd need to add a Hash/Ord bound to Schema.
1568        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1569            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1570        });
1571        match existing_id {
1572            Some((schema_id, _)) => {
1573                // TODO: Validate that the decoded schemas still produce records
1574                // of the recorded DataType, to detect shenanigans. Probably
1575                // best to wait until we've turned on Schema2 in prod and thus
1576                // committed to the current mappings.
1577                Break(NoOpStateTransition(Some(*schema_id)))
1578            }
1579            None if self.is_tombstone() => {
1580                // TODO: Is this right?
1581                Break(NoOpStateTransition(None))
1582            }
1583            None if self.schemas.is_empty() => {
1584                // We'll have to do something more sophisticated here to
1585                // generate the next id if/when we start supporting the removal
1586                // of schemas.
1587                let id = SchemaId(self.schemas.len());
1588                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1589                    .expect("valid key schema");
1590                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1591                    .expect("valid val schema");
1592                let prev = self.schemas.insert(
1593                    id,
1594                    EncodedSchemas {
1595                        key: K::encode_schema(key_schema),
1596                        key_data_type: encode_data_type(&key_data_type),
1597                        val: V::encode_schema(val_schema),
1598                        val_data_type: encode_data_type(&val_data_type),
1599                    },
1600                );
1601                assert_eq!(prev, None);
1602                Continue(Some(id))
1603            }
1604            None => {
1605                info!(
1606                    "register_schemas got {:?} expected {:?}",
1607                    key_schema,
1608                    self.schemas
1609                        .iter()
1610                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1611                        .collect::<Vec<_>>()
1612                );
1613                // Until we implement persist schema changes, only allow at most
1614                // one registered schema.
1615                Break(NoOpStateTransition(None))
1616            }
1617        }
1618    }
1619
1620    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1621        &mut self,
1622        expected: SchemaId,
1623        key_schema: &K::Schema,
1624        val_schema: &V::Schema,
1625    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1626        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1627            // To be defensive, create an empty batch and inspect the resulting
1628            // data type (as opposed to something like allowing the `Schema` to
1629            // declare the DataType).
1630            let array = Schema::encoder(schema).expect("valid schema").finish();
1631            Array::data_type(&array).clone()
1632        }
1633
1634        let (current_id, current) = self
1635            .schemas
1636            .last_key_value()
1637            .expect("all shards have a schema");
1638        if *current_id != expected {
1639            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1640                schema_id: *current_id,
1641                key: K::decode_schema(&current.key),
1642                val: V::decode_schema(&current.val),
1643            }));
1644        }
1645
1646        let current_key = K::decode_schema(&current.key);
1647        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1648        let current_val = V::decode_schema(&current.val);
1649        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1650
1651        let key_dt = data_type(key_schema);
1652        let val_dt = data_type(val_schema);
1653
1654        // If the schema is exactly the same as the current one, no-op.
1655        if current_key == *key_schema
1656            && current_key_dt == key_dt
1657            && current_val == *val_schema
1658            && current_val_dt == val_dt
1659        {
1660            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1661        }
1662
1663        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1664        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1665        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1666            return Break(NoOpStateTransition(CaESchema::Incompatible));
1667        };
1668        // Persist initially disallows dropping columns. This would require a
1669        // bunch more work (e.g. not safe to use the latest schema in
1670        // compaction) and isn't initially necessary in mz.
1671        if key_fn.contains_drop() || val_fn.contains_drop() {
1672            return Break(NoOpStateTransition(CaESchema::Incompatible));
1673        }
1674
1675        // We'll have to do something more sophisticated here to
1676        // generate the next id if/when we start supporting the removal
1677        // of schemas.
1678        let id = SchemaId(self.schemas.len());
1679        self.schemas.insert(
1680            id,
1681            EncodedSchemas {
1682                key: K::encode_schema(key_schema),
1683                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1684                val: V::encode_schema(val_schema),
1685                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1686            },
1687        );
1688        Continue(CaESchema::Ok(id))
1689    }
1690
1691    pub fn compare_and_append(
1692        &mut self,
1693        batch: &HollowBatch<T>,
1694        writer_id: &WriterId,
1695        heartbeat_timestamp_ms: u64,
1696        lease_duration_ms: u64,
1697        idempotency_token: &IdempotencyToken,
1698        debug_info: &HandleDebugState,
1699        inline_writes_total_max_bytes: usize,
1700        claim_compaction_percent: usize,
1701        claim_compaction_min_version: Option<&Version>,
1702    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1703        // We expire all writers if the upper and since both advance to the
1704        // empty antichain. Gracefully handle this. At the same time,
1705        // short-circuit the cmd application so we don't needlessly create new
1706        // SeqNos.
1707        if self.is_tombstone() {
1708            assert_eq!(self.trace.upper(), &Antichain::new());
1709            return Break(CompareAndAppendBreak::Upper {
1710                shard_upper: Antichain::new(),
1711                // This writer might have been registered before the shard upper
1712                // was advanced, which would make this pessimistic in the
1713                // Indeterminate handling of compare_and_append at the machine
1714                // level, but that's fine.
1715                writer_upper: Antichain::new(),
1716            });
1717        }
1718
1719        let writer_state = self
1720            .writers
1721            .entry(writer_id.clone())
1722            .or_insert_with(|| WriterState {
1723                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1724                lease_duration_ms,
1725                most_recent_write_token: IdempotencyToken::SENTINEL,
1726                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1727                debug: debug_info.clone(),
1728            });
1729
1730        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1731            return Break(CompareAndAppendBreak::InvalidUsage(
1732                InvalidUsage::InvalidBounds {
1733                    lower: batch.desc.lower().clone(),
1734                    upper: batch.desc.upper().clone(),
1735                },
1736            ));
1737        }
1738
1739        // If the time interval is empty, the list of updates must also be
1740        // empty.
1741        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1742            return Break(CompareAndAppendBreak::InvalidUsage(
1743                InvalidUsage::InvalidEmptyTimeInterval {
1744                    lower: batch.desc.lower().clone(),
1745                    upper: batch.desc.upper().clone(),
1746                    keys: batch
1747                        .parts
1748                        .iter()
1749                        .map(|x| x.printable_name().to_owned())
1750                        .collect(),
1751                },
1752            ));
1753        }
1754
1755        if idempotency_token == &writer_state.most_recent_write_token {
1756            // If the last write had the same idempotency_token, then this must
1757            // have already committed. Sanity check that the most recent write
1758            // upper matches and that the shard upper is at least the write
1759            // upper, if it's not something very suspect is going on.
1760            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1761            assert!(
1762                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1763                "{:?} vs {:?}",
1764                batch.desc.upper(),
1765                self.trace.upper()
1766            );
1767            return Break(CompareAndAppendBreak::AlreadyCommitted);
1768        }
1769
1770        let shard_upper = self.trace.upper();
1771        if shard_upper != batch.desc.lower() {
1772            return Break(CompareAndAppendBreak::Upper {
1773                shard_upper: shard_upper.clone(),
1774                writer_upper: writer_state.most_recent_write_upper.clone(),
1775            });
1776        }
1777
1778        let new_inline_bytes = batch.inline_bytes();
1779        if new_inline_bytes > 0 {
1780            let mut existing_inline_bytes = 0;
1781            self.trace
1782                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1783            // TODO: For very small batches, it may actually _increase_ the size
1784            // of state to flush them out. Consider another threshold under
1785            // which an inline part can be appended no matter what.
1786            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1787                return Break(CompareAndAppendBreak::InlineBackpressure);
1788            }
1789        }
1790
1791        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1792            self.trace.push_batch(batch.clone())
1793        } else {
1794            Vec::new()
1795        };
1796
1797        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1798        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1799        let all_empty_reqs = merge_reqs
1800            .iter()
1801            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1802        if all_empty_reqs && !batch.is_empty() {
1803            let mut reqs_to_take = claim_compaction_percent / 100;
1804            if (usize::cast_from(idempotency_token.hashed()) % 100)
1805                < (claim_compaction_percent % 100)
1806            {
1807                reqs_to_take += 1;
1808            }
1809            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1810            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1811            merge_reqs.extend(
1812                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1813                // likely to be compacted soon for other reasons.
1814                self.trace
1815                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1816                    .take(reqs_to_take),
1817            )
1818        }
1819
1820        for req in &merge_reqs {
1821            self.trace.claim_compaction(
1822                req.id,
1823                ActiveCompaction {
1824                    start_ms: heartbeat_timestamp_ms,
1825                },
1826            )
1827        }
1828
1829        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1830        writer_state.most_recent_write_token = idempotency_token.clone();
1831        // The writer's most recent upper should only go forward.
1832        assert!(
1833            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1834            "{:?} vs {:?}",
1835            &writer_state.most_recent_write_upper,
1836            batch.desc.upper()
1837        );
1838        writer_state
1839            .most_recent_write_upper
1840            .clone_from(batch.desc.upper());
1841
1842        // Heartbeat the writer state to keep our idempotency token alive.
1843        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1844            heartbeat_timestamp_ms,
1845            writer_state.last_heartbeat_timestamp_ms,
1846        );
1847
1848        Continue(merge_reqs)
1849    }
1850
1851    pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1852        &mut self,
1853        res: &FueledMergeRes<T>,
1854        metrics: &ColumnarMetrics,
1855    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1856        // We expire all writers if the upper and since both advance to the
1857        // empty antichain. Gracefully handle this. At the same time,
1858        // short-circuit the cmd application so we don't needlessly create new
1859        // SeqNos.
1860        if self.is_tombstone() {
1861            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1862        }
1863
1864        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1865        Continue(apply_merge_result)
1866    }
1867
1868    pub fn spine_exert(
1869        &mut self,
1870        fuel: usize,
1871    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1872        let (merge_reqs, did_work) = self.trace.exert(fuel);
1873        if did_work {
1874            Continue(merge_reqs)
1875        } else {
1876            assert!(merge_reqs.is_empty());
1877            // Break if we have nothing useful to do to save the seqno (and
1878            // resulting crdb traffic)
1879            Break(NoOpStateTransition(Vec::new()))
1880        }
1881    }
1882
1883    pub fn downgrade_since(
1884        &mut self,
1885        reader_id: &LeasedReaderId,
1886        seqno: SeqNo,
1887        outstanding_seqno: SeqNo,
1888        new_since: &Antichain<T>,
1889        heartbeat_timestamp_ms: u64,
1890    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1891        // We expire all readers if the upper and since both advance to the
1892        // empty antichain. Gracefully handle this. At the same time,
1893        // short-circuit the cmd application so we don't needlessly create new
1894        // SeqNos.
1895        if self.is_tombstone() {
1896            return Break(NoOpStateTransition(Since(Antichain::new())));
1897        }
1898
1899        // The only way to have a missing reader in state is if it's been expired... and in that
1900        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1901        let Some(reader_state) = self.leased_reader(reader_id) else {
1902            tracing::warn!(
1903                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1904            );
1905            return Break(NoOpStateTransition(Since(Antichain::new())));
1906        };
1907
1908        // Also use this as an opportunity to heartbeat the reader and downgrade
1909        // the seqno capability.
1910        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1911            heartbeat_timestamp_ms,
1912            reader_state.last_heartbeat_timestamp_ms,
1913        );
1914
1915        let seqno = {
1916            assert!(
1917                outstanding_seqno >= reader_state.seqno,
1918                "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1919                    is behind current reader_state ({:?})",
1920                outstanding_seqno,
1921                reader_state.seqno,
1922            );
1923            std::cmp::min(outstanding_seqno, seqno)
1924        };
1925
1926        reader_state.seqno = seqno;
1927
1928        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1929            reader_state.since.clone_from(new_since);
1930            self.update_since();
1931            new_since.clone()
1932        } else {
1933            // No-op, but still commit the state change so that this gets
1934            // linearized.
1935            reader_state.since.clone()
1936        };
1937
1938        Continue(Since(reader_current_since))
1939    }
1940
1941    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1942        &mut self,
1943        reader_id: &CriticalReaderId,
1944        expected_opaque: &O,
1945        (new_opaque, new_since): (&O, &Antichain<T>),
1946    ) -> ControlFlow<
1947        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1948        Result<Since<T>, (O, Since<T>)>,
1949    > {
1950        // We expire all readers if the upper and since both advance to the
1951        // empty antichain. Gracefully handle this. At the same time,
1952        // short-circuit the cmd application so we don't needlessly create new
1953        // SeqNos.
1954        if self.is_tombstone() {
1955            // Match the idempotence behavior below of ignoring the token if
1956            // since is already advanced enough (in this case, because it's a
1957            // tombstone, we know it's the empty antichain).
1958            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1959        }
1960
1961        let reader_state = self.critical_reader(reader_id);
1962        assert_eq!(reader_state.opaque_codec, O::codec_name());
1963
1964        if &O::decode(reader_state.opaque.0) != expected_opaque {
1965            // No-op, but still commit the state change so that this gets
1966            // linearized.
1967            return Continue(Err((
1968                Codec64::decode(reader_state.opaque.0),
1969                Since(reader_state.since.clone()),
1970            )));
1971        }
1972
1973        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1974        if PartialOrder::less_equal(&reader_state.since, new_since) {
1975            reader_state.since.clone_from(new_since);
1976            self.update_since();
1977            Continue(Ok(Since(new_since.clone())))
1978        } else {
1979            // no work to be done -- the reader state's `since` is already sufficiently
1980            // advanced. we may someday need to revisit this branch when it's possible
1981            // for two `since` frontiers to be incomparable.
1982            Continue(Ok(Since(reader_state.since.clone())))
1983        }
1984    }
1985
1986    pub fn expire_leased_reader(
1987        &mut self,
1988        reader_id: &LeasedReaderId,
1989    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1990        // We expire all readers if the upper and since both advance to the
1991        // empty antichain. Gracefully handle this. At the same time,
1992        // short-circuit the cmd application so we don't needlessly create new
1993        // SeqNos.
1994        if self.is_tombstone() {
1995            return Break(NoOpStateTransition(false));
1996        }
1997
1998        let existed = self.leased_readers.remove(reader_id).is_some();
1999        if existed {
2000            // TODO(database-issues#6885): Re-enable this
2001            //
2002            // Temporarily disabling this because we think it might be the cause
2003            // of the remap since bug. Specifically, a clusterd process has a
2004            // ReadHandle for maintaining the once and one inside a Listen. If
2005            // we crash and stay down for longer than the read lease duration,
2006            // it's possible that an expiry of them both in quick succession
2007            // jumps the since forward to the Listen one.
2008            //
2009            // Don't forget to update the downgrade_since when this gets
2010            // switched back on.
2011            //
2012            // self.update_since();
2013        }
2014        // No-op if existed is false, but still commit the state change so that
2015        // this gets linearized.
2016        Continue(existed)
2017    }
2018
2019    pub fn expire_critical_reader(
2020        &mut self,
2021        reader_id: &CriticalReaderId,
2022    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2023        // We expire all readers if the upper and since both advance to the
2024        // empty antichain. Gracefully handle this. At the same time,
2025        // short-circuit the cmd application so we don't needlessly create new
2026        // SeqNos.
2027        if self.is_tombstone() {
2028            return Break(NoOpStateTransition(false));
2029        }
2030
2031        let existed = self.critical_readers.remove(reader_id).is_some();
2032        if existed {
2033            // TODO(database-issues#6885): Re-enable this
2034            //
2035            // Temporarily disabling this because we think it might be the cause
2036            // of the remap since bug. Specifically, a clusterd process has a
2037            // ReadHandle for maintaining the once and one inside a Listen. If
2038            // we crash and stay down for longer than the read lease duration,
2039            // it's possible that an expiry of them both in quick succession
2040            // jumps the since forward to the Listen one.
2041            //
2042            // Don't forget to update the downgrade_since when this gets
2043            // switched back on.
2044            //
2045            // self.update_since();
2046        }
2047        // This state transition is a no-op if existed is false, but we still
2048        // commit the state change so that this gets linearized (maybe we're
2049        // looking at old state).
2050        Continue(existed)
2051    }
2052
2053    pub fn expire_writer(
2054        &mut self,
2055        writer_id: &WriterId,
2056    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2057        // We expire all writers if the upper and since both advance to the
2058        // empty antichain. Gracefully handle this. At the same time,
2059        // short-circuit the cmd application so we don't needlessly create new
2060        // SeqNos.
2061        if self.is_tombstone() {
2062            return Break(NoOpStateTransition(false));
2063        }
2064
2065        let existed = self.writers.remove(writer_id).is_some();
2066        // This state transition is a no-op if existed is false, but we still
2067        // commit the state change so that this gets linearized (maybe we're
2068        // looking at old state).
2069        Continue(existed)
2070    }
2071
2072    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2073        self.leased_readers.get_mut(id)
2074    }
2075
2076    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2077        self.critical_readers
2078            .get_mut(id)
2079            .unwrap_or_else(|| {
2080                panic!(
2081                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2082                    id
2083                )
2084            })
2085    }
2086
2087    fn critical_since(&self) -> Option<Antichain<T>> {
2088        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2089        let mut since = critical_sinces.next().cloned()?;
2090        for s in critical_sinces {
2091            since.meet_assign(s);
2092        }
2093        Some(since)
2094    }
2095
2096    fn update_since(&mut self) {
2097        let mut sinces_iter = self
2098            .leased_readers
2099            .values()
2100            .map(|x| &x.since)
2101            .chain(self.critical_readers.values().map(|x| &x.since));
2102        let mut since = match sinces_iter.next() {
2103            Some(since) => since.clone(),
2104            None => {
2105                // If there are no current readers, leave `since` unchanged so
2106                // it doesn't regress.
2107                return;
2108            }
2109        };
2110        while let Some(s) = sinces_iter.next() {
2111            since.meet_assign(s);
2112        }
2113        self.trace.downgrade_since(&since);
2114    }
2115
2116    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2117        let mut seqno_since = seqno;
2118        for cap in self.leased_readers.values() {
2119            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2120        }
2121        // critical_readers don't hold a seqno capability.
2122        seqno_since
2123    }
2124
2125    fn tombstone_batch() -> HollowBatch<T> {
2126        HollowBatch::empty(Description::new(
2127            Antichain::from_elem(T::minimum()),
2128            Antichain::new(),
2129            Antichain::new(),
2130        ))
2131    }
2132
2133    pub(crate) fn is_tombstone(&self) -> bool {
2134        self.trace.upper().is_empty()
2135            && self.trace.since().is_empty()
2136            && self.writers.is_empty()
2137            && self.leased_readers.is_empty()
2138            && self.critical_readers.is_empty()
2139    }
2140
2141    pub(crate) fn is_single_empty_batch(&self) -> bool {
2142        let mut batch_count = 0;
2143        let mut is_empty = true;
2144        self.trace.map_batches(|b| {
2145            batch_count += 1;
2146            is_empty &= b.is_empty()
2147        });
2148        batch_count <= 1 && is_empty
2149    }
2150
2151    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2152        assert_eq!(self.trace.upper(), &Antichain::new());
2153        assert_eq!(self.trace.since(), &Antichain::new());
2154
2155        // Remember our current state, so we can decide whether we have to
2156        // record a transition in durable state.
2157        let was_tombstone = self.is_tombstone();
2158
2159        // Enter the "tombstone" state, if we're not in it already.
2160        self.writers.clear();
2161        self.leased_readers.clear();
2162        self.critical_readers.clear();
2163
2164        debug_assert!(self.is_tombstone());
2165
2166        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2167        // it -- the actual contents of our batches no longer matter.
2168        // This method progressively replaces batches in our state with simpler versions, to allow
2169        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2170        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2171        // transitions... most operations happen incrementally, and large single writes can overwhelm
2172        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2173
2174        let mut to_replace = None;
2175        let mut batch_count = 0;
2176        self.trace.map_batches(|b| {
2177            batch_count += 1;
2178            if !b.is_empty() && to_replace.is_none() {
2179                to_replace = Some(b.desc.clone());
2180            }
2181        });
2182        if let Some(desc) = to_replace {
2183            // We have a nonempty batch: replace it with an empty batch and return.
2184            // This should not produce an excessively large diff: if it did, we wouldn't have been
2185            // able to append that batch in the first place.
2186            let result = self.trace.apply_tombstone_merge(&desc);
2187            assert!(
2188                result.matched(),
2189                "merge with a matching desc should always match"
2190            );
2191            Continue(())
2192        } else if batch_count > 1 {
2193            // All our batches are empty, but we have more than one of them. Replace the whole set
2194            // with a new single-batch trace.
2195            // This produces a diff with a size proportional to the number of batches, but since
2196            // Spine keeps a logarithmic number of batches this should never be excessively large.
2197            let mut new_trace = Trace::default();
2198            new_trace.downgrade_since(&Antichain::new());
2199            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2200            assert_eq!(merge_reqs, Vec::new());
2201            self.trace = new_trace;
2202            Continue(())
2203        } else if !was_tombstone {
2204            // We were not tombstoned before, so have to make sure this state
2205            // transition is recorded.
2206            Continue(())
2207        } else {
2208            // All our batches are empty, and there's only one... there's no shrinking this
2209            // tombstone further.
2210            Break(NoOpStateTransition(()))
2211        }
2212    }
2213}
2214
2215// TODO: Document invariants.
2216#[derive(Debug)]
2217#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2218pub struct State<T> {
2219    pub(crate) shard_id: ShardId,
2220
2221    pub(crate) seqno: SeqNo,
2222    /// A strictly increasing wall time of when this state was written, in
2223    /// milliseconds since the unix epoch.
2224    pub(crate) walltime_ms: u64,
2225    /// Hostname of the persist user that created this version of state. For
2226    /// debugging.
2227    pub(crate) hostname: String,
2228    pub(crate) collections: StateCollections<T>,
2229}
2230
2231/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2232/// ones in durable storage.
2233pub struct TypedState<K, V, T, D> {
2234    pub(crate) state: State<T>,
2235
2236    // According to the docs, PhantomData is to "mark things that act like they
2237    // own a T". State doesn't actually own K, V, or D, just the ability to
2238    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2239    // T [1], but also allows State to correctly derive Send+Sync.
2240    //
2241    // [1]:
2242    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2243    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2244}
2245
2246impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2247    #[cfg(any(test, debug_assertions))]
2248    pub(crate) fn clone(&self, hostname: String) -> Self {
2249        TypedState {
2250            state: State {
2251                shard_id: self.shard_id.clone(),
2252                seqno: self.seqno.clone(),
2253                walltime_ms: self.walltime_ms,
2254                hostname,
2255                collections: self.collections.clone(),
2256            },
2257            _phantom: PhantomData,
2258        }
2259    }
2260
2261    pub(crate) fn clone_for_rollup(&self) -> Self {
2262        TypedState {
2263            state: State {
2264                shard_id: self.shard_id.clone(),
2265                seqno: self.seqno.clone(),
2266                walltime_ms: self.walltime_ms,
2267                hostname: self.hostname.clone(),
2268                collections: self.collections.clone(),
2269            },
2270            _phantom: PhantomData,
2271        }
2272    }
2273}
2274
2275impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2276    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2277        // Deconstruct self so we get a compile failure if new fields
2278        // are added.
2279        let TypedState { state, _phantom } = self;
2280        f.debug_struct("TypedState").field("state", state).finish()
2281    }
2282}
2283
2284// Impl PartialEq regardless of the type params.
2285#[cfg(any(test, debug_assertions))]
2286impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2287    fn eq(&self, other: &Self) -> bool {
2288        // Deconstruct self and other so we get a compile failure if new fields
2289        // are added.
2290        let TypedState {
2291            state: self_state,
2292            _phantom,
2293        } = self;
2294        let TypedState {
2295            state: other_state,
2296            _phantom,
2297        } = other;
2298        self_state == other_state
2299    }
2300}
2301
2302impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2303    type Target = State<T>;
2304
2305    fn deref(&self) -> &Self::Target {
2306        &self.state
2307    }
2308}
2309
2310impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2311    fn deref_mut(&mut self) -> &mut Self::Target {
2312        &mut self.state
2313    }
2314}
2315
2316impl<K, V, T, D> TypedState<K, V, T, D>
2317where
2318    K: Codec,
2319    V: Codec,
2320    T: Timestamp + Lattice + Codec64,
2321    D: Codec64,
2322{
2323    pub fn new(
2324        applier_version: Version,
2325        shard_id: ShardId,
2326        hostname: String,
2327        walltime_ms: u64,
2328    ) -> Self {
2329        let state = State {
2330            shard_id,
2331            seqno: SeqNo::minimum(),
2332            walltime_ms,
2333            hostname,
2334            collections: StateCollections {
2335                version: applier_version,
2336                last_gc_req: SeqNo::minimum(),
2337                rollups: BTreeMap::new(),
2338                active_rollup: None,
2339                active_gc: None,
2340                leased_readers: BTreeMap::new(),
2341                critical_readers: BTreeMap::new(),
2342                writers: BTreeMap::new(),
2343                schemas: BTreeMap::new(),
2344                trace: Trace::default(),
2345            },
2346        };
2347        TypedState {
2348            state,
2349            _phantom: PhantomData,
2350        }
2351    }
2352
2353    pub fn clone_apply<R, E, WorkFn>(
2354        &self,
2355        cfg: &PersistConfig,
2356        work_fn: &mut WorkFn,
2357    ) -> ControlFlow<E, (R, Self)>
2358    where
2359        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2360    {
2361        // We do not increment the version by default, though work_fn can if it chooses to.
2362        let mut new_state = State {
2363            shard_id: self.shard_id,
2364            seqno: self.seqno.next(),
2365            walltime_ms: (cfg.now)(),
2366            hostname: cfg.hostname.clone(),
2367            collections: self.collections.clone(),
2368        };
2369
2370        // Make sure walltime_ms is strictly increasing, in case clocks are
2371        // offset.
2372        if new_state.walltime_ms <= self.walltime_ms {
2373            new_state.walltime_ms = self.walltime_ms + 1;
2374        }
2375
2376        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2377        let new_state = TypedState {
2378            state: new_state,
2379            _phantom: PhantomData,
2380        };
2381        Continue((work_ret, new_state))
2382    }
2383}
2384
2385#[derive(Copy, Clone, Debug)]
2386pub struct GcConfig {
2387    pub use_active_gc: bool,
2388    pub fallback_threshold_ms: u64,
2389    pub min_versions: usize,
2390    pub max_versions: usize,
2391}
2392
2393impl<T> State<T>
2394where
2395    T: Timestamp + Lattice + Codec64,
2396{
2397    pub fn shard_id(&self) -> ShardId {
2398        self.shard_id
2399    }
2400
2401    pub fn seqno(&self) -> SeqNo {
2402        self.seqno
2403    }
2404
2405    pub fn since(&self) -> &Antichain<T> {
2406        self.collections.trace.since()
2407    }
2408
2409    pub fn upper(&self) -> &Antichain<T> {
2410        self.collections.trace.upper()
2411    }
2412
2413    pub fn spine_batch_count(&self) -> usize {
2414        self.collections.trace.num_spine_batches()
2415    }
2416
2417    pub fn size_metrics(&self) -> StateSizeMetrics {
2418        let mut ret = StateSizeMetrics::default();
2419        self.blobs().for_each(|x| match x {
2420            HollowBlobRef::Batch(x) => {
2421                ret.hollow_batch_count += 1;
2422                ret.batch_part_count += x.part_count();
2423                ret.num_updates += x.len;
2424
2425                let batch_size = x.encoded_size_bytes();
2426                for x in x.parts.iter() {
2427                    if x.ts_rewrite().is_some() {
2428                        ret.rewrite_part_count += 1;
2429                    }
2430                    if x.is_inline() {
2431                        ret.inline_part_count += 1;
2432                        ret.inline_part_bytes += x.inline_bytes();
2433                    }
2434                }
2435                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2436                ret.state_batches_bytes += batch_size;
2437            }
2438            HollowBlobRef::Rollup(x) => {
2439                ret.state_rollup_count += 1;
2440                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2441            }
2442        });
2443        ret
2444    }
2445
2446    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2447        // We maintain the invariant that every version of state has at least
2448        // one rollup.
2449        self.collections
2450            .rollups
2451            .iter()
2452            .rev()
2453            .next()
2454            .expect("State should have at least one rollup if seqno > minimum")
2455    }
2456
2457    pub(crate) fn seqno_since(&self) -> SeqNo {
2458        self.collections.seqno_since(self.seqno)
2459    }
2460
2461    // Returns whether the cmd proposing this state has been selected to perform
2462    // background garbage collection work.
2463    //
2464    // If it was selected, this information is recorded in the state itself for
2465    // commit along with the cmd's state transition. This helps us to avoid
2466    // redundant work.
2467    //
2468    // Correctness does not depend on a gc assignment being executed, nor on
2469    // them being executed in the order they are given. But it is expected that
2470    // gc assignments are best-effort respected. In practice, cmds like
2471    // register_foo or expire_foo, where it would be awkward, ignore gc.
2472    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2473        let GcConfig {
2474            use_active_gc,
2475            fallback_threshold_ms,
2476            min_versions,
2477            max_versions,
2478        } = cfg;
2479        // This is an arbitrary-ish threshold that scales with seqno, but never
2480        // gets particularly big. It probably could be much bigger and certainly
2481        // could use a tuning pass at some point.
2482        let gc_threshold = if use_active_gc {
2483            u64::cast_from(min_versions)
2484        } else {
2485            std::cmp::max(
2486                1,
2487                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2488            )
2489        };
2490        let new_seqno_since = self.seqno_since();
2491        // Collect until the new seqno since... or the old since plus the max number of versions,
2492        // whatever is less.
2493        let gc_until_seqno = new_seqno_since.min(SeqNo(
2494            self.collections
2495                .last_gc_req
2496                .0
2497                .saturating_add(u64::cast_from(max_versions)),
2498        ));
2499        let should_gc = new_seqno_since
2500            .0
2501            .saturating_sub(self.collections.last_gc_req.0)
2502            >= gc_threshold;
2503
2504        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2505        // it's been a while since it started, we should gc.
2506        let should_gc = if use_active_gc && !should_gc {
2507            match self.collections.active_gc {
2508                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2509                None => false,
2510            }
2511        } else {
2512            should_gc
2513        };
2514        // Assign GC traffic preferentially to writers, falling back to anyone
2515        // generating new state versions if there are no writers.
2516        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2517        // Always assign GC work to a tombstoned shard to have the chance to
2518        // clean up any residual blobs. This is safe (won't cause excess gc)
2519        // as the only allowed command after becoming a tombstone is to write
2520        // the final rollup.
2521        let tombstone_needs_gc = self.collections.is_tombstone();
2522        let should_gc = should_gc || tombstone_needs_gc;
2523        let should_gc = if use_active_gc {
2524            // If we have an active gc, we should only gc if the active gc is
2525            // sufficiently old. This is to avoid doing more gc work than
2526            // necessary.
2527            should_gc
2528                && match self.collections.active_gc {
2529                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2530                    None => true,
2531                }
2532        } else {
2533            should_gc
2534        };
2535        if should_gc {
2536            self.collections.last_gc_req = gc_until_seqno;
2537            Some(GcReq {
2538                shard_id: self.shard_id,
2539                new_seqno_since: gc_until_seqno,
2540            })
2541        } else {
2542            None
2543        }
2544    }
2545
2546    /// Return the number of gc-ineligible state versions.
2547    pub fn seqnos_held(&self) -> usize {
2548        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2549    }
2550
2551    /// Expire all readers and writers up to the given walltime_ms.
2552    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2553        let mut metrics = ExpiryMetrics::default();
2554        let shard_id = self.shard_id();
2555        self.collections.leased_readers.retain(|id, state| {
2556            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2557            if !retain {
2558                info!(
2559                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2560                    state.debug.purpose
2561                );
2562                metrics.readers_expired += 1;
2563            }
2564            retain
2565        });
2566        // critical_readers don't need forced expiration. (In fact, that's the point!)
2567        self.collections.writers.retain(|id, state| {
2568            let retain =
2569                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2570            if !retain {
2571                info!(
2572                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2573                    state.debug.purpose
2574                );
2575                metrics.writers_expired += 1;
2576            }
2577            retain
2578        });
2579        metrics
2580    }
2581
2582    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2583    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2584    /// referenced parts are valid to read.
2585    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2586        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2587            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2588                self.collections.trace.since().clone(),
2589            )));
2590        }
2591        let upper = self.collections.trace.upper();
2592        if PartialOrder::less_equal(upper, as_of) {
2593            return Err(SnapshotErr::AsOfNotYetAvailable(
2594                self.seqno,
2595                Upper(upper.clone()),
2596            ));
2597        }
2598
2599        let batches = self
2600            .collections
2601            .trace
2602            .batches()
2603            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2604            .cloned()
2605            .collect();
2606        Ok(batches)
2607    }
2608
2609    // NB: Unlike the other methods here, this one is read-only.
2610    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2611        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2612            return Err(Since(self.collections.trace.since().clone()));
2613        }
2614        Ok(())
2615    }
2616
2617    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2618        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2619        // batch and this iterates through all batches to find the next one.
2620        self.collections
2621            .trace
2622            .batches()
2623            .find(|b| {
2624                PartialOrder::less_equal(b.desc.lower(), frontier)
2625                    && PartialOrder::less_than(frontier, b.desc.upper())
2626            })
2627            .cloned()
2628            .ok_or(self.seqno)
2629    }
2630
2631    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2632        self.collections.active_rollup
2633    }
2634
2635    pub fn need_rollup(
2636        &self,
2637        threshold: usize,
2638        use_active_rollup: bool,
2639        fallback_threshold_ms: u64,
2640        now: u64,
2641    ) -> Option<SeqNo> {
2642        let (latest_rollup_seqno, _) = self.latest_rollup();
2643
2644        // Tombstoned shards require one final rollup. However, because we
2645        // write a rollup as of SeqNo X and then link it in using a state
2646        // transition (in this case from X to X+1), the minimum number of
2647        // live diffs is actually two. Detect when we're in this minimal
2648        // two diff state and stop the (otherwise) infinite iteration.
2649        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2650            return Some(self.seqno);
2651        }
2652
2653        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2654
2655        if use_active_rollup {
2656            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2657            // we should start a new rollup.
2658            // If there is an active rollup, we should check if it has been running too long.
2659            // If it has, we should start a new rollup.
2660            // This is to guard against a worker dying/taking too long/etc.
2661            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2662                match self.active_rollup() {
2663                    Some(active_rollup) => {
2664                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2665                            return Some(self.seqno);
2666                        }
2667                    }
2668                    None => {
2669                        return Some(self.seqno);
2670                    }
2671                }
2672            }
2673        } else {
2674            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2675            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2676            // racing / performing redundant work.
2677            if seqnos_since_last_rollup > 0
2678                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2679            {
2680                return Some(self.seqno);
2681            }
2682
2683            // however, since maintenance is best-effort and could fail, do assign rollup
2684            // work to every seqno after a fallback threshold to ensure one is written.
2685            if seqnos_since_last_rollup
2686                > u64::cast_from(
2687                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2688                )
2689            {
2690                return Some(self.seqno);
2691            }
2692        }
2693
2694        None
2695    }
2696
2697    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2698        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2699        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2700        batches.chain(rollups)
2701    }
2702}
2703
2704fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2705    let val = hex::encode(val);
2706    val.serialize(s)
2707}
2708
2709fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2710    val: &Option<LazyProto<T>>,
2711    s: S,
2712) -> Result<S::Ok, S::Error> {
2713    val.as_ref()
2714        .map(|lazy| hex::encode(&lazy.into_proto()))
2715        .serialize(s)
2716}
2717
2718fn serialize_part_stats<S: Serializer>(
2719    val: &Option<LazyPartStats>,
2720    s: S,
2721) -> Result<S::Ok, S::Error> {
2722    let val = val.as_ref().map(|x| x.decode().key);
2723    val.serialize(s)
2724}
2725
2726fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2727    // This is only used for debugging, so hack to assume that D is i64.
2728    let val = val.map(i64::decode);
2729    val.serialize(s)
2730}
2731
2732// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2733// intentionally gated from users, so not strictly subject to our backward
2734// compatibility guarantees, but still probably best to be thoughtful about
2735// making unnecessary changes. Additionally, it's nice to make the output as
2736// nice to use as possible without tying our hands for the actual code usages.
2737impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2738    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2739        let State {
2740            shard_id,
2741            seqno,
2742            walltime_ms,
2743            hostname,
2744            collections:
2745                StateCollections {
2746                    version: applier_version,
2747                    last_gc_req,
2748                    rollups,
2749                    active_rollup,
2750                    active_gc,
2751                    leased_readers,
2752                    critical_readers,
2753                    writers,
2754                    schemas,
2755                    trace,
2756                },
2757        } = self;
2758        let mut s = s.serialize_struct("State", 13)?;
2759        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2760        let () = s.serialize_field("shard_id", shard_id)?;
2761        let () = s.serialize_field("seqno", seqno)?;
2762        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2763        let () = s.serialize_field("hostname", hostname)?;
2764        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2765        let () = s.serialize_field("rollups", rollups)?;
2766        let () = s.serialize_field("active_rollup", active_rollup)?;
2767        let () = s.serialize_field("active_gc", active_gc)?;
2768        let () = s.serialize_field("leased_readers", leased_readers)?;
2769        let () = s.serialize_field("critical_readers", critical_readers)?;
2770        let () = s.serialize_field("writers", writers)?;
2771        let () = s.serialize_field("schemas", schemas)?;
2772        let () = s.serialize_field("since", &trace.since().elements())?;
2773        let () = s.serialize_field("upper", &trace.upper().elements())?;
2774        let trace = trace.flatten();
2775        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2776        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2777        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2778        let () = s.serialize_field("merges", &trace.merges)?;
2779        s.end()
2780    }
2781}
2782
2783#[derive(Debug, Default)]
2784pub struct StateSizeMetrics {
2785    pub hollow_batch_count: usize,
2786    pub batch_part_count: usize,
2787    pub rewrite_part_count: usize,
2788    pub num_updates: usize,
2789    pub largest_batch_bytes: usize,
2790    pub state_batches_bytes: usize,
2791    pub state_rollups_bytes: usize,
2792    pub state_rollup_count: usize,
2793    pub inline_part_count: usize,
2794    pub inline_part_bytes: usize,
2795}
2796
2797#[derive(Default)]
2798pub struct ExpiryMetrics {
2799    pub(crate) readers_expired: usize,
2800    pub(crate) writers_expired: usize,
2801}
2802
2803/// Wrapper for Antichain that represents a Since
2804#[derive(Debug, Clone, PartialEq)]
2805pub struct Since<T>(pub Antichain<T>);
2806
2807/// Wrapper for Antichain that represents an Upper
2808#[derive(Debug, PartialEq)]
2809pub struct Upper<T>(pub Antichain<T>);
2810
2811#[cfg(test)]
2812pub(crate) mod tests {
2813    use std::ops::Range;
2814    use std::str::FromStr;
2815
2816    use bytes::Bytes;
2817    use mz_build_info::DUMMY_BUILD_INFO;
2818    use mz_dyncfg::ConfigUpdates;
2819    use mz_ore::now::SYSTEM_TIME;
2820    use mz_ore::{assert_none, assert_ok};
2821    use mz_proto::RustType;
2822    use proptest::prelude::*;
2823    use proptest::strategy::ValueTree;
2824
2825    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2826    use crate::cache::PersistClientCache;
2827    use crate::internal::encoding::any_some_lazy_part_stats;
2828    use crate::internal::paths::RollupId;
2829    use crate::internal::trace::tests::any_trace;
2830    use crate::tests::new_test_client_cache;
2831    use crate::{Diagnostics, PersistLocation};
2832
2833    use super::*;
2834
2835    const LEASE_DURATION_MS: u64 = 900 * 1000;
2836    fn debug_state() -> HandleDebugState {
2837        HandleDebugState {
2838            hostname: "debug".to_owned(),
2839            purpose: "finding the bugs".to_owned(),
2840        }
2841    }
2842
2843    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2844        num_runs: usize,
2845    ) -> impl Strategy<Value = HollowBatch<T>> {
2846        (
2847            any::<T>(),
2848            any::<T>(),
2849            any::<T>(),
2850            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2851            any::<usize>(),
2852        )
2853            .prop_map(move |(t0, t1, since, parts, len)| {
2854                let (lower, upper) = if t0 <= t1 {
2855                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2856                } else {
2857                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2858                };
2859                let since = Antichain::from_elem(since);
2860
2861                let run_splits = (1..num_runs)
2862                    .map(|i| i * parts.len() / num_runs)
2863                    .collect::<Vec<_>>();
2864
2865                let run_meta = (0..num_runs)
2866                    .map(|_| {
2867                        let mut meta = RunMeta::default();
2868                        meta.id = Some(RunId::new());
2869                        meta
2870                    })
2871                    .collect::<Vec<_>>();
2872
2873                HollowBatch::new(
2874                    Description::new(lower, upper, since),
2875                    parts,
2876                    len % 10,
2877                    run_meta,
2878                    run_splits,
2879                )
2880            })
2881    }
2882
2883    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2884        Strategy::prop_map(
2885            (
2886                any::<T>(),
2887                any::<T>(),
2888                any::<T>(),
2889                proptest::collection::vec(any_run_part::<T>(), 0..20),
2890                any::<usize>(),
2891                0..=10usize,
2892                proptest::collection::vec(any::<RunId>(), 10),
2893            ),
2894            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2895                let (lower, upper) = if t0 <= t1 {
2896                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2897                } else {
2898                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2899                };
2900                let since = Antichain::from_elem(since);
2901                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2902                    let run_splits = (1..num_runs)
2903                        .map(|i| i * parts.len() / num_runs)
2904                        .collect::<Vec<_>>();
2905
2906                    let run_meta = (0..num_runs)
2907                        .enumerate()
2908                        .map(|(i, _)| {
2909                            let mut meta = RunMeta::default();
2910                            meta.id = Some(run_ids[i]);
2911                            meta
2912                        })
2913                        .collect::<Vec<_>>();
2914
2915                    HollowBatch::new(
2916                        Description::new(lower, upper, since),
2917                        parts,
2918                        len % 10,
2919                        run_meta,
2920                        run_splits,
2921                    )
2922                } else {
2923                    HollowBatch::new_run_for_test(
2924                        Description::new(lower, upper, since),
2925                        parts,
2926                        len % 10,
2927                        run_ids[0],
2928                    )
2929                }
2930            },
2931        )
2932    }
2933
2934    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2935        Strategy::prop_map(
2936            (
2937                any::<bool>(),
2938                any_hollow_batch_part(),
2939                any::<Option<T>>(),
2940                any::<Option<SchemaId>>(),
2941                any::<Option<SchemaId>>(),
2942            ),
2943            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2944                if is_hollow {
2945                    BatchPart::Hollow(hollow)
2946                } else {
2947                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2948                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2949                    BatchPart::Inline {
2950                        updates,
2951                        ts_rewrite,
2952                        schema_id,
2953                        deprecated_schema_id,
2954                    }
2955                }
2956            },
2957        )
2958    }
2959
2960    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2961        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2962    }
2963
2964    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2965    -> impl Strategy<Value = HollowBatchPart<T>> {
2966        Strategy::prop_map(
2967            (
2968                any::<PartialBatchKey>(),
2969                any::<usize>(),
2970                any::<Vec<u8>>(),
2971                any_some_lazy_part_stats(),
2972                any::<Option<T>>(),
2973                any::<[u8; 8]>(),
2974                any::<Option<BatchColumnarFormat>>(),
2975                any::<Option<SchemaId>>(),
2976                any::<Option<SchemaId>>(),
2977            ),
2978            |(
2979                key,
2980                encoded_size_bytes,
2981                key_lower,
2982                stats,
2983                ts_rewrite,
2984                diffs_sum,
2985                format,
2986                schema_id,
2987                deprecated_schema_id,
2988            )| {
2989                HollowBatchPart {
2990                    key,
2991                    meta: Default::default(),
2992                    encoded_size_bytes,
2993                    key_lower,
2994                    structured_key_lower: None,
2995                    stats,
2996                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2997                    diffs_sum: Some(diffs_sum),
2998                    format,
2999                    schema_id,
3000                    deprecated_schema_id,
3001                }
3002            },
3003        )
3004    }
3005
3006    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3007        Strategy::prop_map(
3008            (
3009                any::<SeqNo>(),
3010                any::<Option<T>>(),
3011                any::<u64>(),
3012                any::<u64>(),
3013                any::<HandleDebugState>(),
3014            ),
3015            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3016                // lease_duration_ms of 0 means this state was written by an old
3017                // version of code, which means we'll migrate it in the decode
3018                // path. Avoid.
3019                if lease_duration_ms == 0 {
3020                    lease_duration_ms += 1;
3021                }
3022                LeasedReaderState {
3023                    seqno,
3024                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3025                    last_heartbeat_timestamp_ms,
3026                    lease_duration_ms,
3027                    debug,
3028                }
3029            },
3030        )
3031    }
3032
3033    pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3034    where
3035        T: Arbitrary,
3036    {
3037        Strategy::prop_map(
3038            (
3039                any::<Option<T>>(),
3040                any::<OpaqueState>(),
3041                any::<String>(),
3042                any::<HandleDebugState>(),
3043            ),
3044            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3045                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3046                opaque,
3047                opaque_codec,
3048                debug,
3049            },
3050        )
3051    }
3052
3053    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3054        Strategy::prop_map(
3055            (
3056                any::<u64>(),
3057                any::<u64>(),
3058                any::<IdempotencyToken>(),
3059                any::<Option<T>>(),
3060                any::<HandleDebugState>(),
3061            ),
3062            |(
3063                last_heartbeat_timestamp_ms,
3064                lease_duration_ms,
3065                most_recent_write_token,
3066                most_recent_write_upper,
3067                debug,
3068            )| WriterState {
3069                last_heartbeat_timestamp_ms,
3070                lease_duration_ms,
3071                most_recent_write_token,
3072                most_recent_write_upper: most_recent_write_upper
3073                    .map_or_else(Antichain::new, Antichain::from_elem),
3074                debug,
3075            },
3076        )
3077    }
3078
3079    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3080        Strategy::prop_map(
3081            (
3082                any::<Vec<u8>>(),
3083                any::<Vec<u8>>(),
3084                any::<Vec<u8>>(),
3085                any::<Vec<u8>>(),
3086            ),
3087            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3088                key: Bytes::from(key),
3089                key_data_type: Bytes::from(key_data_type),
3090                val: Bytes::from(val),
3091                val_data_type: Bytes::from(val_data_type),
3092            },
3093        )
3094    }
3095
3096    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3097        num_trace_batches: Range<usize>,
3098    ) -> impl Strategy<Value = State<T>> {
3099        let part1 = (
3100            any::<ShardId>(),
3101            any::<SeqNo>(),
3102            any::<u64>(),
3103            any::<String>(),
3104            any::<SeqNo>(),
3105            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3106            proptest::option::of(any::<ActiveRollup>()),
3107        );
3108
3109        let part2 = (
3110            proptest::option::of(any::<ActiveGc>()),
3111            proptest::collection::btree_map(
3112                any::<LeasedReaderId>(),
3113                any_leased_reader_state::<T>(),
3114                1..3,
3115            ),
3116            proptest::collection::btree_map(
3117                any::<CriticalReaderId>(),
3118                any_critical_reader_state::<T>(),
3119                1..3,
3120            ),
3121            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3122            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3123            any_trace::<T>(num_trace_batches),
3124        );
3125
3126        (part1, part2).prop_map(
3127            |(
3128                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3129                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3130            )| State {
3131                shard_id,
3132                seqno,
3133                walltime_ms,
3134                hostname,
3135                collections: StateCollections {
3136                    version: Version::new(1, 2, 3),
3137                    last_gc_req,
3138                    rollups,
3139                    active_rollup,
3140                    active_gc,
3141                    leased_readers,
3142                    critical_readers,
3143                    writers,
3144                    schemas,
3145                    trace,
3146                },
3147            },
3148        )
3149    }
3150
3151    pub(crate) fn hollow<T: Timestamp>(
3152        lower: T,
3153        upper: T,
3154        keys: &[&str],
3155        len: usize,
3156    ) -> HollowBatch<T> {
3157        HollowBatch::new_run(
3158            Description::new(
3159                Antichain::from_elem(lower),
3160                Antichain::from_elem(upper),
3161                Antichain::from_elem(T::minimum()),
3162            ),
3163            keys.iter()
3164                .map(|x| {
3165                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3166                        key: PartialBatchKey((*x).to_owned()),
3167                        meta: Default::default(),
3168                        encoded_size_bytes: 0,
3169                        key_lower: vec![],
3170                        structured_key_lower: None,
3171                        stats: None,
3172                        ts_rewrite: None,
3173                        diffs_sum: None,
3174                        format: None,
3175                        schema_id: None,
3176                        deprecated_schema_id: None,
3177                    }))
3178                })
3179                .collect(),
3180            len,
3181        )
3182    }
3183
3184    #[mz_ore::test]
3185    fn downgrade_since() {
3186        let mut state = TypedState::<(), (), u64, i64>::new(
3187            DUMMY_BUILD_INFO.semver_version(),
3188            ShardId::new(),
3189            "".to_owned(),
3190            0,
3191        );
3192        let reader = LeasedReaderId::new();
3193        let seqno = SeqNo::minimum();
3194        let now = SYSTEM_TIME.clone();
3195        let _ = state.collections.register_leased_reader(
3196            "",
3197            &reader,
3198            "",
3199            seqno,
3200            Duration::from_secs(10),
3201            now(),
3202            false,
3203        );
3204
3205        // The shard global since == 0 initially.
3206        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3207
3208        // Greater
3209        assert_eq!(
3210            state.collections.downgrade_since(
3211                &reader,
3212                seqno,
3213                seqno,
3214                &Antichain::from_elem(2),
3215                now()
3216            ),
3217            Continue(Since(Antichain::from_elem(2)))
3218        );
3219        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3220        // Equal (no-op)
3221        assert_eq!(
3222            state.collections.downgrade_since(
3223                &reader,
3224                seqno,
3225                seqno,
3226                &Antichain::from_elem(2),
3227                now()
3228            ),
3229            Continue(Since(Antichain::from_elem(2)))
3230        );
3231        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3232        // Less (no-op)
3233        assert_eq!(
3234            state.collections.downgrade_since(
3235                &reader,
3236                seqno,
3237                seqno,
3238                &Antichain::from_elem(1),
3239                now()
3240            ),
3241            Continue(Since(Antichain::from_elem(2)))
3242        );
3243        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3244
3245        // Create a second reader.
3246        let reader2 = LeasedReaderId::new();
3247        let _ = state.collections.register_leased_reader(
3248            "",
3249            &reader2,
3250            "",
3251            seqno,
3252            Duration::from_secs(10),
3253            now(),
3254            false,
3255        );
3256
3257        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3258        assert_eq!(
3259            state.collections.downgrade_since(
3260                &reader2,
3261                seqno,
3262                seqno,
3263                &Antichain::from_elem(3),
3264                now()
3265            ),
3266            Continue(Since(Antichain::from_elem(3)))
3267        );
3268        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3269        // Shard since == 3 when all readers have since >= 3.
3270        assert_eq!(
3271            state.collections.downgrade_since(
3272                &reader,
3273                seqno,
3274                seqno,
3275                &Antichain::from_elem(5),
3276                now()
3277            ),
3278            Continue(Since(Antichain::from_elem(5)))
3279        );
3280        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3281
3282        // Shard since unaffected readers with since > shard since expiring.
3283        assert_eq!(
3284            state.collections.expire_leased_reader(&reader),
3285            Continue(true)
3286        );
3287        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3288
3289        // Create a third reader.
3290        let reader3 = LeasedReaderId::new();
3291        let _ = state.collections.register_leased_reader(
3292            "",
3293            &reader3,
3294            "",
3295            seqno,
3296            Duration::from_secs(10),
3297            now(),
3298            false,
3299        );
3300
3301        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3302        assert_eq!(
3303            state.collections.downgrade_since(
3304                &reader3,
3305                seqno,
3306                seqno,
3307                &Antichain::from_elem(10),
3308                now()
3309            ),
3310            Continue(Since(Antichain::from_elem(10)))
3311        );
3312        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3313
3314        // Shard since advances when reader with the minimal since expires.
3315        assert_eq!(
3316            state.collections.expire_leased_reader(&reader2),
3317            Continue(true)
3318        );
3319        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3320        // Switch this assertion back when we re-enable this.
3321        //
3322        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3323        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3324
3325        // Shard since unaffected when all readers are expired.
3326        assert_eq!(
3327            state.collections.expire_leased_reader(&reader3),
3328            Continue(true)
3329        );
3330        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3331        // Switch this assertion back when we re-enable this.
3332        //
3333        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3334        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3335    }
3336
3337    #[mz_ore::test]
3338    fn compare_and_downgrade_since() {
3339        let mut state = TypedState::<(), (), u64, i64>::new(
3340            DUMMY_BUILD_INFO.semver_version(),
3341            ShardId::new(),
3342            "".to_owned(),
3343            0,
3344        );
3345        let reader = CriticalReaderId::new();
3346        let _ = state
3347            .collections
3348            .register_critical_reader::<u64>("", &reader, "");
3349
3350        // The shard global since == 0 initially.
3351        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3352        // The initial opaque value should be set.
3353        assert_eq!(
3354            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3355            u64::initial()
3356        );
3357
3358        // Greater
3359        assert_eq!(
3360            state.collections.compare_and_downgrade_since::<u64>(
3361                &reader,
3362                &u64::initial(),
3363                (&1, &Antichain::from_elem(2)),
3364            ),
3365            Continue(Ok(Since(Antichain::from_elem(2))))
3366        );
3367        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3368        assert_eq!(
3369            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3370            1
3371        );
3372        // Equal (no-op)
3373        assert_eq!(
3374            state.collections.compare_and_downgrade_since::<u64>(
3375                &reader,
3376                &1,
3377                (&2, &Antichain::from_elem(2)),
3378            ),
3379            Continue(Ok(Since(Antichain::from_elem(2))))
3380        );
3381        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3382        assert_eq!(
3383            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3384            2
3385        );
3386        // Less (no-op)
3387        assert_eq!(
3388            state.collections.compare_and_downgrade_since::<u64>(
3389                &reader,
3390                &2,
3391                (&3, &Antichain::from_elem(1)),
3392            ),
3393            Continue(Ok(Since(Antichain::from_elem(2))))
3394        );
3395        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3396        assert_eq!(
3397            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3398            3
3399        );
3400    }
3401
3402    #[mz_ore::test]
3403    fn compare_and_append() {
3404        let state = &mut TypedState::<String, String, u64, i64>::new(
3405            DUMMY_BUILD_INFO.semver_version(),
3406            ShardId::new(),
3407            "".to_owned(),
3408            0,
3409        )
3410        .collections;
3411
3412        let writer_id = WriterId::new();
3413        let now = SYSTEM_TIME.clone();
3414
3415        // State is initially empty.
3416        assert_eq!(state.trace.num_spine_batches(), 0);
3417        assert_eq!(state.trace.num_hollow_batches(), 0);
3418        assert_eq!(state.trace.num_updates(), 0);
3419
3420        // Cannot insert a batch with a lower != current shard upper.
3421        assert_eq!(
3422            state.compare_and_append(
3423                &hollow(1, 2, &["key1"], 1),
3424                &writer_id,
3425                now(),
3426                LEASE_DURATION_MS,
3427                &IdempotencyToken::new(),
3428                &debug_state(),
3429                0,
3430                100,
3431                None
3432            ),
3433            Break(CompareAndAppendBreak::Upper {
3434                shard_upper: Antichain::from_elem(0),
3435                writer_upper: Antichain::from_elem(0)
3436            })
3437        );
3438
3439        // Insert an empty batch with an upper > lower..
3440        assert!(
3441            state
3442                .compare_and_append(
3443                    &hollow(0, 5, &[], 0),
3444                    &writer_id,
3445                    now(),
3446                    LEASE_DURATION_MS,
3447                    &IdempotencyToken::new(),
3448                    &debug_state(),
3449                    0,
3450                    100,
3451                    None
3452                )
3453                .is_continue()
3454        );
3455
3456        // Cannot insert a batch with a upper less than the lower.
3457        assert_eq!(
3458            state.compare_and_append(
3459                &hollow(5, 4, &["key1"], 1),
3460                &writer_id,
3461                now(),
3462                LEASE_DURATION_MS,
3463                &IdempotencyToken::new(),
3464                &debug_state(),
3465                0,
3466                100,
3467                None
3468            ),
3469            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3470                lower: Antichain::from_elem(5),
3471                upper: Antichain::from_elem(4)
3472            }))
3473        );
3474
3475        // Cannot insert a nonempty batch with an upper equal to lower.
3476        assert_eq!(
3477            state.compare_and_append(
3478                &hollow(5, 5, &["key1"], 1),
3479                &writer_id,
3480                now(),
3481                LEASE_DURATION_MS,
3482                &IdempotencyToken::new(),
3483                &debug_state(),
3484                0,
3485                100,
3486                None
3487            ),
3488            Break(CompareAndAppendBreak::InvalidUsage(
3489                InvalidEmptyTimeInterval {
3490                    lower: Antichain::from_elem(5),
3491                    upper: Antichain::from_elem(5),
3492                    keys: vec!["key1".to_owned()],
3493                }
3494            ))
3495        );
3496
3497        // Can insert an empty batch with an upper equal to lower.
3498        assert!(
3499            state
3500                .compare_and_append(
3501                    &hollow(5, 5, &[], 0),
3502                    &writer_id,
3503                    now(),
3504                    LEASE_DURATION_MS,
3505                    &IdempotencyToken::new(),
3506                    &debug_state(),
3507                    0,
3508                    100,
3509                    None
3510                )
3511                .is_continue()
3512        );
3513    }
3514
3515    #[mz_ore::test]
3516    fn snapshot() {
3517        let now = SYSTEM_TIME.clone();
3518
3519        let mut state = TypedState::<String, String, u64, i64>::new(
3520            DUMMY_BUILD_INFO.semver_version(),
3521            ShardId::new(),
3522            "".to_owned(),
3523            0,
3524        );
3525        // Cannot take a snapshot with as_of == shard upper.
3526        assert_eq!(
3527            state.snapshot(&Antichain::from_elem(0)),
3528            Err(SnapshotErr::AsOfNotYetAvailable(
3529                SeqNo(0),
3530                Upper(Antichain::from_elem(0))
3531            ))
3532        );
3533
3534        // Cannot take a snapshot with as_of > shard upper.
3535        assert_eq!(
3536            state.snapshot(&Antichain::from_elem(5)),
3537            Err(SnapshotErr::AsOfNotYetAvailable(
3538                SeqNo(0),
3539                Upper(Antichain::from_elem(0))
3540            ))
3541        );
3542
3543        let writer_id = WriterId::new();
3544
3545        // Advance upper to 5.
3546        assert!(
3547            state
3548                .collections
3549                .compare_and_append(
3550                    &hollow(0, 5, &["key1"], 1),
3551                    &writer_id,
3552                    now(),
3553                    LEASE_DURATION_MS,
3554                    &IdempotencyToken::new(),
3555                    &debug_state(),
3556                    0,
3557                    100,
3558                    None
3559                )
3560                .is_continue()
3561        );
3562
3563        // Can take a snapshot with as_of < upper.
3564        assert_eq!(
3565            state.snapshot(&Antichain::from_elem(0)),
3566            Ok(vec![hollow(0, 5, &["key1"], 1)])
3567        );
3568
3569        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3570        assert_eq!(
3571            state.snapshot(&Antichain::from_elem(4)),
3572            Ok(vec![hollow(0, 5, &["key1"], 1)])
3573        );
3574
3575        // Cannot take a snapshot with as_of >= upper.
3576        assert_eq!(
3577            state.snapshot(&Antichain::from_elem(5)),
3578            Err(SnapshotErr::AsOfNotYetAvailable(
3579                SeqNo(0),
3580                Upper(Antichain::from_elem(5))
3581            ))
3582        );
3583        assert_eq!(
3584            state.snapshot(&Antichain::from_elem(6)),
3585            Err(SnapshotErr::AsOfNotYetAvailable(
3586                SeqNo(0),
3587                Upper(Antichain::from_elem(5))
3588            ))
3589        );
3590
3591        let reader = LeasedReaderId::new();
3592        // Advance the since to 2.
3593        let _ = state.collections.register_leased_reader(
3594            "",
3595            &reader,
3596            "",
3597            SeqNo::minimum(),
3598            Duration::from_secs(10),
3599            now(),
3600            false,
3601        );
3602        assert_eq!(
3603            state.collections.downgrade_since(
3604                &reader,
3605                SeqNo::minimum(),
3606                SeqNo::minimum(),
3607                &Antichain::from_elem(2),
3608                now()
3609            ),
3610            Continue(Since(Antichain::from_elem(2)))
3611        );
3612        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3613        // Cannot take a snapshot with as_of < shard_since.
3614        assert_eq!(
3615            state.snapshot(&Antichain::from_elem(1)),
3616            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3617                Antichain::from_elem(2)
3618            )))
3619        );
3620
3621        // Advance the upper to 10 via an empty batch.
3622        assert!(
3623            state
3624                .collections
3625                .compare_and_append(
3626                    &hollow(5, 10, &[], 0),
3627                    &writer_id,
3628                    now(),
3629                    LEASE_DURATION_MS,
3630                    &IdempotencyToken::new(),
3631                    &debug_state(),
3632                    0,
3633                    100,
3634                    None
3635                )
3636                .is_continue()
3637        );
3638
3639        // Can still take snapshots at times < upper.
3640        assert_eq!(
3641            state.snapshot(&Antichain::from_elem(7)),
3642            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3643        );
3644
3645        // Cannot take snapshots with as_of >= upper.
3646        assert_eq!(
3647            state.snapshot(&Antichain::from_elem(10)),
3648            Err(SnapshotErr::AsOfNotYetAvailable(
3649                SeqNo(0),
3650                Upper(Antichain::from_elem(10))
3651            ))
3652        );
3653
3654        // Advance upper to 15.
3655        assert!(
3656            state
3657                .collections
3658                .compare_and_append(
3659                    &hollow(10, 15, &["key2"], 1),
3660                    &writer_id,
3661                    now(),
3662                    LEASE_DURATION_MS,
3663                    &IdempotencyToken::new(),
3664                    &debug_state(),
3665                    0,
3666                    100,
3667                    None
3668                )
3669                .is_continue()
3670        );
3671
3672        // Filter out batches whose lowers are less than the requested as of (the
3673        // batches that are too far in the future for the requested as_of).
3674        assert_eq!(
3675            state.snapshot(&Antichain::from_elem(9)),
3676            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3677        );
3678
3679        // Don't filter out batches whose lowers are <= the requested as_of.
3680        assert_eq!(
3681            state.snapshot(&Antichain::from_elem(10)),
3682            Ok(vec![
3683                hollow(0, 5, &["key1"], 1),
3684                hollow(5, 10, &[], 0),
3685                hollow(10, 15, &["key2"], 1)
3686            ])
3687        );
3688
3689        assert_eq!(
3690            state.snapshot(&Antichain::from_elem(11)),
3691            Ok(vec![
3692                hollow(0, 5, &["key1"], 1),
3693                hollow(5, 10, &[], 0),
3694                hollow(10, 15, &["key2"], 1)
3695            ])
3696        );
3697    }
3698
3699    #[mz_ore::test]
3700    fn next_listen_batch() {
3701        let mut state = TypedState::<String, String, u64, i64>::new(
3702            DUMMY_BUILD_INFO.semver_version(),
3703            ShardId::new(),
3704            "".to_owned(),
3705            0,
3706        );
3707
3708        // Empty collection never has any batches to listen for, regardless of the
3709        // current frontier.
3710        assert_eq!(
3711            state.next_listen_batch(&Antichain::from_elem(0)),
3712            Err(SeqNo(0))
3713        );
3714        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3715
3716        let writer_id = WriterId::new();
3717        let now = SYSTEM_TIME.clone();
3718
3719        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3720        assert!(
3721            state
3722                .collections
3723                .compare_and_append(
3724                    &hollow(0, 5, &["key1"], 1),
3725                    &writer_id,
3726                    now(),
3727                    LEASE_DURATION_MS,
3728                    &IdempotencyToken::new(),
3729                    &debug_state(),
3730                    0,
3731                    100,
3732                    None
3733                )
3734                .is_continue()
3735        );
3736        assert!(
3737            state
3738                .collections
3739                .compare_and_append(
3740                    &hollow(5, 10, &["key2"], 1),
3741                    &writer_id,
3742                    now(),
3743                    LEASE_DURATION_MS,
3744                    &IdempotencyToken::new(),
3745                    &debug_state(),
3746                    0,
3747                    100,
3748                    None
3749                )
3750                .is_continue()
3751        );
3752
3753        // All frontiers in [0, 5) return the first batch.
3754        for t in 0..=4 {
3755            assert_eq!(
3756                state.next_listen_batch(&Antichain::from_elem(t)),
3757                Ok(hollow(0, 5, &["key1"], 1))
3758            );
3759        }
3760
3761        // All frontiers in [5, 10) return the second batch.
3762        for t in 5..=9 {
3763            assert_eq!(
3764                state.next_listen_batch(&Antichain::from_elem(t)),
3765                Ok(hollow(5, 10, &["key2"], 1))
3766            );
3767        }
3768
3769        // There is no batch currently available for t = 10.
3770        assert_eq!(
3771            state.next_listen_batch(&Antichain::from_elem(10)),
3772            Err(SeqNo(0))
3773        );
3774
3775        // By definition, there is no frontier ever at the empty antichain which
3776        // is the time after all possible times.
3777        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3778    }
3779
3780    #[mz_ore::test]
3781    fn expire_writer() {
3782        let mut state = TypedState::<String, String, u64, i64>::new(
3783            DUMMY_BUILD_INFO.semver_version(),
3784            ShardId::new(),
3785            "".to_owned(),
3786            0,
3787        );
3788        let now = SYSTEM_TIME.clone();
3789
3790        let writer_id_one = WriterId::new();
3791
3792        let writer_id_two = WriterId::new();
3793
3794        // Writer is eligible to write
3795        assert!(
3796            state
3797                .collections
3798                .compare_and_append(
3799                    &hollow(0, 2, &["key1"], 1),
3800                    &writer_id_one,
3801                    now(),
3802                    LEASE_DURATION_MS,
3803                    &IdempotencyToken::new(),
3804                    &debug_state(),
3805                    0,
3806                    100,
3807                    None
3808                )
3809                .is_continue()
3810        );
3811
3812        assert!(
3813            state
3814                .collections
3815                .expire_writer(&writer_id_one)
3816                .is_continue()
3817        );
3818
3819        // Other writers should still be able to write
3820        assert!(
3821            state
3822                .collections
3823                .compare_and_append(
3824                    &hollow(2, 5, &["key2"], 1),
3825                    &writer_id_two,
3826                    now(),
3827                    LEASE_DURATION_MS,
3828                    &IdempotencyToken::new(),
3829                    &debug_state(),
3830                    0,
3831                    100,
3832                    None
3833                )
3834                .is_continue()
3835        );
3836    }
3837
3838    #[mz_ore::test]
3839    fn maybe_gc_active_gc() {
3840        const GC_CONFIG: GcConfig = GcConfig {
3841            use_active_gc: true,
3842            fallback_threshold_ms: 5000,
3843            min_versions: 99,
3844            max_versions: 500,
3845        };
3846        let now_fn = SYSTEM_TIME.clone();
3847
3848        let mut state = TypedState::<String, String, u64, i64>::new(
3849            DUMMY_BUILD_INFO.semver_version(),
3850            ShardId::new(),
3851            "".to_owned(),
3852            0,
3853        );
3854
3855        let now = now_fn();
3856        // Empty state doesn't need gc, regardless of is_write.
3857        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3858        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3859
3860        // Artificially advance the seqno so the seqno_since advances past our
3861        // internal gc_threshold.
3862        state.seqno = SeqNo(100);
3863        assert_eq!(state.seqno_since(), SeqNo(100));
3864
3865        // When a writer is present, non-writes don't gc.
3866        let writer_id = WriterId::new();
3867        let _ = state.collections.compare_and_append(
3868            &hollow(1, 2, &["key1"], 1),
3869            &writer_id,
3870            now,
3871            LEASE_DURATION_MS,
3872            &IdempotencyToken::new(),
3873            &debug_state(),
3874            0,
3875            100,
3876            None,
3877        );
3878        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3879
3880        // A write will gc though.
3881        assert_eq!(
3882            state.maybe_gc(true, now, GC_CONFIG),
3883            Some(GcReq {
3884                shard_id: state.shard_id,
3885                new_seqno_since: SeqNo(100)
3886            })
3887        );
3888
3889        // But if we write down an active gc, we won't gc.
3890        state.collections.active_gc = Some(ActiveGc {
3891            seqno: state.seqno,
3892            start_ms: now,
3893        });
3894
3895        state.seqno = SeqNo(200);
3896        assert_eq!(state.seqno_since(), SeqNo(200));
3897
3898        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3899
3900        state.seqno = SeqNo(300);
3901        assert_eq!(state.seqno_since(), SeqNo(300));
3902        // But if we advance the time past the threshold, we will gc.
3903        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3904        assert_eq!(
3905            state.maybe_gc(true, new_now, GC_CONFIG),
3906            Some(GcReq {
3907                shard_id: state.shard_id,
3908                new_seqno_since: SeqNo(300)
3909            })
3910        );
3911
3912        // Even if the sequence number doesn't pass the threshold, if the
3913        // active gc is expired, we will gc.
3914
3915        state.seqno = SeqNo(301);
3916        assert_eq!(state.seqno_since(), SeqNo(301));
3917        assert_eq!(
3918            state.maybe_gc(true, new_now, GC_CONFIG),
3919            Some(GcReq {
3920                shard_id: state.shard_id,
3921                new_seqno_since: SeqNo(301)
3922            })
3923        );
3924
3925        state.collections.active_gc = None;
3926
3927        // Artificially advance the seqno (again) so the seqno_since advances
3928        // past our internal gc_threshold (again).
3929        state.seqno = SeqNo(400);
3930        assert_eq!(state.seqno_since(), SeqNo(400));
3931
3932        let now = now_fn();
3933
3934        // If there are no writers, even a non-write will gc.
3935        let _ = state.collections.expire_writer(&writer_id);
3936        assert_eq!(
3937            state.maybe_gc(false, now, GC_CONFIG),
3938            Some(GcReq {
3939                shard_id: state.shard_id,
3940                new_seqno_since: SeqNo(400)
3941            })
3942        );
3943
3944        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3945        let previous_seqno = state.seqno;
3946        state.seqno = SeqNo(10_000);
3947        assert_eq!(state.seqno_since(), SeqNo(10_000));
3948
3949        let now = now_fn();
3950        assert_eq!(
3951            state.maybe_gc(true, now, GC_CONFIG),
3952            Some(GcReq {
3953                shard_id: state.shard_id,
3954                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3955            })
3956        );
3957    }
3958
3959    #[mz_ore::test]
3960    fn maybe_gc_classic() {
3961        const GC_CONFIG: GcConfig = GcConfig {
3962            use_active_gc: false,
3963            fallback_threshold_ms: 5000,
3964            min_versions: 16,
3965            max_versions: 128,
3966        };
3967        const NOW_MS: u64 = 0;
3968
3969        let mut state = TypedState::<String, String, u64, i64>::new(
3970            DUMMY_BUILD_INFO.semver_version(),
3971            ShardId::new(),
3972            "".to_owned(),
3973            0,
3974        );
3975
3976        // Empty state doesn't need gc, regardless of is_write.
3977        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3978        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3979
3980        // Artificially advance the seqno so the seqno_since advances past our
3981        // internal gc_threshold.
3982        state.seqno = SeqNo(100);
3983        assert_eq!(state.seqno_since(), SeqNo(100));
3984
3985        // When a writer is present, non-writes don't gc.
3986        let writer_id = WriterId::new();
3987        let now = SYSTEM_TIME.clone();
3988        let _ = state.collections.compare_and_append(
3989            &hollow(1, 2, &["key1"], 1),
3990            &writer_id,
3991            now(),
3992            LEASE_DURATION_MS,
3993            &IdempotencyToken::new(),
3994            &debug_state(),
3995            0,
3996            100,
3997            None,
3998        );
3999        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4000
4001        // A write will gc though.
4002        assert_eq!(
4003            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4004            Some(GcReq {
4005                shard_id: state.shard_id,
4006                new_seqno_since: SeqNo(100)
4007            })
4008        );
4009
4010        // Artificially advance the seqno (again) so the seqno_since advances
4011        // past our internal gc_threshold (again).
4012        state.seqno = SeqNo(200);
4013        assert_eq!(state.seqno_since(), SeqNo(200));
4014
4015        // If there are no writers, even a non-write will gc.
4016        let _ = state.collections.expire_writer(&writer_id);
4017        assert_eq!(
4018            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4019            Some(GcReq {
4020                shard_id: state.shard_id,
4021                new_seqno_since: SeqNo(200)
4022            })
4023        );
4024    }
4025
4026    #[mz_ore::test]
4027    fn need_rollup_active_rollup() {
4028        const ROLLUP_THRESHOLD: usize = 3;
4029        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4030        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4031        let now = SYSTEM_TIME.clone();
4032
4033        mz_ore::test::init_logging();
4034        let mut state = TypedState::<String, String, u64, i64>::new(
4035            DUMMY_BUILD_INFO.semver_version(),
4036            ShardId::new(),
4037            "".to_owned(),
4038            0,
4039        );
4040
4041        let rollup_seqno = SeqNo(5);
4042        let rollup = HollowRollup {
4043            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4044            encoded_size_bytes: None,
4045        };
4046
4047        assert!(
4048            state
4049                .collections
4050                .add_rollup((rollup_seqno, &rollup))
4051                .is_continue()
4052        );
4053
4054        // shouldn't need a rollup at the seqno of the rollup
4055        state.seqno = SeqNo(5);
4056        assert_none!(state.need_rollup(
4057            ROLLUP_THRESHOLD,
4058            ROLLUP_USE_ACTIVE_ROLLUP,
4059            ROLLUP_FALLBACK_THRESHOLD_MS,
4060            now()
4061        ));
4062
4063        // shouldn't need a rollup at seqnos less than our threshold
4064        state.seqno = SeqNo(6);
4065        assert_none!(state.need_rollup(
4066            ROLLUP_THRESHOLD,
4067            ROLLUP_USE_ACTIVE_ROLLUP,
4068            ROLLUP_FALLBACK_THRESHOLD_MS,
4069            now()
4070        ));
4071        state.seqno = SeqNo(7);
4072        assert_none!(state.need_rollup(
4073            ROLLUP_THRESHOLD,
4074            ROLLUP_USE_ACTIVE_ROLLUP,
4075            ROLLUP_FALLBACK_THRESHOLD_MS,
4076            now()
4077        ));
4078        state.seqno = SeqNo(8);
4079        assert_none!(state.need_rollup(
4080            ROLLUP_THRESHOLD,
4081            ROLLUP_USE_ACTIVE_ROLLUP,
4082            ROLLUP_FALLBACK_THRESHOLD_MS,
4083            now()
4084        ));
4085
4086        let mut current_time = now();
4087        // hit our threshold! we should need a rollup
4088        state.seqno = SeqNo(9);
4089        assert_eq!(
4090            state
4091                .need_rollup(
4092                    ROLLUP_THRESHOLD,
4093                    ROLLUP_USE_ACTIVE_ROLLUP,
4094                    ROLLUP_FALLBACK_THRESHOLD_MS,
4095                    current_time
4096                )
4097                .expect("rollup"),
4098            SeqNo(9)
4099        );
4100
4101        state.collections.active_rollup = Some(ActiveRollup {
4102            seqno: SeqNo(9),
4103            start_ms: current_time,
4104        });
4105
4106        // There is now an active rollup, so we shouldn't need a rollup.
4107        assert_none!(state.need_rollup(
4108            ROLLUP_THRESHOLD,
4109            ROLLUP_USE_ACTIVE_ROLLUP,
4110            ROLLUP_FALLBACK_THRESHOLD_MS,
4111            current_time
4112        ));
4113
4114        state.seqno = SeqNo(10);
4115        // We still don't need a rollup, even though the seqno is greater than
4116        // the rollup threshold.
4117        assert_none!(state.need_rollup(
4118            ROLLUP_THRESHOLD,
4119            ROLLUP_USE_ACTIVE_ROLLUP,
4120            ROLLUP_FALLBACK_THRESHOLD_MS,
4121            current_time
4122        ));
4123
4124        // But if we wait long enough, we should need a rollup again.
4125        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4126        assert_eq!(
4127            state
4128                .need_rollup(
4129                    ROLLUP_THRESHOLD,
4130                    ROLLUP_USE_ACTIVE_ROLLUP,
4131                    ROLLUP_FALLBACK_THRESHOLD_MS,
4132                    current_time
4133                )
4134                .expect("rollup"),
4135            SeqNo(10)
4136        );
4137
4138        state.seqno = SeqNo(9);
4139        // Clear the active rollup and ensure we need a rollup again.
4140        state.collections.active_rollup = None;
4141        let rollup_seqno = SeqNo(9);
4142        let rollup = HollowRollup {
4143            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4144            encoded_size_bytes: None,
4145        };
4146        assert!(
4147            state
4148                .collections
4149                .add_rollup((rollup_seqno, &rollup))
4150                .is_continue()
4151        );
4152
4153        state.seqno = SeqNo(11);
4154        // We shouldn't need a rollup at seqnos less than our threshold
4155        assert_none!(state.need_rollup(
4156            ROLLUP_THRESHOLD,
4157            ROLLUP_USE_ACTIVE_ROLLUP,
4158            ROLLUP_FALLBACK_THRESHOLD_MS,
4159            current_time
4160        ));
4161        // hit our threshold! we should need a rollup
4162        state.seqno = SeqNo(13);
4163        assert_eq!(
4164            state
4165                .need_rollup(
4166                    ROLLUP_THRESHOLD,
4167                    ROLLUP_USE_ACTIVE_ROLLUP,
4168                    ROLLUP_FALLBACK_THRESHOLD_MS,
4169                    current_time
4170                )
4171                .expect("rollup"),
4172            SeqNo(13)
4173        );
4174    }
4175
4176    #[mz_ore::test]
4177    fn need_rollup_classic() {
4178        const ROLLUP_THRESHOLD: usize = 3;
4179        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4180        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4181        const NOW: u64 = 0;
4182
4183        mz_ore::test::init_logging();
4184        let mut state = TypedState::<String, String, u64, i64>::new(
4185            DUMMY_BUILD_INFO.semver_version(),
4186            ShardId::new(),
4187            "".to_owned(),
4188            0,
4189        );
4190
4191        let rollup_seqno = SeqNo(5);
4192        let rollup = HollowRollup {
4193            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4194            encoded_size_bytes: None,
4195        };
4196
4197        assert!(
4198            state
4199                .collections
4200                .add_rollup((rollup_seqno, &rollup))
4201                .is_continue()
4202        );
4203
4204        // shouldn't need a rollup at the seqno of the rollup
4205        state.seqno = SeqNo(5);
4206        assert_none!(state.need_rollup(
4207            ROLLUP_THRESHOLD,
4208            ROLLUP_USE_ACTIVE_ROLLUP,
4209            ROLLUP_FALLBACK_THRESHOLD_MS,
4210            NOW
4211        ));
4212
4213        // shouldn't need a rollup at seqnos less than our threshold
4214        state.seqno = SeqNo(6);
4215        assert_none!(state.need_rollup(
4216            ROLLUP_THRESHOLD,
4217            ROLLUP_USE_ACTIVE_ROLLUP,
4218            ROLLUP_FALLBACK_THRESHOLD_MS,
4219            NOW
4220        ));
4221        state.seqno = SeqNo(7);
4222        assert_none!(state.need_rollup(
4223            ROLLUP_THRESHOLD,
4224            ROLLUP_USE_ACTIVE_ROLLUP,
4225            ROLLUP_FALLBACK_THRESHOLD_MS,
4226            NOW
4227        ));
4228
4229        // hit our threshold! we should need a rollup
4230        state.seqno = SeqNo(8);
4231        assert_eq!(
4232            state
4233                .need_rollup(
4234                    ROLLUP_THRESHOLD,
4235                    ROLLUP_USE_ACTIVE_ROLLUP,
4236                    ROLLUP_FALLBACK_THRESHOLD_MS,
4237                    NOW
4238                )
4239                .expect("rollup"),
4240            SeqNo(8)
4241        );
4242
4243        // but we don't need rollups for every seqno > the threshold
4244        state.seqno = SeqNo(9);
4245        assert_none!(state.need_rollup(
4246            ROLLUP_THRESHOLD,
4247            ROLLUP_USE_ACTIVE_ROLLUP,
4248            ROLLUP_FALLBACK_THRESHOLD_MS,
4249            NOW
4250        ));
4251
4252        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4253        state.seqno = SeqNo(11);
4254        assert_eq!(
4255            state
4256                .need_rollup(
4257                    ROLLUP_THRESHOLD,
4258                    ROLLUP_USE_ACTIVE_ROLLUP,
4259                    ROLLUP_FALLBACK_THRESHOLD_MS,
4260                    NOW
4261                )
4262                .expect("rollup"),
4263            SeqNo(11)
4264        );
4265
4266        // add another rollup and ensure we're always picking the latest
4267        let rollup_seqno = SeqNo(6);
4268        let rollup = HollowRollup {
4269            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4270            encoded_size_bytes: None,
4271        };
4272        assert!(
4273            state
4274                .collections
4275                .add_rollup((rollup_seqno, &rollup))
4276                .is_continue()
4277        );
4278
4279        state.seqno = SeqNo(8);
4280        assert_none!(state.need_rollup(
4281            ROLLUP_THRESHOLD,
4282            ROLLUP_USE_ACTIVE_ROLLUP,
4283            ROLLUP_FALLBACK_THRESHOLD_MS,
4284            NOW
4285        ));
4286        state.seqno = SeqNo(9);
4287        assert_eq!(
4288            state
4289                .need_rollup(
4290                    ROLLUP_THRESHOLD,
4291                    ROLLUP_USE_ACTIVE_ROLLUP,
4292                    ROLLUP_FALLBACK_THRESHOLD_MS,
4293                    NOW
4294                )
4295                .expect("rollup"),
4296            SeqNo(9)
4297        );
4298
4299        // and ensure that after a fallback point, we assign every seqno work
4300        let fallback_seqno = SeqNo(
4301            rollup_seqno.0
4302                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4303        );
4304        state.seqno = fallback_seqno;
4305        assert_eq!(
4306            state
4307                .need_rollup(
4308                    ROLLUP_THRESHOLD,
4309                    ROLLUP_USE_ACTIVE_ROLLUP,
4310                    ROLLUP_FALLBACK_THRESHOLD_MS,
4311                    NOW
4312                )
4313                .expect("rollup"),
4314            fallback_seqno
4315        );
4316        state.seqno = fallback_seqno.next();
4317        assert_eq!(
4318            state
4319                .need_rollup(
4320                    ROLLUP_THRESHOLD,
4321                    ROLLUP_USE_ACTIVE_ROLLUP,
4322                    ROLLUP_FALLBACK_THRESHOLD_MS,
4323                    NOW
4324                )
4325                .expect("rollup"),
4326            fallback_seqno.next()
4327        );
4328    }
4329
4330    #[mz_ore::test]
4331    fn idempotency_token_sentinel() {
4332        assert_eq!(
4333            IdempotencyToken::SENTINEL.to_string(),
4334            "i11111111-1111-1111-1111-111111111111"
4335        );
4336    }
4337
4338    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4339    /// randomness, so that it's deterministic. This lets us assert the
4340    /// serialization of that State against a golden file that's committed,
4341    /// making it easy to see what the serialization (used in an upcoming
4342    /// INSPECT feature) looks like.
4343    ///
4344    /// This golden will have to be updated each time we change State, but
4345    /// that's a feature, not a bug.
4346    #[mz_ore::test]
4347    #[cfg_attr(miri, ignore)] // too slow
4348    fn state_inspect_serde_json() {
4349        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4350        let mut runner = proptest::test_runner::TestRunner::deterministic();
4351        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4352        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4353        assert_eq!(
4354            json.trim(),
4355            STATE_SERDE_JSON.trim(),
4356            "\n\nNEW GOLDEN\n{}\n",
4357            json
4358        );
4359    }
4360
4361    #[mz_persist_proc::test(tokio::test)]
4362    #[cfg_attr(miri, ignore)] // too slow
4363    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4364        let mut clients = new_test_client_cache(&dyncfgs);
4365        let shard_id = ShardId::new();
4366
4367        async fn open_and_write(
4368            clients: &mut PersistClientCache,
4369            version: semver::Version,
4370            shard_id: ShardId,
4371        ) -> Result<(), tokio::task::JoinError> {
4372            clients.cfg.build_version = version.clone();
4373            clients.clear_state_cache();
4374            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4375            // Run in a task so we can catch the panic.
4376            mz_ore::task::spawn(|| version.to_string(), async move {
4377                let () = client
4378                    .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4379                    .await
4380                    .expect("valid usage");
4381                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4382                let current = *write.upper().as_option().unwrap();
4383                // Do a write so that we tag the state with the version.
4384                write
4385                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4386                    .await;
4387            })
4388            .into_tokio_handle()
4389            .await
4390        }
4391
4392        // Start at v0.10.0.
4393        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4394        assert_ok!(res);
4395
4396        // Upgrade to v0.11.0 is allowed.
4397        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4398        assert_ok!(res);
4399
4400        // Downgrade to v0.10.0 is no longer allowed.
4401        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4402        assert!(res.unwrap_err().is_panic());
4403
4404        // Downgrade to v0.9.0 is _NOT_ allowed.
4405        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4406        assert!(res.unwrap_err().is_panic());
4407    }
4408
4409    #[mz_ore::test]
4410    fn runid_roundtrip() {
4411        proptest!(|(runid: RunId)| {
4412            let runid_str = runid.to_string();
4413            let parsed = RunId::from_str(&runid_str);
4414            prop_assert_eq!(parsed, Ok(runid));
4415        });
4416    }
4417}