Skip to main content

mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::{CriticalReaderId, Opaque};
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75    env!("OUT_DIR"),
76    "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80    env!("OUT_DIR"),
81    "/mz_persist_client.internal.diff.rs"
82));
83
84/// Determines how often to write rollups, assigning a maintenance task after
85/// `rollup_threshold` seqnos have passed since the last rollup.
86///
87/// Tuning note: in the absence of a long reader seqno hold, and with
88/// incremental GC, this threshold will determine about how many live diffs are
89/// held in Consensus. Lowering this value decreases the live diff count at the
90/// cost of more maintenance work + blob writes.
91pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92    "persist_rollup_threshold",
93    128,
94    "The number of seqnos between rollups.",
95);
96
97/// Determines how long to wait before an active rollup is considered
98/// "stuck" and a new rollup is started.
99pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100    "persist_rollup_fallback_threshold_ms",
101    5000,
102    "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105/// Feature flag the new active rollup tracking mechanism.
106/// We musn't enable this until we are fully deployed on the new version.
107pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108    "persist_rollup_use_active_rollup",
109    true,
110    "Whether to use the new active rollup tracking mechanism.",
111);
112
113/// Determines how long to wait before an active GC is considered
114/// "stuck" and a new GC is started.
115pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116    "persist_gc_fallback_threshold_ms",
117    900000,
118    "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121/// See the config description string.
122pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123    "persist_gc_min_versions",
124    32,
125    "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128/// See the config description string.
129pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130    "persist_gc_max_versions",
131    128_000,
132    "The maximum number of versions to GC in a single GC run.",
133);
134
135/// Feature flag the new active GC tracking mechanism.
136/// We musn't enable this until we are fully deployed on the new version.
137pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138    "persist_gc_use_active_gc",
139    false,
140    "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144    "persist_enable_incremental_compaction",
145    false,
146    "Whether to enable incremental compaction.",
147);
148
149/// A token to disambiguate state commands that could not otherwise be
150/// idempotent.
151#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(f, "i{}", Uuid::from_bytes(self.0))
158    }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164    }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168    type Err = String;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172    }
173}
174
175impl From<IdempotencyToken> for String {
176    fn from(x: IdempotencyToken) -> Self {
177        x.to_string()
178    }
179}
180
181impl IdempotencyToken {
182    pub(crate) fn new() -> Self {
183        IdempotencyToken(*Uuid::new_v4().as_bytes())
184    }
185    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190    /// The seqno capability of this reader.
191    pub seqno: SeqNo,
192    /// The since capability of this reader.
193    pub since: Antichain<T>,
194    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
195    pub last_heartbeat_timestamp_ms: u64,
196    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
197    /// after which this reader may be expired
198    pub lease_duration_ms: u64,
199    /// For debugging.
200    pub debug: HandleDebugState,
201}
202
203#[derive(Clone, Debug, PartialEq, Serialize)]
204pub struct CriticalReaderState<T> {
205    /// The since capability of this reader.
206    pub since: Antichain<T>,
207    /// An opaque token matched on by compare_and_downgrade_since.
208    pub opaque: Opaque,
209    /// For debugging.
210    pub debug: HandleDebugState,
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct WriterState<T> {
215    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
216    pub last_heartbeat_timestamp_ms: u64,
217    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
218    /// after which this writer may be expired
219    pub lease_duration_ms: u64,
220    /// The idempotency token of the most recent successful compare_and_append
221    /// by this writer.
222    pub most_recent_write_token: IdempotencyToken,
223    /// The upper of the most recent successful compare_and_append by this
224    /// writer.
225    pub most_recent_write_upper: Antichain<T>,
226    /// For debugging.
227    pub debug: HandleDebugState,
228}
229
230/// Debugging info for a reader or writer.
231#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
232pub struct HandleDebugState {
233    /// Hostname of the persist user that registered this writer or reader. For
234    /// critical readers, this is the _most recent_ registration.
235    pub hostname: String,
236    /// Plaintext description of this writer or reader's intent.
237    pub purpose: String,
238}
239
240/// Part of the updates in a Batch.
241///
242/// Either a pointer to ones stored in Blob or the updates themselves inlined.
243#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
244#[serde(tag = "type")]
245pub enum BatchPart<T> {
246    Hollow(HollowBatchPart<T>),
247    Inline {
248        updates: LazyInlineBatchPart,
249        ts_rewrite: Option<Antichain<T>>,
250        schema_id: Option<SchemaId>,
251
252        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
253        deprecated_schema_id: Option<SchemaId>,
254    },
255}
256
257fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
258    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
259        let proto = lower.decode()?;
260        let data = ArrayData::from_proto(proto)?;
261        ensure!(data.len() == 1);
262        Ok(ArrayBound::new(make_array(data), 0))
263    };
264
265    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
266
267    match decoded {
268        Ok(bound) => Some(bound),
269        Err(e) => {
270            soft_panic_or_log!("failed to decode bound: {e:#?}");
271            None
272        }
273    }
274}
275
276impl<T> BatchPart<T> {
277    pub fn hollow_bytes(&self) -> usize {
278        match self {
279            BatchPart::Hollow(x) => x.encoded_size_bytes,
280            BatchPart::Inline { .. } => 0,
281        }
282    }
283
284    pub fn is_inline(&self) -> bool {
285        matches!(self, BatchPart::Inline { .. })
286    }
287
288    pub fn inline_bytes(&self) -> usize {
289        match self {
290            BatchPart::Hollow(_) => 0,
291            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
292        }
293    }
294
295    pub fn writer_key(&self) -> Option<WriterKey> {
296        match self {
297            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
298            BatchPart::Inline { .. } => None,
299        }
300    }
301
302    pub fn encoded_size_bytes(&self) -> usize {
303        match self {
304            BatchPart::Hollow(x) => x.encoded_size_bytes,
305            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
306        }
307    }
308
309    // A user-interpretable identifier or description of the part (for logs and
310    // such).
311    pub fn printable_name(&self) -> &str {
312        match self {
313            BatchPart::Hollow(x) => x.key.0.as_str(),
314            BatchPart::Inline { .. } => "<inline>",
315        }
316    }
317
318    pub fn stats(&self) -> Option<&LazyPartStats> {
319        match self {
320            BatchPart::Hollow(x) => x.stats.as_ref(),
321            BatchPart::Inline { .. } => None,
322        }
323    }
324
325    pub fn key_lower(&self) -> &[u8] {
326        match self {
327            BatchPart::Hollow(x) => x.key_lower.as_slice(),
328            // We don't duplicate the lowest key because this can be
329            // considerable overhead for small parts.
330            //
331            // The empty key might not be a tight lower bound, but it is a valid
332            // lower bound. If a caller is interested in a tighter lower bound,
333            // the data is inline.
334            BatchPart::Inline { .. } => &[],
335        }
336    }
337
338    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
339        let part = match self {
340            BatchPart::Hollow(part) => part,
341            BatchPart::Inline { .. } => return None,
342        };
343
344        decode_structured_lower(part.structured_key_lower.as_ref()?)
345    }
346
347    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
348        match self {
349            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
350            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
351        }
352    }
353
354    pub fn schema_id(&self) -> Option<SchemaId> {
355        match self {
356            BatchPart::Hollow(x) => x.schema_id,
357            BatchPart::Inline { schema_id, .. } => *schema_id,
358        }
359    }
360
361    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
362        match self {
363            BatchPart::Hollow(x) => x.deprecated_schema_id,
364            BatchPart::Inline {
365                deprecated_schema_id,
366                ..
367            } => *deprecated_schema_id,
368        }
369    }
370}
371
372impl<T: Timestamp + Codec64> BatchPart<T> {
373    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
374        match self {
375            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
376            BatchPart::Inline { updates, .. } => {
377                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
378                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
379            }
380        }
381    }
382
383    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
384        match self {
385            BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
386            BatchPart::Inline { updates, .. } => Some(
387                updates
388                    .decode::<T>(metrics)
389                    .expect("valid inline part")
390                    .updates
391                    .diffs_sum(),
392            ),
393        }
394    }
395}
396
397/// An ordered list of parts, generally stored as part of a larger run.
398#[derive(Debug, Clone)]
399pub struct HollowRun<T> {
400    /// Pointers usable to retrieve the updates.
401    pub(crate) parts: Vec<RunPart<T>>,
402}
403
404/// A reference to a [HollowRun], including the key in the blob store and some denormalized
405/// metadata.
406#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
407pub struct HollowRunRef<T> {
408    pub key: PartialBatchKey,
409
410    /// The size of the referenced run object, plus all of the hollow objects it contains.
411    pub hollow_bytes: usize,
412
413    /// The size of the largest individual part in the run; useful for sizing compaction.
414    pub max_part_bytes: usize,
415
416    /// The lower bound of the data in this part, ordered by the codec ordering.
417    pub key_lower: Vec<u8>,
418
419    /// The lower bound of the data in this part, ordered by the structured ordering.
420    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
421
422    pub diffs_sum: Option<[u8; 8]>,
423
424    pub(crate) _phantom_data: PhantomData<T>,
425}
426impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
427    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428        Some(self.cmp(other))
429    }
430}
431
432impl<T: Eq> Ord for HollowRunRef<T> {
433    fn cmp(&self, other: &Self) -> Ordering {
434        self.key.cmp(&other.key)
435    }
436}
437
438impl<T> HollowRunRef<T> {
439    pub fn writer_key(&self) -> Option<WriterKey> {
440        Some(self.key.split()?.0)
441    }
442}
443
444impl<T: Timestamp + Codec64> HollowRunRef<T> {
445    /// Stores the given runs and returns a [HollowRunRef] that points to them.
446    pub async fn set<D: Codec64 + Monoid>(
447        shard_id: ShardId,
448        blob: &dyn Blob,
449        writer: &WriterKey,
450        data: HollowRun<T>,
451        metrics: &Metrics,
452    ) -> Self {
453        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
454        let max_part_bytes = data
455            .parts
456            .iter()
457            .map(|p| p.max_part_bytes())
458            .max()
459            .unwrap_or(0);
460        let key_lower = data
461            .parts
462            .first()
463            .map_or(vec![], |p| p.key_lower().to_vec());
464        let structured_key_lower = match data.parts.first() {
465            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
466            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
467            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
468        };
469        let diffs_sum = data
470            .parts
471            .iter()
472            .map(|p| {
473                p.diffs_sum::<D>(&metrics.columnar)
474                    .expect("valid diffs sum")
475            })
476            .reduce(|mut a, b| {
477                a.plus_equals(&b);
478                a
479            })
480            .expect("valid diffs sum")
481            .encode();
482
483        let key = PartialBatchKey::new(writer, &PartId::new());
484        let blob_key = key.complete(&shard_id);
485        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
486        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
487            blob.set(&blob_key, bytes.clone())
488        })
489        .await;
490        Self {
491            key,
492            hollow_bytes,
493            max_part_bytes,
494            key_lower,
495            structured_key_lower,
496            diffs_sum: Some(diffs_sum),
497            _phantom_data: Default::default(),
498        }
499    }
500
501    /// Retrieve the [HollowRun] that this reference points to.
502    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
503    /// with the same shard id and backing store.
504    pub async fn get(
505        &self,
506        shard_id: ShardId,
507        blob: &dyn Blob,
508        metrics: &Metrics,
509    ) -> Option<HollowRun<T>> {
510        let blob_key = self.key.complete(&shard_id);
511        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
512            blob.get(&blob_key)
513        })
514        .await?;
515        let proto_runs: ProtoHollowRun =
516            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
517        let runs = proto_runs
518            .into_rust()
519            .expect("illegal state: invalid encoded runs proto");
520        Some(runs)
521    }
522}
523
524/// Part of the updates in a run.
525///
526/// Either a pointer to ones stored in Blob or a single part stored inline.
527#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
528#[serde(untagged)]
529pub enum RunPart<T> {
530    Single(BatchPart<T>),
531    Many(HollowRunRef<T>),
532}
533
534impl<T: Ord> PartialOrd<Self> for RunPart<T> {
535    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
536        Some(self.cmp(other))
537    }
538}
539
540impl<T: Ord> Ord for RunPart<T> {
541    fn cmp(&self, other: &Self) -> Ordering {
542        match (self, other) {
543            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
544            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
545            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
546            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
547        }
548    }
549}
550
551impl<T> RunPart<T> {
552    #[cfg(test)]
553    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
554        match self {
555            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
556            _ => panic!("expected hollow part!"),
557        }
558    }
559
560    pub fn hollow_bytes(&self) -> usize {
561        match self {
562            Self::Single(p) => p.hollow_bytes(),
563            Self::Many(r) => r.hollow_bytes,
564        }
565    }
566
567    pub fn is_inline(&self) -> bool {
568        match self {
569            Self::Single(p) => p.is_inline(),
570            Self::Many(_) => false,
571        }
572    }
573
574    pub fn inline_bytes(&self) -> usize {
575        match self {
576            Self::Single(p) => p.inline_bytes(),
577            Self::Many(_) => 0,
578        }
579    }
580
581    pub fn max_part_bytes(&self) -> usize {
582        match self {
583            Self::Single(p) => p.encoded_size_bytes(),
584            Self::Many(r) => r.max_part_bytes,
585        }
586    }
587
588    pub fn writer_key(&self) -> Option<WriterKey> {
589        match self {
590            Self::Single(p) => p.writer_key(),
591            Self::Many(r) => r.writer_key(),
592        }
593    }
594
595    pub fn encoded_size_bytes(&self) -> usize {
596        match self {
597            Self::Single(p) => p.encoded_size_bytes(),
598            Self::Many(r) => r.hollow_bytes,
599        }
600    }
601
602    pub fn schema_id(&self) -> Option<SchemaId> {
603        match self {
604            Self::Single(p) => p.schema_id(),
605            Self::Many(_) => None,
606        }
607    }
608
609    // A user-interpretable identifier or description of the part (for logs and
610    // such).
611    pub fn printable_name(&self) -> &str {
612        match self {
613            Self::Single(p) => p.printable_name(),
614            Self::Many(r) => r.key.0.as_str(),
615        }
616    }
617
618    pub fn stats(&self) -> Option<&LazyPartStats> {
619        match self {
620            Self::Single(p) => p.stats(),
621            // TODO: if we kept stats we could avoid fetching the metadata here.
622            Self::Many(_) => None,
623        }
624    }
625
626    pub fn key_lower(&self) -> &[u8] {
627        match self {
628            Self::Single(p) => p.key_lower(),
629            Self::Many(r) => r.key_lower.as_slice(),
630        }
631    }
632
633    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
634        match self {
635            Self::Single(p) => p.structured_key_lower(),
636            Self::Many(_) => None,
637        }
638    }
639
640    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
641        match self {
642            Self::Single(p) => p.ts_rewrite(),
643            Self::Many(_) => None,
644        }
645    }
646}
647
648impl<T> RunPart<T>
649where
650    T: Timestamp + Codec64,
651{
652    pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
653        match self {
654            Self::Single(p) => p.diffs_sum(metrics),
655            Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
656        }
657    }
658}
659
660/// A blob was missing!
661#[derive(Clone, Debug)]
662pub struct MissingBlob(BlobKey);
663
664impl std::fmt::Display for MissingBlob {
665    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666        write!(f, "unexpectedly missing key: {}", self.0)
667    }
668}
669
670impl std::error::Error for MissingBlob {}
671
672impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
673    pub fn part_stream<'a>(
674        &'a self,
675        shard_id: ShardId,
676        blob: &'a dyn Blob,
677        metrics: &'a Metrics,
678    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
679        try_stream! {
680            match self {
681                RunPart::Single(p) => {
682                    yield Cow::Borrowed(p);
683                }
684                RunPart::Many(r) => {
685                    let fetched = r.get(shard_id, blob, metrics).await
686                        .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
687                    for run_part in fetched.parts {
688                        for await batch_part in
689                            run_part.part_stream(shard_id, blob, metrics).boxed()
690                        {
691                            yield Cow::Owned(batch_part?.into_owned());
692                        }
693                    }
694                }
695            }
696        }
697    }
698}
699
700impl<T: Ord> PartialOrd for BatchPart<T> {
701    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
702        Some(self.cmp(other))
703    }
704}
705
706impl<T: Ord> Ord for BatchPart<T> {
707    fn cmp(&self, other: &Self) -> Ordering {
708        match (self, other) {
709            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
710            (
711                BatchPart::Inline {
712                    updates: s_updates,
713                    ts_rewrite: s_ts_rewrite,
714                    schema_id: s_schema_id,
715                    deprecated_schema_id: s_deprecated_schema_id,
716                },
717                BatchPart::Inline {
718                    updates: o_updates,
719                    ts_rewrite: o_ts_rewrite,
720                    schema_id: o_schema_id,
721                    deprecated_schema_id: o_deprecated_schema_id,
722                },
723            ) => (
724                s_updates,
725                s_ts_rewrite.as_ref().map(|x| x.elements()),
726                s_schema_id,
727                s_deprecated_schema_id,
728            )
729                .cmp(&(
730                    o_updates,
731                    o_ts_rewrite.as_ref().map(|x| x.elements()),
732                    o_schema_id,
733                    o_deprecated_schema_id,
734                )),
735            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
736            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
737        }
738    }
739}
740
741/// What order are the parts in this run in?
742#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
743pub(crate) enum RunOrder {
744    /// They're in no particular order.
745    Unordered,
746    /// They're ordered based on the codec-encoded K/V bytes.
747    Codec,
748    /// They're ordered by the natural ordering of the structured data.
749    Structured,
750}
751
752#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
753pub struct RunId(pub(crate) [u8; 16]);
754
755impl std::fmt::Display for RunId {
756    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
757        write!(f, "ri{}", Uuid::from_bytes(self.0))
758    }
759}
760
761impl std::fmt::Debug for RunId {
762    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
763        write!(f, "RunId({})", Uuid::from_bytes(self.0))
764    }
765}
766
767impl std::str::FromStr for RunId {
768    type Err = String;
769
770    fn from_str(s: &str) -> Result<Self, Self::Err> {
771        parse_id("ri", "RunId", s).map(RunId)
772    }
773}
774
775impl From<RunId> for String {
776    fn from(x: RunId) -> Self {
777        x.to_string()
778    }
779}
780
781impl RunId {
782    pub(crate) fn new() -> Self {
783        RunId(*Uuid::new_v4().as_bytes())
784    }
785}
786
787impl Arbitrary for RunId {
788    type Parameters = ();
789    type Strategy = proptest::strategy::BoxedStrategy<Self>;
790    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
791        Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
792            RunId(*Uuid::from_u128(n).as_bytes())
793        })
794        .boxed()
795    }
796}
797
798/// Metadata shared across a run.
799#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
800pub struct RunMeta {
801    /// If none, Persist should infer the order based on the proto metadata.
802    pub(crate) order: Option<RunOrder>,
803    /// All parts in a run should have the same schema.
804    pub(crate) schema: Option<SchemaId>,
805
806    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
807    pub(crate) deprecated_schema: Option<SchemaId>,
808
809    /// If set, a UUID that uniquely identifies this run.
810    pub(crate) id: Option<RunId>,
811
812    /// The number of updates in this run, or `None` if the number is unknown.
813    pub(crate) len: Option<usize>,
814
815    /// Additional unstructured metadata.
816    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
817    pub(crate) meta: MetadataMap,
818}
819
820/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
821#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823    /// Pointer usable to retrieve the updates.
824    pub key: PartialBatchKey,
825    /// Miscellaneous metadata.
826    #[serde(skip_serializing_if = "MetadataMap::is_empty")]
827    pub meta: MetadataMap,
828    /// The encoded size of this part.
829    pub encoded_size_bytes: usize,
830    /// A lower bound on the keys in the part. (By default, this the minimum
831    /// possible key: `vec![]`.)
832    #[serde(serialize_with = "serialize_part_bytes")]
833    pub key_lower: Vec<u8>,
834    /// A lower bound on the keys in the part, stored as structured data.
835    #[serde(serialize_with = "serialize_lazy_proto")]
836    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837    /// Aggregate statistics about data contained in this part.
838    #[serde(serialize_with = "serialize_part_stats")]
839    pub stats: Option<LazyPartStats>,
840    /// A frontier to which timestamps in this part are advanced on read, if
841    /// set.
842    ///
843    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
844    /// but we maintain the distinction between the two for some internal sanity
845    /// checking of invariants as well as metrics. If this ever becomes an
846    /// issue, everything still works with this as just `Antichain<T>`.
847    pub ts_rewrite: Option<Antichain<T>>,
848    /// A Codec64 encoded sum of all diffs in this part, if known.
849    ///
850    /// This is `None` if this part was written before we started storing this
851    /// information, or if it was written when the dyncfg was off.
852    ///
853    /// It could also make sense to model this as part of the pushdown stats, if
854    /// we later decide that's of some benefit.
855    #[serde(serialize_with = "serialize_diffs_sum")]
856    pub diffs_sum: Option<[u8; 8]>,
857    /// Columnar format that this batch was written in.
858    ///
859    /// This is `None` if this part was written before we started writing structured
860    /// columnar data.
861    pub format: Option<BatchColumnarFormat>,
862    /// The schemas used to encode the data in this batch part.
863    ///
864    /// Or None for historical data written before the schema registry was
865    /// added.
866    pub schema_id: Option<SchemaId>,
867
868    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
869    pub deprecated_schema_id: Option<SchemaId>,
870}
871
872/// A [Batch] but with the updates themselves stored externally.
873///
874/// [Batch]: differential_dataflow::trace::BatchReader
875#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877    /// Describes the times of the updates in the batch.
878    pub desc: Description<T>,
879    /// The number of updates in the batch.
880    pub len: usize,
881    /// Pointers usable to retrieve the updates.
882    pub(crate) parts: Vec<RunPart<T>>,
883    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
884    /// ex.
885    /// ```text
886    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
887    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
888    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
889    /// ```
890    pub(crate) run_splits: Vec<usize>,
891    /// Run-level metadata: the first entry has metadata for the first run, and so on.
892    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
893    pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898        let HollowBatch {
899            desc,
900            parts,
901            len,
902            run_splits: runs,
903            run_meta,
904        } = self;
905        f.debug_struct("HollowBatch")
906            .field(
907                "desc",
908                &(
909                    desc.lower().elements(),
910                    desc.upper().elements(),
911                    desc.since().elements(),
912                ),
913            )
914            .field("parts", &parts)
915            .field("len", &len)
916            .field("runs", &runs)
917            .field("run_meta", &run_meta)
918            .finish()
919    }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924        let HollowBatch {
925            desc,
926            len,
927            // Both parts and runs are covered by the self.runs call.
928            parts: _,
929            run_splits: _,
930            run_meta: _,
931        } = self;
932        let mut s = s.serialize_struct("HollowBatch", 5)?;
933        let () = s.serialize_field("lower", &desc.lower().elements())?;
934        let () = s.serialize_field("upper", &desc.upper().elements())?;
935        let () = s.serialize_field("since", &desc.since().elements())?;
936        let () = s.serialize_field("len", len)?;
937        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938        s.end()
939    }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944        Some(self.cmp(other))
945    }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949    fn cmp(&self, other: &Self) -> Ordering {
950        // Deconstruct self and other so we get a compile failure if new fields
951        // are added.
952        let HollowBatch {
953            desc: self_desc,
954            parts: self_parts,
955            len: self_len,
956            run_splits: self_runs,
957            run_meta: self_run_meta,
958        } = self;
959        let HollowBatch {
960            desc: other_desc,
961            parts: other_parts,
962            len: other_len,
963            run_splits: other_runs,
964            run_meta: other_run_meta,
965        } = other;
966        (
967            self_desc.lower().elements(),
968            self_desc.upper().elements(),
969            self_desc.since().elements(),
970            self_parts,
971            self_len,
972            self_runs,
973            self_run_meta,
974        )
975            .cmp(&(
976                other_desc.lower().elements(),
977                other_desc.upper().elements(),
978                other_desc.since().elements(),
979                other_parts,
980                other_len,
981                other_runs,
982                other_run_meta,
983            ))
984    }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988    pub(crate) fn part_stream<'a>(
989        &'a self,
990        shard_id: ShardId,
991        blob: &'a dyn Blob,
992        metrics: &'a Metrics,
993    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994        stream! {
995            for part in &self.parts {
996                for await part in part.part_stream(shard_id, blob, metrics) {
997                    yield part;
998                }
999            }
1000        }
1001    }
1002}
1003impl<T> HollowBatch<T> {
1004    /// Construct an in-memory hollow batch from the given metadata.
1005    ///
1006    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
1007    /// is responsible for ensuring that the defined runs are valid.
1008    ///
1009    /// `len` should represent the number of valid updates in the referenced parts.
1010    pub(crate) fn new(
1011        desc: Description<T>,
1012        parts: Vec<RunPart<T>>,
1013        len: usize,
1014        run_meta: Vec<RunMeta>,
1015        run_splits: Vec<usize>,
1016    ) -> Self {
1017        debug_assert!(
1018            run_splits.is_strictly_sorted(),
1019            "run indices should be strictly increasing"
1020        );
1021        debug_assert!(
1022            run_splits.first().map_or(true, |i| *i > 0),
1023            "run indices should be positive"
1024        );
1025        debug_assert!(
1026            run_splits.last().map_or(true, |i| *i < parts.len()),
1027            "run indices should be valid indices into parts"
1028        );
1029        debug_assert!(
1030            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031            "all metadata should correspond to a run"
1032        );
1033
1034        Self {
1035            desc,
1036            len,
1037            parts,
1038            run_splits,
1039            run_meta,
1040        }
1041    }
1042
1043    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
1044    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045        let run_meta = if parts.is_empty() {
1046            vec![]
1047        } else {
1048            vec![RunMeta::default()]
1049        };
1050        Self {
1051            desc,
1052            len,
1053            parts,
1054            run_splits: vec![],
1055            run_meta,
1056        }
1057    }
1058
1059    #[cfg(test)]
1060    pub(crate) fn new_run_for_test(
1061        desc: Description<T>,
1062        parts: Vec<RunPart<T>>,
1063        len: usize,
1064        run_id: RunId,
1065    ) -> Self {
1066        let run_meta = if parts.is_empty() {
1067            vec![]
1068        } else {
1069            let mut meta = RunMeta::default();
1070            meta.id = Some(run_id);
1071            vec![meta]
1072        };
1073        Self {
1074            desc,
1075            len,
1076            parts,
1077            run_splits: vec![],
1078            run_meta,
1079        }
1080    }
1081
1082    /// An empty hollow batch, representing no updates over the given desc.
1083    pub(crate) fn empty(desc: Description<T>) -> Self {
1084        Self {
1085            desc,
1086            len: 0,
1087            parts: vec![],
1088            run_splits: vec![],
1089            run_meta: vec![],
1090        }
1091    }
1092
1093    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094        let run_ends = self
1095            .run_splits
1096            .iter()
1097            .copied()
1098            .chain(std::iter::once(self.parts.len()));
1099        let run_metas = self.run_meta.iter();
1100        let run_parts = run_ends
1101            .scan(0, |start, end| {
1102                let range = *start..end;
1103                *start = end;
1104                Some(range)
1105            })
1106            .filter(|range| !range.is_empty())
1107            .map(|range| &self.parts[range]);
1108        run_metas.zip_eq(run_parts)
1109    }
1110
1111    pub(crate) fn inline_bytes(&self) -> usize {
1112        self.parts.iter().map(|x| x.inline_bytes()).sum()
1113    }
1114
1115    pub(crate) fn is_empty(&self) -> bool {
1116        self.parts.is_empty()
1117    }
1118
1119    pub(crate) fn part_count(&self) -> usize {
1120        self.parts.len()
1121    }
1122
1123    /// The sum of the encoded sizes of all parts in the batch.
1124    pub fn encoded_size_bytes(&self) -> usize {
1125        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126    }
1127}
1128
1129// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
1130impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131    pub(crate) fn rewrite_ts(
1132        &mut self,
1133        frontier: &Antichain<T>,
1134        new_upper: Antichain<T>,
1135    ) -> Result<(), String> {
1136        if !PartialOrder::less_than(frontier, &new_upper) {
1137            return Err(format!(
1138                "rewrite frontier {:?} !< rewrite upper {:?}",
1139                frontier.elements(),
1140                new_upper.elements(),
1141            ));
1142        }
1143        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144            return Err(format!(
1145                "rewrite upper {:?} < batch upper {:?}",
1146                new_upper.elements(),
1147                self.desc.upper().elements(),
1148            ));
1149        }
1150
1151        // The following are things that it seems like we could support, but
1152        // initially we don't because we don't have a use case for them.
1153        if PartialOrder::less_than(frontier, self.desc.lower()) {
1154            return Err(format!(
1155                "rewrite frontier {:?} < batch lower {:?}",
1156                frontier.elements(),
1157                self.desc.lower().elements(),
1158            ));
1159        }
1160        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161            return Err(format!(
1162                "batch since {:?} != minimum antichain {:?}",
1163                self.desc.since().elements(),
1164                &[T::minimum()],
1165            ));
1166        }
1167        for part in self.parts.iter() {
1168            let Some(ts_rewrite) = part.ts_rewrite() else {
1169                continue;
1170            };
1171            if PartialOrder::less_than(frontier, ts_rewrite) {
1172                return Err(format!(
1173                    "rewrite frontier {:?} < batch rewrite {:?}",
1174                    frontier.elements(),
1175                    ts_rewrite.elements(),
1176                ));
1177            }
1178        }
1179
1180        self.desc = Description::new(
1181            self.desc.lower().clone(),
1182            new_upper,
1183            self.desc.since().clone(),
1184        );
1185        for part in &mut self.parts {
1186            match part {
1187                RunPart::Single(BatchPart::Hollow(part)) => {
1188                    part.ts_rewrite = Some(frontier.clone())
1189                }
1190                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191                    *ts_rewrite = Some(frontier.clone())
1192                }
1193                RunPart::Many(runs) => {
1194                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1195                    // ever generate runs of >1 part for those.
1196                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197                }
1198            }
1199        }
1200        Ok(())
1201    }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206        Some(self.cmp(other))
1207    }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211    fn cmp(&self, other: &Self) -> Ordering {
1212        // Deconstruct self and other so we get a compile failure if new fields
1213        // are added.
1214        let HollowBatchPart {
1215            key: self_key,
1216            meta: self_meta,
1217            encoded_size_bytes: self_encoded_size_bytes,
1218            key_lower: self_key_lower,
1219            structured_key_lower: self_structured_key_lower,
1220            stats: self_stats,
1221            ts_rewrite: self_ts_rewrite,
1222            diffs_sum: self_diffs_sum,
1223            format: self_format,
1224            schema_id: self_schema_id,
1225            deprecated_schema_id: self_deprecated_schema_id,
1226        } = self;
1227        let HollowBatchPart {
1228            key: other_key,
1229            meta: other_meta,
1230            encoded_size_bytes: other_encoded_size_bytes,
1231            key_lower: other_key_lower,
1232            structured_key_lower: other_structured_key_lower,
1233            stats: other_stats,
1234            ts_rewrite: other_ts_rewrite,
1235            diffs_sum: other_diffs_sum,
1236            format: other_format,
1237            schema_id: other_schema_id,
1238            deprecated_schema_id: other_deprecated_schema_id,
1239        } = other;
1240        (
1241            self_key,
1242            self_meta,
1243            self_encoded_size_bytes,
1244            self_key_lower,
1245            self_structured_key_lower,
1246            self_stats,
1247            self_ts_rewrite.as_ref().map(|x| x.elements()),
1248            self_diffs_sum,
1249            self_format,
1250            self_schema_id,
1251            self_deprecated_schema_id,
1252        )
1253            .cmp(&(
1254                other_key,
1255                other_meta,
1256                other_encoded_size_bytes,
1257                other_key_lower,
1258                other_structured_key_lower,
1259                other_stats,
1260                other_ts_rewrite.as_ref().map(|x| x.elements()),
1261                other_diffs_sum,
1262                other_format,
1263                other_schema_id,
1264                other_deprecated_schema_id,
1265            ))
1266    }
1267}
1268
1269/// A pointer to a rollup stored externally.
1270#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1271pub struct HollowRollup {
1272    /// Pointer usable to retrieve the rollup.
1273    pub key: PartialRollupKey,
1274    /// The encoded size of this rollup, if known.
1275    pub encoded_size_bytes: Option<usize>,
1276}
1277
1278/// A pointer to a blob stored externally.
1279#[derive(Debug)]
1280pub enum HollowBlobRef<'a, T> {
1281    Batch(&'a HollowBatch<T>),
1282    Rollup(&'a HollowRollup),
1283}
1284
1285/// A rollup that is currently being computed.
1286#[derive(
1287    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1288)]
1289pub struct ActiveRollup {
1290    pub seqno: SeqNo,
1291    pub start_ms: u64,
1292}
1293
1294/// A garbage collection request that is currently being computed.
1295#[derive(
1296    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1297)]
1298pub struct ActiveGc {
1299    pub seqno: SeqNo,
1300    pub start_ms: u64,
1301}
1302
1303/// A sentinel for a state transition that was a no-op.
1304///
1305/// Critically, this also indicates that the no-op state transition was not
1306/// committed through compare_and_append and thus is _not linearized_.
1307#[derive(Debug)]
1308#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1309pub struct NoOpStateTransition<T>(pub T);
1310
1311// TODO: Document invariants.
1312#[derive(Debug, Clone)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct StateCollections<T> {
1315    /// The version of this state. This is typically identical to the version of the code
1316    /// that wrote it, but may diverge during 0dt upgrades and similar operations when a
1317    /// new version of code is intentionally interoperating with an older state format.
1318    pub(crate) version: Version,
1319
1320    // - Invariant: `<= all reader.since`
1321    // - Invariant: Doesn't regress across state versions.
1322    pub(crate) last_gc_req: SeqNo,
1323
1324    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1325    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1326
1327    /// The rollup that is currently being computed.
1328    pub(crate) active_rollup: Option<ActiveRollup>,
1329    /// The gc request that is currently being computed.
1330    pub(crate) active_gc: Option<ActiveGc>,
1331
1332    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1333    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1334    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1335    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1336
1337    // - Invariant: `trace.since == meet(all reader.since)`
1338    // - Invariant: `trace.since` doesn't regress across state versions.
1339    // - Invariant: `trace.upper` doesn't regress across state versions.
1340    // - Invariant: `trace` upholds its own invariants.
1341    pub(crate) trace: Trace<T>,
1342}
1343
1344/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1345///
1346/// This strategy of directly serializing the schema objects requires that
1347/// persist users do the right thing. Specifically, that an encoded schema
1348/// doesn't in some later version of mz decode to an in-mem object that acts
1349/// differently. In a sense, the current system (before the introduction of the
1350/// schema registry) where schemas are passed in unchecked to reader and writer
1351/// registration calls also has the same defect, so seems fine.
1352///
1353/// An alternative is to write down here some persist-specific representation of
1354/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1355/// the potential to lead down a similar failure mode to the mz_persist_types
1356/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1357/// can always migrate later by rehydrating these, seems fine to start with the
1358/// easy thing.
1359#[derive(Debug, Clone, Serialize, PartialEq)]
1360pub struct EncodedSchemas {
1361    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1362    pub key: Bytes,
1363    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1364    /// registered, encoded as a `ProtoDataType`.
1365    pub key_data_type: Bytes,
1366    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1367    pub val: Bytes,
1368    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1369    /// registered, encoded as a `ProtoDataType`.
1370    pub val_data_type: Bytes,
1371}
1372
1373impl EncodedSchemas {
1374    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1375        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1376        DataType::from_proto(proto).expect("valid DataType")
1377    }
1378}
1379
1380#[derive(Debug)]
1381#[cfg_attr(test, derive(PartialEq))]
1382pub enum CompareAndAppendBreak<T> {
1383    AlreadyCommitted,
1384    Upper {
1385        shard_upper: Antichain<T>,
1386        writer_upper: Antichain<T>,
1387    },
1388    InvalidUsage(InvalidUsage<T>),
1389    InlineBackpressure,
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum SnapshotErr<T> {
1395    AsOfNotYetAvailable(SeqNo, Upper<T>),
1396    AsOfHistoricalDistinctionsLost(Since<T>),
1397}
1398
1399impl<T> StateCollections<T>
1400where
1401    T: Timestamp + Lattice + Codec64,
1402{
1403    pub fn add_rollup(
1404        &mut self,
1405        add_rollup: (SeqNo, &HollowRollup),
1406    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1407        let (rollup_seqno, rollup) = add_rollup;
1408        let applied = match self.rollups.get(&rollup_seqno) {
1409            Some(x) => x.key == rollup.key,
1410            None => {
1411                // PER-16: refuse to insert a rollup at a seqno that GC has
1412                // already physically removed. `apply_unbatched_idempotent_cmd`
1413                // replays the captured `(rollup_seqno, rollup)` tuple on
1414                // every retry. If the original CAS committed indeterminately
1415                // and a concurrent GC then removed the rollup, the retry
1416                // would otherwise observe an empty map entry and re-insert
1417                // the same `PartialRollupKey` at the same seqno, producing
1418                // a second Insert (and, later, a second Delete) for that
1419                // key in the diff stream and tripping the `assert!` in
1420                // `find_removable_blobs`.
1421                //
1422                // We use the smallest seqno currently in `self.rollups` as
1423                // the GC watermark. GC's `remove_rollups` always keeps the
1424                // latest rollup `<= seqno_since` and removes the older
1425                // ones, so the surviving minimum is strictly above every
1426                // seqno that has been removed. (Unlike `last_gc_req`, this
1427                // is only advanced by an actual physical removal, not by
1428                // `maybe_gc` deciding to *request* one — `last_gc_req` runs
1429                // ahead of removals and would falsely reject legitimate
1430                // late `add_rollup` calls for older seqnos.)
1431                if let Some(min_kept) = self.rollups.keys().next() {
1432                    if rollup_seqno < *min_kept {
1433                        return Continue(false);
1434                    }
1435                }
1436                self.active_rollup = None;
1437                self.rollups.insert(rollup_seqno, rollup.to_owned());
1438                true
1439            }
1440        };
1441        // This state transition is a no-op if applied is false but we
1442        // still commit the state change so that this gets linearized
1443        // (maybe we're looking at old state).
1444        Continue(applied)
1445    }
1446
1447    pub fn remove_rollups(
1448        &mut self,
1449        remove_rollups: &[(SeqNo, PartialRollupKey)],
1450    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1451        if self.is_tombstone() {
1452            return Break(NoOpStateTransition(vec![]));
1453        }
1454
1455        // This state transition is called at the end of the GC process, so we
1456        // need to unset the `active_gc` field.
1457        let active_gc_was_set = self.active_gc.take().is_some();
1458
1459        if remove_rollups.is_empty() {
1460            return if active_gc_was_set {
1461                Continue(vec![])
1462            } else {
1463                Break(NoOpStateTransition(vec![]))
1464            };
1465        }
1466
1467        let mut removed = vec![];
1468        for (seqno, key) in remove_rollups {
1469            let removed_key = self.rollups.remove(seqno);
1470            debug_assert!(
1471                removed_key.as_ref().map_or(true, |x| &x.key == key),
1472                "{} vs {:?}",
1473                key,
1474                removed_key
1475            );
1476
1477            if removed_key.is_some() {
1478                removed.push(*seqno);
1479            }
1480        }
1481
1482        Continue(removed)
1483    }
1484
1485    pub fn register_leased_reader(
1486        &mut self,
1487        hostname: &str,
1488        reader_id: &LeasedReaderId,
1489        purpose: &str,
1490        seqno: SeqNo,
1491        lease_duration: Duration,
1492        heartbeat_timestamp_ms: u64,
1493        use_critical_since: bool,
1494    ) -> ControlFlow<
1495        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1496        (LeasedReaderState<T>, SeqNo),
1497    > {
1498        let since = if use_critical_since {
1499            self.critical_since()
1500                .unwrap_or_else(|| self.trace.since().clone())
1501        } else {
1502            self.trace.since().clone()
1503        };
1504        let reader_state = LeasedReaderState {
1505            debug: HandleDebugState {
1506                hostname: hostname.to_owned(),
1507                purpose: purpose.to_owned(),
1508            },
1509            seqno,
1510            since,
1511            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1512            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1513                .expect("lease duration as millis should fit within u64"),
1514        };
1515
1516        // If the shard-global upper and since are both the empty antichain,
1517        // then no further writes can ever commit and no further reads can be
1518        // served. Optimize this by no-op-ing reader registration so that we can
1519        // settle the shard into a final unchanging tombstone state.
1520        if self.is_tombstone() {
1521            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1522        }
1523
1524        // TODO: Handle if the reader or writer already exists.
1525        self.leased_readers
1526            .insert(reader_id.clone(), reader_state.clone());
1527        Continue((reader_state, self.seqno_since(seqno)))
1528    }
1529
1530    pub fn register_critical_reader(
1531        &mut self,
1532        hostname: &str,
1533        reader_id: &CriticalReaderId,
1534        opaque: Opaque,
1535        purpose: &str,
1536    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1537        let state = CriticalReaderState {
1538            debug: HandleDebugState {
1539                hostname: hostname.to_owned(),
1540                purpose: purpose.to_owned(),
1541            },
1542            since: self.trace.since().clone(),
1543            opaque,
1544        };
1545
1546        // We expire all readers if the upper and since both advance to the
1547        // empty antichain. Gracefully handle this. At the same time,
1548        // short-circuit the cmd application so we don't needlessly create new
1549        // SeqNos.
1550        if self.is_tombstone() {
1551            return Break(NoOpStateTransition(state));
1552        }
1553
1554        let state = match self.critical_readers.get_mut(reader_id) {
1555            Some(existing_state) => {
1556                existing_state.debug = state.debug;
1557                existing_state.clone()
1558            }
1559            None => {
1560                self.critical_readers
1561                    .insert(reader_id.clone(), state.clone());
1562                state
1563            }
1564        };
1565        Continue(state)
1566    }
1567
1568    pub fn register_schema<K: Codec, V: Codec>(
1569        &mut self,
1570        key_schema: &K::Schema,
1571        val_schema: &V::Schema,
1572    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1573        fn encode_data_type(data_type: &DataType) -> Bytes {
1574            let proto = data_type.into_proto();
1575            prost::Message::encode_to_vec(&proto).into()
1576        }
1577
1578        // Look for an existing registered SchemaId for these schemas.
1579        //
1580        // The common case is that this should be a recent one, so as a minor
1581        // optimization, do this search in reverse order.
1582        //
1583        // TODO: Note that this impl is `O(schemas)`. Combined with the
1584        // possibility of cmd retries, it's possible but unlikely for this to
1585        // get expensive. We could maintain a reverse map to speed this up in
1586        // necessary. This would either need to work on the encoded
1587        // representation (which, we'd have to fall back to the linear scan) or
1588        // we'd need to add a Hash/Ord bound to Schema.
1589        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1590            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1591        });
1592        match existing_id {
1593            Some((schema_id, _)) => {
1594                // TODO: Validate that the decoded schemas still produce records
1595                // of the recorded DataType, to detect shenanigans. Probably
1596                // best to wait until we've turned on Schema2 in prod and thus
1597                // committed to the current mappings.
1598                Break(NoOpStateTransition(Some(*schema_id)))
1599            }
1600            None if self.is_tombstone() => {
1601                // TODO: Is this right?
1602                Break(NoOpStateTransition(None))
1603            }
1604            None if self.schemas.is_empty() => {
1605                // We'll have to do something more sophisticated here to
1606                // generate the next id if/when we start supporting the removal
1607                // of schemas.
1608                let id = SchemaId(self.schemas.len());
1609                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1610                    .expect("valid key schema");
1611                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1612                    .expect("valid val schema");
1613                let prev = self.schemas.insert(
1614                    id,
1615                    EncodedSchemas {
1616                        key: K::encode_schema(key_schema),
1617                        key_data_type: encode_data_type(&key_data_type),
1618                        val: V::encode_schema(val_schema),
1619                        val_data_type: encode_data_type(&val_data_type),
1620                    },
1621                );
1622                assert_eq!(prev, None);
1623                Continue(Some(id))
1624            }
1625            None => {
1626                info!(
1627                    "register_schemas got {:?} expected {:?}",
1628                    key_schema,
1629                    self.schemas
1630                        .iter()
1631                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1632                        .collect::<Vec<_>>()
1633                );
1634                // Until we implement persist schema changes, only allow at most
1635                // one registered schema.
1636                Break(NoOpStateTransition(None))
1637            }
1638        }
1639    }
1640
1641    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1642        &mut self,
1643        expected: SchemaId,
1644        key_schema: &K::Schema,
1645        val_schema: &V::Schema,
1646    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1647        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1648            // To be defensive, create an empty batch and inspect the resulting
1649            // data type (as opposed to something like allowing the `Schema` to
1650            // declare the DataType).
1651            let array = Schema::encoder(schema).expect("valid schema").finish();
1652            Array::data_type(&array).clone()
1653        }
1654
1655        let (current_id, current) = self
1656            .schemas
1657            .last_key_value()
1658            .expect("all shards have a schema");
1659        if *current_id != expected {
1660            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1661                schema_id: *current_id,
1662                key: K::decode_schema(&current.key),
1663                val: V::decode_schema(&current.val),
1664            }));
1665        }
1666
1667        let current_key = K::decode_schema(&current.key);
1668        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1669        let current_val = V::decode_schema(&current.val);
1670        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1671
1672        let key_dt = data_type(key_schema);
1673        let val_dt = data_type(val_schema);
1674
1675        // If the schema is exactly the same as the current one, no-op.
1676        if current_key == *key_schema
1677            && current_key_dt == key_dt
1678            && current_val == *val_schema
1679            && current_val_dt == val_dt
1680        {
1681            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1682        }
1683
1684        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1685        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1686        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1687            return Break(NoOpStateTransition(CaESchema::Incompatible));
1688        };
1689        // Persist initially disallows dropping columns. This would require a
1690        // bunch more work (e.g. not safe to use the latest schema in
1691        // compaction) and isn't initially necessary in mz.
1692        if key_fn.contains_drop() || val_fn.contains_drop() {
1693            return Break(NoOpStateTransition(CaESchema::Incompatible));
1694        }
1695
1696        // We'll have to do something more sophisticated here to
1697        // generate the next id if/when we start supporting the removal
1698        // of schemas.
1699        let id = SchemaId(self.schemas.len());
1700        self.schemas.insert(
1701            id,
1702            EncodedSchemas {
1703                key: K::encode_schema(key_schema),
1704                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1705                val: V::encode_schema(val_schema),
1706                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1707            },
1708        );
1709        Continue(CaESchema::Ok(id))
1710    }
1711
1712    pub fn compare_and_append(
1713        &mut self,
1714        batch: &HollowBatch<T>,
1715        writer_id: &WriterId,
1716        heartbeat_timestamp_ms: u64,
1717        lease_duration_ms: u64,
1718        idempotency_token: &IdempotencyToken,
1719        debug_info: &HandleDebugState,
1720        inline_writes_total_max_bytes: usize,
1721        claim_compaction_percent: usize,
1722        claim_compaction_min_version: Option<&Version>,
1723    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1724        // We expire all writers if the upper and since both advance to the
1725        // empty antichain. Gracefully handle this. At the same time,
1726        // short-circuit the cmd application so we don't needlessly create new
1727        // SeqNos.
1728        if self.is_tombstone() {
1729            assert_eq!(self.trace.upper(), &Antichain::new());
1730            return Break(CompareAndAppendBreak::Upper {
1731                shard_upper: Antichain::new(),
1732                // This writer might have been registered before the shard upper
1733                // was advanced, which would make this pessimistic in the
1734                // Indeterminate handling of compare_and_append at the machine
1735                // level, but that's fine.
1736                writer_upper: Antichain::new(),
1737            });
1738        }
1739
1740        let writer_state = self
1741            .writers
1742            .entry(writer_id.clone())
1743            .or_insert_with(|| WriterState {
1744                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1745                lease_duration_ms,
1746                most_recent_write_token: IdempotencyToken::SENTINEL,
1747                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1748                debug: debug_info.clone(),
1749            });
1750
1751        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1752            return Break(CompareAndAppendBreak::InvalidUsage(
1753                InvalidUsage::InvalidBounds {
1754                    lower: batch.desc.lower().clone(),
1755                    upper: batch.desc.upper().clone(),
1756                },
1757            ));
1758        }
1759
1760        // If the time interval is empty, the list of updates must also be
1761        // empty.
1762        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1763            return Break(CompareAndAppendBreak::InvalidUsage(
1764                InvalidUsage::InvalidEmptyTimeInterval {
1765                    lower: batch.desc.lower().clone(),
1766                    upper: batch.desc.upper().clone(),
1767                    keys: batch
1768                        .parts
1769                        .iter()
1770                        .map(|x| x.printable_name().to_owned())
1771                        .collect(),
1772                },
1773            ));
1774        }
1775
1776        if idempotency_token == &writer_state.most_recent_write_token {
1777            // If the last write had the same idempotency_token, then this must
1778            // have already committed. Sanity check that the most recent write
1779            // upper matches and that the shard upper is at least the write
1780            // upper, if it's not something very suspect is going on.
1781            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1782            assert!(
1783                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1784                "{:?} vs {:?}",
1785                batch.desc.upper(),
1786                self.trace.upper()
1787            );
1788            return Break(CompareAndAppendBreak::AlreadyCommitted);
1789        }
1790
1791        let shard_upper = self.trace.upper();
1792        if shard_upper != batch.desc.lower() {
1793            return Break(CompareAndAppendBreak::Upper {
1794                shard_upper: shard_upper.clone(),
1795                writer_upper: writer_state.most_recent_write_upper.clone(),
1796            });
1797        }
1798
1799        let new_inline_bytes = batch.inline_bytes();
1800        if new_inline_bytes > 0 {
1801            let mut existing_inline_bytes = 0;
1802            self.trace
1803                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1804            // TODO: For very small batches, it may actually _increase_ the size
1805            // of state to flush them out. Consider another threshold under
1806            // which an inline part can be appended no matter what.
1807            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1808                return Break(CompareAndAppendBreak::InlineBackpressure);
1809            }
1810        }
1811
1812        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1813            self.trace.push_batch(batch.clone())
1814        } else {
1815            Vec::new()
1816        };
1817
1818        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1819        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1820        let all_empty_reqs = merge_reqs
1821            .iter()
1822            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1823        if all_empty_reqs && !batch.is_empty() {
1824            let mut reqs_to_take = claim_compaction_percent / 100;
1825            if (usize::cast_from(idempotency_token.hashed()) % 100)
1826                < (claim_compaction_percent % 100)
1827            {
1828                reqs_to_take += 1;
1829            }
1830            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1831            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1832            merge_reqs.extend(
1833                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1834                // likely to be compacted soon for other reasons.
1835                self.trace
1836                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1837                    .take(reqs_to_take),
1838            )
1839        }
1840
1841        for req in &merge_reqs {
1842            self.trace.claim_compaction(
1843                req.id,
1844                ActiveCompaction {
1845                    start_ms: heartbeat_timestamp_ms,
1846                },
1847            )
1848        }
1849
1850        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1851        writer_state.most_recent_write_token = idempotency_token.clone();
1852        // The writer's most recent upper should only go forward.
1853        assert!(
1854            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1855            "{:?} vs {:?}",
1856            &writer_state.most_recent_write_upper,
1857            batch.desc.upper()
1858        );
1859        writer_state
1860            .most_recent_write_upper
1861            .clone_from(batch.desc.upper());
1862
1863        // Heartbeat the writer state to keep our idempotency token alive.
1864        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1865            heartbeat_timestamp_ms,
1866            writer_state.last_heartbeat_timestamp_ms,
1867        );
1868
1869        Continue(merge_reqs)
1870    }
1871
1872    pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1873        &mut self,
1874        res: &FueledMergeRes<T>,
1875        metrics: &ColumnarMetrics,
1876    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1877        // We expire all writers if the upper and since both advance to the
1878        // empty antichain. Gracefully handle this. At the same time,
1879        // short-circuit the cmd application so we don't needlessly create new
1880        // SeqNos.
1881        if self.is_tombstone() {
1882            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1883        }
1884
1885        let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1886        Continue(apply_merge_result)
1887    }
1888
1889    pub fn spine_exert(
1890        &mut self,
1891        fuel: usize,
1892    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1893        let (merge_reqs, did_work) = self.trace.exert(fuel);
1894        if did_work {
1895            Continue(merge_reqs)
1896        } else {
1897            assert!(merge_reqs.is_empty());
1898            // Break if we have nothing useful to do to save the seqno (and
1899            // resulting crdb traffic)
1900            Break(NoOpStateTransition(Vec::new()))
1901        }
1902    }
1903
1904    pub fn downgrade_since(
1905        &mut self,
1906        reader_id: &LeasedReaderId,
1907        seqno: SeqNo,
1908        outstanding_seqno: SeqNo,
1909        new_since: &Antichain<T>,
1910        heartbeat_timestamp_ms: u64,
1911    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1912        // We expire all readers if the upper and since both advance to the
1913        // empty antichain. Gracefully handle this. At the same time,
1914        // short-circuit the cmd application so we don't needlessly create new
1915        // SeqNos.
1916        if self.is_tombstone() {
1917            return Break(NoOpStateTransition(Since(Antichain::new())));
1918        }
1919
1920        // The only way to have a missing reader in state is if it's been expired... and in that
1921        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1922        let Some(reader_state) = self.leased_reader(reader_id) else {
1923            tracing::warn!(
1924                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1925            );
1926            return Break(NoOpStateTransition(Since(Antichain::new())));
1927        };
1928
1929        // Also use this as an opportunity to heartbeat the reader and downgrade
1930        // the seqno capability.
1931        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1932            heartbeat_timestamp_ms,
1933            reader_state.last_heartbeat_timestamp_ms,
1934        );
1935
1936        let seqno = {
1937            assert!(
1938                outstanding_seqno >= reader_state.seqno,
1939                "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1940                    is behind current reader_state ({:?})",
1941                outstanding_seqno,
1942                reader_state.seqno,
1943            );
1944            std::cmp::min(outstanding_seqno, seqno)
1945        };
1946
1947        reader_state.seqno = seqno;
1948
1949        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1950            reader_state.since.clone_from(new_since);
1951            self.update_since();
1952            new_since.clone()
1953        } else {
1954            // No-op, but still commit the state change so that this gets
1955            // linearized.
1956            reader_state.since.clone()
1957        };
1958
1959        Continue(Since(reader_current_since))
1960    }
1961
1962    pub fn compare_and_downgrade_since(
1963        &mut self,
1964        reader_id: &CriticalReaderId,
1965        expected_opaque: &Opaque,
1966        (new_opaque, new_since): (&Opaque, &Antichain<T>),
1967    ) -> ControlFlow<
1968        NoOpStateTransition<Result<Since<T>, (Opaque, Since<T>)>>,
1969        Result<Since<T>, (Opaque, Since<T>)>,
1970    > {
1971        // We expire all readers if the upper and since both advance to the
1972        // empty antichain. Gracefully handle this. At the same time,
1973        // short-circuit the cmd application so we don't needlessly create new
1974        // SeqNos.
1975        if self.is_tombstone() {
1976            // Match the idempotence behavior below of ignoring the token if
1977            // since is already advanced enough (in this case, because it's a
1978            // tombstone, we know it's the empty antichain).
1979            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1980        }
1981
1982        let reader_state = self.critical_reader(reader_id);
1983
1984        if reader_state.opaque != *expected_opaque {
1985            // No-op, but still commit the state change so that this gets
1986            // linearized.
1987            return Continue(Err((
1988                reader_state.opaque.clone(),
1989                Since(reader_state.since.clone()),
1990            )));
1991        }
1992
1993        reader_state.opaque = new_opaque.clone();
1994        if PartialOrder::less_equal(&reader_state.since, new_since) {
1995            reader_state.since.clone_from(new_since);
1996            self.update_since();
1997            Continue(Ok(Since(new_since.clone())))
1998        } else {
1999            // no work to be done -- the reader state's `since` is already sufficiently
2000            // advanced. we may someday need to revisit this branch when it's possible
2001            // for two `since` frontiers to be incomparable.
2002            Continue(Ok(Since(reader_state.since.clone())))
2003        }
2004    }
2005
2006    pub fn expire_leased_reader(
2007        &mut self,
2008        reader_id: &LeasedReaderId,
2009    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2010        // We expire all readers if the upper and since both advance to the
2011        // empty antichain. Gracefully handle this. At the same time,
2012        // short-circuit the cmd application so we don't needlessly create new
2013        // SeqNos.
2014        if self.is_tombstone() {
2015            return Break(NoOpStateTransition(false));
2016        }
2017
2018        let existed = self.leased_readers.remove(reader_id).is_some();
2019        if existed {
2020            // TODO(database-issues#6885): Re-enable this
2021            //
2022            // Temporarily disabling this because we think it might be the cause
2023            // of the remap since bug. Specifically, a clusterd process has a
2024            // ReadHandle for maintaining the once and one inside a Listen. If
2025            // we crash and stay down for longer than the read lease duration,
2026            // it's possible that an expiry of them both in quick succession
2027            // jumps the since forward to the Listen one.
2028            //
2029            // Don't forget to update the downgrade_since when this gets
2030            // switched back on.
2031            //
2032            // self.update_since();
2033        }
2034        // No-op if existed is false, but still commit the state change so that
2035        // this gets linearized.
2036        Continue(existed)
2037    }
2038
2039    pub fn expire_critical_reader(
2040        &mut self,
2041        reader_id: &CriticalReaderId,
2042    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2043        // We expire all readers if the upper and since both advance to the
2044        // empty antichain. Gracefully handle this. At the same time,
2045        // short-circuit the cmd application so we don't needlessly create new
2046        // SeqNos.
2047        if self.is_tombstone() {
2048            return Break(NoOpStateTransition(false));
2049        }
2050
2051        let existed = self.critical_readers.remove(reader_id).is_some();
2052        if existed {
2053            // TODO(database-issues#6885): Re-enable this
2054            //
2055            // Temporarily disabling this because we think it might be the cause
2056            // of the remap since bug. Specifically, a clusterd process has a
2057            // ReadHandle for maintaining the once and one inside a Listen. If
2058            // we crash and stay down for longer than the read lease duration,
2059            // it's possible that an expiry of them both in quick succession
2060            // jumps the since forward to the Listen one.
2061            //
2062            // Don't forget to update the downgrade_since when this gets
2063            // switched back on.
2064            //
2065            // self.update_since();
2066        }
2067        // This state transition is a no-op if existed is false, but we still
2068        // commit the state change so that this gets linearized (maybe we're
2069        // looking at old state).
2070        Continue(existed)
2071    }
2072
2073    pub fn expire_writer(
2074        &mut self,
2075        writer_id: &WriterId,
2076    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2077        // We expire all writers if the upper and since both advance to the
2078        // empty antichain. Gracefully handle this. At the same time,
2079        // short-circuit the cmd application so we don't needlessly create new
2080        // SeqNos.
2081        if self.is_tombstone() {
2082            return Break(NoOpStateTransition(false));
2083        }
2084
2085        let existed = self.writers.remove(writer_id).is_some();
2086        // This state transition is a no-op if existed is false, but we still
2087        // commit the state change so that this gets linearized (maybe we're
2088        // looking at old state).
2089        Continue(existed)
2090    }
2091
2092    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2093        self.leased_readers.get_mut(id)
2094    }
2095
2096    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2097        self.critical_readers
2098            .get_mut(id)
2099            .unwrap_or_else(|| {
2100                panic!(
2101                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2102                    id
2103                )
2104            })
2105    }
2106
2107    fn critical_since(&self) -> Option<Antichain<T>> {
2108        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2109        let mut since = critical_sinces.next().cloned()?;
2110        for s in critical_sinces {
2111            since.meet_assign(s);
2112        }
2113        Some(since)
2114    }
2115
2116    fn update_since(&mut self) {
2117        let mut sinces_iter = self
2118            .leased_readers
2119            .values()
2120            .map(|x| &x.since)
2121            .chain(self.critical_readers.values().map(|x| &x.since));
2122        let mut since = match sinces_iter.next() {
2123            Some(since) => since.clone(),
2124            None => {
2125                // If there are no current readers, leave `since` unchanged so
2126                // it doesn't regress.
2127                return;
2128            }
2129        };
2130        while let Some(s) = sinces_iter.next() {
2131            since.meet_assign(s);
2132        }
2133        self.trace.downgrade_since(&since);
2134    }
2135
2136    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2137        let mut seqno_since = seqno;
2138        for cap in self.leased_readers.values() {
2139            seqno_since = std::cmp::min(seqno_since, cap.seqno);
2140        }
2141        // critical_readers don't hold a seqno capability.
2142        seqno_since
2143    }
2144
2145    fn tombstone_batch() -> HollowBatch<T> {
2146        HollowBatch::empty(Description::new(
2147            Antichain::from_elem(T::minimum()),
2148            Antichain::new(),
2149            Antichain::new(),
2150        ))
2151    }
2152
2153    pub(crate) fn is_tombstone(&self) -> bool {
2154        self.trace.upper().is_empty()
2155            && self.trace.since().is_empty()
2156            && self.writers.is_empty()
2157            && self.leased_readers.is_empty()
2158            && self.critical_readers.is_empty()
2159    }
2160
2161    pub(crate) fn is_single_empty_batch(&self) -> bool {
2162        let mut batch_count = 0;
2163        let mut is_empty = true;
2164        self.trace.map_batches(|b| {
2165            batch_count += 1;
2166            is_empty &= b.is_empty()
2167        });
2168        batch_count <= 1 && is_empty
2169    }
2170
2171    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2172        assert_eq!(self.trace.upper(), &Antichain::new());
2173        assert_eq!(self.trace.since(), &Antichain::new());
2174
2175        // Remember our current state, so we can decide whether we have to
2176        // record a transition in durable state.
2177        let was_tombstone = self.is_tombstone();
2178
2179        // Enter the "tombstone" state, if we're not in it already.
2180        self.writers.clear();
2181        self.leased_readers.clear();
2182        self.critical_readers.clear();
2183
2184        debug_assert!(self.is_tombstone());
2185
2186        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2187        // it -- the actual contents of our batches no longer matter.
2188        // This method progressively replaces batches in our state with simpler versions, to allow
2189        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2190        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2191        // transitions... most operations happen incrementally, and large single writes can overwhelm
2192        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2193
2194        let mut to_replace = None;
2195        let mut batch_count = 0;
2196        self.trace.map_batches(|b| {
2197            batch_count += 1;
2198            if !b.is_empty() && to_replace.is_none() {
2199                to_replace = Some(b.desc.clone());
2200            }
2201        });
2202        if let Some(desc) = to_replace {
2203            // We have a nonempty batch: replace it with an empty batch and return.
2204            // This should not produce an excessively large diff: if it did, we wouldn't have been
2205            // able to append that batch in the first place.
2206            let result = self.trace.apply_tombstone_merge(&desc);
2207            assert!(
2208                result.matched(),
2209                "merge with a matching desc should always match"
2210            );
2211            Continue(())
2212        } else if batch_count > 1 {
2213            // All our batches are empty, but we have more than one of them. Replace the whole set
2214            // with a new single-batch trace.
2215            // This produces a diff with a size proportional to the number of batches, but since
2216            // Spine keeps a logarithmic number of batches this should never be excessively large.
2217            let mut new_trace = Trace::default();
2218            new_trace.downgrade_since(&Antichain::new());
2219            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2220            assert_eq!(merge_reqs, Vec::new());
2221            self.trace = new_trace;
2222            Continue(())
2223        } else if !was_tombstone {
2224            // We were not tombstoned before, so have to make sure this state
2225            // transition is recorded.
2226            Continue(())
2227        } else {
2228            // All our batches are empty, and there's only one... there's no shrinking this
2229            // tombstone further.
2230            Break(NoOpStateTransition(()))
2231        }
2232    }
2233}
2234
2235// TODO: Document invariants.
2236#[derive(Debug)]
2237#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2238pub struct State<T> {
2239    pub(crate) shard_id: ShardId,
2240
2241    pub(crate) seqno: SeqNo,
2242    /// A strictly increasing wall time of when this state was written, in
2243    /// milliseconds since the unix epoch.
2244    pub(crate) walltime_ms: u64,
2245    /// Hostname of the persist user that created this version of state. For
2246    /// debugging.
2247    pub(crate) hostname: String,
2248    pub(crate) collections: StateCollections<T>,
2249}
2250
2251/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2252/// ones in durable storage.
2253pub struct TypedState<K, V, T, D> {
2254    pub(crate) state: State<T>,
2255
2256    // According to the docs, PhantomData is to "mark things that act like they
2257    // own a T". State doesn't actually own K, V, or D, just the ability to
2258    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2259    // T [1], but also allows State to correctly derive Send+Sync.
2260    //
2261    // [1]:
2262    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2263    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2264}
2265
2266impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2267    #[cfg(any(test, debug_assertions))]
2268    pub(crate) fn clone(&self, hostname: String) -> Self {
2269        TypedState {
2270            state: State {
2271                shard_id: self.shard_id.clone(),
2272                seqno: self.seqno.clone(),
2273                walltime_ms: self.walltime_ms,
2274                hostname,
2275                collections: self.collections.clone(),
2276            },
2277            _phantom: PhantomData,
2278        }
2279    }
2280
2281    pub(crate) fn clone_for_rollup(&self) -> Self {
2282        TypedState {
2283            state: State {
2284                shard_id: self.shard_id.clone(),
2285                seqno: self.seqno.clone(),
2286                walltime_ms: self.walltime_ms,
2287                hostname: self.hostname.clone(),
2288                collections: self.collections.clone(),
2289            },
2290            _phantom: PhantomData,
2291        }
2292    }
2293}
2294
2295impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2296    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2297        // Deconstruct self so we get a compile failure if new fields
2298        // are added.
2299        let TypedState { state, _phantom } = self;
2300        f.debug_struct("TypedState").field("state", state).finish()
2301    }
2302}
2303
2304// Impl PartialEq regardless of the type params.
2305#[cfg(any(test, debug_assertions))]
2306impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2307    fn eq(&self, other: &Self) -> bool {
2308        // Deconstruct self and other so we get a compile failure if new fields
2309        // are added.
2310        let TypedState {
2311            state: self_state,
2312            _phantom,
2313        } = self;
2314        let TypedState {
2315            state: other_state,
2316            _phantom,
2317        } = other;
2318        self_state == other_state
2319    }
2320}
2321
2322impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2323    type Target = State<T>;
2324
2325    fn deref(&self) -> &Self::Target {
2326        &self.state
2327    }
2328}
2329
2330impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2331    fn deref_mut(&mut self) -> &mut Self::Target {
2332        &mut self.state
2333    }
2334}
2335
2336impl<K, V, T, D> TypedState<K, V, T, D>
2337where
2338    K: Codec,
2339    V: Codec,
2340    T: Timestamp + Lattice + Codec64,
2341    D: Codec64,
2342{
2343    pub fn new(
2344        applier_version: Version,
2345        shard_id: ShardId,
2346        hostname: String,
2347        walltime_ms: u64,
2348    ) -> Self {
2349        let state = State {
2350            shard_id,
2351            seqno: SeqNo::minimum(),
2352            walltime_ms,
2353            hostname,
2354            collections: StateCollections {
2355                version: applier_version,
2356                last_gc_req: SeqNo::minimum(),
2357                rollups: BTreeMap::new(),
2358                active_rollup: None,
2359                active_gc: None,
2360                leased_readers: BTreeMap::new(),
2361                critical_readers: BTreeMap::new(),
2362                writers: BTreeMap::new(),
2363                schemas: BTreeMap::new(),
2364                trace: Trace::default(),
2365            },
2366        };
2367        TypedState {
2368            state,
2369            _phantom: PhantomData,
2370        }
2371    }
2372
2373    pub fn clone_apply<R, E, WorkFn>(
2374        &self,
2375        cfg: &PersistConfig,
2376        work_fn: &mut WorkFn,
2377    ) -> ControlFlow<E, (R, Self)>
2378    where
2379        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2380    {
2381        // We do not increment the version by default, though work_fn can if it chooses to.
2382        let mut new_state = State {
2383            shard_id: self.shard_id,
2384            seqno: self.seqno.next(),
2385            walltime_ms: (cfg.now)(),
2386            hostname: cfg.hostname.clone(),
2387            collections: self.collections.clone(),
2388        };
2389
2390        // Make sure walltime_ms is strictly increasing, in case clocks are
2391        // offset.
2392        if new_state.walltime_ms <= self.walltime_ms {
2393            new_state.walltime_ms = self.walltime_ms + 1;
2394        }
2395
2396        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2397        let new_state = TypedState {
2398            state: new_state,
2399            _phantom: PhantomData,
2400        };
2401        Continue((work_ret, new_state))
2402    }
2403}
2404
2405#[derive(Copy, Clone, Debug)]
2406pub struct GcConfig {
2407    pub use_active_gc: bool,
2408    pub fallback_threshold_ms: u64,
2409    pub min_versions: usize,
2410    pub max_versions: usize,
2411}
2412
2413impl<T> State<T>
2414where
2415    T: Timestamp + Lattice + Codec64,
2416{
2417    pub fn shard_id(&self) -> ShardId {
2418        self.shard_id
2419    }
2420
2421    pub fn seqno(&self) -> SeqNo {
2422        self.seqno
2423    }
2424
2425    pub fn since(&self) -> &Antichain<T> {
2426        self.collections.trace.since()
2427    }
2428
2429    pub fn upper(&self) -> &Antichain<T> {
2430        self.collections.trace.upper()
2431    }
2432
2433    pub fn spine_batch_count(&self) -> usize {
2434        self.collections.trace.num_spine_batches()
2435    }
2436
2437    pub fn size_metrics(&self) -> StateSizeMetrics {
2438        let mut ret = StateSizeMetrics::default();
2439        self.blobs().for_each(|x| match x {
2440            HollowBlobRef::Batch(x) => {
2441                ret.hollow_batch_count += 1;
2442                ret.batch_part_count += x.part_count();
2443                ret.num_updates += x.len;
2444
2445                let batch_size = x.encoded_size_bytes();
2446                for x in x.parts.iter() {
2447                    if x.ts_rewrite().is_some() {
2448                        ret.rewrite_part_count += 1;
2449                    }
2450                    if x.is_inline() {
2451                        ret.inline_part_count += 1;
2452                        ret.inline_part_bytes += x.inline_bytes();
2453                    }
2454                }
2455                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2456                ret.state_batches_bytes += batch_size;
2457            }
2458            HollowBlobRef::Rollup(x) => {
2459                ret.state_rollup_count += 1;
2460                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2461            }
2462        });
2463        ret
2464    }
2465
2466    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2467        // We maintain the invariant that every version of state has at least
2468        // one rollup.
2469        self.collections
2470            .rollups
2471            .iter()
2472            .rev()
2473            .next()
2474            .expect("State should have at least one rollup if seqno > minimum")
2475    }
2476
2477    pub(crate) fn seqno_since(&self) -> SeqNo {
2478        self.collections.seqno_since(self.seqno)
2479    }
2480
2481    // Returns whether the cmd proposing this state has been selected to perform
2482    // background garbage collection work.
2483    //
2484    // If it was selected, this information is recorded in the state itself for
2485    // commit along with the cmd's state transition. This helps us to avoid
2486    // redundant work.
2487    //
2488    // Correctness does not depend on a gc assignment being executed, nor on
2489    // them being executed in the order they are given. But it is expected that
2490    // gc assignments are best-effort respected. In practice, cmds like
2491    // register_foo or expire_foo, where it would be awkward, ignore gc.
2492    pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2493        let GcConfig {
2494            use_active_gc,
2495            fallback_threshold_ms,
2496            min_versions,
2497            max_versions,
2498        } = cfg;
2499        // This is an arbitrary-ish threshold that scales with seqno, but never
2500        // gets particularly big. It probably could be much bigger and certainly
2501        // could use a tuning pass at some point.
2502        let gc_threshold = if use_active_gc {
2503            u64::cast_from(min_versions)
2504        } else {
2505            std::cmp::max(
2506                1,
2507                u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2508            )
2509        };
2510        let new_seqno_since = self.seqno_since();
2511        // Collect until the new seqno since... or the old since plus the max number of versions,
2512        // whatever is less.
2513        let gc_until_seqno = new_seqno_since.min(SeqNo(
2514            self.collections
2515                .last_gc_req
2516                .0
2517                .saturating_add(u64::cast_from(max_versions)),
2518        ));
2519        let should_gc = new_seqno_since
2520            .0
2521            .saturating_sub(self.collections.last_gc_req.0)
2522            >= gc_threshold;
2523
2524        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2525        // it's been a while since it started, we should gc.
2526        let should_gc = if use_active_gc && !should_gc {
2527            match self.collections.active_gc {
2528                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2529                None => false,
2530            }
2531        } else {
2532            should_gc
2533        };
2534        // Assign GC traffic preferentially to writers, falling back to anyone
2535        // generating new state versions if there are no writers.
2536        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2537        // Always assign GC work to a tombstoned shard to have the chance to
2538        // clean up any residual blobs. This is safe (won't cause excess gc)
2539        // as the only allowed command after becoming a tombstone is to write
2540        // the final rollup.
2541        let tombstone_needs_gc = self.collections.is_tombstone();
2542        let should_gc = should_gc || tombstone_needs_gc;
2543        let should_gc = if use_active_gc {
2544            // If we have an active gc, we should only gc if the active gc is
2545            // sufficiently old. This is to avoid doing more gc work than
2546            // necessary.
2547            should_gc
2548                && match self.collections.active_gc {
2549                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2550                    None => true,
2551                }
2552        } else {
2553            should_gc
2554        };
2555        if should_gc {
2556            self.collections.last_gc_req = gc_until_seqno;
2557            Some(GcReq {
2558                shard_id: self.shard_id,
2559                new_seqno_since: gc_until_seqno,
2560            })
2561        } else {
2562            None
2563        }
2564    }
2565
2566    /// Return the number of gc-ineligible state versions.
2567    pub fn seqnos_held(&self) -> usize {
2568        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2569    }
2570
2571    /// Expire all readers and writers up to the given walltime_ms.
2572    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2573        let mut metrics = ExpiryMetrics::default();
2574        let shard_id = self.shard_id();
2575        self.collections.leased_readers.retain(|id, state| {
2576            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2577            if !retain {
2578                info!(
2579                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2580                    state.debug.purpose
2581                );
2582                metrics.readers_expired += 1;
2583            }
2584            retain
2585        });
2586        // critical_readers don't need forced expiration. (In fact, that's the point!)
2587        self.collections.writers.retain(|id, state| {
2588            let retain =
2589                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2590            if !retain {
2591                info!(
2592                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2593                    state.debug.purpose
2594                );
2595                metrics.writers_expired += 1;
2596            }
2597            retain
2598        });
2599        metrics
2600    }
2601
2602    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2603    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2604    /// referenced parts are valid to read.
2605    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2606        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2607            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2608                self.collections.trace.since().clone(),
2609            )));
2610        }
2611        let upper = self.collections.trace.upper();
2612        if PartialOrder::less_equal(upper, as_of) {
2613            return Err(SnapshotErr::AsOfNotYetAvailable(
2614                self.seqno,
2615                Upper(upper.clone()),
2616            ));
2617        }
2618
2619        let batches = self
2620            .collections
2621            .trace
2622            .batches()
2623            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2624            .cloned()
2625            .collect();
2626        Ok(batches)
2627    }
2628
2629    // NB: Unlike the other methods here, this one is read-only.
2630    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2631        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2632            return Err(Since(self.collections.trace.since().clone()));
2633        }
2634        Ok(())
2635    }
2636
2637    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2638        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2639        // batch and this iterates through all batches to find the next one.
2640        self.collections
2641            .trace
2642            .batches()
2643            .find(|b| {
2644                PartialOrder::less_equal(b.desc.lower(), frontier)
2645                    && PartialOrder::less_than(frontier, b.desc.upper())
2646            })
2647            .cloned()
2648            .ok_or(self.seqno)
2649    }
2650
2651    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2652        self.collections.active_rollup
2653    }
2654
2655    pub fn need_rollup(
2656        &self,
2657        threshold: usize,
2658        use_active_rollup: bool,
2659        fallback_threshold_ms: u64,
2660        now: u64,
2661    ) -> Option<SeqNo> {
2662        let (latest_rollup_seqno, _) = self.latest_rollup();
2663
2664        // Tombstoned shards require one final rollup. However, because we
2665        // write a rollup as of SeqNo X and then link it in using a state
2666        // transition (in this case from X to X+1), the minimum number of
2667        // live diffs is actually two. Detect when we're in this minimal
2668        // two diff state and stop the (otherwise) infinite iteration.
2669        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2670            return Some(self.seqno);
2671        }
2672
2673        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2674
2675        if use_active_rollup {
2676            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2677            // we should start a new rollup.
2678            // If there is an active rollup, we should check if it has been running too long.
2679            // If it has, we should start a new rollup.
2680            // This is to guard against a worker dying/taking too long/etc.
2681            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2682                match self.active_rollup() {
2683                    Some(active_rollup) => {
2684                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2685                            return Some(self.seqno);
2686                        }
2687                    }
2688                    None => {
2689                        return Some(self.seqno);
2690                    }
2691                }
2692            }
2693        } else {
2694            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2695            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2696            // racing / performing redundant work.
2697            if seqnos_since_last_rollup > 0
2698                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2699            {
2700                return Some(self.seqno);
2701            }
2702
2703            // however, since maintenance is best-effort and could fail, do assign rollup
2704            // work to every seqno after a fallback threshold to ensure one is written.
2705            if seqnos_since_last_rollup
2706                > u64::cast_from(
2707                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2708                )
2709            {
2710                return Some(self.seqno);
2711            }
2712        }
2713
2714        None
2715    }
2716
2717    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2718        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2719        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2720        batches.chain(rollups)
2721    }
2722}
2723
2724fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2725    let val = hex::encode(val);
2726    val.serialize(s)
2727}
2728
2729fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2730    val: &Option<LazyProto<T>>,
2731    s: S,
2732) -> Result<S::Ok, S::Error> {
2733    val.as_ref()
2734        .map(|lazy| hex::encode(&lazy.into_proto()))
2735        .serialize(s)
2736}
2737
2738fn serialize_part_stats<S: Serializer>(
2739    val: &Option<LazyPartStats>,
2740    s: S,
2741) -> Result<S::Ok, S::Error> {
2742    let val = val.as_ref().map(|x| x.decode().key);
2743    val.serialize(s)
2744}
2745
2746fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2747    // This is only used for debugging, so hack to assume that D is i64.
2748    let val = val.map(i64::decode);
2749    val.serialize(s)
2750}
2751
2752// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2753// intentionally gated from users, so not strictly subject to our backward
2754// compatibility guarantees, but still probably best to be thoughtful about
2755// making unnecessary changes. Additionally, it's nice to make the output as
2756// nice to use as possible without tying our hands for the actual code usages.
2757impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2758    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2759        let State {
2760            shard_id,
2761            seqno,
2762            walltime_ms,
2763            hostname,
2764            collections:
2765                StateCollections {
2766                    version: applier_version,
2767                    last_gc_req,
2768                    rollups,
2769                    active_rollup,
2770                    active_gc,
2771                    leased_readers,
2772                    critical_readers,
2773                    writers,
2774                    schemas,
2775                    trace,
2776                },
2777        } = self;
2778        let mut s = s.serialize_struct("State", 13)?;
2779        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2780        let () = s.serialize_field("shard_id", shard_id)?;
2781        let () = s.serialize_field("seqno", seqno)?;
2782        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2783        let () = s.serialize_field("hostname", hostname)?;
2784        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2785        let () = s.serialize_field("rollups", rollups)?;
2786        let () = s.serialize_field("active_rollup", active_rollup)?;
2787        let () = s.serialize_field("active_gc", active_gc)?;
2788        let () = s.serialize_field("leased_readers", leased_readers)?;
2789        let () = s.serialize_field("critical_readers", critical_readers)?;
2790        let () = s.serialize_field("writers", writers)?;
2791        let () = s.serialize_field("schemas", schemas)?;
2792        let () = s.serialize_field("since", &trace.since().elements())?;
2793        let () = s.serialize_field("upper", &trace.upper().elements())?;
2794        let trace = trace.flatten();
2795        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2796        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2797        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2798        let () = s.serialize_field("merges", &trace.merges)?;
2799        s.end()
2800    }
2801}
2802
2803#[derive(Debug, Default)]
2804pub struct StateSizeMetrics {
2805    pub hollow_batch_count: usize,
2806    pub batch_part_count: usize,
2807    pub rewrite_part_count: usize,
2808    pub num_updates: usize,
2809    pub largest_batch_bytes: usize,
2810    pub state_batches_bytes: usize,
2811    pub state_rollups_bytes: usize,
2812    pub state_rollup_count: usize,
2813    pub inline_part_count: usize,
2814    pub inline_part_bytes: usize,
2815}
2816
2817#[derive(Default)]
2818pub struct ExpiryMetrics {
2819    pub(crate) readers_expired: usize,
2820    pub(crate) writers_expired: usize,
2821}
2822
2823/// Wrapper for Antichain that represents a Since
2824#[derive(Debug, Clone, PartialEq)]
2825pub struct Since<T>(pub Antichain<T>);
2826
2827/// Wrapper for Antichain that represents an Upper
2828#[derive(Debug, PartialEq)]
2829pub struct Upper<T>(pub Antichain<T>);
2830
2831#[cfg(test)]
2832pub(crate) mod tests {
2833    use std::ops::Range;
2834    use std::str::FromStr;
2835
2836    use bytes::Bytes;
2837    use mz_build_info::DUMMY_BUILD_INFO;
2838    use mz_dyncfg::ConfigUpdates;
2839    use mz_ore::now::SYSTEM_TIME;
2840    use mz_ore::{assert_none, assert_ok};
2841    use mz_proto::RustType;
2842    use proptest::prelude::*;
2843    use proptest::strategy::ValueTree;
2844
2845    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2846    use crate::cache::PersistClientCache;
2847    use crate::internal::encoding::any_some_lazy_part_stats;
2848    use crate::internal::paths::RollupId;
2849    use crate::internal::trace::tests::any_trace;
2850    use crate::tests::new_test_client_cache;
2851    use crate::{Diagnostics, PersistLocation};
2852
2853    use super::*;
2854
2855    const LEASE_DURATION_MS: u64 = 900 * 1000;
2856    fn debug_state() -> HandleDebugState {
2857        HandleDebugState {
2858            hostname: "debug".to_owned(),
2859            purpose: "finding the bugs".to_owned(),
2860        }
2861    }
2862
2863    pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2864        num_runs: usize,
2865    ) -> impl Strategy<Value = HollowBatch<T>> {
2866        (
2867            any::<T>(),
2868            any::<T>(),
2869            any::<T>(),
2870            proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2871            any::<usize>(),
2872        )
2873            .prop_map(move |(t0, t1, since, parts, len)| {
2874                let (lower, upper) = if t0 <= t1 {
2875                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2876                } else {
2877                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2878                };
2879                let since = Antichain::from_elem(since);
2880
2881                let run_splits = (1..num_runs)
2882                    .map(|i| i * parts.len() / num_runs)
2883                    .collect::<Vec<_>>();
2884
2885                let run_meta = (0..num_runs)
2886                    .map(|_| {
2887                        let mut meta = RunMeta::default();
2888                        meta.id = Some(RunId::new());
2889                        meta
2890                    })
2891                    .collect::<Vec<_>>();
2892
2893                HollowBatch::new(
2894                    Description::new(lower, upper, since),
2895                    parts,
2896                    len % 10,
2897                    run_meta,
2898                    run_splits,
2899                )
2900            })
2901    }
2902
2903    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2904        Strategy::prop_map(
2905            (
2906                any::<T>(),
2907                any::<T>(),
2908                any::<T>(),
2909                proptest::collection::vec(any_run_part::<T>(), 0..20),
2910                any::<usize>(),
2911                0..=10usize,
2912                proptest::collection::vec(any::<RunId>(), 10),
2913            ),
2914            |(t0, t1, since, parts, len, num_runs, run_ids)| {
2915                let (lower, upper) = if t0 <= t1 {
2916                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2917                } else {
2918                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2919                };
2920                let since = Antichain::from_elem(since);
2921                if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2922                    let run_splits = (1..num_runs)
2923                        .map(|i| i * parts.len() / num_runs)
2924                        .collect::<Vec<_>>();
2925
2926                    let run_meta = (0..num_runs)
2927                        .enumerate()
2928                        .map(|(i, _)| {
2929                            let mut meta = RunMeta::default();
2930                            meta.id = Some(run_ids[i]);
2931                            meta
2932                        })
2933                        .collect::<Vec<_>>();
2934
2935                    HollowBatch::new(
2936                        Description::new(lower, upper, since),
2937                        parts,
2938                        len % 10,
2939                        run_meta,
2940                        run_splits,
2941                    )
2942                } else {
2943                    HollowBatch::new_run_for_test(
2944                        Description::new(lower, upper, since),
2945                        parts,
2946                        len % 10,
2947                        run_ids[0],
2948                    )
2949                }
2950            },
2951        )
2952    }
2953
2954    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2955        Strategy::prop_map(
2956            (
2957                any::<bool>(),
2958                any_hollow_batch_part(),
2959                any::<Option<T>>(),
2960                any::<Option<SchemaId>>(),
2961                any::<Option<SchemaId>>(),
2962            ),
2963            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2964                if is_hollow {
2965                    BatchPart::Hollow(hollow)
2966                } else {
2967                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2968                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2969                    BatchPart::Inline {
2970                        updates,
2971                        ts_rewrite,
2972                        schema_id,
2973                        deprecated_schema_id,
2974                    }
2975                }
2976            },
2977        )
2978    }
2979
2980    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2981        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2982    }
2983
2984    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2985    -> impl Strategy<Value = HollowBatchPart<T>> {
2986        Strategy::prop_map(
2987            (
2988                any::<PartialBatchKey>(),
2989                any::<usize>(),
2990                any::<Vec<u8>>(),
2991                any_some_lazy_part_stats(),
2992                any::<Option<T>>(),
2993                any::<[u8; 8]>(),
2994                any::<Option<BatchColumnarFormat>>(),
2995                any::<Option<SchemaId>>(),
2996                any::<Option<SchemaId>>(),
2997            ),
2998            |(
2999                key,
3000                encoded_size_bytes,
3001                key_lower,
3002                stats,
3003                ts_rewrite,
3004                diffs_sum,
3005                format,
3006                schema_id,
3007                deprecated_schema_id,
3008            )| {
3009                HollowBatchPart {
3010                    key,
3011                    meta: Default::default(),
3012                    encoded_size_bytes,
3013                    key_lower,
3014                    structured_key_lower: None,
3015                    stats,
3016                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3017                    diffs_sum: Some(diffs_sum),
3018                    format,
3019                    schema_id,
3020                    deprecated_schema_id,
3021                }
3022            },
3023        )
3024    }
3025
3026    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3027        Strategy::prop_map(
3028            (
3029                any::<SeqNo>(),
3030                any::<Option<T>>(),
3031                any::<u64>(),
3032                any::<u64>(),
3033                any::<HandleDebugState>(),
3034            ),
3035            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3036                // lease_duration_ms of 0 means this state was written by an old
3037                // version of code, which means we'll migrate it in the decode
3038                // path. Avoid.
3039                if lease_duration_ms == 0 {
3040                    lease_duration_ms += 1;
3041                }
3042                LeasedReaderState {
3043                    seqno,
3044                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
3045                    last_heartbeat_timestamp_ms,
3046                    lease_duration_ms,
3047                    debug,
3048                }
3049            },
3050        )
3051    }
3052
3053    pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3054    where
3055        T: Arbitrary,
3056    {
3057        Strategy::prop_map(
3058            (
3059                any::<Option<T>>(),
3060                any::<Opaque>(),
3061                any::<HandleDebugState>(),
3062            ),
3063            |(since, opaque, debug)| CriticalReaderState {
3064                since: since.map_or_else(Antichain::new, Antichain::from_elem),
3065                opaque,
3066                debug,
3067            },
3068        )
3069    }
3070
3071    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3072        Strategy::prop_map(
3073            (
3074                any::<u64>(),
3075                any::<u64>(),
3076                any::<IdempotencyToken>(),
3077                any::<Option<T>>(),
3078                any::<HandleDebugState>(),
3079            ),
3080            |(
3081                last_heartbeat_timestamp_ms,
3082                lease_duration_ms,
3083                most_recent_write_token,
3084                most_recent_write_upper,
3085                debug,
3086            )| WriterState {
3087                last_heartbeat_timestamp_ms,
3088                lease_duration_ms,
3089                most_recent_write_token,
3090                most_recent_write_upper: most_recent_write_upper
3091                    .map_or_else(Antichain::new, Antichain::from_elem),
3092                debug,
3093            },
3094        )
3095    }
3096
3097    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3098        Strategy::prop_map(
3099            (
3100                any::<Vec<u8>>(),
3101                any::<Vec<u8>>(),
3102                any::<Vec<u8>>(),
3103                any::<Vec<u8>>(),
3104            ),
3105            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3106                key: Bytes::from(key),
3107                key_data_type: Bytes::from(key_data_type),
3108                val: Bytes::from(val),
3109                val_data_type: Bytes::from(val_data_type),
3110            },
3111        )
3112    }
3113
3114    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3115        num_trace_batches: Range<usize>,
3116    ) -> impl Strategy<Value = State<T>> {
3117        let part1 = (
3118            any::<ShardId>(),
3119            any::<SeqNo>(),
3120            any::<u64>(),
3121            any::<String>(),
3122            any::<SeqNo>(),
3123            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3124            proptest::option::of(any::<ActiveRollup>()),
3125        );
3126
3127        let part2 = (
3128            proptest::option::of(any::<ActiveGc>()),
3129            proptest::collection::btree_map(
3130                any::<LeasedReaderId>(),
3131                any_leased_reader_state::<T>(),
3132                1..3,
3133            ),
3134            proptest::collection::btree_map(
3135                any::<CriticalReaderId>(),
3136                any_critical_reader_state::<T>(),
3137                1..3,
3138            ),
3139            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3140            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3141            any_trace::<T>(num_trace_batches),
3142        );
3143
3144        (part1, part2).prop_map(
3145            |(
3146                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3147                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3148            )| State {
3149                shard_id,
3150                seqno,
3151                walltime_ms,
3152                hostname,
3153                collections: StateCollections {
3154                    version: Version::new(1, 2, 3),
3155                    last_gc_req,
3156                    rollups,
3157                    active_rollup,
3158                    active_gc,
3159                    leased_readers,
3160                    critical_readers,
3161                    writers,
3162                    schemas,
3163                    trace,
3164                },
3165            },
3166        )
3167    }
3168
3169    pub(crate) fn hollow<T: Timestamp>(
3170        lower: T,
3171        upper: T,
3172        keys: &[&str],
3173        len: usize,
3174    ) -> HollowBatch<T> {
3175        HollowBatch::new_run(
3176            Description::new(
3177                Antichain::from_elem(lower),
3178                Antichain::from_elem(upper),
3179                Antichain::from_elem(T::minimum()),
3180            ),
3181            keys.iter()
3182                .map(|x| {
3183                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3184                        key: PartialBatchKey((*x).to_owned()),
3185                        meta: Default::default(),
3186                        encoded_size_bytes: 0,
3187                        key_lower: vec![],
3188                        structured_key_lower: None,
3189                        stats: None,
3190                        ts_rewrite: None,
3191                        diffs_sum: None,
3192                        format: None,
3193                        schema_id: None,
3194                        deprecated_schema_id: None,
3195                    }))
3196                })
3197                .collect(),
3198            len,
3199        )
3200    }
3201
3202    #[mz_ore::test]
3203    fn downgrade_since() {
3204        let mut state = TypedState::<(), (), u64, i64>::new(
3205            DUMMY_BUILD_INFO.semver_version(),
3206            ShardId::new(),
3207            "".to_owned(),
3208            0,
3209        );
3210        let reader = LeasedReaderId::new();
3211        let seqno = SeqNo::minimum();
3212        let now = SYSTEM_TIME.clone();
3213        let _ = state.collections.register_leased_reader(
3214            "",
3215            &reader,
3216            "",
3217            seqno,
3218            Duration::from_secs(10),
3219            now(),
3220            false,
3221        );
3222
3223        // The shard global since == 0 initially.
3224        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3225
3226        // Greater
3227        assert_eq!(
3228            state.collections.downgrade_since(
3229                &reader,
3230                seqno,
3231                seqno,
3232                &Antichain::from_elem(2),
3233                now()
3234            ),
3235            Continue(Since(Antichain::from_elem(2)))
3236        );
3237        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3238        // Equal (no-op)
3239        assert_eq!(
3240            state.collections.downgrade_since(
3241                &reader,
3242                seqno,
3243                seqno,
3244                &Antichain::from_elem(2),
3245                now()
3246            ),
3247            Continue(Since(Antichain::from_elem(2)))
3248        );
3249        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3250        // Less (no-op)
3251        assert_eq!(
3252            state.collections.downgrade_since(
3253                &reader,
3254                seqno,
3255                seqno,
3256                &Antichain::from_elem(1),
3257                now()
3258            ),
3259            Continue(Since(Antichain::from_elem(2)))
3260        );
3261        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3262
3263        // Create a second reader.
3264        let reader2 = LeasedReaderId::new();
3265        let _ = state.collections.register_leased_reader(
3266            "",
3267            &reader2,
3268            "",
3269            seqno,
3270            Duration::from_secs(10),
3271            now(),
3272            false,
3273        );
3274
3275        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3276        assert_eq!(
3277            state.collections.downgrade_since(
3278                &reader2,
3279                seqno,
3280                seqno,
3281                &Antichain::from_elem(3),
3282                now()
3283            ),
3284            Continue(Since(Antichain::from_elem(3)))
3285        );
3286        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3287        // Shard since == 3 when all readers have since >= 3.
3288        assert_eq!(
3289            state.collections.downgrade_since(
3290                &reader,
3291                seqno,
3292                seqno,
3293                &Antichain::from_elem(5),
3294                now()
3295            ),
3296            Continue(Since(Antichain::from_elem(5)))
3297        );
3298        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3299
3300        // Shard since unaffected readers with since > shard since expiring.
3301        assert_eq!(
3302            state.collections.expire_leased_reader(&reader),
3303            Continue(true)
3304        );
3305        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3306
3307        // Create a third reader.
3308        let reader3 = LeasedReaderId::new();
3309        let _ = state.collections.register_leased_reader(
3310            "",
3311            &reader3,
3312            "",
3313            seqno,
3314            Duration::from_secs(10),
3315            now(),
3316            false,
3317        );
3318
3319        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3320        assert_eq!(
3321            state.collections.downgrade_since(
3322                &reader3,
3323                seqno,
3324                seqno,
3325                &Antichain::from_elem(10),
3326                now()
3327            ),
3328            Continue(Since(Antichain::from_elem(10)))
3329        );
3330        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3331
3332        // Shard since advances when reader with the minimal since expires.
3333        assert_eq!(
3334            state.collections.expire_leased_reader(&reader2),
3335            Continue(true)
3336        );
3337        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3338        // Switch this assertion back when we re-enable this.
3339        //
3340        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3341        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3342
3343        // Shard since unaffected when all readers are expired.
3344        assert_eq!(
3345            state.collections.expire_leased_reader(&reader3),
3346            Continue(true)
3347        );
3348        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3349        // Switch this assertion back when we re-enable this.
3350        //
3351        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3352        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3353    }
3354
3355    #[mz_ore::test]
3356    fn compare_and_downgrade_since() {
3357        let mut state = TypedState::<(), (), u64, i64>::new(
3358            DUMMY_BUILD_INFO.semver_version(),
3359            ShardId::new(),
3360            "".to_owned(),
3361            0,
3362        );
3363        let reader = CriticalReaderId::new();
3364        let _ = state
3365            .collections
3366            .register_critical_reader("", &reader, Opaque::encode(&0u64), "");
3367
3368        // The shard global since == 0 initially.
3369        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3370        // The initial opaque value should be set.
3371        assert_eq!(
3372            state
3373                .collections
3374                .critical_reader(&reader)
3375                .opaque
3376                .decode::<u64>(),
3377            u64::MIN
3378        );
3379
3380        // Greater
3381        assert_eq!(
3382            state.collections.compare_and_downgrade_since(
3383                &reader,
3384                &Opaque::encode(&0u64),
3385                (&Opaque::encode(&1u64), &Antichain::from_elem(2)),
3386            ),
3387            Continue(Ok(Since(Antichain::from_elem(2))))
3388        );
3389        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3390        assert_eq!(
3391            state
3392                .collections
3393                .critical_reader(&reader)
3394                .opaque
3395                .decode::<u64>(),
3396            1
3397        );
3398        // Equal (no-op)
3399        assert_eq!(
3400            state.collections.compare_and_downgrade_since(
3401                &reader,
3402                &Opaque::encode(&1u64),
3403                (&Opaque::encode(&2u64), &Antichain::from_elem(2)),
3404            ),
3405            Continue(Ok(Since(Antichain::from_elem(2))))
3406        );
3407        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3408        assert_eq!(
3409            state
3410                .collections
3411                .critical_reader(&reader)
3412                .opaque
3413                .decode::<u64>(),
3414            2
3415        );
3416        // Less (no-op)
3417        assert_eq!(
3418            state.collections.compare_and_downgrade_since(
3419                &reader,
3420                &Opaque::encode(&2u64),
3421                (&Opaque::encode(&3u64), &Antichain::from_elem(1)),
3422            ),
3423            Continue(Ok(Since(Antichain::from_elem(2))))
3424        );
3425        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3426        assert_eq!(
3427            state
3428                .collections
3429                .critical_reader(&reader)
3430                .opaque
3431                .decode::<u64>(),
3432            3
3433        );
3434    }
3435
3436    #[mz_ore::test]
3437    fn compare_and_append() {
3438        let state = &mut TypedState::<String, String, u64, i64>::new(
3439            DUMMY_BUILD_INFO.semver_version(),
3440            ShardId::new(),
3441            "".to_owned(),
3442            0,
3443        )
3444        .collections;
3445
3446        let writer_id = WriterId::new();
3447        let now = SYSTEM_TIME.clone();
3448
3449        // State is initially empty.
3450        assert_eq!(state.trace.num_spine_batches(), 0);
3451        assert_eq!(state.trace.num_hollow_batches(), 0);
3452        assert_eq!(state.trace.num_updates(), 0);
3453
3454        // Cannot insert a batch with a lower != current shard upper.
3455        assert_eq!(
3456            state.compare_and_append(
3457                &hollow(1, 2, &["key1"], 1),
3458                &writer_id,
3459                now(),
3460                LEASE_DURATION_MS,
3461                &IdempotencyToken::new(),
3462                &debug_state(),
3463                0,
3464                100,
3465                None
3466            ),
3467            Break(CompareAndAppendBreak::Upper {
3468                shard_upper: Antichain::from_elem(0),
3469                writer_upper: Antichain::from_elem(0)
3470            })
3471        );
3472
3473        // Insert an empty batch with an upper > lower..
3474        assert!(
3475            state
3476                .compare_and_append(
3477                    &hollow(0, 5, &[], 0),
3478                    &writer_id,
3479                    now(),
3480                    LEASE_DURATION_MS,
3481                    &IdempotencyToken::new(),
3482                    &debug_state(),
3483                    0,
3484                    100,
3485                    None
3486                )
3487                .is_continue()
3488        );
3489
3490        // Cannot insert a batch with a upper less than the lower.
3491        assert_eq!(
3492            state.compare_and_append(
3493                &hollow(5, 4, &["key1"], 1),
3494                &writer_id,
3495                now(),
3496                LEASE_DURATION_MS,
3497                &IdempotencyToken::new(),
3498                &debug_state(),
3499                0,
3500                100,
3501                None
3502            ),
3503            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3504                lower: Antichain::from_elem(5),
3505                upper: Antichain::from_elem(4)
3506            }))
3507        );
3508
3509        // Cannot insert a nonempty batch with an upper equal to lower.
3510        assert_eq!(
3511            state.compare_and_append(
3512                &hollow(5, 5, &["key1"], 1),
3513                &writer_id,
3514                now(),
3515                LEASE_DURATION_MS,
3516                &IdempotencyToken::new(),
3517                &debug_state(),
3518                0,
3519                100,
3520                None
3521            ),
3522            Break(CompareAndAppendBreak::InvalidUsage(
3523                InvalidEmptyTimeInterval {
3524                    lower: Antichain::from_elem(5),
3525                    upper: Antichain::from_elem(5),
3526                    keys: vec!["key1".to_owned()],
3527                }
3528            ))
3529        );
3530
3531        // Can insert an empty batch with an upper equal to lower.
3532        assert!(
3533            state
3534                .compare_and_append(
3535                    &hollow(5, 5, &[], 0),
3536                    &writer_id,
3537                    now(),
3538                    LEASE_DURATION_MS,
3539                    &IdempotencyToken::new(),
3540                    &debug_state(),
3541                    0,
3542                    100,
3543                    None
3544                )
3545                .is_continue()
3546        );
3547    }
3548
3549    #[mz_ore::test]
3550    fn snapshot() {
3551        let now = SYSTEM_TIME.clone();
3552
3553        let mut state = TypedState::<String, String, u64, i64>::new(
3554            DUMMY_BUILD_INFO.semver_version(),
3555            ShardId::new(),
3556            "".to_owned(),
3557            0,
3558        );
3559        // Cannot take a snapshot with as_of == shard upper.
3560        assert_eq!(
3561            state.snapshot(&Antichain::from_elem(0)),
3562            Err(SnapshotErr::AsOfNotYetAvailable(
3563                SeqNo(0),
3564                Upper(Antichain::from_elem(0))
3565            ))
3566        );
3567
3568        // Cannot take a snapshot with as_of > shard upper.
3569        assert_eq!(
3570            state.snapshot(&Antichain::from_elem(5)),
3571            Err(SnapshotErr::AsOfNotYetAvailable(
3572                SeqNo(0),
3573                Upper(Antichain::from_elem(0))
3574            ))
3575        );
3576
3577        let writer_id = WriterId::new();
3578
3579        // Advance upper to 5.
3580        assert!(
3581            state
3582                .collections
3583                .compare_and_append(
3584                    &hollow(0, 5, &["key1"], 1),
3585                    &writer_id,
3586                    now(),
3587                    LEASE_DURATION_MS,
3588                    &IdempotencyToken::new(),
3589                    &debug_state(),
3590                    0,
3591                    100,
3592                    None
3593                )
3594                .is_continue()
3595        );
3596
3597        // Can take a snapshot with as_of < upper.
3598        assert_eq!(
3599            state.snapshot(&Antichain::from_elem(0)),
3600            Ok(vec![hollow(0, 5, &["key1"], 1)])
3601        );
3602
3603        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3604        assert_eq!(
3605            state.snapshot(&Antichain::from_elem(4)),
3606            Ok(vec![hollow(0, 5, &["key1"], 1)])
3607        );
3608
3609        // Cannot take a snapshot with as_of >= upper.
3610        assert_eq!(
3611            state.snapshot(&Antichain::from_elem(5)),
3612            Err(SnapshotErr::AsOfNotYetAvailable(
3613                SeqNo(0),
3614                Upper(Antichain::from_elem(5))
3615            ))
3616        );
3617        assert_eq!(
3618            state.snapshot(&Antichain::from_elem(6)),
3619            Err(SnapshotErr::AsOfNotYetAvailable(
3620                SeqNo(0),
3621                Upper(Antichain::from_elem(5))
3622            ))
3623        );
3624
3625        let reader = LeasedReaderId::new();
3626        // Advance the since to 2.
3627        let _ = state.collections.register_leased_reader(
3628            "",
3629            &reader,
3630            "",
3631            SeqNo::minimum(),
3632            Duration::from_secs(10),
3633            now(),
3634            false,
3635        );
3636        assert_eq!(
3637            state.collections.downgrade_since(
3638                &reader,
3639                SeqNo::minimum(),
3640                SeqNo::minimum(),
3641                &Antichain::from_elem(2),
3642                now()
3643            ),
3644            Continue(Since(Antichain::from_elem(2)))
3645        );
3646        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3647        // Cannot take a snapshot with as_of < shard_since.
3648        assert_eq!(
3649            state.snapshot(&Antichain::from_elem(1)),
3650            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3651                Antichain::from_elem(2)
3652            )))
3653        );
3654
3655        // Advance the upper to 10 via an empty batch.
3656        assert!(
3657            state
3658                .collections
3659                .compare_and_append(
3660                    &hollow(5, 10, &[], 0),
3661                    &writer_id,
3662                    now(),
3663                    LEASE_DURATION_MS,
3664                    &IdempotencyToken::new(),
3665                    &debug_state(),
3666                    0,
3667                    100,
3668                    None
3669                )
3670                .is_continue()
3671        );
3672
3673        // Can still take snapshots at times < upper.
3674        assert_eq!(
3675            state.snapshot(&Antichain::from_elem(7)),
3676            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3677        );
3678
3679        // Cannot take snapshots with as_of >= upper.
3680        assert_eq!(
3681            state.snapshot(&Antichain::from_elem(10)),
3682            Err(SnapshotErr::AsOfNotYetAvailable(
3683                SeqNo(0),
3684                Upper(Antichain::from_elem(10))
3685            ))
3686        );
3687
3688        // Advance upper to 15.
3689        assert!(
3690            state
3691                .collections
3692                .compare_and_append(
3693                    &hollow(10, 15, &["key2"], 1),
3694                    &writer_id,
3695                    now(),
3696                    LEASE_DURATION_MS,
3697                    &IdempotencyToken::new(),
3698                    &debug_state(),
3699                    0,
3700                    100,
3701                    None
3702                )
3703                .is_continue()
3704        );
3705
3706        // Filter out batches whose lowers are less than the requested as of (the
3707        // batches that are too far in the future for the requested as_of).
3708        assert_eq!(
3709            state.snapshot(&Antichain::from_elem(9)),
3710            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3711        );
3712
3713        // Don't filter out batches whose lowers are <= the requested as_of.
3714        assert_eq!(
3715            state.snapshot(&Antichain::from_elem(10)),
3716            Ok(vec![
3717                hollow(0, 5, &["key1"], 1),
3718                hollow(5, 10, &[], 0),
3719                hollow(10, 15, &["key2"], 1)
3720            ])
3721        );
3722
3723        assert_eq!(
3724            state.snapshot(&Antichain::from_elem(11)),
3725            Ok(vec![
3726                hollow(0, 5, &["key1"], 1),
3727                hollow(5, 10, &[], 0),
3728                hollow(10, 15, &["key2"], 1)
3729            ])
3730        );
3731    }
3732
3733    #[mz_ore::test]
3734    fn next_listen_batch() {
3735        let mut state = TypedState::<String, String, u64, i64>::new(
3736            DUMMY_BUILD_INFO.semver_version(),
3737            ShardId::new(),
3738            "".to_owned(),
3739            0,
3740        );
3741
3742        // Empty collection never has any batches to listen for, regardless of the
3743        // current frontier.
3744        assert_eq!(
3745            state.next_listen_batch(&Antichain::from_elem(0)),
3746            Err(SeqNo(0))
3747        );
3748        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3749
3750        let writer_id = WriterId::new();
3751        let now = SYSTEM_TIME.clone();
3752
3753        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3754        assert!(
3755            state
3756                .collections
3757                .compare_and_append(
3758                    &hollow(0, 5, &["key1"], 1),
3759                    &writer_id,
3760                    now(),
3761                    LEASE_DURATION_MS,
3762                    &IdempotencyToken::new(),
3763                    &debug_state(),
3764                    0,
3765                    100,
3766                    None
3767                )
3768                .is_continue()
3769        );
3770        assert!(
3771            state
3772                .collections
3773                .compare_and_append(
3774                    &hollow(5, 10, &["key2"], 1),
3775                    &writer_id,
3776                    now(),
3777                    LEASE_DURATION_MS,
3778                    &IdempotencyToken::new(),
3779                    &debug_state(),
3780                    0,
3781                    100,
3782                    None
3783                )
3784                .is_continue()
3785        );
3786
3787        // All frontiers in [0, 5) return the first batch.
3788        for t in 0..=4 {
3789            assert_eq!(
3790                state.next_listen_batch(&Antichain::from_elem(t)),
3791                Ok(hollow(0, 5, &["key1"], 1))
3792            );
3793        }
3794
3795        // All frontiers in [5, 10) return the second batch.
3796        for t in 5..=9 {
3797            assert_eq!(
3798                state.next_listen_batch(&Antichain::from_elem(t)),
3799                Ok(hollow(5, 10, &["key2"], 1))
3800            );
3801        }
3802
3803        // There is no batch currently available for t = 10.
3804        assert_eq!(
3805            state.next_listen_batch(&Antichain::from_elem(10)),
3806            Err(SeqNo(0))
3807        );
3808
3809        // By definition, there is no frontier ever at the empty antichain which
3810        // is the time after all possible times.
3811        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3812    }
3813
3814    #[mz_ore::test]
3815    fn expire_writer() {
3816        let mut state = TypedState::<String, String, u64, i64>::new(
3817            DUMMY_BUILD_INFO.semver_version(),
3818            ShardId::new(),
3819            "".to_owned(),
3820            0,
3821        );
3822        let now = SYSTEM_TIME.clone();
3823
3824        let writer_id_one = WriterId::new();
3825
3826        let writer_id_two = WriterId::new();
3827
3828        // Writer is eligible to write
3829        assert!(
3830            state
3831                .collections
3832                .compare_and_append(
3833                    &hollow(0, 2, &["key1"], 1),
3834                    &writer_id_one,
3835                    now(),
3836                    LEASE_DURATION_MS,
3837                    &IdempotencyToken::new(),
3838                    &debug_state(),
3839                    0,
3840                    100,
3841                    None
3842                )
3843                .is_continue()
3844        );
3845
3846        assert!(
3847            state
3848                .collections
3849                .expire_writer(&writer_id_one)
3850                .is_continue()
3851        );
3852
3853        // Other writers should still be able to write
3854        assert!(
3855            state
3856                .collections
3857                .compare_and_append(
3858                    &hollow(2, 5, &["key2"], 1),
3859                    &writer_id_two,
3860                    now(),
3861                    LEASE_DURATION_MS,
3862                    &IdempotencyToken::new(),
3863                    &debug_state(),
3864                    0,
3865                    100,
3866                    None
3867                )
3868                .is_continue()
3869        );
3870    }
3871
3872    #[mz_ore::test]
3873    fn maybe_gc_active_gc() {
3874        const GC_CONFIG: GcConfig = GcConfig {
3875            use_active_gc: true,
3876            fallback_threshold_ms: 5000,
3877            min_versions: 99,
3878            max_versions: 500,
3879        };
3880        let now_fn = SYSTEM_TIME.clone();
3881
3882        let mut state = TypedState::<String, String, u64, i64>::new(
3883            DUMMY_BUILD_INFO.semver_version(),
3884            ShardId::new(),
3885            "".to_owned(),
3886            0,
3887        );
3888
3889        let now = now_fn();
3890        // Empty state doesn't need gc, regardless of is_write.
3891        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3892        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3893
3894        // Artificially advance the seqno so the seqno_since advances past our
3895        // internal gc_threshold.
3896        state.seqno = SeqNo(100);
3897        assert_eq!(state.seqno_since(), SeqNo(100));
3898
3899        // When a writer is present, non-writes don't gc.
3900        let writer_id = WriterId::new();
3901        let _ = state.collections.compare_and_append(
3902            &hollow(1, 2, &["key1"], 1),
3903            &writer_id,
3904            now,
3905            LEASE_DURATION_MS,
3906            &IdempotencyToken::new(),
3907            &debug_state(),
3908            0,
3909            100,
3910            None,
3911        );
3912        assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3913
3914        // A write will gc though.
3915        assert_eq!(
3916            state.maybe_gc(true, now, GC_CONFIG),
3917            Some(GcReq {
3918                shard_id: state.shard_id,
3919                new_seqno_since: SeqNo(100)
3920            })
3921        );
3922
3923        // But if we write down an active gc, we won't gc.
3924        state.collections.active_gc = Some(ActiveGc {
3925            seqno: state.seqno,
3926            start_ms: now,
3927        });
3928
3929        state.seqno = SeqNo(200);
3930        assert_eq!(state.seqno_since(), SeqNo(200));
3931
3932        assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3933
3934        state.seqno = SeqNo(300);
3935        assert_eq!(state.seqno_since(), SeqNo(300));
3936        // But if we advance the time past the threshold, we will gc.
3937        let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3938        assert_eq!(
3939            state.maybe_gc(true, new_now, GC_CONFIG),
3940            Some(GcReq {
3941                shard_id: state.shard_id,
3942                new_seqno_since: SeqNo(300)
3943            })
3944        );
3945
3946        // Even if the sequence number doesn't pass the threshold, if the
3947        // active gc is expired, we will gc.
3948
3949        state.seqno = SeqNo(301);
3950        assert_eq!(state.seqno_since(), SeqNo(301));
3951        assert_eq!(
3952            state.maybe_gc(true, new_now, GC_CONFIG),
3953            Some(GcReq {
3954                shard_id: state.shard_id,
3955                new_seqno_since: SeqNo(301)
3956            })
3957        );
3958
3959        state.collections.active_gc = None;
3960
3961        // Artificially advance the seqno (again) so the seqno_since advances
3962        // past our internal gc_threshold (again).
3963        state.seqno = SeqNo(400);
3964        assert_eq!(state.seqno_since(), SeqNo(400));
3965
3966        let now = now_fn();
3967
3968        // If there are no writers, even a non-write will gc.
3969        let _ = state.collections.expire_writer(&writer_id);
3970        assert_eq!(
3971            state.maybe_gc(false, now, GC_CONFIG),
3972            Some(GcReq {
3973                shard_id: state.shard_id,
3974                new_seqno_since: SeqNo(400)
3975            })
3976        );
3977
3978        // Upper-bound the number of seqnos we'll attempt to collect in one go.
3979        let previous_seqno = state.seqno;
3980        state.seqno = SeqNo(10_000);
3981        assert_eq!(state.seqno_since(), SeqNo(10_000));
3982
3983        let now = now_fn();
3984        assert_eq!(
3985            state.maybe_gc(true, now, GC_CONFIG),
3986            Some(GcReq {
3987                shard_id: state.shard_id,
3988                new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3989            })
3990        );
3991    }
3992
3993    #[mz_ore::test]
3994    fn maybe_gc_classic() {
3995        const GC_CONFIG: GcConfig = GcConfig {
3996            use_active_gc: false,
3997            fallback_threshold_ms: 5000,
3998            min_versions: 16,
3999            max_versions: 128,
4000        };
4001        const NOW_MS: u64 = 0;
4002
4003        let mut state = TypedState::<String, String, u64, i64>::new(
4004            DUMMY_BUILD_INFO.semver_version(),
4005            ShardId::new(),
4006            "".to_owned(),
4007            0,
4008        );
4009
4010        // Empty state doesn't need gc, regardless of is_write.
4011        assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
4012        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4013
4014        // Artificially advance the seqno so the seqno_since advances past our
4015        // internal gc_threshold.
4016        state.seqno = SeqNo(100);
4017        assert_eq!(state.seqno_since(), SeqNo(100));
4018
4019        // When a writer is present, non-writes don't gc.
4020        let writer_id = WriterId::new();
4021        let now = SYSTEM_TIME.clone();
4022        let _ = state.collections.compare_and_append(
4023            &hollow(1, 2, &["key1"], 1),
4024            &writer_id,
4025            now(),
4026            LEASE_DURATION_MS,
4027            &IdempotencyToken::new(),
4028            &debug_state(),
4029            0,
4030            100,
4031            None,
4032        );
4033        assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4034
4035        // A write will gc though.
4036        assert_eq!(
4037            state.maybe_gc(true, NOW_MS, GC_CONFIG),
4038            Some(GcReq {
4039                shard_id: state.shard_id,
4040                new_seqno_since: SeqNo(100)
4041            })
4042        );
4043
4044        // Artificially advance the seqno (again) so the seqno_since advances
4045        // past our internal gc_threshold (again).
4046        state.seqno = SeqNo(200);
4047        assert_eq!(state.seqno_since(), SeqNo(200));
4048
4049        // If there are no writers, even a non-write will gc.
4050        let _ = state.collections.expire_writer(&writer_id);
4051        assert_eq!(
4052            state.maybe_gc(false, NOW_MS, GC_CONFIG),
4053            Some(GcReq {
4054                shard_id: state.shard_id,
4055                new_seqno_since: SeqNo(200)
4056            })
4057        );
4058    }
4059
4060    #[mz_ore::test]
4061    fn need_rollup_active_rollup() {
4062        const ROLLUP_THRESHOLD: usize = 3;
4063        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4064        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4065        let now = SYSTEM_TIME.clone();
4066
4067        mz_ore::test::init_logging();
4068        let mut state = TypedState::<String, String, u64, i64>::new(
4069            DUMMY_BUILD_INFO.semver_version(),
4070            ShardId::new(),
4071            "".to_owned(),
4072            0,
4073        );
4074
4075        let rollup_seqno = SeqNo(5);
4076        let rollup = HollowRollup {
4077            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4078            encoded_size_bytes: None,
4079        };
4080
4081        assert!(
4082            state
4083                .collections
4084                .add_rollup((rollup_seqno, &rollup))
4085                .is_continue()
4086        );
4087
4088        // shouldn't need a rollup at the seqno of the rollup
4089        state.seqno = SeqNo(5);
4090        assert_none!(state.need_rollup(
4091            ROLLUP_THRESHOLD,
4092            ROLLUP_USE_ACTIVE_ROLLUP,
4093            ROLLUP_FALLBACK_THRESHOLD_MS,
4094            now()
4095        ));
4096
4097        // shouldn't need a rollup at seqnos less than our threshold
4098        state.seqno = SeqNo(6);
4099        assert_none!(state.need_rollup(
4100            ROLLUP_THRESHOLD,
4101            ROLLUP_USE_ACTIVE_ROLLUP,
4102            ROLLUP_FALLBACK_THRESHOLD_MS,
4103            now()
4104        ));
4105        state.seqno = SeqNo(7);
4106        assert_none!(state.need_rollup(
4107            ROLLUP_THRESHOLD,
4108            ROLLUP_USE_ACTIVE_ROLLUP,
4109            ROLLUP_FALLBACK_THRESHOLD_MS,
4110            now()
4111        ));
4112        state.seqno = SeqNo(8);
4113        assert_none!(state.need_rollup(
4114            ROLLUP_THRESHOLD,
4115            ROLLUP_USE_ACTIVE_ROLLUP,
4116            ROLLUP_FALLBACK_THRESHOLD_MS,
4117            now()
4118        ));
4119
4120        let mut current_time = now();
4121        // hit our threshold! we should need a rollup
4122        state.seqno = SeqNo(9);
4123        assert_eq!(
4124            state
4125                .need_rollup(
4126                    ROLLUP_THRESHOLD,
4127                    ROLLUP_USE_ACTIVE_ROLLUP,
4128                    ROLLUP_FALLBACK_THRESHOLD_MS,
4129                    current_time
4130                )
4131                .expect("rollup"),
4132            SeqNo(9)
4133        );
4134
4135        state.collections.active_rollup = Some(ActiveRollup {
4136            seqno: SeqNo(9),
4137            start_ms: current_time,
4138        });
4139
4140        // There is now an active rollup, so we shouldn't need a rollup.
4141        assert_none!(state.need_rollup(
4142            ROLLUP_THRESHOLD,
4143            ROLLUP_USE_ACTIVE_ROLLUP,
4144            ROLLUP_FALLBACK_THRESHOLD_MS,
4145            current_time
4146        ));
4147
4148        state.seqno = SeqNo(10);
4149        // We still don't need a rollup, even though the seqno is greater than
4150        // the rollup threshold.
4151        assert_none!(state.need_rollup(
4152            ROLLUP_THRESHOLD,
4153            ROLLUP_USE_ACTIVE_ROLLUP,
4154            ROLLUP_FALLBACK_THRESHOLD_MS,
4155            current_time
4156        ));
4157
4158        // But if we wait long enough, we should need a rollup again.
4159        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4160        assert_eq!(
4161            state
4162                .need_rollup(
4163                    ROLLUP_THRESHOLD,
4164                    ROLLUP_USE_ACTIVE_ROLLUP,
4165                    ROLLUP_FALLBACK_THRESHOLD_MS,
4166                    current_time
4167                )
4168                .expect("rollup"),
4169            SeqNo(10)
4170        );
4171
4172        state.seqno = SeqNo(9);
4173        // Clear the active rollup and ensure we need a rollup again.
4174        state.collections.active_rollup = None;
4175        let rollup_seqno = SeqNo(9);
4176        let rollup = HollowRollup {
4177            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4178            encoded_size_bytes: None,
4179        };
4180        assert!(
4181            state
4182                .collections
4183                .add_rollup((rollup_seqno, &rollup))
4184                .is_continue()
4185        );
4186
4187        state.seqno = SeqNo(11);
4188        // We shouldn't need a rollup at seqnos less than our threshold
4189        assert_none!(state.need_rollup(
4190            ROLLUP_THRESHOLD,
4191            ROLLUP_USE_ACTIVE_ROLLUP,
4192            ROLLUP_FALLBACK_THRESHOLD_MS,
4193            current_time
4194        ));
4195        // hit our threshold! we should need a rollup
4196        state.seqno = SeqNo(13);
4197        assert_eq!(
4198            state
4199                .need_rollup(
4200                    ROLLUP_THRESHOLD,
4201                    ROLLUP_USE_ACTIVE_ROLLUP,
4202                    ROLLUP_FALLBACK_THRESHOLD_MS,
4203                    current_time
4204                )
4205                .expect("rollup"),
4206            SeqNo(13)
4207        );
4208    }
4209
4210    #[mz_ore::test]
4211    fn need_rollup_classic() {
4212        const ROLLUP_THRESHOLD: usize = 3;
4213        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4214        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4215        const NOW: u64 = 0;
4216
4217        mz_ore::test::init_logging();
4218        let mut state = TypedState::<String, String, u64, i64>::new(
4219            DUMMY_BUILD_INFO.semver_version(),
4220            ShardId::new(),
4221            "".to_owned(),
4222            0,
4223        );
4224
4225        let rollup_seqno = SeqNo(5);
4226        let rollup = HollowRollup {
4227            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4228            encoded_size_bytes: None,
4229        };
4230
4231        assert!(
4232            state
4233                .collections
4234                .add_rollup((rollup_seqno, &rollup))
4235                .is_continue()
4236        );
4237
4238        // shouldn't need a rollup at the seqno of the rollup
4239        state.seqno = SeqNo(5);
4240        assert_none!(state.need_rollup(
4241            ROLLUP_THRESHOLD,
4242            ROLLUP_USE_ACTIVE_ROLLUP,
4243            ROLLUP_FALLBACK_THRESHOLD_MS,
4244            NOW
4245        ));
4246
4247        // shouldn't need a rollup at seqnos less than our threshold
4248        state.seqno = SeqNo(6);
4249        assert_none!(state.need_rollup(
4250            ROLLUP_THRESHOLD,
4251            ROLLUP_USE_ACTIVE_ROLLUP,
4252            ROLLUP_FALLBACK_THRESHOLD_MS,
4253            NOW
4254        ));
4255        state.seqno = SeqNo(7);
4256        assert_none!(state.need_rollup(
4257            ROLLUP_THRESHOLD,
4258            ROLLUP_USE_ACTIVE_ROLLUP,
4259            ROLLUP_FALLBACK_THRESHOLD_MS,
4260            NOW
4261        ));
4262
4263        // hit our threshold! we should need a rollup
4264        state.seqno = SeqNo(8);
4265        assert_eq!(
4266            state
4267                .need_rollup(
4268                    ROLLUP_THRESHOLD,
4269                    ROLLUP_USE_ACTIVE_ROLLUP,
4270                    ROLLUP_FALLBACK_THRESHOLD_MS,
4271                    NOW
4272                )
4273                .expect("rollup"),
4274            SeqNo(8)
4275        );
4276
4277        // but we don't need rollups for every seqno > the threshold
4278        state.seqno = SeqNo(9);
4279        assert_none!(state.need_rollup(
4280            ROLLUP_THRESHOLD,
4281            ROLLUP_USE_ACTIVE_ROLLUP,
4282            ROLLUP_FALLBACK_THRESHOLD_MS,
4283            NOW
4284        ));
4285
4286        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4287        state.seqno = SeqNo(11);
4288        assert_eq!(
4289            state
4290                .need_rollup(
4291                    ROLLUP_THRESHOLD,
4292                    ROLLUP_USE_ACTIVE_ROLLUP,
4293                    ROLLUP_FALLBACK_THRESHOLD_MS,
4294                    NOW
4295                )
4296                .expect("rollup"),
4297            SeqNo(11)
4298        );
4299
4300        // add another rollup and ensure we're always picking the latest
4301        let rollup_seqno = SeqNo(6);
4302        let rollup = HollowRollup {
4303            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4304            encoded_size_bytes: None,
4305        };
4306        assert!(
4307            state
4308                .collections
4309                .add_rollup((rollup_seqno, &rollup))
4310                .is_continue()
4311        );
4312
4313        state.seqno = SeqNo(8);
4314        assert_none!(state.need_rollup(
4315            ROLLUP_THRESHOLD,
4316            ROLLUP_USE_ACTIVE_ROLLUP,
4317            ROLLUP_FALLBACK_THRESHOLD_MS,
4318            NOW
4319        ));
4320        state.seqno = SeqNo(9);
4321        assert_eq!(
4322            state
4323                .need_rollup(
4324                    ROLLUP_THRESHOLD,
4325                    ROLLUP_USE_ACTIVE_ROLLUP,
4326                    ROLLUP_FALLBACK_THRESHOLD_MS,
4327                    NOW
4328                )
4329                .expect("rollup"),
4330            SeqNo(9)
4331        );
4332
4333        // and ensure that after a fallback point, we assign every seqno work
4334        let fallback_seqno = SeqNo(
4335            rollup_seqno.0
4336                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4337        );
4338        state.seqno = fallback_seqno;
4339        assert_eq!(
4340            state
4341                .need_rollup(
4342                    ROLLUP_THRESHOLD,
4343                    ROLLUP_USE_ACTIVE_ROLLUP,
4344                    ROLLUP_FALLBACK_THRESHOLD_MS,
4345                    NOW
4346                )
4347                .expect("rollup"),
4348            fallback_seqno
4349        );
4350        state.seqno = fallback_seqno.next();
4351        assert_eq!(
4352            state
4353                .need_rollup(
4354                    ROLLUP_THRESHOLD,
4355                    ROLLUP_USE_ACTIVE_ROLLUP,
4356                    ROLLUP_FALLBACK_THRESHOLD_MS,
4357                    NOW
4358                )
4359                .expect("rollup"),
4360            fallback_seqno.next()
4361        );
4362    }
4363
4364    #[mz_ore::test]
4365    fn idempotency_token_sentinel() {
4366        assert_eq!(
4367            IdempotencyToken::SENTINEL.to_string(),
4368            "i11111111-1111-1111-1111-111111111111"
4369        );
4370    }
4371
4372    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4373    /// randomness, so that it's deterministic. This lets us assert the
4374    /// serialization of that State against a golden file that's committed,
4375    /// making it easy to see what the serialization (used in an upcoming
4376    /// INSPECT feature) looks like.
4377    ///
4378    /// This golden will have to be updated each time we change State, but
4379    /// that's a feature, not a bug.
4380    #[mz_ore::test]
4381    #[cfg_attr(miri, ignore)] // too slow
4382    fn state_inspect_serde_json() {
4383        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4384        let mut runner = proptest::test_runner::TestRunner::deterministic();
4385        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4386        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4387        assert_eq!(
4388            json.trim(),
4389            STATE_SERDE_JSON.trim(),
4390            "\n\nNEW GOLDEN\n{}\n",
4391            json
4392        );
4393    }
4394
4395    #[mz_persist_proc::test(tokio::test)]
4396    #[cfg_attr(miri, ignore)] // too slow
4397    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4398        let mut clients = new_test_client_cache(&dyncfgs);
4399        let shard_id = ShardId::new();
4400
4401        async fn open_and_write(
4402            clients: &mut PersistClientCache,
4403            version: semver::Version,
4404            shard_id: ShardId,
4405        ) -> Result<(), tokio::task::JoinError> {
4406            clients.cfg.build_version = version.clone();
4407            clients.clear_state_cache();
4408            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4409            // Run in a task so we can catch the panic.
4410            mz_ore::task::spawn(|| version.to_string(), async move {
4411                let () = client
4412                    .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4413                    .await
4414                    .expect("valid usage");
4415                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4416                let current = *write.upper().as_option().unwrap();
4417                // Do a write so that we tag the state with the version.
4418                write
4419                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4420                    .await;
4421            })
4422            .into_tokio_handle()
4423            .await
4424        }
4425
4426        // Start at v0.10.0.
4427        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4428        assert_ok!(res);
4429
4430        // Upgrade to v0.11.0 is allowed.
4431        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4432        assert_ok!(res);
4433
4434        // Downgrade to v0.10.0 is no longer allowed.
4435        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4436        assert!(res.unwrap_err().is_panic());
4437
4438        // Downgrade to v0.9.0 is _NOT_ allowed.
4439        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4440        assert!(res.unwrap_err().is_panic());
4441    }
4442
4443    #[mz_ore::test]
4444    fn runid_roundtrip() {
4445        proptest!(|(runid: RunId)| {
4446            let runid_str = runid.to_string();
4447            let parsed = RunId::from_str(&runid_str);
4448            prop_assert_eq!(parsed, Ok(runid));
4449        });
4450    }
4451
4452    /// Regression for PER-16. `add_rollup` must refuse to (re-)insert a
4453    /// rollup at a seqno that GC has already physically removed.
4454    /// Without this guard, an `apply_unbatched_idempotent_cmd` retry of
4455    /// `add_rollup` that runs after a concurrent GC removed the rollup's
4456    /// original map entry will re-insert it under the same key, producing
4457    /// duplicate Insert events in the diff stream and (later) duplicate
4458    /// Delete events that trip the `assert!` in `gc.rs`'s
4459    /// `find_removable_blobs`.
4460    ///
4461    /// This test reaches the bug state via the public
4462    /// `add_rollup` → `add_rollup` (newer) → `remove_rollups` (the older
4463    /// one) → `add_rollup` (retry of the older one) sequence, mirroring
4464    /// how a delayed retry of an indeterminate add_rollup commit can race
4465    /// a concurrent GC pass that has since added a newer rollup and
4466    /// removed the older one.
4467    #[mz_ore::test]
4468    fn add_rollup_idempotent_across_gc_removal() {
4469        let mut state = TypedState::<String, String, u64, i64>::new(
4470            DUMMY_BUILD_INFO.semver_version(),
4471            ShardId::new(),
4472            "".to_owned(),
4473            0,
4474        );
4475
4476        let older_seqno = SeqNo(10);
4477        let older = HollowRollup {
4478            key: PartialRollupKey::new(older_seqno, &RollupId::new()),
4479            encoded_size_bytes: None,
4480        };
4481        let newer_seqno = SeqNo(20);
4482        let newer = HollowRollup {
4483            key: PartialRollupKey::new(newer_seqno, &RollupId::new()),
4484            encoded_size_bytes: None,
4485        };
4486        let add_older = |state: &mut StateCollections<u64>| state.add_rollup((older_seqno, &older));
4487
4488        // First attempt commits the older rollup.
4489        assert_eq!(add_older(&mut state.collections), Continue(true));
4490        // A repeat at the same seqno with the same key is the pre-existing
4491        // idempotent no-op and should report applied=true.
4492        assert_eq!(add_older(&mut state.collections), Continue(true));
4493        assert_eq!(state.collections.rollups.len(), 1);
4494
4495        // The shard keeps making progress and a later command commits a
4496        // newer rollup. This is what gives GC something to keep when it
4497        // later removes the older entry.
4498        assert_eq!(
4499            state.collections.add_rollup((newer_seqno, &newer)),
4500            Continue(true),
4501        );
4502
4503        // The GC worker, having found a kept rollup at `newer_seqno`,
4504        // commits the removal of the older one. State.rollups now only
4505        // contains the kept rollup; the older entry is gone.
4506        let _ = state
4507            .collections
4508            .remove_rollups(&[(older_seqno, older.key.clone())]);
4509        assert!(!state.collections.rollups.contains_key(&older_seqno));
4510        assert!(state.collections.rollups.contains_key(&newer_seqno));
4511
4512        // The retry replays the same captured `(older_seqno, older)`
4513        // tuple. Even though no entry references the key anymore,
4514        // re-inserting it must be refused: `older_seqno` falls below the
4515        // smallest live rollup seqno, which is the GC watermark.
4516        assert_eq!(add_older(&mut state.collections), Continue(false));
4517        assert!(!state.collections.rollups.contains_key(&older_seqno));
4518        assert_eq!(state.collections.rollups.len(), 1);
4519    }
4520}