Skip to main content

mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::{CriticalReaderId, Opaque};
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75    env!("OUT_DIR"),
76    "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80    env!("OUT_DIR"),
81    "/mz_persist_client.internal.diff.rs"
82));
83
84/// Determines how often to write rollups, assigning a maintenance task after
85/// `rollup_threshold` seqnos have passed since the last rollup.
86///
87/// Tuning note: in the absence of a long reader seqno hold, and with
88/// incremental GC, this threshold will determine about how many live diffs are
89/// held in Consensus. Lowering this value decreases the live diff count at the
90/// cost of more maintenance work + blob writes.
91pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92    "persist_rollup_threshold",
93    128,
94    "The number of seqnos between rollups.",
95);
96
97/// Determines how long to wait before an active rollup is considered
98/// "stuck" and a new rollup is started.
99pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100    "persist_rollup_fallback_threshold_ms",
101    5000,
102    "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105/// Feature flag the new active rollup tracking mechanism.
106/// We musn't enable this until we are fully deployed on the new version.
107pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108    "persist_rollup_use_active_rollup",
109    true,
110    "Whether to use the new active rollup tracking mechanism.",
111);
112
113/// Determines how long to wait before an active GC is considered
114/// "stuck" and a new GC is started.
115pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116    "persist_gc_fallback_threshold_ms",
117    900000,
118    "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121/// See the config description string.
122pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123    "persist_gc_min_versions",
124    32,
125    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128/// See the config description string.
129pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130    "persist_gc_max_versions",
131    128_000,
132    "The maximum number of versions to GC in a single GC run.",
133);
134
135/// Feature flag the new active GC tracking mechanism.
136/// We musn't enable this until we are fully deployed on the new version.
137pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138    "persist_gc_use_active_gc",
139    false,
140    "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144    "persist_enable_incremental_compaction",
145    false,
146    "Whether to enable incremental compaction.",
147);
148
149/// A token to disambiguate state commands that could not otherwise be
150/// idempotent.
151#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(f, "i{}", Uuid::from_bytes(self.0))
158    }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164    }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168    type Err = String;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172    }
173}
174
175impl From<IdempotencyToken> for String {
176    fn from(x: IdempotencyToken) -> Self {
177        x.to_string()
178    }
179}
180
181impl IdempotencyToken {
182    pub(crate) fn new() -> Self {
183        IdempotencyToken(*Uuid::new_v4().as_bytes())
184    }
185    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190    /// The seqno capability of this reader.
191    pub seqno: SeqNo,
192    /// The since capability of this reader.
193    pub since: Antichain<T>,
194    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
195    pub last_heartbeat_timestamp_ms: u64,
196    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
197    /// after which this reader may be expired
198    pub lease_duration_ms: u64,
199    /// For debugging.
200    pub debug: HandleDebugState,
201}
202
203#[derive(Clone, Debug, PartialEq, Serialize)]
204pub struct CriticalReaderState<T> {
205    /// The since capability of this reader.
206    pub since: Antichain<T>,
207    /// An opaque token matched on by compare_and_downgrade_since.
208    pub opaque: Opaque,
209    /// For debugging.
210    pub debug: HandleDebugState,
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct WriterState<T> {
215    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
216    pub last_heartbeat_timestamp_ms: u64,
217    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
218    /// after which this writer may be expired
219    pub lease_duration_ms: u64,
220    /// The idempotency token of the most recent successful compare_and_append
221    /// by this writer.
222    pub most_recent_write_token: IdempotencyToken,
223    /// The upper of the most recent successful compare_and_append by this
224    /// writer.
225    pub most_recent_write_upper: Antichain<T>,
226    /// For debugging.
227    pub debug: HandleDebugState,
228}
229
230/// Debugging info for a reader or writer.
231#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
232pub struct HandleDebugState {
233    /// Hostname of the persist user that registered this writer or reader. For
234    /// critical readers, this is the _most recent_ registration.
235    pub hostname: String,
236    /// Plaintext description of this writer or reader's intent.
237    pub purpose: String,
238}
239
240/// Part of the updates in a Batch.
241///
242/// Either a pointer to ones stored in Blob or the updates themselves inlined.
243#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
244#[serde(tag = "type")]
245pub enum BatchPart<T> {
246    Hollow(HollowBatchPart<T>),
247    Inline {
248        updates: LazyInlineBatchPart,
249        ts_rewrite: Option<Antichain<T>>,
250        schema_id: Option<SchemaId>,
251
252        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
253        deprecated_schema_id: Option<SchemaId>,
254    },
255}
256
257fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
258    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
259        let proto = lower.decode()?;
260        let data = ArrayData::from_proto(proto)?;
261        ensure!(data.len() == 1);
262        Ok(ArrayBound::new(make_array(data), 0))
263    };
264
265    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
266
267    match decoded {
268        Ok(bound) => Some(bound),
269        Err(e) => {
270            soft_panic_or_log!("failed to decode bound: {e:#?}");
271            None
272        }
273    }
274}
275
276impl<T> BatchPart<T> {
277    pub fn hollow_bytes(&self) -> usize {
278        match self {
279            BatchPart::Hollow(x) => x.encoded_size_bytes,
280            BatchPart::Inline { .. } => 0,
281        }
282    }
283
284    pub fn is_inline(&self) -> bool {
285        matches!(self, BatchPart::Inline { .. })
286    }
287
288    pub fn inline_bytes(&self) -> usize {
289        match self {
290            BatchPart::Hollow(_) => 0,
291            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
292        }
293    }
294
295    pub fn writer_key(&self) -> Option<WriterKey> {
296        match self {
297            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
298            BatchPart::Inline { .. } => None,
299        }
300    }
301
302    pub fn encoded_size_bytes(&self) -> usize {
303        match self {
304            BatchPart::Hollow(x) => x.encoded_size_bytes,
305            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
306        }
307    }
308
309    // A user-interpretable identifier or description of the part (for logs and
310    // such).
311    pub fn printable_name(&self) -> &str {
312        match self {
313            BatchPart::Hollow(x) => x.key.0.as_str(),
314            BatchPart::Inline { .. } => "<inline>",
315        }
316    }
317
318    pub fn stats(&self) -> Option<&LazyPartStats> {
319        match self {
320            BatchPart::Hollow(x) => x.stats.as_ref(),
321            BatchPart::Inline { .. } => None,
322        }
323    }
324
325    pub fn key_lower(&self) -> &[u8] {
326        match self {
327            BatchPart::Hollow(x) => x.key_lower.as_slice(),
328            // We don't duplicate the lowest key because this can be
329            // considerable overhead for small parts.
330            //
331            // The empty key might not be a tight lower bound, but it is a valid
332            // lower bound. If a caller is interested in a tighter lower bound,
333            // the data is inline.
334            BatchPart::Inline { .. } => &[],
335        }
336    }
337
338    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
339        let part = match self {
340            BatchPart::Hollow(part) => part,
341            BatchPart::Inline { .. } => return None,
342        };
343
344        decode_structured_lower(part.structured_key_lower.as_ref()?)
345    }
346
347    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
348        match self {
349            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
350            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
351        }
352    }
353
354    pub fn schema_id(&self) -> Option<SchemaId> {
355        match self {
356            BatchPart::Hollow(x) => x.schema_id,
357            BatchPart::Inline { schema_id, .. } => *schema_id,
358        }
359    }
360
361    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
362        match self {
363            BatchPart::Hollow(x) => x.deprecated_schema_id,
364            BatchPart::Inline {
365                deprecated_schema_id,
366                ..
367            } => *deprecated_schema_id,
368        }
369    }
370}
371
372impl<T: Timestamp + Codec64> BatchPart<T> {
373    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
374        match self {
375            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
376            BatchPart::Inline { updates, .. } => {
377                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
378                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
379            }
380        }
381    }
382
383    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
384        match self {
385            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
386            BatchPart::Inline { updates, .. } => Some(
387                updates
388                    .decode::<T>(metrics)
389                    .expect("valid inline part")
390                    .updates
391                    .diffs_sum(),
392            ),
393        }
394    }
395}
396
397/// An ordered list of parts, generally stored as part of a larger run.
398#[derive(Debug, Clone)]
399pub struct HollowRun<T> {
400    /// Pointers usable to retrieve the updates.
401    pub(crate) parts: Vec<RunPart<T>>,
402}
403
404/// A reference to a [HollowRun], including the key in the blob store and some denormalized
405/// metadata.
406#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
407pub struct HollowRunRef<T> {
408    pub key: PartialBatchKey,
409
410    /// The size of the referenced run object, plus all of the hollow objects it contains.
411    pub hollow_bytes: usize,
412
413    /// The size of the largest individual part in the run; useful for sizing compaction.
414    pub max_part_bytes: usize,
415
416    /// The lower bound of the data in this part, ordered by the codec ordering.
417    pub key_lower: Vec<u8>,
418
419    /// The lower bound of the data in this part, ordered by the structured ordering.
420    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
421
422    pub diffs_sum: Option<[u8; 8]>,
423
424    pub(crate) _phantom_data: PhantomData<T>,
425}
426impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
427    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428        Some(self.cmp(other))
429    }
430}
431
432impl<T: Eq> Ord for HollowRunRef<T> {
433    fn cmp(&self, other: &Self) -> Ordering {
434        self.key.cmp(&other.key)
435    }
436}
437
438impl<T> HollowRunRef<T> {
439    pub fn writer_key(&self) -> Option<WriterKey> {
440        Some(self.key.split()?.0)
441    }
442}
443
444impl<T: Timestamp + Codec64> HollowRunRef<T> {
445    /// Stores the given runs and returns a [HollowRunRef] that points to them.
446    pub async fn set<D: Codec64 + Monoid>(
447        shard_id: ShardId,
448        blob: &dyn Blob,
449        writer: &WriterKey,
450        data: HollowRun<T>,
451        metrics: &Metrics,
452    ) -> Self {
453        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
454        let max_part_bytes = data
455            .parts
456            .iter()
457            .map(|p| p.max_part_bytes())
458            .max()
459            .unwrap_or(0);
460        let key_lower = data
461            .parts
462            .first()
463            .map_or(vec![], |p| p.key_lower().to_vec());
464        let structured_key_lower = match data.parts.first() {
465            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
466            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
467            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
468        };
469        let diffs_sum = data
470            .parts
471            .iter()
472            .map(|p| {
473                p.diffs_sum::<D>(&metrics.columnar)
474                    .expect("valid diffs sum")
475            })
476            .reduce(|mut a, b| {
477                a.plus_equals(&b);
478                a
479            })
480            .expect("valid diffs sum")
481            .encode();
482
483        let key = PartialBatchKey::new(writer, &PartId::new());
484        let blob_key = key.complete(&shard_id);
485        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
486        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
487            blob.set(&blob_key, bytes.clone())
488        })
489        .await;
490        Self {
491            key,
492            hollow_bytes,
493            max_part_bytes,
494            key_lower,
495            structured_key_lower,
496            diffs_sum: Some(diffs_sum),
497            _phantom_data: Default::default(),
498        }
499    }
500
501    /// Retrieve the [HollowRun] that this reference points to.
502    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
503    /// with the same shard id and backing store.
504    pub async fn get(
505        &self,
506        shard_id: ShardId,
507        blob: &dyn Blob,
508        metrics: &Metrics,
509    ) -> Option<HollowRun<T>> {
510        let blob_key = self.key.complete(&shard_id);
511        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
512            blob.get(&blob_key)
513        })
514        .await?;
515        let proto_runs: ProtoHollowRun =
516            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
517        let runs = proto_runs
518            .into_rust()
519            .expect("illegal state: invalid encoded runs proto");
520        Some(runs)
521    }
522}
523
524/// Part of the updates in a run.
525///
526/// Either a pointer to ones stored in Blob or a single part stored inline.
527#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
528#[serde(untagged)]
529pub enum RunPart<T> {
530    Single(BatchPart<T>),
531    Many(HollowRunRef<T>),
532}
533
534impl<T: Ord> PartialOrd<Self> for RunPart<T> {
535    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
536        Some(self.cmp(other))
537    }
538}
539
540impl<T: Ord> Ord for RunPart<T> {
541    fn cmp(&self, other: &Self) -> Ordering {
542        match (self, other) {
543            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
544            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
545            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
546            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
547        }
548    }
549}
550
551impl<T> RunPart<T> {
552    #[cfg(test)]
553    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
554        match self {
555            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
556            _ => panic!("expected hollow part!"),
557        }
558    }
559
560    pub fn hollow_bytes(&self) -> usize {
561        match self {
562            Self::Single(p) => p.hollow_bytes(),
563            Self::Many(r) => r.hollow_bytes,
564        }
565    }
566
567    pub fn is_inline(&self) -> bool {
568        match self {
569            Self::Single(p) => p.is_inline(),
570            Self::Many(_) => false,
571        }
572    }
573
574    pub fn inline_bytes(&self) -> usize {
575        match self {
576            Self::Single(p) => p.inline_bytes(),
577            Self::Many(_) => 0,
578        }
579    }
580
581    pub fn max_part_bytes(&self) -> usize {
582        match self {
583            Self::Single(p) => p.encoded_size_bytes(),
584            Self::Many(r) => r.max_part_bytes,
585        }
586    }
587
588    pub fn writer_key(&self) -> Option<WriterKey> {
589        match self {
590            Self::Single(p) => p.writer_key(),
591            Self::Many(r) => r.writer_key(),
592        }
593    }
594
595    pub fn encoded_size_bytes(&self) -> usize {
596        match self {
597            Self::Single(p) => p.encoded_size_bytes(),
598            Self::Many(r) => r.hollow_bytes,
599        }
600    }
601
602    pub fn schema_id(&self) -> Option<SchemaId> {
603        match self {
604            Self::Single(p) => p.schema_id(),
605            Self::Many(_) => None,
606        }
607    }
608
609    // A user-interpretable identifier or description of the part (for logs and
610    // such).
611    pub fn printable_name(&self) -> &str {
612        match self {
613            Self::Single(p) => p.printable_name(),
614            Self::Many(r) => r.key.0.as_str(),
615        }
616    }
617
618    pub fn stats(&self) -> Option<&LazyPartStats> {
619        match self {
620            Self::Single(p) => p.stats(),
621            // TODO: if we kept stats we could avoid fetching the metadata here.
622            Self::Many(_) => None,
623        }
624    }
625
626    pub fn key_lower(&self) -> &[u8] {
627        match self {
628            Self::Single(p) => p.key_lower(),
629            Self::Many(r) => r.key_lower.as_slice(),
630        }
631    }
632
633    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
634        match self {
635            Self::Single(p) => p.structured_key_lower(),
636            Self::Many(_) => None,
637        }
638    }
639
640    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
641        match self {
642            Self::Single(p) => p.ts_rewrite(),
643            Self::Many(_) => None,
644        }
645    }
646}
647
648impl<T> RunPart<T>
649where
650    T: Timestamp + Codec64,
651{
652    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
653        match self {
654            Self::Single(p) => p.diffs_sum(metrics),
655            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
656        }
657    }
658}
659
660/// A blob was missing!
661#[derive(Clone, Debug)]
662pub struct MissingBlob(BlobKey);
663
664impl std::fmt::Display for MissingBlob {
665    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666        write!(f, "unexpectedly missing key: {}", self.0)
667    }
668}
669
670impl std::error::Error for MissingBlob {}
671
672impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
673    pub fn part_stream<'a>(
674        &'a self,
675        shard_id: ShardId,
676        blob: &'a dyn Blob,
677        metrics: &'a Metrics,
678    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
679        try_stream! {
680            match self {
681                RunPart::Single(p) => {
682                    yield Cow::Borrowed(p);
683                }
684                RunPart::Many(r) => {
685                    let fetched = r.get(shard_id, blob, metrics).await
686                        .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
687                    for run_part in fetched.parts {
688                        for await batch_part in
689                            run_part.part_stream(shard_id, blob, metrics).boxed()
690                        {
691                            yield Cow::Owned(batch_part?.into_owned());
692                        }
693                    }
694                }
695            }
696        }
697    }
698}
699
700impl<T: Ord> PartialOrd for BatchPart<T> {
701    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
702        Some(self.cmp(other))
703    }
704}
705
706impl<T: Ord> Ord for BatchPart<T> {
707    fn cmp(&self, other: &Self) -> Ordering {
708        match (self, other) {
709            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
710            (
711                BatchPart::Inline {
712                    updates: s_updates,
713                    ts_rewrite: s_ts_rewrite,
714                    schema_id: s_schema_id,
715                    deprecated_schema_id: s_deprecated_schema_id,
716                },
717                BatchPart::Inline {
718                    updates: o_updates,
719                    ts_rewrite: o_ts_rewrite,
720                    schema_id: o_schema_id,
721                    deprecated_schema_id: o_deprecated_schema_id,
722                },
723            ) => (
724                s_updates,
725                s_ts_rewrite.as_ref().map(|x| x.elements()),
726                s_schema_id,
727                s_deprecated_schema_id,
728            )
729                .cmp(&(
730                    o_updates,
731                    o_ts_rewrite.as_ref().map(|x| x.elements()),
732                    o_schema_id,
733                    o_deprecated_schema_id,
734                )),
735            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
736            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
737        }
738    }
739}
740
741/// What order are the parts in this run in?
742#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
743pub(crate) enum RunOrder {
744    /// They're in no particular order.
745    Unordered,
746    /// They're ordered based on the codec-encoded K/V bytes.
747    Codec,
748    /// They're ordered by the natural ordering of the structured data.
749    Structured,
750}
751
752#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
753pub struct RunId(pub(crate) [u8; 16]);
754
755impl std::fmt::Display for RunId {
756    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
757        write!(f, "ri{}", Uuid::from_bytes(self.0))
758    }
759}
760
761impl std::fmt::Debug for RunId {
762    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
763        write!(f, "RunId({})", Uuid::from_bytes(self.0))
764    }
765}
766
767impl std::str::FromStr for RunId {
768    type Err = String;
769
770    fn from_str(s: &str) -> Result<Self, Self::Err> {
771        parse_id("ri", "RunId", s).map(RunId)
772    }
773}
774
775impl From<RunId> for String {
776    fn from(x: RunId) -> Self {
777        x.to_string()
778    }
779}
780
781impl RunId {
782    pub(crate) fn new() -> Self {
783        RunId(*Uuid::new_v4().as_bytes())
784    }
785}
786
787impl Arbitrary for RunId {
788    type Parameters = ();
789    type Strategy = proptest::strategy::BoxedStrategy<Self>;
790    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
791        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
792            RunId(*Uuid::from_u128(n).as_bytes())
793        })
794        .boxed()
795    }
796}
797
798/// Metadata shared across a run.
799#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
800pub struct RunMeta {
801    /// If none, Persist should infer the order based on the proto metadata.
802    pub(crate) order: Option<RunOrder>,
803    /// All parts in a run should have the same schema.
804    pub(crate) schema: Option<SchemaId>,
805
806    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
807    pub(crate) deprecated_schema: Option<SchemaId>,
808
809    /// If set, a UUID that uniquely identifies this run.
810    pub(crate) id: Option<RunId>,
811
812    /// The number of updates in this run, or `None` if the number is unknown.
813    pub(crate) len: Option<usize>,
814
815    /// Additional unstructured metadata.
816    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
817    pub(crate) meta: MetadataMap,
818}
819
820/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
821#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823    /// Pointer usable to retrieve the updates.
824    pub key: PartialBatchKey,
825    /// Miscellaneous metadata.
826    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
827    pub meta: MetadataMap,
828    /// The encoded size of this part.
829    pub encoded_size_bytes: usize,
830    /// A lower bound on the keys in the part. (By default, this the minimum
831    /// possible key: `vec![]`.)
832    #[serde(serialize_with = "serialize_part_bytes")]
833    pub key_lower: Vec<u8>,
834    /// A lower bound on the keys in the part, stored as structured data.
835    #[serde(serialize_with = "serialize_lazy_proto")]
836    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837    /// Aggregate statistics about data contained in this part.
838    #[serde(serialize_with = "serialize_part_stats")]
839    pub stats: Option<LazyPartStats>,
840    /// A frontier to which timestamps in this part are advanced on read, if
841    /// set.
842    ///
843    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
844    /// but we maintain the distinction between the two for some internal sanity
845    /// checking of invariants as well as metrics. If this ever becomes an
846    /// issue, everything still works with this as just `Antichain<T>`.
847    pub ts_rewrite: Option<Antichain<T>>,
848    /// A Codec64 encoded sum of all diffs in this part, if known.
849    ///
850    /// This is `None` if this part was written before we started storing this
851    /// information, or if it was written when the dyncfg was off.
852    ///
853    /// It could also make sense to model this as part of the pushdown stats, if
854    /// we later decide that's of some benefit.
855    #[serde(serialize_with = "serialize_diffs_sum")]
856    pub diffs_sum: Option<[u8; 8]>,
857    /// Columnar format that this batch was written in.
858    ///
859    /// This is `None` if this part was written before we started writing structured
860    /// columnar data.
861    pub format: Option<BatchColumnarFormat>,
862    /// The schemas used to encode the data in this batch part.
863    ///
864    /// Or None for historical data written before the schema registry was
865    /// added.
866    pub schema_id: Option<SchemaId>,
867
868    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
869    pub deprecated_schema_id: Option<SchemaId>,
870}
871
872/// A [Batch] but with the updates themselves stored externally.
873///
874/// [Batch]: differential_dataflow::trace::BatchReader
875#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877    /// Describes the times of the updates in the batch.
878    pub desc: Description<T>,
879    /// The number of updates in the batch.
880    pub len: usize,
881    /// Pointers usable to retrieve the updates.
882    pub(crate) parts: Vec<RunPart<T>>,
883    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
884    /// ex.
885    /// ```text
886    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
887    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
888    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
889    /// ```
890    pub(crate) run_splits: Vec<usize>,
891    /// Run-level metadata: the first entry has metadata for the first run, and so on.
892    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
893    pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898        let HollowBatch {
899            desc,
900            parts,
901            len,
902            run_splits: runs,
903            run_meta,
904        } = self;
905        f.debug_struct("HollowBatch")
906            .field(
907                "desc",
908                &(
909                    desc.lower().elements(),
910                    desc.upper().elements(),
911                    desc.since().elements(),
912                ),
913            )
914            .field("parts", &parts)
915            .field("len", &len)
916            .field("runs", &runs)
917            .field("run_meta", &run_meta)
918            .finish()
919    }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924        let HollowBatch {
925            desc,
926            len,
927            // Both parts and runs are covered by the self.runs call.
928            parts: _,
929            run_splits: _,
930            run_meta: _,
931        } = self;
932        let mut s = s.serialize_struct("HollowBatch", 5)?;
933        let () = s.serialize_field("lower", &desc.lower().elements())?;
934        let () = s.serialize_field("upper", &desc.upper().elements())?;
935        let () = s.serialize_field("since", &desc.since().elements())?;
936        let () = s.serialize_field("len", len)?;
937        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938        s.end()
939    }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944        Some(self.cmp(other))
945    }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949    fn cmp(&self, other: &Self) -> Ordering {
950        // Deconstruct self and other so we get a compile failure if new fields
951        // are added.
952        let HollowBatch {
953            desc: self_desc,
954            parts: self_parts,
955            len: self_len,
956            run_splits: self_runs,
957            run_meta: self_run_meta,
958        } = self;
959        let HollowBatch {
960            desc: other_desc,
961            parts: other_parts,
962            len: other_len,
963            run_splits: other_runs,
964            run_meta: other_run_meta,
965        } = other;
966        (
967            self_desc.lower().elements(),
968            self_desc.upper().elements(),
969            self_desc.since().elements(),
970            self_parts,
971            self_len,
972            self_runs,
973            self_run_meta,
974        )
975            .cmp(&(
976                other_desc.lower().elements(),
977                other_desc.upper().elements(),
978                other_desc.since().elements(),
979                other_parts,
980                other_len,
981                other_runs,
982                other_run_meta,
983            ))
984    }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988    pub(crate) fn part_stream<'a>(
989        &'a self,
990        shard_id: ShardId,
991        blob: &'a dyn Blob,
992        metrics: &'a Metrics,
993    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994        stream! {
995            for part in &self.parts {
996                for await part in part.part_stream(shard_id, blob, metrics) {
997                    yield part;
998                }
999            }
1000        }
1001    }
1002}
1003impl<T> HollowBatch<T> {
1004    /// Construct an in-memory hollow batch from the given metadata.
1005    ///
1006    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1007    /// is responsible for ensuring that the defined runs are valid.
1008    ///
1009    /// `len` should represent the number of valid updates in the referenced parts.
1010    pub(crate) fn new(
1011        desc: Description<T>,
1012        parts: Vec<RunPart<T>>,
1013        len: usize,
1014        run_meta: Vec<RunMeta>,
1015        run_splits: Vec<usize>,
1016    ) -> Self {
1017        debug_assert!(
1018            run_splits.is_strictly_sorted(),
1019            "run indices should be strictly increasing"
1020        );
1021        debug_assert!(
1022            run_splits.first().map_or(true, |i| *i > 0),
1023            "run indices should be positive"
1024        );
1025        debug_assert!(
1026            run_splits.last().map_or(true, |i| *i < parts.len()),
1027            "run indices should be valid indices into parts"
1028        );
1029        debug_assert!(
1030            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031            "all metadata should correspond to a run"
1032        );
1033
1034        Self {
1035            desc,
1036            len,
1037            parts,
1038            run_splits,
1039            run_meta,
1040        }
1041    }
1042
1043    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1044    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045        let run_meta = if parts.is_empty() {
1046            vec![]
1047        } else {
1048            vec![RunMeta::default()]
1049        };
1050        Self {
1051            desc,
1052            len,
1053            parts,
1054            run_splits: vec![],
1055            run_meta,
1056        }
1057    }
1058
1059    #[cfg(test)]
1060    pub(crate) fn new_run_for_test(
1061        desc: Description<T>,
1062        parts: Vec<RunPart<T>>,
1063        len: usize,
1064        run_id: RunId,
1065    ) -> Self {
1066        let run_meta = if parts.is_empty() {
1067            vec![]
1068        } else {
1069            let mut meta = RunMeta::default();
1070            meta.id = Some(run_id);
1071            vec![meta]
1072        };
1073        Self {
1074            desc,
1075            len,
1076            parts,
1077            run_splits: vec![],
1078            run_meta,
1079        }
1080    }
1081
1082    /// An empty hollow batch, representing no updates over the given desc.
1083    pub(crate) fn empty(desc: Description<T>) -> Self {
1084        Self {
1085            desc,
1086            len: 0,
1087            parts: vec![],
1088            run_splits: vec![],
1089            run_meta: vec![],
1090        }
1091    }
1092
1093    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094        let run_ends = self
1095            .run_splits
1096            .iter()
1097            .copied()
1098            .chain(std::iter::once(self.parts.len()));
1099        let run_metas = self.run_meta.iter();
1100        let run_parts = run_ends
1101            .scan(0, |start, end| {
1102                let range = *start..end;
1103                *start = end;
1104                Some(range)
1105            })
1106            .filter(|range| !range.is_empty())
1107            .map(|range| &self.parts[range]);
1108        run_metas.zip_eq(run_parts)
1109    }
1110
1111    pub(crate) fn inline_bytes(&self) -> usize {
1112        self.parts.iter().map(|x| x.inline_bytes()).sum()
1113    }
1114
1115    pub(crate) fn is_empty(&self) -> bool {
1116        self.parts.is_empty()
1117    }
1118
1119    pub(crate) fn part_count(&self) -> usize {
1120        self.parts.len()
1121    }
1122
1123    /// The sum of the encoded sizes of all parts in the batch.
1124    pub fn encoded_size_bytes(&self) -> usize {
1125        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126    }
1127}
1128
1129// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1130impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131    pub(crate) fn rewrite_ts(
1132        &mut self,
1133        frontier: &Antichain<T>,
1134        new_upper: Antichain<T>,
1135    ) -> Result<(), String> {
1136        if !PartialOrder::less_than(frontier, &new_upper) {
1137            return Err(format!(
1138                "rewrite frontier {:?} !< rewrite upper {:?}",
1139                frontier.elements(),
1140                new_upper.elements(),
1141            ));
1142        }
1143        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144            return Err(format!(
1145                "rewrite upper {:?} < batch upper {:?}",
1146                new_upper.elements(),
1147                self.desc.upper().elements(),
1148            ));
1149        }
1150
1151        // The following are things that it seems like we could support, but
1152        // initially we don't because we don't have a use case for them.
1153        if PartialOrder::less_than(frontier, self.desc.lower()) {
1154            return Err(format!(
1155                "rewrite frontier {:?} < batch lower {:?}",
1156                frontier.elements(),
1157                self.desc.lower().elements(),
1158            ));
1159        }
1160        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161            return Err(format!(
1162                "batch since {:?} != minimum antichain {:?}",
1163                self.desc.since().elements(),
1164                &[T::minimum()],
1165            ));
1166        }
1167        for part in self.parts.iter() {
1168            let Some(ts_rewrite) = part.ts_rewrite() else {
1169                continue;
1170            };
1171            if PartialOrder::less_than(frontier, ts_rewrite) {
1172                return Err(format!(
1173                    "rewrite frontier {:?} < batch rewrite {:?}",
1174                    frontier.elements(),
1175                    ts_rewrite.elements(),
1176                ));
1177            }
1178        }
1179
1180        self.desc = Description::new(
1181            self.desc.lower().clone(),
1182            new_upper,
1183            self.desc.since().clone(),
1184        );
1185        for part in &mut self.parts {
1186            match part {
1187                RunPart::Single(BatchPart::Hollow(part)) => {
1188                    part.ts_rewrite = Some(frontier.clone())
1189                }
1190                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191                    *ts_rewrite = Some(frontier.clone())
1192                }
1193                RunPart::Many(runs) => {
1194                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1195                    // ever generate runs of >1 part for those.
1196                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197                }
1198            }
1199        }
1200        Ok(())
1201    }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206        Some(self.cmp(other))
1207    }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211    fn cmp(&self, other: &Self) -> Ordering {
1212        // Deconstruct self and other so we get a compile failure if new fields
1213        // are added.
1214        let HollowBatchPart {
1215            key: self_key,
1216            meta: self_meta,
1217            encoded_size_bytes: self_encoded_size_bytes,
1218            key_lower: self_key_lower,
1219            structured_key_lower: self_structured_key_lower,
1220            stats: self_stats,
1221            ts_rewrite: self_ts_rewrite,
1222            diffs_sum: self_diffs_sum,
1223            format: self_format,
1224            schema_id: self_schema_id,
1225            deprecated_schema_id: self_deprecated_schema_id,
1226        } = self;
1227        let HollowBatchPart {
1228            key: other_key,
1229            meta: other_meta,
1230            encoded_size_bytes: other_encoded_size_bytes,
1231            key_lower: other_key_lower,
1232            structured_key_lower: other_structured_key_lower,
1233            stats: other_stats,
1234            ts_rewrite: other_ts_rewrite,
1235            diffs_sum: other_diffs_sum,
1236            format: other_format,
1237            schema_id: other_schema_id,
1238            deprecated_schema_id: other_deprecated_schema_id,
1239        } = other;
1240        (
1241            self_key,
1242            self_meta,
1243            self_encoded_size_bytes,
1244            self_key_lower,
1245            self_structured_key_lower,
1246            self_stats,
1247            self_ts_rewrite.as_ref().map(|x| x.elements()),
1248            self_diffs_sum,
1249            self_format,
1250            self_schema_id,
1251            self_deprecated_schema_id,
1252        )
1253            .cmp(&(
1254                other_key,
1255                other_meta,
1256                other_encoded_size_bytes,
1257                other_key_lower,
1258                other_structured_key_lower,
1259                other_stats,
1260                other_ts_rewrite.as_ref().map(|x| x.elements()),
1261                other_diffs_sum,
1262                other_format,
1263                other_schema_id,
1264                other_deprecated_schema_id,
1265            ))
1266    }
1267}
1268
1269/// A pointer to a rollup stored externally.
1270#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1271pub struct HollowRollup {
1272    /// Pointer usable to retrieve the rollup.
1273    pub key: PartialRollupKey,
1274    /// The encoded size of this rollup, if known.
1275    pub encoded_size_bytes: Option<usize>,
1276}
1277
1278/// A pointer to a blob stored externally.
1279#[derive(Debug)]
1280pub enum HollowBlobRef<'a, T> {
1281    Batch(&'a HollowBatch<T>),
1282    Rollup(&'a HollowRollup),
1283}
1284
1285/// A rollup that is currently being computed.
1286#[derive(
1287    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1288)]
1289pub struct ActiveRollup {
1290    pub seqno: SeqNo,
1291    pub start_ms: u64,
1292}
1293
1294/// A garbage collection request that is currently being computed.
1295#[derive(
1296    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1297)]
1298pub struct ActiveGc {
1299    pub seqno: SeqNo,
1300    pub start_ms: u64,
1301}
1302
1303/// A sentinel for a state transition that was a no-op.
1304///
1305/// Critically, this also indicates that the no-op state transition was not
1306/// committed through compare_and_append and thus is _not linearized_.
1307#[derive(Debug)]
1308#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1309pub struct NoOpStateTransition<T>(pub T);
1310
1311// TODO: Document invariants.
1312#[derive(Debug, Clone)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct StateCollections<T> {
1315    /// The version of this state. This is typically identical to the version of the code
1316    /// that wrote it, but may diverge during 0dt upgrades and similar operations when a
1317    /// new version of code is intentionally interoperating with an older state format.
1318    pub(crate) version: Version,
1319
1320    // - Invariant: `<= all reader.since`
1321    // - Invariant: Doesn't regress across state versions.
1322    pub(crate) last_gc_req: SeqNo,
1323
1324    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1325    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1326
1327    /// The rollup that is currently being computed.
1328    pub(crate) active_rollup: Option<ActiveRollup>,
1329    /// The gc request that is currently being computed.
1330    pub(crate) active_gc: Option<ActiveGc>,
1331
1332    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1333    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1334    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1335    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1336
1337    // - Invariant: `trace.since == meet(all reader.since)`
1338    // - Invariant: `trace.since` doesn't regress across state versions.
1339    // - Invariant: `trace.upper` doesn't regress across state versions.
1340    // - Invariant: `trace` upholds its own invariants.
1341    pub(crate) trace: Trace<T>,
1342}
1343
1344/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1345///
1346/// This strategy of directly serializing the schema objects requires that
1347/// persist users do the right thing. Specifically, that an encoded schema
1348/// doesn't in some later version of mz decode to an in-mem object that acts
1349/// differently. In a sense, the current system (before the introduction of the
1350/// schema registry) where schemas are passed in unchecked to reader and writer
1351/// registration calls also has the same defect, so seems fine.
1352///
1353/// An alternative is to write down here some persist-specific representation of
1354/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1355/// the potential to lead down a similar failure mode to the mz_persist_types
1356/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1357/// can always migrate later by rehydrating these, seems fine to start with the
1358/// easy thing.
1359#[derive(Debug, Clone, Serialize, PartialEq)]
1360pub struct EncodedSchemas {
1361    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1362    pub key: Bytes,
1363    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1364    /// registered, encoded as a `ProtoDataType`.
1365    pub key_data_type: Bytes,
1366    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1367    pub val: Bytes,
1368    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1369    /// registered, encoded as a `ProtoDataType`.
1370    pub val_data_type: Bytes,
1371}
1372
1373impl EncodedSchemas {
1374    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1375        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1376        DataType::from_proto(proto).expect("valid DataType")
1377    }
1378}
1379
1380#[derive(Debug)]
1381#[cfg_attr(test, derive(PartialEq))]
1382pub enum CompareAndAppendBreak<T> {
1383    AlreadyCommitted,
1384    Upper {
1385        shard_upper: Antichain<T>,
1386        writer_upper: Antichain<T>,
1387    },
1388    InvalidUsage(InvalidUsage<T>),
1389    InlineBackpressure,
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum SnapshotErr<T> {
1395    AsOfNotYetAvailable(SeqNo, Upper<T>),
1396    AsOfHistoricalDistinctionsLost(Since<T>),
1397}
1398
1399impl<T> StateCollections<T>
1400where
1401    T: Timestamp + Lattice + Codec64,
1402{
1403    pub fn add_rollup(
1404        &mut self,
1405        add_rollup: (SeqNo, &HollowRollup),
1406    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1407        let (rollup_seqno, rollup) = add_rollup;
1408        let applied = match self.rollups.get(&rollup_seqno) {
1409            Some(x) => x.key == rollup.key,
1410            None => {
1411                self.active_rollup = None;
1412                self.rollups.insert(rollup_seqno, rollup.to_owned());
1413                true
1414            }
1415        };
1416        // This state transition is a no-op if applied is false but we
1417        // still commit the state change so that this gets linearized
1418        // (maybe we're looking at old state).
1419        Continue(applied)
1420    }
1421
1422    pub fn remove_rollups(
1423        &mut self,
1424        remove_rollups: &[(SeqNo, PartialRollupKey)],
1425    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1426        if remove_rollups.is_empty() || self.is_tombstone() {
1427            return Break(NoOpStateTransition(vec![]));
1428        }
1429
1430        //This state transition is called at the end of the GC process, so we
1431        //need to unset the `active_gc` field.
1432        self.active_gc = None;
1433
1434        let mut removed = vec![];
1435        for (seqno, key) in remove_rollups {
1436            let removed_key = self.rollups.remove(seqno);
1437            debug_assert!(
1438                removed_key.as_ref().map_or(true, |x| &x.key == key),
1439                "{} vs {:?}",
1440                key,
1441                removed_key
1442            );
1443
1444            if removed_key.is_some() {
1445                removed.push(*seqno);
1446            }
1447        }
1448
1449        Continue(removed)
1450    }
1451
1452    pub fn register_leased_reader(
1453        &mut self,
1454        hostname: &str,
1455        reader_id: &LeasedReaderId,
1456        purpose: &str,
1457        seqno: SeqNo,
1458        lease_duration: Duration,
1459        heartbeat_timestamp_ms: u64,
1460        use_critical_since: bool,
1461    ) -> ControlFlow<
1462        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1463        (LeasedReaderState<T>, SeqNo),
1464    > {
1465        let since = if use_critical_since {
1466            self.critical_since()
1467                .unwrap_or_else(|| self.trace.since().clone())
1468        } else {
1469            self.trace.since().clone()
1470        };
1471        let reader_state = LeasedReaderState {
1472            debug: HandleDebugState {
1473                hostname: hostname.to_owned(),
1474                purpose: purpose.to_owned(),
1475            },
1476            seqno,
1477            since,
1478            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1479            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1480                .expect("lease duration as millis should fit within u64"),
1481        };
1482
1483        // If the shard-global upper and since are both the empty antichain,
1484        // then no further writes can ever commit and no further reads can be
1485        // served. Optimize this by no-op-ing reader registration so that we can
1486        // settle the shard into a final unchanging tombstone state.
1487        if self.is_tombstone() {
1488            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1489        }
1490
1491        // TODO: Handle if the reader or writer already exists.
1492        self.leased_readers
1493            .insert(reader_id.clone(), reader_state.clone());
1494        Continue((reader_state, self.seqno_since(seqno)))
1495    }
1496
1497    pub fn register_critical_reader(
1498        &mut self,
1499        hostname: &str,
1500        reader_id: &CriticalReaderId,
1501        opaque: Opaque,
1502        purpose: &str,
1503    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1504        let state = CriticalReaderState {
1505            debug: HandleDebugState {
1506                hostname: hostname.to_owned(),
1507                purpose: purpose.to_owned(),
1508            },
1509            since: self.trace.since().clone(),
1510            opaque,
1511        };
1512
1513        // We expire all readers if the upper and since both advance to the
1514        // empty antichain. Gracefully handle this. At the same time,
1515        // short-circuit the cmd application so we don't needlessly create new
1516        // SeqNos.
1517        if self.is_tombstone() {
1518            return Break(NoOpStateTransition(state));
1519        }
1520
1521        let state = match self.critical_readers.get_mut(reader_id) {
1522            Some(existing_state) => {
1523                existing_state.debug = state.debug;
1524                existing_state.clone()
1525            }
1526            None => {
1527                self.critical_readers
1528                    .insert(reader_id.clone(), state.clone());
1529                state
1530            }
1531        };
1532        Continue(state)
1533    }
1534
1535    pub fn register_schema<K: Codec, V: Codec>(
1536        &mut self,
1537        key_schema: &K::Schema,
1538        val_schema: &V::Schema,
1539    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1540        fn encode_data_type(data_type: &DataType) -> Bytes {
1541            let proto = data_type.into_proto();
1542            prost::Message::encode_to_vec(&proto).into()
1543        }
1544
1545        // Look for an existing registered SchemaId for these schemas.
1546        //
1547        // The common case is that this should be a recent one, so as a minor
1548        // optimization, do this search in reverse order.
1549        //
1550        // TODO: Note that this impl is `O(schemas)`. Combined with the
1551        // possibility of cmd retries, it's possible but unlikely for this to
1552        // get expensive. We could maintain a reverse map to speed this up in
1553        // necessary. This would either need to work on the encoded
1554        // representation (which, we'd have to fall back to the linear scan) or
1555        // we'd need to add a Hash/Ord bound to Schema.
1556        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1557            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1558        });
1559        match existing_id {
1560            Some((schema_id, _)) => {
1561                // TODO: Validate that the decoded schemas still produce records
1562                // of the recorded DataType, to detect shenanigans. Probably
1563                // best to wait until we've turned on Schema2 in prod and thus
1564                // committed to the current mappings.
1565                Break(NoOpStateTransition(Some(*schema_id)))
1566            }
1567            None if self.is_tombstone() => {
1568                // TODO: Is this right?
1569                Break(NoOpStateTransition(None))
1570            }
1571            None if self.schemas.is_empty() => {
1572                // We'll have to do something more sophisticated here to
1573                // generate the next id if/when we start supporting the removal
1574                // of schemas.
1575                let id = SchemaId(self.schemas.len());
1576                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1577                    .expect("valid key schema");
1578                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1579                    .expect("valid val schema");
1580                let prev = self.schemas.insert(
1581                    id,
1582                    EncodedSchemas {
1583                        key: K::encode_schema(key_schema),
1584                        key_data_type: encode_data_type(&key_data_type),
1585                        val: V::encode_schema(val_schema),
1586                        val_data_type: encode_data_type(&val_data_type),
1587                    },
1588                );
1589                assert_eq!(prev, None);
1590                Continue(Some(id))
1591            }
1592            None => {
1593                info!(
1594                    "register_schemas got {:?} expected {:?}",
1595                    key_schema,
1596                    self.schemas
1597                        .iter()
1598                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1599                        .collect::<Vec<_>>()
1600                );
1601                // Until we implement persist schema changes, only allow at most
1602                // one registered schema.
1603                Break(NoOpStateTransition(None))
1604            }
1605        }
1606    }
1607
1608    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1609        &mut self,
1610        expected: SchemaId,
1611        key_schema: &K::Schema,
1612        val_schema: &V::Schema,
1613    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1614        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1615            // To be defensive, create an empty batch and inspect the resulting
1616            // data type (as opposed to something like allowing the `Schema` to
1617            // declare the DataType).
1618            let array = Schema::encoder(schema).expect("valid schema").finish();
1619            Array::data_type(&array).clone()
1620        }
1621
1622        let (current_id, current) = self
1623            .schemas
1624            .last_key_value()
1625            .expect("all shards have a schema");
1626        if *current_id != expected {
1627            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1628                schema_id: *current_id,
1629                key: K::decode_schema(&current.key),
1630                val: V::decode_schema(&current.val),
1631            }));
1632        }
1633
1634        let current_key = K::decode_schema(&current.key);
1635        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1636        let current_val = V::decode_schema(&current.val);
1637        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1638
1639        let key_dt = data_type(key_schema);
1640        let val_dt = data_type(val_schema);
1641
1642        // If the schema is exactly the same as the current one, no-op.
1643        if current_key == *key_schema
1644            && current_key_dt == key_dt
1645            && current_val == *val_schema
1646            && current_val_dt == val_dt
1647        {
1648            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1649        }
1650
1651        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1652        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1653        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1654            return Break(NoOpStateTransition(CaESchema::Incompatible));
1655        };
1656        // Persist initially disallows dropping columns. This would require a
1657        // bunch more work (e.g. not safe to use the latest schema in
1658        // compaction) and isn't initially necessary in mz.
1659        if key_fn.contains_drop() || val_fn.contains_drop() {
1660            return Break(NoOpStateTransition(CaESchema::Incompatible));
1661        }
1662
1663        // We'll have to do something more sophisticated here to
1664        // generate the next id if/when we start supporting the removal
1665        // of schemas.
1666        let id = SchemaId(self.schemas.len());
1667        self.schemas.insert(
1668            id,
1669            EncodedSchemas {
1670                key: K::encode_schema(key_schema),
1671                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1672                val: V::encode_schema(val_schema),
1673                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1674            },
1675        );
1676        Continue(CaESchema::Ok(id))
1677    }
1678
1679    pub fn compare_and_append(
1680        &mut self,
1681        batch: &HollowBatch<T>,
1682        writer_id: &WriterId,
1683        heartbeat_timestamp_ms: u64,
1684        lease_duration_ms: u64,
1685        idempotency_token: &IdempotencyToken,
1686        debug_info: &HandleDebugState,
1687        inline_writes_total_max_bytes: usize,
1688        claim_compaction_percent: usize,
1689        claim_compaction_min_version: Option<&Version>,
1690    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1691        // We expire all writers if the upper and since both advance to the
1692        // empty antichain. Gracefully handle this. At the same time,
1693        // short-circuit the cmd application so we don't needlessly create new
1694        // SeqNos.
1695        if self.is_tombstone() {
1696            assert_eq!(self.trace.upper(), &Antichain::new());
1697            return Break(CompareAndAppendBreak::Upper {
1698                shard_upper: Antichain::new(),
1699                // This writer might have been registered before the shard upper
1700                // was advanced, which would make this pessimistic in the
1701                // Indeterminate handling of compare_and_append at the machine
1702                // level, but that's fine.
1703                writer_upper: Antichain::new(),
1704            });
1705        }
1706
1707        let writer_state = self
1708            .writers
1709            .entry(writer_id.clone())
1710            .or_insert_with(|| WriterState {
1711                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1712                lease_duration_ms,
1713                most_recent_write_token: IdempotencyToken::SENTINEL,
1714                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1715                debug: debug_info.clone(),
1716            });
1717
1718        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1719            return Break(CompareAndAppendBreak::InvalidUsage(
1720                InvalidUsage::InvalidBounds {
1721                    lower: batch.desc.lower().clone(),
1722                    upper: batch.desc.upper().clone(),
1723                },
1724            ));
1725        }
1726
1727        // If the time interval is empty, the list of updates must also be
1728        // empty.
1729        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1730            return Break(CompareAndAppendBreak::InvalidUsage(
1731                InvalidUsage::InvalidEmptyTimeInterval {
1732                    lower: batch.desc.lower().clone(),
1733                    upper: batch.desc.upper().clone(),
1734                    keys: batch
1735                        .parts
1736                        .iter()
1737                        .map(|x| x.printable_name().to_owned())
1738                        .collect(),
1739                },
1740            ));
1741        }
1742
1743        if idempotency_token == &writer_state.most_recent_write_token {
1744            // If the last write had the same idempotency_token, then this must
1745            // have already committed. Sanity check that the most recent write
1746            // upper matches and that the shard upper is at least the write
1747            // upper, if it's not something very suspect is going on.
1748            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1749            assert!(
1750                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1751                "{:?} vs {:?}",
1752                batch.desc.upper(),
1753                self.trace.upper()
1754            );
1755            return Break(CompareAndAppendBreak::AlreadyCommitted);
1756        }
1757
1758        let shard_upper = self.trace.upper();
1759        if shard_upper != batch.desc.lower() {
1760            return Break(CompareAndAppendBreak::Upper {
1761                shard_upper: shard_upper.clone(),
1762                writer_upper: writer_state.most_recent_write_upper.clone(),
1763            });
1764        }
1765
1766        let new_inline_bytes = batch.inline_bytes();
1767        if new_inline_bytes > 0 {
1768            let mut existing_inline_bytes = 0;
1769            self.trace
1770                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1771            // TODO: For very small batches, it may actually _increase_ the size
1772            // of state to flush them out. Consider another threshold under
1773            // which an inline part can be appended no matter what.
1774            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1775                return Break(CompareAndAppendBreak::InlineBackpressure);
1776            }
1777        }
1778
1779        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1780            self.trace.push_batch(batch.clone())
1781        } else {
1782            Vec::new()
1783        };
1784
1785        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1786        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1787        let all_empty_reqs = merge_reqs
1788            .iter()
1789            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1790        if all_empty_reqs && !batch.is_empty() {
1791            let mut reqs_to_take = claim_compaction_percent / 100;
1792            if (usize::cast_from(idempotency_token.hashed()) % 100)
1793                < (claim_compaction_percent % 100)
1794            {
1795                reqs_to_take += 1;
1796            }
1797            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1798            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1799            merge_reqs.extend(
1800                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1801                // likely to be compacted soon for other reasons.
1802                self.trace
1803                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1804                    .take(reqs_to_take),
1805            )
1806        }
1807
1808        for req in &merge_reqs {
1809            self.trace.claim_compaction(
1810                req.id,
1811                ActiveCompaction {
1812                    start_ms: heartbeat_timestamp_ms,
1813                },
1814            )
1815        }
1816
1817        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1818        writer_state.most_recent_write_token = idempotency_token.clone();
1819        // The writer's most recent upper should only go forward.
1820        assert!(
1821            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1822            "{:?} vs {:?}",
1823            &writer_state.most_recent_write_upper,
1824            batch.desc.upper()
1825        );
1826        writer_state
1827            .most_recent_write_upper
1828            .clone_from(batch.desc.upper());
1829
1830        // Heartbeat the writer state to keep our idempotency token alive.
1831        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1832            heartbeat_timestamp_ms,
1833            writer_state.last_heartbeat_timestamp_ms,
1834        );
1835
1836        Continue(merge_reqs)
1837    }
1838
1839    pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1840        &mut self,
1841        res: &FueledMergeRes<T>,
1842        metrics: &ColumnarMetrics,
1843    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1844        // We expire all writers if the upper and since both advance to the
1845        // empty antichain. Gracefully handle this. At the same time,
1846        // short-circuit the cmd application so we don't needlessly create new
1847        // SeqNos.
1848        if self.is_tombstone() {
1849            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1850        }
1851
1852        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1853        Continue(apply_merge_result)
1854    }
1855
1856    pub fn spine_exert(
1857        &mut self,
1858        fuel: usize,
1859    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1860        let (merge_reqs, did_work) = self.trace.exert(fuel);
1861        if did_work {
1862            Continue(merge_reqs)
1863        } else {
1864            assert!(merge_reqs.is_empty());
1865            // Break if we have nothing useful to do to save the seqno (and
1866            // resulting crdb traffic)
1867            Break(NoOpStateTransition(Vec::new()))
1868        }
1869    }
1870
1871    pub fn downgrade_since(
1872        &mut self,
1873        reader_id: &LeasedReaderId,
1874        seqno: SeqNo,
1875        outstanding_seqno: SeqNo,
1876        new_since: &Antichain<T>,
1877        heartbeat_timestamp_ms: u64,
1878    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1879        // We expire all readers if the upper and since both advance to the
1880        // empty antichain. Gracefully handle this. At the same time,
1881        // short-circuit the cmd application so we don't needlessly create new
1882        // SeqNos.
1883        if self.is_tombstone() {
1884            return Break(NoOpStateTransition(Since(Antichain::new())));
1885        }
1886
1887        // The only way to have a missing reader in state is if it's been expired... and in that
1888        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1889        let Some(reader_state) = self.leased_reader(reader_id) else {
1890            tracing::warn!(
1891                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1892            );
1893            return Break(NoOpStateTransition(Since(Antichain::new())));
1894        };
1895
1896        // Also use this as an opportunity to heartbeat the reader and downgrade
1897        // the seqno capability.
1898        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1899            heartbeat_timestamp_ms,
1900            reader_state.last_heartbeat_timestamp_ms,
1901        );
1902
1903        let seqno = {
1904            assert!(
1905                outstanding_seqno >= reader_state.seqno,
1906                "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1907                    is behind current reader_state ({:?})",
1908                outstanding_seqno,
1909                reader_state.seqno,
1910            );
1911            std::cmp::min(outstanding_seqno, seqno)
1912        };
1913
1914        reader_state.seqno = seqno;
1915
1916        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1917            reader_state.since.clone_from(new_since);
1918            self.update_since();
1919            new_since.clone()
1920        } else {
1921            // No-op, but still commit the state change so that this gets
1922            // linearized.
1923            reader_state.since.clone()
1924        };
1925
1926        Continue(Since(reader_current_since))
1927    }
1928
1929    pub fn compare_and_downgrade_since(
1930        &mut self,
1931        reader_id: &CriticalReaderId,
1932        expected_opaque: &Opaque,
1933        (new_opaque, new_since): (&Opaque, &Antichain<T>),
1934    ) -> ControlFlow<
1935        NoOpStateTransition<Result<Since<T>, (Opaque, Since<T>)>>,
1936        Result<Since<T>, (Opaque, Since<T>)>,
1937    > {
1938        // We expire all readers if the upper and since both advance to the
1939        // empty antichain. Gracefully handle this. At the same time,
1940        // short-circuit the cmd application so we don't needlessly create new
1941        // SeqNos.
1942        if self.is_tombstone() {
1943            // Match the idempotence behavior below of ignoring the token if
1944            // since is already advanced enough (in this case, because it's a
1945            // tombstone, we know it's the empty antichain).
1946            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1947        }
1948
1949        let reader_state = self.critical_reader(reader_id);
1950
1951        if reader_state.opaque != *expected_opaque {
1952            // No-op, but still commit the state change so that this gets
1953            // linearized.
1954            return Continue(Err((
1955                reader_state.opaque.clone(),
1956                Since(reader_state.since.clone()),
1957            )));
1958        }
1959
1960        reader_state.opaque = new_opaque.clone();
1961        if PartialOrder::less_equal(&reader_state.since, new_since) {
1962            reader_state.since.clone_from(new_since);
1963            self.update_since();
1964            Continue(Ok(Since(new_since.clone())))
1965        } else {
1966            // no work to be done -- the reader state's `since` is already sufficiently
1967            // advanced. we may someday need to revisit this branch when it's possible
1968            // for two `since` frontiers to be incomparable.
1969            Continue(Ok(Since(reader_state.since.clone())))
1970        }
1971    }
1972
1973    pub fn expire_leased_reader(
1974        &mut self,
1975        reader_id: &LeasedReaderId,
1976    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1977        // We expire all readers if the upper and since both advance to the
1978        // empty antichain. Gracefully handle this. At the same time,
1979        // short-circuit the cmd application so we don't needlessly create new
1980        // SeqNos.
1981        if self.is_tombstone() {
1982            return Break(NoOpStateTransition(false));
1983        }
1984
1985        let existed = self.leased_readers.remove(reader_id).is_some();
1986        if existed {
1987            // TODO(database-issues#6885): Re-enable this
1988            //
1989            // Temporarily disabling this because we think it might be the cause
1990            // of the remap since bug. Specifically, a clusterd process has a
1991            // ReadHandle for maintaining the once and one inside a Listen. If
1992            // we crash and stay down for longer than the read lease duration,
1993            // it's possible that an expiry of them both in quick succession
1994            // jumps the since forward to the Listen one.
1995            //
1996            // Don't forget to update the downgrade_since when this gets
1997            // switched back on.
1998            //
1999            // self.update_since();
2000        }
2001        // No-op if existed is false, but still commit the state change so that
2002        // this gets linearized.
2003        Continue(existed)
2004    }
2005
2006    pub fn expire_critical_reader(
2007        &mut self,
2008        reader_id: &CriticalReaderId,
2009    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2010        // We expire all readers if the upper and since both advance to the
2011        // empty antichain. Gracefully handle this. At the same time,
2012        // short-circuit the cmd application so we don't needlessly create new
2013        // SeqNos.
2014        if self.is_tombstone() {
2015            return Break(NoOpStateTransition(false));
2016        }
2017
2018        let existed = self.critical_readers.remove(reader_id).is_some();
2019        if existed {
2020            // TODO(database-issues#6885): Re-enable this
2021            //
2022            // Temporarily disabling this because we think it might be the cause
2023            // of the remap since bug. Specifically, a clusterd process has a
2024            // ReadHandle for maintaining the once and one inside a Listen. If
2025            // we crash and stay down for longer than the read lease duration,
2026            // it's possible that an expiry of them both in quick succession
2027            // jumps the since forward to the Listen one.
2028            //
2029            // Don't forget to update the downgrade_since when this gets
2030            // switched back on.
2031            //
2032            // self.update_since();
2033        }
2034        // This state transition is a no-op if existed is false, but we still
2035        // commit the state change so that this gets linearized (maybe we're
2036        // looking at old state).
2037        Continue(existed)
2038    }
2039
2040    pub fn expire_writer(
2041        &mut self,
2042        writer_id: &WriterId,
2043    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2044        // We expire all writers if the upper and since both advance to the
2045        // empty antichain. Gracefully handle this. At the same time,
2046        // short-circuit the cmd application so we don't needlessly create new
2047        // SeqNos.
2048        if self.is_tombstone() {
2049            return Break(NoOpStateTransition(false));
2050        }
2051
2052        let existed = self.writers.remove(writer_id).is_some();
2053        // This state transition is a no-op if existed is false, but we still
2054        // commit the state change so that this gets linearized (maybe we're
2055        // looking at old state).
2056        Continue(existed)
2057    }
2058
2059    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2060        self.leased_readers.get_mut(id)
2061    }
2062
2063    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2064        self.critical_readers
2065            .get_mut(id)
2066            .unwrap_or_else(|| {
2067                panic!(
2068                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2069                    id
2070                )
2071            })
2072    }
2073
2074    fn critical_since(&self) -> Option<Antichain<T>> {
2075        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2076        let mut since = critical_sinces.next().cloned()?;
2077        for s in critical_sinces {
2078            since.meet_assign(s);
2079        }
2080        Some(since)
2081    }
2082
2083    fn update_since(&mut self) {
2084        let mut sinces_iter = self
2085            .leased_readers
2086            .values()
2087            .map(|x| &x.since)
2088            .chain(self.critical_readers.values().map(|x| &x.since));
2089        let mut since = match sinces_iter.next() {
2090            Some(since) => since.clone(),
2091            None => {
2092                // If there are no current readers, leave `since` unchanged so
2093                // it doesn't regress.
2094                return;
2095            }
2096        };
2097        while let Some(s) = sinces_iter.next() {
2098            since.meet_assign(s);
2099        }
2100        self.trace.downgrade_since(&since);
2101    }
2102
2103    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2104        let mut seqno_since = seqno;
2105        for cap in self.leased_readers.values() {
2106            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2107        }
2108        // critical_readers don't hold a seqno capability.
2109        seqno_since
2110    }
2111
2112    fn tombstone_batch() -> HollowBatch<T> {
2113        HollowBatch::empty(Description::new(
2114            Antichain::from_elem(T::minimum()),
2115            Antichain::new(),
2116            Antichain::new(),
2117        ))
2118    }
2119
2120    pub(crate) fn is_tombstone(&self) -> bool {
2121        self.trace.upper().is_empty()
2122            && self.trace.since().is_empty()
2123            && self.writers.is_empty()
2124            && self.leased_readers.is_empty()
2125            && self.critical_readers.is_empty()
2126    }
2127
2128    pub(crate) fn is_single_empty_batch(&self) -> bool {
2129        let mut batch_count = 0;
2130        let mut is_empty = true;
2131        self.trace.map_batches(|b| {
2132            batch_count += 1;
2133            is_empty &= b.is_empty()
2134        });
2135        batch_count <= 1 && is_empty
2136    }
2137
2138    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2139        assert_eq!(self.trace.upper(), &Antichain::new());
2140        assert_eq!(self.trace.since(), &Antichain::new());
2141
2142        // Remember our current state, so we can decide whether we have to
2143        // record a transition in durable state.
2144        let was_tombstone = self.is_tombstone();
2145
2146        // Enter the "tombstone" state, if we're not in it already.
2147        self.writers.clear();
2148        self.leased_readers.clear();
2149        self.critical_readers.clear();
2150
2151        debug_assert!(self.is_tombstone());
2152
2153        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2154        // it -- the actual contents of our batches no longer matter.
2155        // This method progressively replaces batches in our state with simpler versions, to allow
2156        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2157        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2158        // transitions... most operations happen incrementally, and large single writes can overwhelm
2159        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2160
2161        let mut to_replace = None;
2162        let mut batch_count = 0;
2163        self.trace.map_batches(|b| {
2164            batch_count += 1;
2165            if !b.is_empty() && to_replace.is_none() {
2166                to_replace = Some(b.desc.clone());
2167            }
2168        });
2169        if let Some(desc) = to_replace {
2170            // We have a nonempty batch: replace it with an empty batch and return.
2171            // This should not produce an excessively large diff: if it did, we wouldn't have been
2172            // able to append that batch in the first place.
2173            let result = self.trace.apply_tombstone_merge(&desc);
2174            assert!(
2175                result.matched(),
2176                "merge with a matching desc should always match"
2177            );
2178            Continue(())
2179        } else if batch_count > 1 {
2180            // All our batches are empty, but we have more than one of them. Replace the whole set
2181            // with a new single-batch trace.
2182            // This produces a diff with a size proportional to the number of batches, but since
2183            // Spine keeps a logarithmic number of batches this should never be excessively large.
2184            let mut new_trace = Trace::default();
2185            new_trace.downgrade_since(&Antichain::new());
2186            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2187            assert_eq!(merge_reqs, Vec::new());
2188            self.trace = new_trace;
2189            Continue(())
2190        } else if !was_tombstone {
2191            // We were not tombstoned before, so have to make sure this state
2192            // transition is recorded.
2193            Continue(())
2194        } else {
2195            // All our batches are empty, and there's only one... there's no shrinking this
2196            // tombstone further.
2197            Break(NoOpStateTransition(()))
2198        }
2199    }
2200}
2201
2202// TODO: Document invariants.
2203#[derive(Debug)]
2204#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2205pub struct State<T> {
2206    pub(crate) shard_id: ShardId,
2207
2208    pub(crate) seqno: SeqNo,
2209    /// A strictly increasing wall time of when this state was written, in
2210    /// milliseconds since the unix epoch.
2211    pub(crate) walltime_ms: u64,
2212    /// Hostname of the persist user that created this version of state. For
2213    /// debugging.
2214    pub(crate) hostname: String,
2215    pub(crate) collections: StateCollections<T>,
2216}
2217
2218/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2219/// ones in durable storage.
2220pub struct TypedState<K, V, T, D> {
2221    pub(crate) state: State<T>,
2222
2223    // According to the docs, PhantomData is to "mark things that act like they
2224    // own a T". State doesn't actually own K, V, or D, just the ability to
2225    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2226    // T [1], but also allows State to correctly derive Send+Sync.
2227    //
2228    // [1]:
2229    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2230    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2231}
2232
2233impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2234    #[cfg(any(test, debug_assertions))]
2235    pub(crate) fn clone(&self, hostname: String) -> Self {
2236        TypedState {
2237            state: State {
2238                shard_id: self.shard_id.clone(),
2239                seqno: self.seqno.clone(),
2240                walltime_ms: self.walltime_ms,
2241                hostname,
2242                collections: self.collections.clone(),
2243            },
2244            _phantom: PhantomData,
2245        }
2246    }
2247
2248    pub(crate) fn clone_for_rollup(&self) -> Self {
2249        TypedState {
2250            state: State {
2251                shard_id: self.shard_id.clone(),
2252                seqno: self.seqno.clone(),
2253                walltime_ms: self.walltime_ms,
2254                hostname: self.hostname.clone(),
2255                collections: self.collections.clone(),
2256            },
2257            _phantom: PhantomData,
2258        }
2259    }
2260}
2261
2262impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2263    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2264        // Deconstruct self so we get a compile failure if new fields
2265        // are added.
2266        let TypedState { state, _phantom } = self;
2267        f.debug_struct("TypedState").field("state", state).finish()
2268    }
2269}
2270
2271// Impl PartialEq regardless of the type params.
2272#[cfg(any(test, debug_assertions))]
2273impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2274    fn eq(&self, other: &Self) -> bool {
2275        // Deconstruct self and other so we get a compile failure if new fields
2276        // are added.
2277        let TypedState {
2278            state: self_state,
2279            _phantom,
2280        } = self;
2281        let TypedState {
2282            state: other_state,
2283            _phantom,
2284        } = other;
2285        self_state == other_state
2286    }
2287}
2288
2289impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2290    type Target = State<T>;
2291
2292    fn deref(&self) -> &Self::Target {
2293        &self.state
2294    }
2295}
2296
2297impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2298    fn deref_mut(&mut self) -> &mut Self::Target {
2299        &mut self.state
2300    }
2301}
2302
2303impl<K, V, T, D> TypedState<K, V, T, D>
2304where
2305    K: Codec,
2306    V: Codec,
2307    T: Timestamp + Lattice + Codec64,
2308    D: Codec64,
2309{
2310    pub fn new(
2311        applier_version: Version,
2312        shard_id: ShardId,
2313        hostname: String,
2314        walltime_ms: u64,
2315    ) -> Self {
2316        let state = State {
2317            shard_id,
2318            seqno: SeqNo::minimum(),
2319            walltime_ms,
2320            hostname,
2321            collections: StateCollections {
2322                version: applier_version,
2323                last_gc_req: SeqNo::minimum(),
2324                rollups: BTreeMap::new(),
2325                active_rollup: None,
2326                active_gc: None,
2327                leased_readers: BTreeMap::new(),
2328                critical_readers: BTreeMap::new(),
2329                writers: BTreeMap::new(),
2330                schemas: BTreeMap::new(),
2331                trace: Trace::default(),
2332            },
2333        };
2334        TypedState {
2335            state,
2336            _phantom: PhantomData,
2337        }
2338    }
2339
2340    pub fn clone_apply<R, E, WorkFn>(
2341        &self,
2342        cfg: &PersistConfig,
2343        work_fn: &mut WorkFn,
2344    ) -> ControlFlow<E, (R, Self)>
2345    where
2346        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2347    {
2348        // We do not increment the version by default, though work_fn can if it chooses to.
2349        let mut new_state = State {
2350            shard_id: self.shard_id,
2351            seqno: self.seqno.next(),
2352            walltime_ms: (cfg.now)(),
2353            hostname: cfg.hostname.clone(),
2354            collections: self.collections.clone(),
2355        };
2356
2357        // Make sure walltime_ms is strictly increasing, in case clocks are
2358        // offset.
2359        if new_state.walltime_ms <= self.walltime_ms {
2360            new_state.walltime_ms = self.walltime_ms + 1;
2361        }
2362
2363        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2364        let new_state = TypedState {
2365            state: new_state,
2366            _phantom: PhantomData,
2367        };
2368        Continue((work_ret, new_state))
2369    }
2370}
2371
2372#[derive(Copy, Clone, Debug)]
2373pub struct GcConfig {
2374    pub use_active_gc: bool,
2375    pub fallback_threshold_ms: u64,
2376    pub min_versions: usize,
2377    pub max_versions: usize,
2378}
2379
2380impl<T> State<T>
2381where
2382    T: Timestamp + Lattice + Codec64,
2383{
2384    pub fn shard_id(&self) -> ShardId {
2385        self.shard_id
2386    }
2387
2388    pub fn seqno(&self) -> SeqNo {
2389        self.seqno
2390    }
2391
2392    pub fn since(&self) -> &Antichain<T> {
2393        self.collections.trace.since()
2394    }
2395
2396    pub fn upper(&self) -> &Antichain<T> {
2397        self.collections.trace.upper()
2398    }
2399
2400    pub fn spine_batch_count(&self) -> usize {
2401        self.collections.trace.num_spine_batches()
2402    }
2403
2404    pub fn size_metrics(&self) -> StateSizeMetrics {
2405        let mut ret = StateSizeMetrics::default();
2406        self.blobs().for_each(|x| match x {
2407            HollowBlobRef::Batch(x) => {
2408                ret.hollow_batch_count += 1;
2409                ret.batch_part_count += x.part_count();
2410                ret.num_updates += x.len;
2411
2412                let batch_size = x.encoded_size_bytes();
2413                for x in x.parts.iter() {
2414                    if x.ts_rewrite().is_some() {
2415                        ret.rewrite_part_count += 1;
2416                    }
2417                    if x.is_inline() {
2418                        ret.inline_part_count += 1;
2419                        ret.inline_part_bytes += x.inline_bytes();
2420                    }
2421                }
2422                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2423                ret.state_batches_bytes += batch_size;
2424            }
2425            HollowBlobRef::Rollup(x) => {
2426                ret.state_rollup_count += 1;
2427                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2428            }
2429        });
2430        ret
2431    }
2432
2433    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2434        // We maintain the invariant that every version of state has at least
2435        // one rollup.
2436        self.collections
2437            .rollups
2438            .iter()
2439            .rev()
2440            .next()
2441            .expect("State should have at least one rollup if seqno > minimum")
2442    }
2443
2444    pub(crate) fn seqno_since(&self) -> SeqNo {
2445        self.collections.seqno_since(self.seqno)
2446    }
2447
2448    // Returns whether the cmd proposing this state has been selected to perform
2449    // background garbage collection work.
2450    //
2451    // If it was selected, this information is recorded in the state itself for
2452    // commit along with the cmd's state transition. This helps us to avoid
2453    // redundant work.
2454    //
2455    // Correctness does not depend on a gc assignment being executed, nor on
2456    // them being executed in the order they are given. But it is expected that
2457    // gc assignments are best-effort respected. In practice, cmds like
2458    // register_foo or expire_foo, where it would be awkward, ignore gc.
2459    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2460        let GcConfig {
2461            use_active_gc,
2462            fallback_threshold_ms,
2463            min_versions,
2464            max_versions,
2465        } = cfg;
2466        // This is an arbitrary-ish threshold that scales with seqno, but never
2467        // gets particularly big. It probably could be much bigger and certainly
2468        // could use a tuning pass at some point.
2469        let gc_threshold = if use_active_gc {
2470            u64::cast_from(min_versions)
2471        } else {
2472            std::cmp::max(
2473                1,
2474                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2475            )
2476        };
2477        let new_seqno_since = self.seqno_since();
2478        // Collect until the new seqno since... or the old since plus the max number of versions,
2479        // whatever is less.
2480        let gc_until_seqno = new_seqno_since.min(SeqNo(
2481            self.collections
2482                .last_gc_req
2483                .0
2484                .saturating_add(u64::cast_from(max_versions)),
2485        ));
2486        let should_gc = new_seqno_since
2487            .0
2488            .saturating_sub(self.collections.last_gc_req.0)
2489            >= gc_threshold;
2490
2491        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2492        // it's been a while since it started, we should gc.
2493        let should_gc = if use_active_gc && !should_gc {
2494            match self.collections.active_gc {
2495                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2496                None => false,
2497            }
2498        } else {
2499            should_gc
2500        };
2501        // Assign GC traffic preferentially to writers, falling back to anyone
2502        // generating new state versions if there are no writers.
2503        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2504        // Always assign GC work to a tombstoned shard to have the chance to
2505        // clean up any residual blobs. This is safe (won't cause excess gc)
2506        // as the only allowed command after becoming a tombstone is to write
2507        // the final rollup.
2508        let tombstone_needs_gc = self.collections.is_tombstone();
2509        let should_gc = should_gc || tombstone_needs_gc;
2510        let should_gc = if use_active_gc {
2511            // If we have an active gc, we should only gc if the active gc is
2512            // sufficiently old. This is to avoid doing more gc work than
2513            // necessary.
2514            should_gc
2515                && match self.collections.active_gc {
2516                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2517                    None => true,
2518                }
2519        } else {
2520            should_gc
2521        };
2522        if should_gc {
2523            self.collections.last_gc_req = gc_until_seqno;
2524            Some(GcReq {
2525                shard_id: self.shard_id,
2526                new_seqno_since: gc_until_seqno,
2527            })
2528        } else {
2529            None
2530        }
2531    }
2532
2533    /// Return the number of gc-ineligible state versions.
2534    pub fn seqnos_held(&self) -> usize {
2535        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2536    }
2537
2538    /// Expire all readers and writers up to the given walltime_ms.
2539    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2540        let mut metrics = ExpiryMetrics::default();
2541        let shard_id = self.shard_id();
2542        self.collections.leased_readers.retain(|id, state| {
2543            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2544            if !retain {
2545                info!(
2546                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2547                    state.debug.purpose
2548                );
2549                metrics.readers_expired += 1;
2550            }
2551            retain
2552        });
2553        // critical_readers don't need forced expiration. (In fact, that's the point!)
2554        self.collections.writers.retain(|id, state| {
2555            let retain =
2556                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2557            if !retain {
2558                info!(
2559                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2560                    state.debug.purpose
2561                );
2562                metrics.writers_expired += 1;
2563            }
2564            retain
2565        });
2566        metrics
2567    }
2568
2569    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2570    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2571    /// referenced parts are valid to read.
2572    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2573        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2574            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2575                self.collections.trace.since().clone(),
2576            )));
2577        }
2578        let upper = self.collections.trace.upper();
2579        if PartialOrder::less_equal(upper, as_of) {
2580            return Err(SnapshotErr::AsOfNotYetAvailable(
2581                self.seqno,
2582                Upper(upper.clone()),
2583            ));
2584        }
2585
2586        let batches = self
2587            .collections
2588            .trace
2589            .batches()
2590            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2591            .cloned()
2592            .collect();
2593        Ok(batches)
2594    }
2595
2596    // NB: Unlike the other methods here, this one is read-only.
2597    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2598        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2599            return Err(Since(self.collections.trace.since().clone()));
2600        }
2601        Ok(())
2602    }
2603
2604    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2605        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2606        // batch and this iterates through all batches to find the next one.
2607        self.collections
2608            .trace
2609            .batches()
2610            .find(|b| {
2611                PartialOrder::less_equal(b.desc.lower(), frontier)
2612                    && PartialOrder::less_than(frontier, b.desc.upper())
2613            })
2614            .cloned()
2615            .ok_or(self.seqno)
2616    }
2617
2618    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2619        self.collections.active_rollup
2620    }
2621
2622    pub fn need_rollup(
2623        &self,
2624        threshold: usize,
2625        use_active_rollup: bool,
2626        fallback_threshold_ms: u64,
2627        now: u64,
2628    ) -> Option<SeqNo> {
2629        let (latest_rollup_seqno, _) = self.latest_rollup();
2630
2631        // Tombstoned shards require one final rollup. However, because we
2632        // write a rollup as of SeqNo X and then link it in using a state
2633        // transition (in this case from X to X+1), the minimum number of
2634        // live diffs is actually two. Detect when we're in this minimal
2635        // two diff state and stop the (otherwise) infinite iteration.
2636        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2637            return Some(self.seqno);
2638        }
2639
2640        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2641
2642        if use_active_rollup {
2643            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2644            // we should start a new rollup.
2645            // If there is an active rollup, we should check if it has been running too long.
2646            // If it has, we should start a new rollup.
2647            // This is to guard against a worker dying/taking too long/etc.
2648            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2649                match self.active_rollup() {
2650                    Some(active_rollup) => {
2651                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2652                            return Some(self.seqno);
2653                        }
2654                    }
2655                    None => {
2656                        return Some(self.seqno);
2657                    }
2658                }
2659            }
2660        } else {
2661            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2662            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2663            // racing / performing redundant work.
2664            if seqnos_since_last_rollup > 0
2665                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2666            {
2667                return Some(self.seqno);
2668            }
2669
2670            // however, since maintenance is best-effort and could fail, do assign rollup
2671            // work to every seqno after a fallback threshold to ensure one is written.
2672            if seqnos_since_last_rollup
2673                > u64::cast_from(
2674                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2675                )
2676            {
2677                return Some(self.seqno);
2678            }
2679        }
2680
2681        None
2682    }
2683
2684    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2685        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2686        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2687        batches.chain(rollups)
2688    }
2689}
2690
2691fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2692    let val = hex::encode(val);
2693    val.serialize(s)
2694}
2695
2696fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2697    val: &Option<LazyProto<T>>,
2698    s: S,
2699) -> Result<S::Ok, S::Error> {
2700    val.as_ref()
2701        .map(|lazy| hex::encode(&lazy.into_proto()))
2702        .serialize(s)
2703}
2704
2705fn serialize_part_stats<S: Serializer>(
2706    val: &Option<LazyPartStats>,
2707    s: S,
2708) -> Result<S::Ok, S::Error> {
2709    let val = val.as_ref().map(|x| x.decode().key);
2710    val.serialize(s)
2711}
2712
2713fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2714    // This is only used for debugging, so hack to assume that D is i64.
2715    let val = val.map(i64::decode);
2716    val.serialize(s)
2717}
2718
2719// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2720// intentionally gated from users, so not strictly subject to our backward
2721// compatibility guarantees, but still probably best to be thoughtful about
2722// making unnecessary changes. Additionally, it's nice to make the output as
2723// nice to use as possible without tying our hands for the actual code usages.
2724impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2725    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2726        let State {
2727            shard_id,
2728            seqno,
2729            walltime_ms,
2730            hostname,
2731            collections:
2732                StateCollections {
2733                    version: applier_version,
2734                    last_gc_req,
2735                    rollups,
2736                    active_rollup,
2737                    active_gc,
2738                    leased_readers,
2739                    critical_readers,
2740                    writers,
2741                    schemas,
2742                    trace,
2743                },
2744        } = self;
2745        let mut s = s.serialize_struct("State", 13)?;
2746        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2747        let () = s.serialize_field("shard_id", shard_id)?;
2748        let () = s.serialize_field("seqno", seqno)?;
2749        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2750        let () = s.serialize_field("hostname", hostname)?;
2751        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2752        let () = s.serialize_field("rollups", rollups)?;
2753        let () = s.serialize_field("active_rollup", active_rollup)?;
2754        let () = s.serialize_field("active_gc", active_gc)?;
2755        let () = s.serialize_field("leased_readers", leased_readers)?;
2756        let () = s.serialize_field("critical_readers", critical_readers)?;
2757        let () = s.serialize_field("writers", writers)?;
2758        let () = s.serialize_field("schemas", schemas)?;
2759        let () = s.serialize_field("since", &trace.since().elements())?;
2760        let () = s.serialize_field("upper", &trace.upper().elements())?;
2761        let trace = trace.flatten();
2762        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2763        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2764        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2765        let () = s.serialize_field("merges", &trace.merges)?;
2766        s.end()
2767    }
2768}
2769
2770#[derive(Debug, Default)]
2771pub struct StateSizeMetrics {
2772    pub hollow_batch_count: usize,
2773    pub batch_part_count: usize,
2774    pub rewrite_part_count: usize,
2775    pub num_updates: usize,
2776    pub largest_batch_bytes: usize,
2777    pub state_batches_bytes: usize,
2778    pub state_rollups_bytes: usize,
2779    pub state_rollup_count: usize,
2780    pub inline_part_count: usize,
2781    pub inline_part_bytes: usize,
2782}
2783
2784#[derive(Default)]
2785pub struct ExpiryMetrics {
2786    pub(crate) readers_expired: usize,
2787    pub(crate) writers_expired: usize,
2788}
2789
2790/// Wrapper for Antichain that represents a Since
2791#[derive(Debug, Clone, PartialEq)]
2792pub struct Since<T>(pub Antichain<T>);
2793
2794/// Wrapper for Antichain that represents an Upper
2795#[derive(Debug, PartialEq)]
2796pub struct Upper<T>(pub Antichain<T>);
2797
2798#[cfg(test)]
2799pub(crate) mod tests {
2800    use std::ops::Range;
2801    use std::str::FromStr;
2802
2803    use bytes::Bytes;
2804    use mz_build_info::DUMMY_BUILD_INFO;
2805    use mz_dyncfg::ConfigUpdates;
2806    use mz_ore::now::SYSTEM_TIME;
2807    use mz_ore::{assert_none, assert_ok};
2808    use mz_proto::RustType;
2809    use proptest::prelude::*;
2810    use proptest::strategy::ValueTree;
2811
2812    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2813    use crate::cache::PersistClientCache;
2814    use crate::internal::encoding::any_some_lazy_part_stats;
2815    use crate::internal::paths::RollupId;
2816    use crate::internal::trace::tests::any_trace;
2817    use crate::tests::new_test_client_cache;
2818    use crate::{Diagnostics, PersistLocation};
2819
2820    use super::*;
2821
2822    const LEASE_DURATION_MS: u64 = 900 * 1000;
2823    fn debug_state() -> HandleDebugState {
2824        HandleDebugState {
2825            hostname: "debug".to_owned(),
2826            purpose: "finding the bugs".to_owned(),
2827        }
2828    }
2829
2830    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2831        num_runs: usize,
2832    ) -> impl Strategy<Value = HollowBatch<T>> {
2833        (
2834            any::<T>(),
2835            any::<T>(),
2836            any::<T>(),
2837            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2838            any::<usize>(),
2839        )
2840            .prop_map(move |(t0, t1, since, parts, len)| {
2841                let (lower, upper) = if t0 <= t1 {
2842                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2843                } else {
2844                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2845                };
2846                let since = Antichain::from_elem(since);
2847
2848                let run_splits = (1..num_runs)
2849                    .map(|i| i * parts.len() / num_runs)
2850                    .collect::<Vec<_>>();
2851
2852                let run_meta = (0..num_runs)
2853                    .map(|_| {
2854                        let mut meta = RunMeta::default();
2855                        meta.id = Some(RunId::new());
2856                        meta
2857                    })
2858                    .collect::<Vec<_>>();
2859
2860                HollowBatch::new(
2861                    Description::new(lower, upper, since),
2862                    parts,
2863                    len % 10,
2864                    run_meta,
2865                    run_splits,
2866                )
2867            })
2868    }
2869
2870    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2871        Strategy::prop_map(
2872            (
2873                any::<T>(),
2874                any::<T>(),
2875                any::<T>(),
2876                proptest::collection::vec(any_run_part::<T>(), 0..20),
2877                any::<usize>(),
2878                0..=10usize,
2879                proptest::collection::vec(any::<RunId>(), 10),
2880            ),
2881            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2882                let (lower, upper) = if t0 <= t1 {
2883                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2884                } else {
2885                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2886                };
2887                let since = Antichain::from_elem(since);
2888                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2889                    let run_splits = (1..num_runs)
2890                        .map(|i| i * parts.len() / num_runs)
2891                        .collect::<Vec<_>>();
2892
2893                    let run_meta = (0..num_runs)
2894                        .enumerate()
2895                        .map(|(i, _)| {
2896                            let mut meta = RunMeta::default();
2897                            meta.id = Some(run_ids[i]);
2898                            meta
2899                        })
2900                        .collect::<Vec<_>>();
2901
2902                    HollowBatch::new(
2903                        Description::new(lower, upper, since),
2904                        parts,
2905                        len % 10,
2906                        run_meta,
2907                        run_splits,
2908                    )
2909                } else {
2910                    HollowBatch::new_run_for_test(
2911                        Description::new(lower, upper, since),
2912                        parts,
2913                        len % 10,
2914                        run_ids[0],
2915                    )
2916                }
2917            },
2918        )
2919    }
2920
2921    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2922        Strategy::prop_map(
2923            (
2924                any::<bool>(),
2925                any_hollow_batch_part(),
2926                any::<Option<T>>(),
2927                any::<Option<SchemaId>>(),
2928                any::<Option<SchemaId>>(),
2929            ),
2930            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2931                if is_hollow {
2932                    BatchPart::Hollow(hollow)
2933                } else {
2934                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2935                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2936                    BatchPart::Inline {
2937                        updates,
2938                        ts_rewrite,
2939                        schema_id,
2940                        deprecated_schema_id,
2941                    }
2942                }
2943            },
2944        )
2945    }
2946
2947    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2948        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2949    }
2950
2951    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2952    -> impl Strategy<Value = HollowBatchPart<T>> {
2953        Strategy::prop_map(
2954            (
2955                any::<PartialBatchKey>(),
2956                any::<usize>(),
2957                any::<Vec<u8>>(),
2958                any_some_lazy_part_stats(),
2959                any::<Option<T>>(),
2960                any::<[u8; 8]>(),
2961                any::<Option<BatchColumnarFormat>>(),
2962                any::<Option<SchemaId>>(),
2963                any::<Option<SchemaId>>(),
2964            ),
2965            |(
2966                key,
2967                encoded_size_bytes,
2968                key_lower,
2969                stats,
2970                ts_rewrite,
2971                diffs_sum,
2972                format,
2973                schema_id,
2974                deprecated_schema_id,
2975            )| {
2976                HollowBatchPart {
2977                    key,
2978                    meta: Default::default(),
2979                    encoded_size_bytes,
2980                    key_lower,
2981                    structured_key_lower: None,
2982                    stats,
2983                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2984                    diffs_sum: Some(diffs_sum),
2985                    format,
2986                    schema_id,
2987                    deprecated_schema_id,
2988                }
2989            },
2990        )
2991    }
2992
2993    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
2994        Strategy::prop_map(
2995            (
2996                any::<SeqNo>(),
2997                any::<Option<T>>(),
2998                any::<u64>(),
2999                any::<u64>(),
3000                any::<HandleDebugState>(),
3001            ),
3002            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3003                // lease_duration_ms of 0 means this state was written by an old
3004                // version of code, which means we'll migrate it in the decode
3005                // path. Avoid.
3006                if lease_duration_ms == 0 {
3007                    lease_duration_ms += 1;
3008                }
3009                LeasedReaderState {
3010                    seqno,
3011                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3012                    last_heartbeat_timestamp_ms,
3013                    lease_duration_ms,
3014                    debug,
3015                }
3016            },
3017        )
3018    }
3019
3020    pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3021    where
3022        T: Arbitrary,
3023    {
3024        Strategy::prop_map(
3025            (
3026                any::<Option<T>>(),
3027                any::<Opaque>(),
3028                any::<HandleDebugState>(),
3029            ),
3030            |(since, opaque, debug)| CriticalReaderState {
3031                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3032                opaque,
3033                debug,
3034            },
3035        )
3036    }
3037
3038    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3039        Strategy::prop_map(
3040            (
3041                any::<u64>(),
3042                any::<u64>(),
3043                any::<IdempotencyToken>(),
3044                any::<Option<T>>(),
3045                any::<HandleDebugState>(),
3046            ),
3047            |(
3048                last_heartbeat_timestamp_ms,
3049                lease_duration_ms,
3050                most_recent_write_token,
3051                most_recent_write_upper,
3052                debug,
3053            )| WriterState {
3054                last_heartbeat_timestamp_ms,
3055                lease_duration_ms,
3056                most_recent_write_token,
3057                most_recent_write_upper: most_recent_write_upper
3058                    .map_or_else(Antichain::new, Antichain::from_elem),
3059                debug,
3060            },
3061        )
3062    }
3063
3064    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3065        Strategy::prop_map(
3066            (
3067                any::<Vec<u8>>(),
3068                any::<Vec<u8>>(),
3069                any::<Vec<u8>>(),
3070                any::<Vec<u8>>(),
3071            ),
3072            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3073                key: Bytes::from(key),
3074                key_data_type: Bytes::from(key_data_type),
3075                val: Bytes::from(val),
3076                val_data_type: Bytes::from(val_data_type),
3077            },
3078        )
3079    }
3080
3081    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3082        num_trace_batches: Range<usize>,
3083    ) -> impl Strategy<Value = State<T>> {
3084        let part1 = (
3085            any::<ShardId>(),
3086            any::<SeqNo>(),
3087            any::<u64>(),
3088            any::<String>(),
3089            any::<SeqNo>(),
3090            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3091            proptest::option::of(any::<ActiveRollup>()),
3092        );
3093
3094        let part2 = (
3095            proptest::option::of(any::<ActiveGc>()),
3096            proptest::collection::btree_map(
3097                any::<LeasedReaderId>(),
3098                any_leased_reader_state::<T>(),
3099                1..3,
3100            ),
3101            proptest::collection::btree_map(
3102                any::<CriticalReaderId>(),
3103                any_critical_reader_state::<T>(),
3104                1..3,
3105            ),
3106            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3107            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3108            any_trace::<T>(num_trace_batches),
3109        );
3110
3111        (part1, part2).prop_map(
3112            |(
3113                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3114                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3115            )| State {
3116                shard_id,
3117                seqno,
3118                walltime_ms,
3119                hostname,
3120                collections: StateCollections {
3121                    version: Version::new(1, 2, 3),
3122                    last_gc_req,
3123                    rollups,
3124                    active_rollup,
3125                    active_gc,
3126                    leased_readers,
3127                    critical_readers,
3128                    writers,
3129                    schemas,
3130                    trace,
3131                },
3132            },
3133        )
3134    }
3135
3136    pub(crate) fn hollow<T: Timestamp>(
3137        lower: T,
3138        upper: T,
3139        keys: &[&str],
3140        len: usize,
3141    ) -> HollowBatch<T> {
3142        HollowBatch::new_run(
3143            Description::new(
3144                Antichain::from_elem(lower),
3145                Antichain::from_elem(upper),
3146                Antichain::from_elem(T::minimum()),
3147            ),
3148            keys.iter()
3149                .map(|x| {
3150                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3151                        key: PartialBatchKey((*x).to_owned()),
3152                        meta: Default::default(),
3153                        encoded_size_bytes: 0,
3154                        key_lower: vec![],
3155                        structured_key_lower: None,
3156                        stats: None,
3157                        ts_rewrite: None,
3158                        diffs_sum: None,
3159                        format: None,
3160                        schema_id: None,
3161                        deprecated_schema_id: None,
3162                    }))
3163                })
3164                .collect(),
3165            len,
3166        )
3167    }
3168
3169    #[mz_ore::test]
3170    fn downgrade_since() {
3171        let mut state = TypedState::<(), (), u64, i64>::new(
3172            DUMMY_BUILD_INFO.semver_version(),
3173            ShardId::new(),
3174            "".to_owned(),
3175            0,
3176        );
3177        let reader = LeasedReaderId::new();
3178        let seqno = SeqNo::minimum();
3179        let now = SYSTEM_TIME.clone();
3180        let _ = state.collections.register_leased_reader(
3181            "",
3182            &reader,
3183            "",
3184            seqno,
3185            Duration::from_secs(10),
3186            now(),
3187            false,
3188        );
3189
3190        // The shard global since == 0 initially.
3191        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3192
3193        // Greater
3194        assert_eq!(
3195            state.collections.downgrade_since(
3196                &reader,
3197                seqno,
3198                seqno,
3199                &Antichain::from_elem(2),
3200                now()
3201            ),
3202            Continue(Since(Antichain::from_elem(2)))
3203        );
3204        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3205        // Equal (no-op)
3206        assert_eq!(
3207            state.collections.downgrade_since(
3208                &reader,
3209                seqno,
3210                seqno,
3211                &Antichain::from_elem(2),
3212                now()
3213            ),
3214            Continue(Since(Antichain::from_elem(2)))
3215        );
3216        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3217        // Less (no-op)
3218        assert_eq!(
3219            state.collections.downgrade_since(
3220                &reader,
3221                seqno,
3222                seqno,
3223                &Antichain::from_elem(1),
3224                now()
3225            ),
3226            Continue(Since(Antichain::from_elem(2)))
3227        );
3228        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3229
3230        // Create a second reader.
3231        let reader2 = LeasedReaderId::new();
3232        let _ = state.collections.register_leased_reader(
3233            "",
3234            &reader2,
3235            "",
3236            seqno,
3237            Duration::from_secs(10),
3238            now(),
3239            false,
3240        );
3241
3242        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3243        assert_eq!(
3244            state.collections.downgrade_since(
3245                &reader2,
3246                seqno,
3247                seqno,
3248                &Antichain::from_elem(3),
3249                now()
3250            ),
3251            Continue(Since(Antichain::from_elem(3)))
3252        );
3253        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3254        // Shard since == 3 when all readers have since >= 3.
3255        assert_eq!(
3256            state.collections.downgrade_since(
3257                &reader,
3258                seqno,
3259                seqno,
3260                &Antichain::from_elem(5),
3261                now()
3262            ),
3263            Continue(Since(Antichain::from_elem(5)))
3264        );
3265        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3266
3267        // Shard since unaffected readers with since > shard since expiring.
3268        assert_eq!(
3269            state.collections.expire_leased_reader(&reader),
3270            Continue(true)
3271        );
3272        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3273
3274        // Create a third reader.
3275        let reader3 = LeasedReaderId::new();
3276        let _ = state.collections.register_leased_reader(
3277            "",
3278            &reader3,
3279            "",
3280            seqno,
3281            Duration::from_secs(10),
3282            now(),
3283            false,
3284        );
3285
3286        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3287        assert_eq!(
3288            state.collections.downgrade_since(
3289                &reader3,
3290                seqno,
3291                seqno,
3292                &Antichain::from_elem(10),
3293                now()
3294            ),
3295            Continue(Since(Antichain::from_elem(10)))
3296        );
3297        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3298
3299        // Shard since advances when reader with the minimal since expires.
3300        assert_eq!(
3301            state.collections.expire_leased_reader(&reader2),
3302            Continue(true)
3303        );
3304        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3305        // Switch this assertion back when we re-enable this.
3306        //
3307        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3308        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3309
3310        // Shard since unaffected when all readers are expired.
3311        assert_eq!(
3312            state.collections.expire_leased_reader(&reader3),
3313            Continue(true)
3314        );
3315        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3316        // Switch this assertion back when we re-enable this.
3317        //
3318        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3319        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3320    }
3321
3322    #[mz_ore::test]
3323    fn compare_and_downgrade_since() {
3324        let mut state = TypedState::<(), (), u64, i64>::new(
3325            DUMMY_BUILD_INFO.semver_version(),
3326            ShardId::new(),
3327            "".to_owned(),
3328            0,
3329        );
3330        let reader = CriticalReaderId::new();
3331        let _ = state
3332            .collections
3333            .register_critical_reader("", &reader, Opaque::encode(&0u64), "");
3334
3335        // The shard global since == 0 initially.
3336        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3337        // The initial opaque value should be set.
3338        assert_eq!(
3339            state
3340                .collections
3341                .critical_reader(&reader)
3342                .opaque
3343                .decode::<u64>(),
3344            u64::MIN
3345        );
3346
3347        // Greater
3348        assert_eq!(
3349            state.collections.compare_and_downgrade_since(
3350                &reader,
3351                &Opaque::encode(&0u64),
3352                (&Opaque::encode(&1u64), &Antichain::from_elem(2)),
3353            ),
3354            Continue(Ok(Since(Antichain::from_elem(2))))
3355        );
3356        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3357        assert_eq!(
3358            state
3359                .collections
3360                .critical_reader(&reader)
3361                .opaque
3362                .decode::<u64>(),
3363            1
3364        );
3365        // Equal (no-op)
3366        assert_eq!(
3367            state.collections.compare_and_downgrade_since(
3368                &reader,
3369                &Opaque::encode(&1u64),
3370                (&Opaque::encode(&2u64), &Antichain::from_elem(2)),
3371            ),
3372            Continue(Ok(Since(Antichain::from_elem(2))))
3373        );
3374        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3375        assert_eq!(
3376            state
3377                .collections
3378                .critical_reader(&reader)
3379                .opaque
3380                .decode::<u64>(),
3381            2
3382        );
3383        // Less (no-op)
3384        assert_eq!(
3385            state.collections.compare_and_downgrade_since(
3386                &reader,
3387                &Opaque::encode(&2u64),
3388                (&Opaque::encode(&3u64), &Antichain::from_elem(1)),
3389            ),
3390            Continue(Ok(Since(Antichain::from_elem(2))))
3391        );
3392        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3393        assert_eq!(
3394            state
3395                .collections
3396                .critical_reader(&reader)
3397                .opaque
3398                .decode::<u64>(),
3399            3
3400        );
3401    }
3402
3403    #[mz_ore::test]
3404    fn compare_and_append() {
3405        let state = &mut TypedState::<String, String, u64, i64>::new(
3406            DUMMY_BUILD_INFO.semver_version(),
3407            ShardId::new(),
3408            "".to_owned(),
3409            0,
3410        )
3411        .collections;
3412
3413        let writer_id = WriterId::new();
3414        let now = SYSTEM_TIME.clone();
3415
3416        // State is initially empty.
3417        assert_eq!(state.trace.num_spine_batches(), 0);
3418        assert_eq!(state.trace.num_hollow_batches(), 0);
3419        assert_eq!(state.trace.num_updates(), 0);
3420
3421        // Cannot insert a batch with a lower != current shard upper.
3422        assert_eq!(
3423            state.compare_and_append(
3424                &hollow(1, 2, &["key1"], 1),
3425                &writer_id,
3426                now(),
3427                LEASE_DURATION_MS,
3428                &IdempotencyToken::new(),
3429                &debug_state(),
3430                0,
3431                100,
3432                None
3433            ),
3434            Break(CompareAndAppendBreak::Upper {
3435                shard_upper: Antichain::from_elem(0),
3436                writer_upper: Antichain::from_elem(0)
3437            })
3438        );
3439
3440        // Insert an empty batch with an upper > lower..
3441        assert!(
3442            state
3443                .compare_and_append(
3444                    &hollow(0, 5, &[], 0),
3445                    &writer_id,
3446                    now(),
3447                    LEASE_DURATION_MS,
3448                    &IdempotencyToken::new(),
3449                    &debug_state(),
3450                    0,
3451                    100,
3452                    None
3453                )
3454                .is_continue()
3455        );
3456
3457        // Cannot insert a batch with a upper less than the lower.
3458        assert_eq!(
3459            state.compare_and_append(
3460                &hollow(5, 4, &["key1"], 1),
3461                &writer_id,
3462                now(),
3463                LEASE_DURATION_MS,
3464                &IdempotencyToken::new(),
3465                &debug_state(),
3466                0,
3467                100,
3468                None
3469            ),
3470            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3471                lower: Antichain::from_elem(5),
3472                upper: Antichain::from_elem(4)
3473            }))
3474        );
3475
3476        // Cannot insert a nonempty batch with an upper equal to lower.
3477        assert_eq!(
3478            state.compare_and_append(
3479                &hollow(5, 5, &["key1"], 1),
3480                &writer_id,
3481                now(),
3482                LEASE_DURATION_MS,
3483                &IdempotencyToken::new(),
3484                &debug_state(),
3485                0,
3486                100,
3487                None
3488            ),
3489            Break(CompareAndAppendBreak::InvalidUsage(
3490                InvalidEmptyTimeInterval {
3491                    lower: Antichain::from_elem(5),
3492                    upper: Antichain::from_elem(5),
3493                    keys: vec!["key1".to_owned()],
3494                }
3495            ))
3496        );
3497
3498        // Can insert an empty batch with an upper equal to lower.
3499        assert!(
3500            state
3501                .compare_and_append(
3502                    &hollow(5, 5, &[], 0),
3503                    &writer_id,
3504                    now(),
3505                    LEASE_DURATION_MS,
3506                    &IdempotencyToken::new(),
3507                    &debug_state(),
3508                    0,
3509                    100,
3510                    None
3511                )
3512                .is_continue()
3513        );
3514    }
3515
3516    #[mz_ore::test]
3517    fn snapshot() {
3518        let now = SYSTEM_TIME.clone();
3519
3520        let mut state = TypedState::<String, String, u64, i64>::new(
3521            DUMMY_BUILD_INFO.semver_version(),
3522            ShardId::new(),
3523            "".to_owned(),
3524            0,
3525        );
3526        // Cannot take a snapshot with as_of == shard upper.
3527        assert_eq!(
3528            state.snapshot(&Antichain::from_elem(0)),
3529            Err(SnapshotErr::AsOfNotYetAvailable(
3530                SeqNo(0),
3531                Upper(Antichain::from_elem(0))
3532            ))
3533        );
3534
3535        // Cannot take a snapshot with as_of > shard upper.
3536        assert_eq!(
3537            state.snapshot(&Antichain::from_elem(5)),
3538            Err(SnapshotErr::AsOfNotYetAvailable(
3539                SeqNo(0),
3540                Upper(Antichain::from_elem(0))
3541            ))
3542        );
3543
3544        let writer_id = WriterId::new();
3545
3546        // Advance upper to 5.
3547        assert!(
3548            state
3549                .collections
3550                .compare_and_append(
3551                    &hollow(0, 5, &["key1"], 1),
3552                    &writer_id,
3553                    now(),
3554                    LEASE_DURATION_MS,
3555                    &IdempotencyToken::new(),
3556                    &debug_state(),
3557                    0,
3558                    100,
3559                    None
3560                )
3561                .is_continue()
3562        );
3563
3564        // Can take a snapshot with as_of < upper.
3565        assert_eq!(
3566            state.snapshot(&Antichain::from_elem(0)),
3567            Ok(vec![hollow(0, 5, &["key1"], 1)])
3568        );
3569
3570        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3571        assert_eq!(
3572            state.snapshot(&Antichain::from_elem(4)),
3573            Ok(vec![hollow(0, 5, &["key1"], 1)])
3574        );
3575
3576        // Cannot take a snapshot with as_of >= upper.
3577        assert_eq!(
3578            state.snapshot(&Antichain::from_elem(5)),
3579            Err(SnapshotErr::AsOfNotYetAvailable(
3580                SeqNo(0),
3581                Upper(Antichain::from_elem(5))
3582            ))
3583        );
3584        assert_eq!(
3585            state.snapshot(&Antichain::from_elem(6)),
3586            Err(SnapshotErr::AsOfNotYetAvailable(
3587                SeqNo(0),
3588                Upper(Antichain::from_elem(5))
3589            ))
3590        );
3591
3592        let reader = LeasedReaderId::new();
3593        // Advance the since to 2.
3594        let _ = state.collections.register_leased_reader(
3595            "",
3596            &reader,
3597            "",
3598            SeqNo::minimum(),
3599            Duration::from_secs(10),
3600            now(),
3601            false,
3602        );
3603        assert_eq!(
3604            state.collections.downgrade_since(
3605                &reader,
3606                SeqNo::minimum(),
3607                SeqNo::minimum(),
3608                &Antichain::from_elem(2),
3609                now()
3610            ),
3611            Continue(Since(Antichain::from_elem(2)))
3612        );
3613        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3614        // Cannot take a snapshot with as_of < shard_since.
3615        assert_eq!(
3616            state.snapshot(&Antichain::from_elem(1)),
3617            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3618                Antichain::from_elem(2)
3619            )))
3620        );
3621
3622        // Advance the upper to 10 via an empty batch.
3623        assert!(
3624            state
3625                .collections
3626                .compare_and_append(
3627                    &hollow(5, 10, &[], 0),
3628                    &writer_id,
3629                    now(),
3630                    LEASE_DURATION_MS,
3631                    &IdempotencyToken::new(),
3632                    &debug_state(),
3633                    0,
3634                    100,
3635                    None
3636                )
3637                .is_continue()
3638        );
3639
3640        // Can still take snapshots at times < upper.
3641        assert_eq!(
3642            state.snapshot(&Antichain::from_elem(7)),
3643            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3644        );
3645
3646        // Cannot take snapshots with as_of >= upper.
3647        assert_eq!(
3648            state.snapshot(&Antichain::from_elem(10)),
3649            Err(SnapshotErr::AsOfNotYetAvailable(
3650                SeqNo(0),
3651                Upper(Antichain::from_elem(10))
3652            ))
3653        );
3654
3655        // Advance upper to 15.
3656        assert!(
3657            state
3658                .collections
3659                .compare_and_append(
3660                    &hollow(10, 15, &["key2"], 1),
3661                    &writer_id,
3662                    now(),
3663                    LEASE_DURATION_MS,
3664                    &IdempotencyToken::new(),
3665                    &debug_state(),
3666                    0,
3667                    100,
3668                    None
3669                )
3670                .is_continue()
3671        );
3672
3673        // Filter out batches whose lowers are less than the requested as of (the
3674        // batches that are too far in the future for the requested as_of).
3675        assert_eq!(
3676            state.snapshot(&Antichain::from_elem(9)),
3677            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3678        );
3679
3680        // Don't filter out batches whose lowers are <= the requested as_of.
3681        assert_eq!(
3682            state.snapshot(&Antichain::from_elem(10)),
3683            Ok(vec![
3684                hollow(0, 5, &["key1"], 1),
3685                hollow(5, 10, &[], 0),
3686                hollow(10, 15, &["key2"], 1)
3687            ])
3688        );
3689
3690        assert_eq!(
3691            state.snapshot(&Antichain::from_elem(11)),
3692            Ok(vec![
3693                hollow(0, 5, &["key1"], 1),
3694                hollow(5, 10, &[], 0),
3695                hollow(10, 15, &["key2"], 1)
3696            ])
3697        );
3698    }
3699
3700    #[mz_ore::test]
3701    fn next_listen_batch() {
3702        let mut state = TypedState::<String, String, u64, i64>::new(
3703            DUMMY_BUILD_INFO.semver_version(),
3704            ShardId::new(),
3705            "".to_owned(),
3706            0,
3707        );
3708
3709        // Empty collection never has any batches to listen for, regardless of the
3710        // current frontier.
3711        assert_eq!(
3712            state.next_listen_batch(&Antichain::from_elem(0)),
3713            Err(SeqNo(0))
3714        );
3715        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3716
3717        let writer_id = WriterId::new();
3718        let now = SYSTEM_TIME.clone();
3719
3720        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3721        assert!(
3722            state
3723                .collections
3724                .compare_and_append(
3725                    &hollow(0, 5, &["key1"], 1),
3726                    &writer_id,
3727                    now(),
3728                    LEASE_DURATION_MS,
3729                    &IdempotencyToken::new(),
3730                    &debug_state(),
3731                    0,
3732                    100,
3733                    None
3734                )
3735                .is_continue()
3736        );
3737        assert!(
3738            state
3739                .collections
3740                .compare_and_append(
3741                    &hollow(5, 10, &["key2"], 1),
3742                    &writer_id,
3743                    now(),
3744                    LEASE_DURATION_MS,
3745                    &IdempotencyToken::new(),
3746                    &debug_state(),
3747                    0,
3748                    100,
3749                    None
3750                )
3751                .is_continue()
3752        );
3753
3754        // All frontiers in [0, 5) return the first batch.
3755        for t in 0..=4 {
3756            assert_eq!(
3757                state.next_listen_batch(&Antichain::from_elem(t)),
3758                Ok(hollow(0, 5, &["key1"], 1))
3759            );
3760        }
3761
3762        // All frontiers in [5, 10) return the second batch.
3763        for t in 5..=9 {
3764            assert_eq!(
3765                state.next_listen_batch(&Antichain::from_elem(t)),
3766                Ok(hollow(5, 10, &["key2"], 1))
3767            );
3768        }
3769
3770        // There is no batch currently available for t = 10.
3771        assert_eq!(
3772            state.next_listen_batch(&Antichain::from_elem(10)),
3773            Err(SeqNo(0))
3774        );
3775
3776        // By definition, there is no frontier ever at the empty antichain which
3777        // is the time after all possible times.
3778        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3779    }
3780
3781    #[mz_ore::test]
3782    fn expire_writer() {
3783        let mut state = TypedState::<String, String, u64, i64>::new(
3784            DUMMY_BUILD_INFO.semver_version(),
3785            ShardId::new(),
3786            "".to_owned(),
3787            0,
3788        );
3789        let now = SYSTEM_TIME.clone();
3790
3791        let writer_id_one = WriterId::new();
3792
3793        let writer_id_two = WriterId::new();
3794
3795        // Writer is eligible to write
3796        assert!(
3797            state
3798                .collections
3799                .compare_and_append(
3800                    &hollow(0, 2, &["key1"], 1),
3801                    &writer_id_one,
3802                    now(),
3803                    LEASE_DURATION_MS,
3804                    &IdempotencyToken::new(),
3805                    &debug_state(),
3806                    0,
3807                    100,
3808                    None
3809                )
3810                .is_continue()
3811        );
3812
3813        assert!(
3814            state
3815                .collections
3816                .expire_writer(&writer_id_one)
3817                .is_continue()
3818        );
3819
3820        // Other writers should still be able to write
3821        assert!(
3822            state
3823                .collections
3824                .compare_and_append(
3825                    &hollow(2, 5, &["key2"], 1),
3826                    &writer_id_two,
3827                    now(),
3828                    LEASE_DURATION_MS,
3829                    &IdempotencyToken::new(),
3830                    &debug_state(),
3831                    0,
3832                    100,
3833                    None
3834                )
3835                .is_continue()
3836        );
3837    }
3838
3839    #[mz_ore::test]
3840    fn maybe_gc_active_gc() {
3841        const GC_CONFIG: GcConfig = GcConfig {
3842            use_active_gc: true,
3843            fallback_threshold_ms: 5000,
3844            min_versions: 99,
3845            max_versions: 500,
3846        };
3847        let now_fn = SYSTEM_TIME.clone();
3848
3849        let mut state = TypedState::<String, String, u64, i64>::new(
3850            DUMMY_BUILD_INFO.semver_version(),
3851            ShardId::new(),
3852            "".to_owned(),
3853            0,
3854        );
3855
3856        let now = now_fn();
3857        // Empty state doesn't need gc, regardless of is_write.
3858        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3859        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3860
3861        // Artificially advance the seqno so the seqno_since advances past our
3862        // internal gc_threshold.
3863        state.seqno = SeqNo(100);
3864        assert_eq!(state.seqno_since(), SeqNo(100));
3865
3866        // When a writer is present, non-writes don't gc.
3867        let writer_id = WriterId::new();
3868        let _ = state.collections.compare_and_append(
3869            &hollow(1, 2, &["key1"], 1),
3870            &writer_id,
3871            now,
3872            LEASE_DURATION_MS,
3873            &IdempotencyToken::new(),
3874            &debug_state(),
3875            0,
3876            100,
3877            None,
3878        );
3879        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3880
3881        // A write will gc though.
3882        assert_eq!(
3883            state.maybe_gc(true, now, GC_CONFIG),
3884            Some(GcReq {
3885                shard_id: state.shard_id,
3886                new_seqno_since: SeqNo(100)
3887            })
3888        );
3889
3890        // But if we write down an active gc, we won't gc.
3891        state.collections.active_gc = Some(ActiveGc {
3892            seqno: state.seqno,
3893            start_ms: now,
3894        });
3895
3896        state.seqno = SeqNo(200);
3897        assert_eq!(state.seqno_since(), SeqNo(200));
3898
3899        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3900
3901        state.seqno = SeqNo(300);
3902        assert_eq!(state.seqno_since(), SeqNo(300));
3903        // But if we advance the time past the threshold, we will gc.
3904        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3905        assert_eq!(
3906            state.maybe_gc(true, new_now, GC_CONFIG),
3907            Some(GcReq {
3908                shard_id: state.shard_id,
3909                new_seqno_since: SeqNo(300)
3910            })
3911        );
3912
3913        // Even if the sequence number doesn't pass the threshold, if the
3914        // active gc is expired, we will gc.
3915
3916        state.seqno = SeqNo(301);
3917        assert_eq!(state.seqno_since(), SeqNo(301));
3918        assert_eq!(
3919            state.maybe_gc(true, new_now, GC_CONFIG),
3920            Some(GcReq {
3921                shard_id: state.shard_id,
3922                new_seqno_since: SeqNo(301)
3923            })
3924        );
3925
3926        state.collections.active_gc = None;
3927
3928        // Artificially advance the seqno (again) so the seqno_since advances
3929        // past our internal gc_threshold (again).
3930        state.seqno = SeqNo(400);
3931        assert_eq!(state.seqno_since(), SeqNo(400));
3932
3933        let now = now_fn();
3934
3935        // If there are no writers, even a non-write will gc.
3936        let _ = state.collections.expire_writer(&writer_id);
3937        assert_eq!(
3938            state.maybe_gc(false, now, GC_CONFIG),
3939            Some(GcReq {
3940                shard_id: state.shard_id,
3941                new_seqno_since: SeqNo(400)
3942            })
3943        );
3944
3945        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3946        let previous_seqno = state.seqno;
3947        state.seqno = SeqNo(10_000);
3948        assert_eq!(state.seqno_since(), SeqNo(10_000));
3949
3950        let now = now_fn();
3951        assert_eq!(
3952            state.maybe_gc(true, now, GC_CONFIG),
3953            Some(GcReq {
3954                shard_id: state.shard_id,
3955                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3956            })
3957        );
3958    }
3959
3960    #[mz_ore::test]
3961    fn maybe_gc_classic() {
3962        const GC_CONFIG: GcConfig = GcConfig {
3963            use_active_gc: false,
3964            fallback_threshold_ms: 5000,
3965            min_versions: 16,
3966            max_versions: 128,
3967        };
3968        const NOW_MS: u64 = 0;
3969
3970        let mut state = TypedState::<String, String, u64, i64>::new(
3971            DUMMY_BUILD_INFO.semver_version(),
3972            ShardId::new(),
3973            "".to_owned(),
3974            0,
3975        );
3976
3977        // Empty state doesn't need gc, regardless of is_write.
3978        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3979        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3980
3981        // Artificially advance the seqno so the seqno_since advances past our
3982        // internal gc_threshold.
3983        state.seqno = SeqNo(100);
3984        assert_eq!(state.seqno_since(), SeqNo(100));
3985
3986        // When a writer is present, non-writes don't gc.
3987        let writer_id = WriterId::new();
3988        let now = SYSTEM_TIME.clone();
3989        let _ = state.collections.compare_and_append(
3990            &hollow(1, 2, &["key1"], 1),
3991            &writer_id,
3992            now(),
3993            LEASE_DURATION_MS,
3994            &IdempotencyToken::new(),
3995            &debug_state(),
3996            0,
3997            100,
3998            None,
3999        );
4000        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4001
4002        // A write will gc though.
4003        assert_eq!(
4004            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4005            Some(GcReq {
4006                shard_id: state.shard_id,
4007                new_seqno_since: SeqNo(100)
4008            })
4009        );
4010
4011        // Artificially advance the seqno (again) so the seqno_since advances
4012        // past our internal gc_threshold (again).
4013        state.seqno = SeqNo(200);
4014        assert_eq!(state.seqno_since(), SeqNo(200));
4015
4016        // If there are no writers, even a non-write will gc.
4017        let _ = state.collections.expire_writer(&writer_id);
4018        assert_eq!(
4019            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4020            Some(GcReq {
4021                shard_id: state.shard_id,
4022                new_seqno_since: SeqNo(200)
4023            })
4024        );
4025    }
4026
4027    #[mz_ore::test]
4028    fn need_rollup_active_rollup() {
4029        const ROLLUP_THRESHOLD: usize = 3;
4030        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4031        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4032        let now = SYSTEM_TIME.clone();
4033
4034        mz_ore::test::init_logging();
4035        let mut state = TypedState::<String, String, u64, i64>::new(
4036            DUMMY_BUILD_INFO.semver_version(),
4037            ShardId::new(),
4038            "".to_owned(),
4039            0,
4040        );
4041
4042        let rollup_seqno = SeqNo(5);
4043        let rollup = HollowRollup {
4044            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4045            encoded_size_bytes: None,
4046        };
4047
4048        assert!(
4049            state
4050                .collections
4051                .add_rollup((rollup_seqno, &rollup))
4052                .is_continue()
4053        );
4054
4055        // shouldn't need a rollup at the seqno of the rollup
4056        state.seqno = SeqNo(5);
4057        assert_none!(state.need_rollup(
4058            ROLLUP_THRESHOLD,
4059            ROLLUP_USE_ACTIVE_ROLLUP,
4060            ROLLUP_FALLBACK_THRESHOLD_MS,
4061            now()
4062        ));
4063
4064        // shouldn't need a rollup at seqnos less than our threshold
4065        state.seqno = SeqNo(6);
4066        assert_none!(state.need_rollup(
4067            ROLLUP_THRESHOLD,
4068            ROLLUP_USE_ACTIVE_ROLLUP,
4069            ROLLUP_FALLBACK_THRESHOLD_MS,
4070            now()
4071        ));
4072        state.seqno = SeqNo(7);
4073        assert_none!(state.need_rollup(
4074            ROLLUP_THRESHOLD,
4075            ROLLUP_USE_ACTIVE_ROLLUP,
4076            ROLLUP_FALLBACK_THRESHOLD_MS,
4077            now()
4078        ));
4079        state.seqno = SeqNo(8);
4080        assert_none!(state.need_rollup(
4081            ROLLUP_THRESHOLD,
4082            ROLLUP_USE_ACTIVE_ROLLUP,
4083            ROLLUP_FALLBACK_THRESHOLD_MS,
4084            now()
4085        ));
4086
4087        let mut current_time = now();
4088        // hit our threshold! we should need a rollup
4089        state.seqno = SeqNo(9);
4090        assert_eq!(
4091            state
4092                .need_rollup(
4093                    ROLLUP_THRESHOLD,
4094                    ROLLUP_USE_ACTIVE_ROLLUP,
4095                    ROLLUP_FALLBACK_THRESHOLD_MS,
4096                    current_time
4097                )
4098                .expect("rollup"),
4099            SeqNo(9)
4100        );
4101
4102        state.collections.active_rollup = Some(ActiveRollup {
4103            seqno: SeqNo(9),
4104            start_ms: current_time,
4105        });
4106
4107        // There is now an active rollup, so we shouldn't need a rollup.
4108        assert_none!(state.need_rollup(
4109            ROLLUP_THRESHOLD,
4110            ROLLUP_USE_ACTIVE_ROLLUP,
4111            ROLLUP_FALLBACK_THRESHOLD_MS,
4112            current_time
4113        ));
4114
4115        state.seqno = SeqNo(10);
4116        // We still don't need a rollup, even though the seqno is greater than
4117        // the rollup threshold.
4118        assert_none!(state.need_rollup(
4119            ROLLUP_THRESHOLD,
4120            ROLLUP_USE_ACTIVE_ROLLUP,
4121            ROLLUP_FALLBACK_THRESHOLD_MS,
4122            current_time
4123        ));
4124
4125        // But if we wait long enough, we should need a rollup again.
4126        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4127        assert_eq!(
4128            state
4129                .need_rollup(
4130                    ROLLUP_THRESHOLD,
4131                    ROLLUP_USE_ACTIVE_ROLLUP,
4132                    ROLLUP_FALLBACK_THRESHOLD_MS,
4133                    current_time
4134                )
4135                .expect("rollup"),
4136            SeqNo(10)
4137        );
4138
4139        state.seqno = SeqNo(9);
4140        // Clear the active rollup and ensure we need a rollup again.
4141        state.collections.active_rollup = None;
4142        let rollup_seqno = SeqNo(9);
4143        let rollup = HollowRollup {
4144            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4145            encoded_size_bytes: None,
4146        };
4147        assert!(
4148            state
4149                .collections
4150                .add_rollup((rollup_seqno, &rollup))
4151                .is_continue()
4152        );
4153
4154        state.seqno = SeqNo(11);
4155        // We shouldn't need a rollup at seqnos less than our threshold
4156        assert_none!(state.need_rollup(
4157            ROLLUP_THRESHOLD,
4158            ROLLUP_USE_ACTIVE_ROLLUP,
4159            ROLLUP_FALLBACK_THRESHOLD_MS,
4160            current_time
4161        ));
4162        // hit our threshold! we should need a rollup
4163        state.seqno = SeqNo(13);
4164        assert_eq!(
4165            state
4166                .need_rollup(
4167                    ROLLUP_THRESHOLD,
4168                    ROLLUP_USE_ACTIVE_ROLLUP,
4169                    ROLLUP_FALLBACK_THRESHOLD_MS,
4170                    current_time
4171                )
4172                .expect("rollup"),
4173            SeqNo(13)
4174        );
4175    }
4176
4177    #[mz_ore::test]
4178    fn need_rollup_classic() {
4179        const ROLLUP_THRESHOLD: usize = 3;
4180        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4181        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4182        const NOW: u64 = 0;
4183
4184        mz_ore::test::init_logging();
4185        let mut state = TypedState::<String, String, u64, i64>::new(
4186            DUMMY_BUILD_INFO.semver_version(),
4187            ShardId::new(),
4188            "".to_owned(),
4189            0,
4190        );
4191
4192        let rollup_seqno = SeqNo(5);
4193        let rollup = HollowRollup {
4194            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4195            encoded_size_bytes: None,
4196        };
4197
4198        assert!(
4199            state
4200                .collections
4201                .add_rollup((rollup_seqno, &rollup))
4202                .is_continue()
4203        );
4204
4205        // shouldn't need a rollup at the seqno of the rollup
4206        state.seqno = SeqNo(5);
4207        assert_none!(state.need_rollup(
4208            ROLLUP_THRESHOLD,
4209            ROLLUP_USE_ACTIVE_ROLLUP,
4210            ROLLUP_FALLBACK_THRESHOLD_MS,
4211            NOW
4212        ));
4213
4214        // shouldn't need a rollup at seqnos less than our threshold
4215        state.seqno = SeqNo(6);
4216        assert_none!(state.need_rollup(
4217            ROLLUP_THRESHOLD,
4218            ROLLUP_USE_ACTIVE_ROLLUP,
4219            ROLLUP_FALLBACK_THRESHOLD_MS,
4220            NOW
4221        ));
4222        state.seqno = SeqNo(7);
4223        assert_none!(state.need_rollup(
4224            ROLLUP_THRESHOLD,
4225            ROLLUP_USE_ACTIVE_ROLLUP,
4226            ROLLUP_FALLBACK_THRESHOLD_MS,
4227            NOW
4228        ));
4229
4230        // hit our threshold! we should need a rollup
4231        state.seqno = SeqNo(8);
4232        assert_eq!(
4233            state
4234                .need_rollup(
4235                    ROLLUP_THRESHOLD,
4236                    ROLLUP_USE_ACTIVE_ROLLUP,
4237                    ROLLUP_FALLBACK_THRESHOLD_MS,
4238                    NOW
4239                )
4240                .expect("rollup"),
4241            SeqNo(8)
4242        );
4243
4244        // but we don't need rollups for every seqno > the threshold
4245        state.seqno = SeqNo(9);
4246        assert_none!(state.need_rollup(
4247            ROLLUP_THRESHOLD,
4248            ROLLUP_USE_ACTIVE_ROLLUP,
4249            ROLLUP_FALLBACK_THRESHOLD_MS,
4250            NOW
4251        ));
4252
4253        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4254        state.seqno = SeqNo(11);
4255        assert_eq!(
4256            state
4257                .need_rollup(
4258                    ROLLUP_THRESHOLD,
4259                    ROLLUP_USE_ACTIVE_ROLLUP,
4260                    ROLLUP_FALLBACK_THRESHOLD_MS,
4261                    NOW
4262                )
4263                .expect("rollup"),
4264            SeqNo(11)
4265        );
4266
4267        // add another rollup and ensure we're always picking the latest
4268        let rollup_seqno = SeqNo(6);
4269        let rollup = HollowRollup {
4270            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4271            encoded_size_bytes: None,
4272        };
4273        assert!(
4274            state
4275                .collections
4276                .add_rollup((rollup_seqno, &rollup))
4277                .is_continue()
4278        );
4279
4280        state.seqno = SeqNo(8);
4281        assert_none!(state.need_rollup(
4282            ROLLUP_THRESHOLD,
4283            ROLLUP_USE_ACTIVE_ROLLUP,
4284            ROLLUP_FALLBACK_THRESHOLD_MS,
4285            NOW
4286        ));
4287        state.seqno = SeqNo(9);
4288        assert_eq!(
4289            state
4290                .need_rollup(
4291                    ROLLUP_THRESHOLD,
4292                    ROLLUP_USE_ACTIVE_ROLLUP,
4293                    ROLLUP_FALLBACK_THRESHOLD_MS,
4294                    NOW
4295                )
4296                .expect("rollup"),
4297            SeqNo(9)
4298        );
4299
4300        // and ensure that after a fallback point, we assign every seqno work
4301        let fallback_seqno = SeqNo(
4302            rollup_seqno.0
4303                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4304        );
4305        state.seqno = fallback_seqno;
4306        assert_eq!(
4307            state
4308                .need_rollup(
4309                    ROLLUP_THRESHOLD,
4310                    ROLLUP_USE_ACTIVE_ROLLUP,
4311                    ROLLUP_FALLBACK_THRESHOLD_MS,
4312                    NOW
4313                )
4314                .expect("rollup"),
4315            fallback_seqno
4316        );
4317        state.seqno = fallback_seqno.next();
4318        assert_eq!(
4319            state
4320                .need_rollup(
4321                    ROLLUP_THRESHOLD,
4322                    ROLLUP_USE_ACTIVE_ROLLUP,
4323                    ROLLUP_FALLBACK_THRESHOLD_MS,
4324                    NOW
4325                )
4326                .expect("rollup"),
4327            fallback_seqno.next()
4328        );
4329    }
4330
4331    #[mz_ore::test]
4332    fn idempotency_token_sentinel() {
4333        assert_eq!(
4334            IdempotencyToken::SENTINEL.to_string(),
4335            "i11111111-1111-1111-1111-111111111111"
4336        );
4337    }
4338
4339    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4340    /// randomness, so that it's deterministic. This lets us assert the
4341    /// serialization of that State against a golden file that's committed,
4342    /// making it easy to see what the serialization (used in an upcoming
4343    /// INSPECT feature) looks like.
4344    ///
4345    /// This golden will have to be updated each time we change State, but
4346    /// that's a feature, not a bug.
4347    #[mz_ore::test]
4348    #[cfg_attr(miri, ignore)] // too slow
4349    fn state_inspect_serde_json() {
4350        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4351        let mut runner = proptest::test_runner::TestRunner::deterministic();
4352        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4353        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4354        assert_eq!(
4355            json.trim(),
4356            STATE_SERDE_JSON.trim(),
4357            "\n\nNEW GOLDEN\n{}\n",
4358            json
4359        );
4360    }
4361
4362    #[mz_persist_proc::test(tokio::test)]
4363    #[cfg_attr(miri, ignore)] // too slow
4364    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4365        let mut clients = new_test_client_cache(&dyncfgs);
4366        let shard_id = ShardId::new();
4367
4368        async fn open_and_write(
4369            clients: &mut PersistClientCache,
4370            version: semver::Version,
4371            shard_id: ShardId,
4372        ) -> Result<(), tokio::task::JoinError> {
4373            clients.cfg.build_version = version.clone();
4374            clients.clear_state_cache();
4375            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4376            // Run in a task so we can catch the panic.
4377            mz_ore::task::spawn(|| version.to_string(), async move {
4378                let () = client
4379                    .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4380                    .await
4381                    .expect("valid usage");
4382                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4383                let current = *write.upper().as_option().unwrap();
4384                // Do a write so that we tag the state with the version.
4385                write
4386                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4387                    .await;
4388            })
4389            .into_tokio_handle()
4390            .await
4391        }
4392
4393        // Start at v0.10.0.
4394        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4395        assert_ok!(res);
4396
4397        // Upgrade to v0.11.0 is allowed.
4398        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4399        assert_ok!(res);
4400
4401        // Downgrade to v0.10.0 is no longer allowed.
4402        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4403        assert!(res.unwrap_err().is_panic());
4404
4405        // Downgrade to v0.9.0 is _NOT_ allowed.
4406        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4407        assert!(res.unwrap_err().is_panic());
4408    }
4409
4410    #[mz_ore::test]
4411    fn runid_roundtrip() {
4412        proptest!(|(runid: RunId)| {
4413            let runid_str = runid.to_string();
4414            let parsed = RunId::from_str(&runid_str);
4415            prop_assert_eq!(parsed, Ok(runid));
4416        });
4417    }
4418}