mz_persist_client/internal/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use mz_persist::metrics::ColumnarMetrics;
13use std::borrow::Cow;
14use std::cmp::Ordering;
15use std::collections::BTreeMap;
16use std::fmt::{Debug, Formatter};
17use std::marker::PhantomData;
18use std::ops::ControlFlow::{self, Break, Continue};
19use std::ops::{Deref, DerefMut};
20use std::time::Duration;
21
22use arrow::array::{Array, ArrayData, make_array};
23use arrow::datatypes::DataType;
24use bytes::Bytes;
25use differential_dataflow::Hashable;
26use differential_dataflow::lattice::Lattice;
27use differential_dataflow::trace::Description;
28use differential_dataflow::trace::implementations::BatchContainer;
29use futures::Stream;
30use futures_util::StreamExt;
31use mz_dyncfg::Config;
32use mz_ore::cast::CastFrom;
33use mz_ore::now::EpochMillis;
34use mz_ore::soft_panic_or_log;
35use mz_ore::vec::PartialOrdVecExt;
36use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
37use mz_persist::location::{Blob, SeqNo};
38use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
39use mz_persist_types::columnar::{ColumnEncoder, Schema};
40use mz_persist_types::schema::{SchemaId, backward_compatible};
41use mz_persist_types::{Codec, Codec64, Opaque};
42use mz_proto::ProtoType;
43use mz_proto::RustType;
44use proptest_derive::Arbitrary;
45use semver::Version;
46use serde::ser::SerializeStruct;
47use serde::{Serialize, Serializer};
48use timely::order::TotalOrder;
49use timely::progress::{Antichain, Timestamp};
50use timely::{Container, PartialOrder};
51use tracing::info;
52use uuid::Uuid;
53
54use crate::critical::CriticalReaderId;
55use crate::error::InvalidUsage;
56use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
57use crate::internal::gc::GcReq;
58use crate::internal::machine::retry_external;
59use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
60use crate::internal::trace::{
61    ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
62};
63use crate::metrics::Metrics;
64use crate::read::LeasedReaderId;
65use crate::schema::CaESchema;
66use crate::write::WriterId;
67use crate::{PersistConfig, ShardId};
68
69include!(concat!(
70    env!("OUT_DIR"),
71    "/mz_persist_client.internal.state.rs"
72));
73
74include!(concat!(
75    env!("OUT_DIR"),
76    "/mz_persist_client.internal.diff.rs"
77));
78
79/// Determines how often to write rollups, assigning a maintenance task after
80/// `rollup_threshold` seqnos have passed since the last rollup.
81///
82/// Tuning note: in the absence of a long reader seqno hold, and with
83/// incremental GC, this threshold will determine about how many live diffs are
84/// held in Consensus. Lowering this value decreases the live diff count at the
85/// cost of more maintenance work + blob writes.
86pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
87    "persist_rollup_threshold",
88    128,
89    "The number of seqnos between rollups.",
90);
91
92/// Determines how long to wait before an active rollup is considered
93/// "stuck" and a new rollup is started.
94pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
95    "persist_rollup_fallback_threshold_ms",
96    5000,
97    "The number of milliseconds before a worker claims an already claimed rollup.",
98);
99
100/// Feature flag the new active rollup tracking mechanism.
101/// We musn't enable this until we are fully deployed on the new version.
102pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
103    "persist_rollup_use_active_rollup",
104    false,
105    "Whether to use the new active rollup tracking mechanism.",
106);
107
108/// Determines how long to wait before an active GC is considered
109/// "stuck" and a new GC is started.
110pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
111    "persist_gc_fallback_threshold_ms",
112    900000,
113    "The number of milliseconds before a worker claims an already claimed GC.",
114);
115
116/// Feature flag the new active GC tracking mechanism.
117/// We musn't enable this until we are fully deployed on the new version.
118pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
119    "persist_gc_use_active_gc",
120    false,
121    "Whether to use the new active GC tracking mechanism.",
122);
123
124/// A token to disambiguate state commands that could not otherwise be
125/// idempotent.
126#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
127#[serde(into = "String")]
128pub struct IdempotencyToken(pub(crate) [u8; 16]);
129
130impl std::fmt::Display for IdempotencyToken {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        write!(f, "i{}", Uuid::from_bytes(self.0))
133    }
134}
135
136impl std::fmt::Debug for IdempotencyToken {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
139    }
140}
141
142impl std::str::FromStr for IdempotencyToken {
143    type Err = String;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        parse_id('i', "IdempotencyToken", s).map(IdempotencyToken)
147    }
148}
149
150impl From<IdempotencyToken> for String {
151    fn from(x: IdempotencyToken) -> Self {
152        x.to_string()
153    }
154}
155
156impl IdempotencyToken {
157    pub(crate) fn new() -> Self {
158        IdempotencyToken(*Uuid::new_v4().as_bytes())
159    }
160    pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
161}
162
163#[derive(Clone, Debug, PartialEq, Serialize)]
164pub struct LeasedReaderState<T> {
165    /// The seqno capability of this reader.
166    pub seqno: SeqNo,
167    /// The since capability of this reader.
168    pub since: Antichain<T>,
169    /// UNIX_EPOCH timestamp (in millis) of this reader's most recent heartbeat
170    pub last_heartbeat_timestamp_ms: u64,
171    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
172    /// after which this reader may be expired
173    pub lease_duration_ms: u64,
174    /// For debugging.
175    pub debug: HandleDebugState,
176}
177
178#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
179#[serde(into = "u64")]
180pub struct OpaqueState(pub [u8; 8]);
181
182impl From<OpaqueState> for u64 {
183    fn from(value: OpaqueState) -> Self {
184        u64::from_le_bytes(value.0)
185    }
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct CriticalReaderState<T> {
190    /// The since capability of this reader.
191    pub since: Antichain<T>,
192    /// An opaque token matched on by compare_and_downgrade_since.
193    pub opaque: OpaqueState,
194    /// The [Codec64] used to encode [Self::opaque].
195    pub opaque_codec: String,
196    /// For debugging.
197    pub debug: HandleDebugState,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize)]
201pub struct WriterState<T> {
202    /// UNIX_EPOCH timestamp (in millis) of this writer's most recent heartbeat
203    pub last_heartbeat_timestamp_ms: u64,
204    /// Duration (in millis) allowed after [Self::last_heartbeat_timestamp_ms]
205    /// after which this writer may be expired
206    pub lease_duration_ms: u64,
207    /// The idempotency token of the most recent successful compare_and_append
208    /// by this writer.
209    pub most_recent_write_token: IdempotencyToken,
210    /// The upper of the most recent successful compare_and_append by this
211    /// writer.
212    pub most_recent_write_upper: Antichain<T>,
213    /// For debugging.
214    pub debug: HandleDebugState,
215}
216
217/// Debugging info for a reader or writer.
218#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
219pub struct HandleDebugState {
220    /// Hostname of the persist user that registered this writer or reader. For
221    /// critical readers, this is the _most recent_ registration.
222    pub hostname: String,
223    /// Plaintext description of this writer or reader's intent.
224    pub purpose: String,
225}
226
227/// Part of the updates in a Batch.
228///
229/// Either a pointer to ones stored in Blob or the updates themselves inlined.
230#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
231#[serde(tag = "type")]
232pub enum BatchPart<T> {
233    Hollow(HollowBatchPart<T>),
234    Inline {
235        updates: LazyInlineBatchPart,
236        ts_rewrite: Option<Antichain<T>>,
237        schema_id: Option<SchemaId>,
238
239        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
240        deprecated_schema_id: Option<SchemaId>,
241    },
242}
243
244fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
245    let try_decode = |lower: &LazyProto<ProtoArrayData>| {
246        let proto = lower.decode()?;
247        let data = ArrayData::from_proto(proto)?;
248        ensure!(data.len() == 1);
249        Ok(ArrayBound::new(make_array(data), 0))
250    };
251
252    let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
253
254    match decoded {
255        Ok(bound) => Some(bound),
256        Err(e) => {
257            soft_panic_or_log!("failed to decode bound: {e:#?}");
258            None
259        }
260    }
261}
262
263impl<T> BatchPart<T> {
264    pub fn hollow_bytes(&self) -> usize {
265        match self {
266            BatchPart::Hollow(x) => x.encoded_size_bytes,
267            BatchPart::Inline { .. } => 0,
268        }
269    }
270
271    pub fn is_inline(&self) -> bool {
272        matches!(self, BatchPart::Inline { .. })
273    }
274
275    pub fn inline_bytes(&self) -> usize {
276        match self {
277            BatchPart::Hollow(_) => 0,
278            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
279        }
280    }
281
282    pub fn writer_key(&self) -> Option<WriterKey> {
283        match self {
284            BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
285            BatchPart::Inline { .. } => None,
286        }
287    }
288
289    pub fn encoded_size_bytes(&self) -> usize {
290        match self {
291            BatchPart::Hollow(x) => x.encoded_size_bytes,
292            BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
293        }
294    }
295
296    // A user-interpretable identifier or description of the part (for logs and
297    // such).
298    pub fn printable_name(&self) -> &str {
299        match self {
300            BatchPart::Hollow(x) => x.key.0.as_str(),
301            BatchPart::Inline { .. } => "<inline>",
302        }
303    }
304
305    pub fn stats(&self) -> Option<&LazyPartStats> {
306        match self {
307            BatchPart::Hollow(x) => x.stats.as_ref(),
308            BatchPart::Inline { .. } => None,
309        }
310    }
311
312    pub fn key_lower(&self) -> &[u8] {
313        match self {
314            BatchPart::Hollow(x) => x.key_lower.as_slice(),
315            // We don't duplicate the lowest key because this can be
316            // considerable overhead for small parts.
317            //
318            // The empty key might not be a tight lower bound, but it is a valid
319            // lower bound. If a caller is interested in a tighter lower bound,
320            // the data is inline.
321            BatchPart::Inline { .. } => &[],
322        }
323    }
324
325    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
326        let part = match self {
327            BatchPart::Hollow(part) => part,
328            BatchPart::Inline { .. } => return None,
329        };
330
331        decode_structured_lower(part.structured_key_lower.as_ref()?)
332    }
333
334    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
335        match self {
336            BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
337            BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
338        }
339    }
340
341    pub fn schema_id(&self) -> Option<SchemaId> {
342        match self {
343            BatchPart::Hollow(x) => x.schema_id,
344            BatchPart::Inline { schema_id, .. } => *schema_id,
345        }
346    }
347
348    pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
349        match self {
350            BatchPart::Hollow(x) => x.deprecated_schema_id,
351            BatchPart::Inline {
352                deprecated_schema_id,
353                ..
354            } => *deprecated_schema_id,
355        }
356    }
357}
358
359impl<T: Timestamp + Codec64> BatchPart<T> {
360    pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
361        match self {
362            BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
363            BatchPart::Inline { updates, .. } => {
364                let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
365                matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
366            }
367        }
368    }
369}
370
371/// An ordered list of parts, generally stored as part of a larger run.
372#[derive(Debug, Clone)]
373pub struct HollowRun<T> {
374    /// Pointers usable to retrieve the updates.
375    pub(crate) parts: Vec<RunPart<T>>,
376}
377
378/// A reference to a [HollowRun], including the key in the blob store and some denormalized
379/// metadata.
380#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
381pub struct HollowRunRef<T> {
382    pub key: PartialBatchKey,
383
384    /// The size of the referenced run object, plus all of the hollow objects it contains.
385    pub hollow_bytes: usize,
386
387    /// The size of the largest individual part in the run; useful for sizing compaction.
388    pub max_part_bytes: usize,
389
390    /// The lower bound of the data in this part, ordered by the codec ordering.
391    pub key_lower: Vec<u8>,
392
393    /// The lower bound of the data in this part, ordered by the structured ordering.
394    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
395
396    pub(crate) _phantom_data: PhantomData<T>,
397}
398impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
399    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
400        Some(self.cmp(other))
401    }
402}
403
404impl<T: Eq> Ord for HollowRunRef<T> {
405    fn cmp(&self, other: &Self) -> Ordering {
406        self.key.cmp(&other.key)
407    }
408}
409
410impl<T> HollowRunRef<T> {
411    pub fn writer_key(&self) -> Option<WriterKey> {
412        Some(self.key.split()?.0)
413    }
414}
415
416impl<T: Timestamp + Codec64> HollowRunRef<T> {
417    /// Stores the given runs and returns a [HollowRunRef] that points to them.
418    pub async fn set(
419        shard_id: ShardId,
420        blob: &dyn Blob,
421        writer: &WriterKey,
422        data: HollowRun<T>,
423        metrics: &Metrics,
424    ) -> Self {
425        let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
426        let max_part_bytes = data
427            .parts
428            .iter()
429            .map(|p| p.max_part_bytes())
430            .max()
431            .unwrap_or(0);
432        let key_lower = data
433            .parts
434            .first()
435            .map_or(vec![], |p| p.key_lower().to_vec());
436        let structured_key_lower = match data.parts.first() {
437            Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
438            Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
439            Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
440        };
441
442        let key = PartialBatchKey::new(writer, &PartId::new());
443        let blob_key = key.complete(&shard_id);
444        let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
445        let () = retry_external(&metrics.retries.external.hollow_run_set, || {
446            blob.set(&blob_key, bytes.clone())
447        })
448        .await;
449        Self {
450            key,
451            hollow_bytes,
452            max_part_bytes,
453            key_lower,
454            structured_key_lower,
455            _phantom_data: Default::default(),
456        }
457    }
458
459    /// Retrieve the [HollowRun] that this reference points to.
460    /// The caller is expected to ensure that this ref is the result of calling [HollowRunRef::set]
461    /// with the same shard id and backing store.
462    pub async fn get(
463        &self,
464        shard_id: ShardId,
465        blob: &dyn Blob,
466        metrics: &Metrics,
467    ) -> Option<HollowRun<T>> {
468        let blob_key = self.key.complete(&shard_id);
469        let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
470            blob.get(&blob_key)
471        })
472        .await?;
473        let proto_runs: ProtoHollowRun =
474            prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
475        let runs = proto_runs
476            .into_rust()
477            .expect("illegal state: invalid encoded runs proto");
478        Some(runs)
479    }
480}
481
482/// Part of the updates in a run.
483///
484/// Either a pointer to ones stored in Blob or a single part stored inline.
485#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
486#[serde(untagged)]
487pub enum RunPart<T> {
488    Single(BatchPart<T>),
489    Many(HollowRunRef<T>),
490}
491
492impl<T: Ord> PartialOrd<Self> for RunPart<T> {
493    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
494        Some(self.cmp(other))
495    }
496}
497
498impl<T: Ord> Ord for RunPart<T> {
499    fn cmp(&self, other: &Self) -> Ordering {
500        match (self, other) {
501            (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
502            (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
503            (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
504            (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
505        }
506    }
507}
508
509impl<T> RunPart<T> {
510    #[cfg(test)]
511    pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
512        match self {
513            RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
514            _ => panic!("expected hollow part!"),
515        }
516    }
517
518    pub fn hollow_bytes(&self) -> usize {
519        match self {
520            Self::Single(p) => p.hollow_bytes(),
521            Self::Many(r) => r.hollow_bytes,
522        }
523    }
524
525    pub fn is_inline(&self) -> bool {
526        match self {
527            Self::Single(p) => p.is_inline(),
528            Self::Many(_) => false,
529        }
530    }
531
532    pub fn inline_bytes(&self) -> usize {
533        match self {
534            Self::Single(p) => p.inline_bytes(),
535            Self::Many(_) => 0,
536        }
537    }
538
539    pub fn max_part_bytes(&self) -> usize {
540        match self {
541            Self::Single(p) => p.encoded_size_bytes(),
542            Self::Many(r) => r.max_part_bytes,
543        }
544    }
545
546    pub fn writer_key(&self) -> Option<WriterKey> {
547        match self {
548            Self::Single(p) => p.writer_key(),
549            Self::Many(r) => r.writer_key(),
550        }
551    }
552
553    pub fn encoded_size_bytes(&self) -> usize {
554        match self {
555            Self::Single(p) => p.encoded_size_bytes(),
556            Self::Many(r) => r.hollow_bytes,
557        }
558    }
559
560    pub fn schema_id(&self) -> Option<SchemaId> {
561        match self {
562            Self::Single(p) => p.schema_id(),
563            Self::Many(_) => None,
564        }
565    }
566
567    // A user-interpretable identifier or description of the part (for logs and
568    // such).
569    pub fn printable_name(&self) -> &str {
570        match self {
571            Self::Single(p) => p.printable_name(),
572            Self::Many(r) => r.key.0.as_str(),
573        }
574    }
575
576    pub fn stats(&self) -> Option<&LazyPartStats> {
577        match self {
578            Self::Single(p) => p.stats(),
579            // TODO: if we kept stats we could avoid fetching the metadata here.
580            Self::Many(_) => None,
581        }
582    }
583
584    pub fn key_lower(&self) -> &[u8] {
585        match self {
586            Self::Single(p) => p.key_lower(),
587            Self::Many(r) => r.key_lower.as_slice(),
588        }
589    }
590
591    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
592        match self {
593            Self::Single(p) => p.structured_key_lower(),
594            Self::Many(_) => None,
595        }
596    }
597
598    pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
599        match self {
600            Self::Single(p) => p.ts_rewrite(),
601            Self::Many(_) => None,
602        }
603    }
604}
605
606/// A blob was missing!
607#[derive(Clone, Debug)]
608pub struct MissingBlob(BlobKey);
609
610impl std::fmt::Display for MissingBlob {
611    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
612        write!(f, "unexpectedly missing key: {}", self.0)
613    }
614}
615
616impl std::error::Error for MissingBlob {}
617
618impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
619    pub fn part_stream<'a>(
620        &'a self,
621        shard_id: ShardId,
622        blob: &'a dyn Blob,
623        metrics: &'a Metrics,
624    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
625        try_stream! {
626            match self {
627                RunPart::Single(p) => {
628                    yield Cow::Borrowed(p);
629                }
630                RunPart::Many(r) => {
631                    let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
632                    for run_part in fetched.parts {
633                        for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
634                            yield Cow::Owned(batch_part?.into_owned());
635                        }
636                    }
637                }
638            }
639        }
640    }
641}
642
643impl<T: Ord> PartialOrd for BatchPart<T> {
644    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
645        Some(self.cmp(other))
646    }
647}
648
649impl<T: Ord> Ord for BatchPart<T> {
650    fn cmp(&self, other: &Self) -> Ordering {
651        match (self, other) {
652            (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
653            (
654                BatchPart::Inline {
655                    updates: s_updates,
656                    ts_rewrite: s_ts_rewrite,
657                    schema_id: s_schema_id,
658                    deprecated_schema_id: s_deprecated_schema_id,
659                },
660                BatchPart::Inline {
661                    updates: o_updates,
662                    ts_rewrite: o_ts_rewrite,
663                    schema_id: o_schema_id,
664                    deprecated_schema_id: o_deprecated_schema_id,
665                },
666            ) => (
667                s_updates,
668                s_ts_rewrite.as_ref().map(|x| x.elements()),
669                s_schema_id,
670                s_deprecated_schema_id,
671            )
672                .cmp(&(
673                    o_updates,
674                    o_ts_rewrite.as_ref().map(|x| x.elements()),
675                    o_schema_id,
676                    o_deprecated_schema_id,
677                )),
678            (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
679            (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
680        }
681    }
682}
683
684/// What order are the parts in this run in?
685#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
686pub(crate) enum RunOrder {
687    /// They're in no particular order.
688    Unordered,
689    /// They're ordered based on the codec-encoded K/V bytes.
690    Codec,
691    /// They're ordered by the natural ordering of the structured data.
692    Structured,
693}
694
695/// Metadata shared across a run.
696#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
697pub struct RunMeta {
698    /// If none, Persist should infer the order based on the proto metadata.
699    pub(crate) order: Option<RunOrder>,
700    /// All parts in a run should have the same schema.
701    pub(crate) schema: Option<SchemaId>,
702
703    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
704    pub(crate) deprecated_schema: Option<SchemaId>,
705}
706
707/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
708#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
709pub struct HollowBatchPart<T> {
710    /// Pointer usable to retrieve the updates.
711    pub key: PartialBatchKey,
712    /// The encoded size of this part.
713    pub encoded_size_bytes: usize,
714    /// A lower bound on the keys in the part. (By default, this the minimum
715    /// possible key: `vec![]`.)
716    #[serde(serialize_with = "serialize_part_bytes")]
717    pub key_lower: Vec<u8>,
718    /// A lower bound on the keys in the part, stored as structured data.
719    #[serde(serialize_with = "serialize_lazy_proto")]
720    pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
721    /// Aggregate statistics about data contained in this part.
722    #[serde(serialize_with = "serialize_part_stats")]
723    pub stats: Option<LazyPartStats>,
724    /// A frontier to which timestamps in this part are advanced on read, if
725    /// set.
726    ///
727    /// A value of `Some([T::minimum()])` is functionally the same as `None`,
728    /// but we maintain the distinction between the two for some internal sanity
729    /// checking of invariants as well as metrics. If this ever becomes an
730    /// issue, everything still works with this as just `Antichain<T>`.
731    pub ts_rewrite: Option<Antichain<T>>,
732    /// A Codec64 encoded sum of all diffs in this part, if known.
733    ///
734    /// This is `None` if this part was written before we started storing this
735    /// information, or if it was written when the dyncfg was off.
736    ///
737    /// It could also make sense to model this as part of the pushdown stats, if
738    /// we later decide that's of some benefit.
739    #[serde(serialize_with = "serialize_diffs_sum")]
740    pub diffs_sum: Option<[u8; 8]>,
741    /// Columnar format that this batch was written in.
742    ///
743    /// This is `None` if this part was written before we started writing structured
744    /// columnar data.
745    pub format: Option<BatchColumnarFormat>,
746    /// The schemas used to encode the data in this batch part.
747    ///
748    /// Or None for historical data written before the schema registry was
749    /// added.
750    pub schema_id: Option<SchemaId>,
751
752    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
753    pub deprecated_schema_id: Option<SchemaId>,
754}
755
756/// A [Batch] but with the updates themselves stored externally.
757///
758/// [Batch]: differential_dataflow::trace::BatchReader
759#[derive(Clone, PartialEq, Eq)]
760pub struct HollowBatch<T> {
761    /// Describes the times of the updates in the batch.
762    pub desc: Description<T>,
763    /// The number of updates in the batch.
764    pub len: usize,
765    /// Pointers usable to retrieve the updates.
766    pub(crate) parts: Vec<RunPart<T>>,
767    /// Runs of sequential sorted batch parts, stored as indices into `parts`.
768    /// ex.
769    /// ```text
770    ///     parts=[p1, p2, p3], runs=[]     --> run  is  [p1, p2, p2]
771    ///     parts=[p1, p2, p3], runs=[1]    --> runs are [p1] and [p2, p3]
772    ///     parts=[p1, p2, p3], runs=[1, 2] --> runs are [p1], [p2], [p3]
773    /// ```
774    pub(crate) run_splits: Vec<usize>,
775    /// Run-level metadata: the first entry has metadata for the first run, and so on.
776    /// If there's no corresponding entry for a particular run, it's assumed to be [RunMeta::default()].
777    pub(crate) run_meta: Vec<RunMeta>,
778}
779
780impl<T: Debug> Debug for HollowBatch<T> {
781    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
782        let HollowBatch {
783            desc,
784            parts,
785            len,
786            run_splits: runs,
787            run_meta,
788        } = self;
789        f.debug_struct("HollowBatch")
790            .field(
791                "desc",
792                &(
793                    desc.lower().elements(),
794                    desc.upper().elements(),
795                    desc.since().elements(),
796                ),
797            )
798            .field("parts", &parts)
799            .field("len", &len)
800            .field("runs", &runs)
801            .field("run_meta", &run_meta)
802            .finish()
803    }
804}
805
806impl<T: Serialize> serde::Serialize for HollowBatch<T> {
807    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
808        let HollowBatch {
809            desc,
810            len,
811            // Both parts and runs are covered by the self.runs call.
812            parts: _,
813            run_splits: _,
814            run_meta: _,
815        } = self;
816        let mut s = s.serialize_struct("HollowBatch", 5)?;
817        let () = s.serialize_field("lower", &desc.lower().elements())?;
818        let () = s.serialize_field("upper", &desc.upper().elements())?;
819        let () = s.serialize_field("since", &desc.since().elements())?;
820        let () = s.serialize_field("len", len)?;
821        let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
822        s.end()
823    }
824}
825
826impl<T: Ord> PartialOrd for HollowBatch<T> {
827    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
828        Some(self.cmp(other))
829    }
830}
831
832impl<T: Ord> Ord for HollowBatch<T> {
833    fn cmp(&self, other: &Self) -> Ordering {
834        // Deconstruct self and other so we get a compile failure if new fields
835        // are added.
836        let HollowBatch {
837            desc: self_desc,
838            parts: self_parts,
839            len: self_len,
840            run_splits: self_runs,
841            run_meta: self_run_meta,
842        } = self;
843        let HollowBatch {
844            desc: other_desc,
845            parts: other_parts,
846            len: other_len,
847            run_splits: other_runs,
848            run_meta: other_run_meta,
849        } = other;
850        (
851            self_desc.lower().elements(),
852            self_desc.upper().elements(),
853            self_desc.since().elements(),
854            self_parts,
855            self_len,
856            self_runs,
857            self_run_meta,
858        )
859            .cmp(&(
860                other_desc.lower().elements(),
861                other_desc.upper().elements(),
862                other_desc.since().elements(),
863                other_parts,
864                other_len,
865                other_runs,
866                other_run_meta,
867            ))
868    }
869}
870
871impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
872    pub fn part_stream<'a>(
873        &'a self,
874        shard_id: ShardId,
875        blob: &'a dyn Blob,
876        metrics: &'a Metrics,
877    ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
878        stream! {
879            for part in &self.parts {
880                for await part in part.part_stream(shard_id, blob, metrics) {
881                    yield part;
882                }
883            }
884        }
885    }
886}
887impl<T> HollowBatch<T> {
888    /// Construct an in-memory hollow batch from the given metadata.
889    ///
890    /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
891    /// is responsible for ensuring that the defined runs are valid.
892    ///
893    /// `len` should represent the number of valid updates in the referenced parts.
894    pub(crate) fn new(
895        desc: Description<T>,
896        parts: Vec<RunPart<T>>,
897        len: usize,
898        run_meta: Vec<RunMeta>,
899        run_splits: Vec<usize>,
900    ) -> Self {
901        debug_assert!(
902            run_splits.is_strictly_sorted(),
903            "run indices should be strictly increasing"
904        );
905        debug_assert!(
906            run_splits.first().map_or(true, |i| *i > 0),
907            "run indices should be positive"
908        );
909        debug_assert!(
910            run_splits.last().map_or(true, |i| *i < parts.len()),
911            "run indices should be valid indices into parts"
912        );
913        debug_assert!(
914            parts.is_empty() || run_meta.len() == run_splits.len() + 1,
915            "all metadata should correspond to a run"
916        );
917
918        Self {
919            desc,
920            len,
921            parts,
922            run_splits,
923            run_meta,
924        }
925    }
926
927    /// Construct a batch of a single run with default metadata. Mostly interesting for tests.
928    pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
929        let run_meta = if parts.is_empty() {
930            vec![]
931        } else {
932            vec![RunMeta::default()]
933        };
934        Self {
935            desc,
936            len,
937            parts,
938            run_splits: vec![],
939            run_meta,
940        }
941    }
942
943    /// An empty hollow batch, representing no updates over the given desc.
944    pub(crate) fn empty(desc: Description<T>) -> Self {
945        Self {
946            desc,
947            len: 0,
948            parts: vec![],
949            run_splits: vec![],
950            run_meta: vec![],
951        }
952    }
953
954    pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
955        let run_ends = self
956            .run_splits
957            .iter()
958            .copied()
959            .chain(std::iter::once(self.parts.len()));
960        let run_metas = self.run_meta.iter();
961        let run_parts = run_ends
962            .scan(0, |start, end| {
963                let range = *start..end;
964                *start = end;
965                Some(range)
966            })
967            .filter(|range| !range.is_empty())
968            .map(|range| &self.parts[range]);
969        run_metas.zip(run_parts)
970    }
971
972    pub(crate) fn inline_bytes(&self) -> usize {
973        self.parts.iter().map(|x| x.inline_bytes()).sum()
974    }
975
976    pub fn is_empty(&self) -> bool {
977        self.parts.is_empty()
978    }
979
980    pub fn part_count(&self) -> usize {
981        self.parts.len()
982    }
983
984    /// The sum of the encoded sizes of all parts in the batch.
985    pub fn encoded_size_bytes(&self) -> usize {
986        self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
987    }
988}
989
990// See the comment on [Batch::rewrite_ts] for why this is TotalOrder.
991impl<T: Timestamp + TotalOrder> HollowBatch<T> {
992    pub(crate) fn rewrite_ts(
993        &mut self,
994        frontier: &Antichain<T>,
995        new_upper: Antichain<T>,
996    ) -> Result<(), String> {
997        if !PartialOrder::less_than(frontier, &new_upper) {
998            return Err(format!(
999                "rewrite frontier {:?} !< rewrite upper {:?}",
1000                frontier.elements(),
1001                new_upper.elements(),
1002            ));
1003        }
1004        if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1005            return Err(format!(
1006                "rewrite upper {:?} < batch upper {:?}",
1007                new_upper.elements(),
1008                self.desc.upper().elements(),
1009            ));
1010        }
1011
1012        // The following are things that it seems like we could support, but
1013        // initially we don't because we don't have a use case for them.
1014        if PartialOrder::less_than(frontier, self.desc.lower()) {
1015            return Err(format!(
1016                "rewrite frontier {:?} < batch lower {:?}",
1017                frontier.elements(),
1018                self.desc.lower().elements(),
1019            ));
1020        }
1021        if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1022            return Err(format!(
1023                "batch since {:?} != minimum antichain {:?}",
1024                self.desc.since().elements(),
1025                &[T::minimum()],
1026            ));
1027        }
1028        for part in self.parts.iter() {
1029            let Some(ts_rewrite) = part.ts_rewrite() else {
1030                continue;
1031            };
1032            if PartialOrder::less_than(frontier, ts_rewrite) {
1033                return Err(format!(
1034                    "rewrite frontier {:?} < batch rewrite {:?}",
1035                    frontier.elements(),
1036                    ts_rewrite.elements(),
1037                ));
1038            }
1039        }
1040
1041        self.desc = Description::new(
1042            self.desc.lower().clone(),
1043            new_upper,
1044            self.desc.since().clone(),
1045        );
1046        for part in &mut self.parts {
1047            match part {
1048                RunPart::Single(BatchPart::Hollow(part)) => {
1049                    part.ts_rewrite = Some(frontier.clone())
1050                }
1051                RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1052                    *ts_rewrite = Some(frontier.clone())
1053                }
1054                RunPart::Many(runs) => {
1055                    // Currently unreachable: we only apply rewrites to user batches, and we don't
1056                    // ever generate runs of >1 part for those.
1057                    panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1058                }
1059            }
1060        }
1061        Ok(())
1062    }
1063}
1064
1065impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1066    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1067        Some(self.cmp(other))
1068    }
1069}
1070
1071impl<T: Ord> Ord for HollowBatchPart<T> {
1072    fn cmp(&self, other: &Self) -> Ordering {
1073        // Deconstruct self and other so we get a compile failure if new fields
1074        // are added.
1075        let HollowBatchPart {
1076            key: self_key,
1077            encoded_size_bytes: self_encoded_size_bytes,
1078            key_lower: self_key_lower,
1079            structured_key_lower: self_structured_key_lower,
1080            stats: self_stats,
1081            ts_rewrite: self_ts_rewrite,
1082            diffs_sum: self_diffs_sum,
1083            format: self_format,
1084            schema_id: self_schema_id,
1085            deprecated_schema_id: self_deprecated_schema_id,
1086        } = self;
1087        let HollowBatchPart {
1088            key: other_key,
1089            encoded_size_bytes: other_encoded_size_bytes,
1090            key_lower: other_key_lower,
1091            structured_key_lower: other_structured_key_lower,
1092            stats: other_stats,
1093            ts_rewrite: other_ts_rewrite,
1094            diffs_sum: other_diffs_sum,
1095            format: other_format,
1096            schema_id: other_schema_id,
1097            deprecated_schema_id: other_deprecated_schema_id,
1098        } = other;
1099        (
1100            self_key,
1101            self_encoded_size_bytes,
1102            self_key_lower,
1103            self_structured_key_lower,
1104            self_stats,
1105            self_ts_rewrite.as_ref().map(|x| x.elements()),
1106            self_diffs_sum,
1107            self_format,
1108            self_schema_id,
1109            self_deprecated_schema_id,
1110        )
1111            .cmp(&(
1112                other_key,
1113                other_encoded_size_bytes,
1114                other_key_lower,
1115                other_structured_key_lower,
1116                other_stats,
1117                other_ts_rewrite.as_ref().map(|x| x.elements()),
1118                other_diffs_sum,
1119                other_format,
1120                other_schema_id,
1121                other_deprecated_schema_id,
1122            ))
1123    }
1124}
1125
1126/// A pointer to a rollup stored externally.
1127#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1128pub struct HollowRollup {
1129    /// Pointer usable to retrieve the rollup.
1130    pub key: PartialRollupKey,
1131    /// The encoded size of this rollup, if known.
1132    pub encoded_size_bytes: Option<usize>,
1133}
1134
1135/// A pointer to a blob stored externally.
1136#[derive(Debug)]
1137pub enum HollowBlobRef<'a, T> {
1138    Batch(&'a HollowBatch<T>),
1139    Rollup(&'a HollowRollup),
1140}
1141
1142/// A rollup that is currently being computed.
1143#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1144pub struct ActiveRollup {
1145    pub seqno: SeqNo,
1146    pub start_ms: u64,
1147}
1148
1149/// A garbage collection request that is currently being computed.
1150#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1151pub struct ActiveGc {
1152    pub seqno: SeqNo,
1153    pub start_ms: u64,
1154}
1155
1156/// A sentinel for a state transition that was a no-op.
1157///
1158/// Critically, this also indicates that the no-op state transition was not
1159/// committed through compare_and_append and thus is _not linearized_.
1160#[derive(Debug)]
1161#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1162pub struct NoOpStateTransition<T>(pub T);
1163
1164// TODO: Document invariants.
1165#[derive(Debug, Clone)]
1166#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1167pub struct StateCollections<T> {
1168    // - Invariant: `<= all reader.since`
1169    // - Invariant: Doesn't regress across state versions.
1170    pub(crate) last_gc_req: SeqNo,
1171
1172    // - Invariant: There is a rollup with `seqno <= self.seqno_since`.
1173    pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1174
1175    /// The rollup that is currently being computed.
1176    pub(crate) active_rollup: Option<ActiveRollup>,
1177    /// The gc request that is currently being computed.
1178    pub(crate) active_gc: Option<ActiveGc>,
1179
1180    pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1181    pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1182    pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1183    pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1184
1185    // - Invariant: `trace.since == meet(all reader.since)`
1186    // - Invariant: `trace.since` doesn't regress across state versions.
1187    // - Invariant: `trace.upper` doesn't regress across state versions.
1188    // - Invariant: `trace` upholds its own invariants.
1189    pub(crate) trace: Trace<T>,
1190}
1191
1192/// A key and val [Codec::Schema] encoded via [Codec::encode_schema].
1193///
1194/// This strategy of directly serializing the schema objects requires that
1195/// persist users do the right thing. Specifically, that an encoded schema
1196/// doesn't in some later version of mz decode to an in-mem object that acts
1197/// differently. In a sense, the current system (before the introduction of the
1198/// schema registry) where schemas are passed in unchecked to reader and writer
1199/// registration calls also has the same defect, so seems fine.
1200///
1201/// An alternative is to write down here some persist-specific representation of
1202/// the schema (e.g. the arrow DataType). This is a lot more work and also has
1203/// the potential to lead down a similar failure mode to the mz_persist_types
1204/// `Data` trait, where the boilerplate isn't worth the safety. Given that we
1205/// can always migrate later by rehydrating these, seems fine to start with the
1206/// easy thing.
1207#[derive(Debug, Clone, Serialize, PartialEq)]
1208pub struct EncodedSchemas {
1209    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
1210    pub key: Bytes,
1211    /// The arrow `DataType` produced by this `K::Schema` at the time it was
1212    /// registered, encoded as a `ProtoDataType`.
1213    pub key_data_type: Bytes,
1214    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
1215    pub val: Bytes,
1216    /// The arrow `DataType` produced by this `V::Schema` at the time it was
1217    /// registered, encoded as a `ProtoDataType`.
1218    pub val_data_type: Bytes,
1219}
1220
1221impl EncodedSchemas {
1222    pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1223        let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1224        DataType::from_proto(proto).expect("valid DataType")
1225    }
1226}
1227
1228#[derive(Debug)]
1229#[cfg_attr(test, derive(PartialEq))]
1230pub enum CompareAndAppendBreak<T> {
1231    AlreadyCommitted,
1232    Upper {
1233        shard_upper: Antichain<T>,
1234        writer_upper: Antichain<T>,
1235    },
1236    InvalidUsage(InvalidUsage<T>),
1237    InlineBackpressure,
1238}
1239
1240#[derive(Debug)]
1241#[cfg_attr(test, derive(PartialEq))]
1242pub enum SnapshotErr<T> {
1243    AsOfNotYetAvailable(SeqNo, Upper<T>),
1244    AsOfHistoricalDistinctionsLost(Since<T>),
1245}
1246
1247impl<T> StateCollections<T>
1248where
1249    T: Timestamp + Lattice + Codec64,
1250{
1251    pub fn add_rollup(
1252        &mut self,
1253        add_rollup: (SeqNo, &HollowRollup),
1254    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1255        let (rollup_seqno, rollup) = add_rollup;
1256        let applied = match self.rollups.get(&rollup_seqno) {
1257            Some(x) => x.key == rollup.key,
1258            None => {
1259                self.active_rollup = None;
1260                self.rollups.insert(rollup_seqno, rollup.to_owned());
1261                true
1262            }
1263        };
1264        // This state transition is a no-op if applied is false but we
1265        // still commit the state change so that this gets linearized
1266        // (maybe we're looking at old state).
1267        Continue(applied)
1268    }
1269
1270    pub fn remove_rollups(
1271        &mut self,
1272        remove_rollups: &[(SeqNo, PartialRollupKey)],
1273    ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1274        if remove_rollups.is_empty() || self.is_tombstone() {
1275            return Break(NoOpStateTransition(vec![]));
1276        }
1277
1278        //This state transition is called at the end of the GC process, so we
1279        //need to unset the `active_gc` field.
1280        self.active_gc = None;
1281
1282        let mut removed = vec![];
1283        for (seqno, key) in remove_rollups {
1284            let removed_key = self.rollups.remove(seqno);
1285            debug_assert!(
1286                removed_key.as_ref().map_or(true, |x| &x.key == key),
1287                "{} vs {:?}",
1288                key,
1289                removed_key
1290            );
1291
1292            if removed_key.is_some() {
1293                removed.push(*seqno);
1294            }
1295        }
1296
1297        Continue(removed)
1298    }
1299
1300    pub fn register_leased_reader(
1301        &mut self,
1302        hostname: &str,
1303        reader_id: &LeasedReaderId,
1304        purpose: &str,
1305        seqno: SeqNo,
1306        lease_duration: Duration,
1307        heartbeat_timestamp_ms: u64,
1308        use_critical_since: bool,
1309    ) -> ControlFlow<
1310        NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1311        (LeasedReaderState<T>, SeqNo),
1312    > {
1313        let since = if use_critical_since {
1314            self.critical_since()
1315                .unwrap_or_else(|| self.trace.since().clone())
1316        } else {
1317            self.trace.since().clone()
1318        };
1319        let reader_state = LeasedReaderState {
1320            debug: HandleDebugState {
1321                hostname: hostname.to_owned(),
1322                purpose: purpose.to_owned(),
1323            },
1324            seqno,
1325            since,
1326            last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1327            lease_duration_ms: u64::try_from(lease_duration.as_millis())
1328                .expect("lease duration as millis should fit within u64"),
1329        };
1330
1331        // If the shard-global upper and since are both the empty antichain,
1332        // then no further writes can ever commit and no further reads can be
1333        // served. Optimize this by no-op-ing reader registration so that we can
1334        // settle the shard into a final unchanging tombstone state.
1335        if self.is_tombstone() {
1336            return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1337        }
1338
1339        // TODO: Handle if the reader or writer already exists.
1340        self.leased_readers
1341            .insert(reader_id.clone(), reader_state.clone());
1342        Continue((reader_state, self.seqno_since(seqno)))
1343    }
1344
1345    pub fn register_critical_reader<O: Opaque + Codec64>(
1346        &mut self,
1347        hostname: &str,
1348        reader_id: &CriticalReaderId,
1349        purpose: &str,
1350    ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1351        let state = CriticalReaderState {
1352            debug: HandleDebugState {
1353                hostname: hostname.to_owned(),
1354                purpose: purpose.to_owned(),
1355            },
1356            since: self.trace.since().clone(),
1357            opaque: OpaqueState(Codec64::encode(&O::initial())),
1358            opaque_codec: O::codec_name(),
1359        };
1360
1361        // We expire all readers if the upper and since both advance to the
1362        // empty antichain. Gracefully handle this. At the same time,
1363        // short-circuit the cmd application so we don't needlessly create new
1364        // SeqNos.
1365        if self.is_tombstone() {
1366            return Break(NoOpStateTransition(state));
1367        }
1368
1369        let state = match self.critical_readers.get_mut(reader_id) {
1370            Some(existing_state) => {
1371                existing_state.debug = state.debug;
1372                existing_state.clone()
1373            }
1374            None => {
1375                self.critical_readers
1376                    .insert(reader_id.clone(), state.clone());
1377                state
1378            }
1379        };
1380        Continue(state)
1381    }
1382
1383    pub fn register_schema<K: Codec, V: Codec>(
1384        &mut self,
1385        key_schema: &K::Schema,
1386        val_schema: &V::Schema,
1387    ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1388        fn encode_data_type(data_type: &DataType) -> Bytes {
1389            let proto = data_type.into_proto();
1390            prost::Message::encode_to_vec(&proto).into()
1391        }
1392
1393        // Look for an existing registered SchemaId for these schemas.
1394        //
1395        // The common case is that this should be a recent one, so as a minor
1396        // optimization, do this search in reverse order.
1397        //
1398        // TODO: Note that this impl is `O(schemas)`. Combined with the
1399        // possibility of cmd retries, it's possible but unlikely for this to
1400        // get expensive. We could maintain a reverse map to speed this up in
1401        // necessary. This would either need to work on the encoded
1402        // representation (which, we'd have to fall back to the linear scan) or
1403        // we'd need to add a Hash/Ord bound to Schema.
1404        let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1405            K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1406        });
1407        match existing_id {
1408            Some((schema_id, _)) => {
1409                // TODO: Validate that the decoded schemas still produce records
1410                // of the recorded DataType, to detect shenanigans. Probably
1411                // best to wait until we've turned on Schema2 in prod and thus
1412                // committed to the current mappings.
1413                Break(NoOpStateTransition(Some(*schema_id)))
1414            }
1415            None if self.is_tombstone() => {
1416                // TODO: Is this right?
1417                Break(NoOpStateTransition(None))
1418            }
1419            None if self.schemas.is_empty() => {
1420                // We'll have to do something more sophisticated here to
1421                // generate the next id if/when we start supporting the removal
1422                // of schemas.
1423                let id = SchemaId(self.schemas.len());
1424                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1425                    .expect("valid key schema");
1426                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1427                    .expect("valid val schema");
1428                let prev = self.schemas.insert(
1429                    id,
1430                    EncodedSchemas {
1431                        key: K::encode_schema(key_schema),
1432                        key_data_type: encode_data_type(&key_data_type),
1433                        val: V::encode_schema(val_schema),
1434                        val_data_type: encode_data_type(&val_data_type),
1435                    },
1436                );
1437                assert_eq!(prev, None);
1438                Continue(Some(id))
1439            }
1440            None => {
1441                info!(
1442                    "register_schemas got {:?} expected {:?}",
1443                    key_schema,
1444                    self.schemas
1445                        .iter()
1446                        .map(|(id, x)| (id, K::decode_schema(&x.key)))
1447                        .collect::<Vec<_>>()
1448                );
1449                // Until we implement persist schema changes, only allow at most
1450                // one registered schema.
1451                Break(NoOpStateTransition(None))
1452            }
1453        }
1454    }
1455
1456    pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1457        &mut self,
1458        expected: SchemaId,
1459        key_schema: &K::Schema,
1460        val_schema: &V::Schema,
1461    ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1462        fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1463            // To be defensive, create an empty batch and inspect the resulting
1464            // data type (as opposed to something like allowing the `Schema` to
1465            // declare the DataType).
1466            let array = Schema::encoder(schema).expect("valid schema").finish();
1467            Array::data_type(&array).clone()
1468        }
1469
1470        let (current_id, current) = self
1471            .schemas
1472            .last_key_value()
1473            .expect("all shards have a schema");
1474        if *current_id != expected {
1475            return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1476                schema_id: *current_id,
1477                key: K::decode_schema(&current.key),
1478                val: V::decode_schema(&current.val),
1479            }));
1480        }
1481
1482        let current_key = K::decode_schema(&current.key);
1483        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
1484        let current_val = V::decode_schema(&current.val);
1485        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
1486
1487        let key_dt = data_type(key_schema);
1488        let val_dt = data_type(val_schema);
1489
1490        // If the schema is exactly the same as the current one, no-op.
1491        if current_key == *key_schema
1492            && current_key_dt == key_dt
1493            && current_val == *val_schema
1494            && current_val_dt == val_dt
1495        {
1496            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1497        }
1498
1499        let key_fn = backward_compatible(&current_key_dt, &key_dt);
1500        let val_fn = backward_compatible(&current_val_dt, &val_dt);
1501        let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1502            return Break(NoOpStateTransition(CaESchema::Incompatible));
1503        };
1504        // Persist initially disallows dropping columns. This would require a
1505        // bunch more work (e.g. not safe to use the latest schema in
1506        // compaction) and isn't initially necessary in mz.
1507        if key_fn.contains_drop() || val_fn.contains_drop() {
1508            return Break(NoOpStateTransition(CaESchema::Incompatible));
1509        }
1510
1511        // We'll have to do something more sophisticated here to
1512        // generate the next id if/when we start supporting the removal
1513        // of schemas.
1514        let id = SchemaId(self.schemas.len());
1515        self.schemas.insert(
1516            id,
1517            EncodedSchemas {
1518                key: K::encode_schema(key_schema),
1519                key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1520                val: V::encode_schema(val_schema),
1521                val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1522            },
1523        );
1524        Continue(CaESchema::Ok(id))
1525    }
1526
1527    pub fn compare_and_append(
1528        &mut self,
1529        batch: &HollowBatch<T>,
1530        writer_id: &WriterId,
1531        heartbeat_timestamp_ms: u64,
1532        lease_duration_ms: u64,
1533        idempotency_token: &IdempotencyToken,
1534        debug_info: &HandleDebugState,
1535        inline_writes_total_max_bytes: usize,
1536        claim_compaction_percent: usize,
1537        claim_compaction_min_version: Option<&Version>,
1538    ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1539        // We expire all writers if the upper and since both advance to the
1540        // empty antichain. Gracefully handle this. At the same time,
1541        // short-circuit the cmd application so we don't needlessly create new
1542        // SeqNos.
1543        if self.is_tombstone() {
1544            assert_eq!(self.trace.upper(), &Antichain::new());
1545            return Break(CompareAndAppendBreak::Upper {
1546                shard_upper: Antichain::new(),
1547                // This writer might have been registered before the shard upper
1548                // was advanced, which would make this pessimistic in the
1549                // Indeterminate handling of compare_and_append at the machine
1550                // level, but that's fine.
1551                writer_upper: Antichain::new(),
1552            });
1553        }
1554
1555        let writer_state = self
1556            .writers
1557            .entry(writer_id.clone())
1558            .or_insert_with(|| WriterState {
1559                last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1560                lease_duration_ms,
1561                most_recent_write_token: IdempotencyToken::SENTINEL,
1562                most_recent_write_upper: Antichain::from_elem(T::minimum()),
1563                debug: debug_info.clone(),
1564            });
1565
1566        if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1567            return Break(CompareAndAppendBreak::InvalidUsage(
1568                InvalidUsage::InvalidBounds {
1569                    lower: batch.desc.lower().clone(),
1570                    upper: batch.desc.upper().clone(),
1571                },
1572            ));
1573        }
1574
1575        // If the time interval is empty, the list of updates must also be
1576        // empty.
1577        if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1578            return Break(CompareAndAppendBreak::InvalidUsage(
1579                InvalidUsage::InvalidEmptyTimeInterval {
1580                    lower: batch.desc.lower().clone(),
1581                    upper: batch.desc.upper().clone(),
1582                    keys: batch
1583                        .parts
1584                        .iter()
1585                        .map(|x| x.printable_name().to_owned())
1586                        .collect(),
1587                },
1588            ));
1589        }
1590
1591        if idempotency_token == &writer_state.most_recent_write_token {
1592            // If the last write had the same idempotency_token, then this must
1593            // have already committed. Sanity check that the most recent write
1594            // upper matches and that the shard upper is at least the write
1595            // upper, if it's not something very suspect is going on.
1596            assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1597            assert!(
1598                PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1599                "{:?} vs {:?}",
1600                batch.desc.upper(),
1601                self.trace.upper()
1602            );
1603            return Break(CompareAndAppendBreak::AlreadyCommitted);
1604        }
1605
1606        let shard_upper = self.trace.upper();
1607        if shard_upper != batch.desc.lower() {
1608            return Break(CompareAndAppendBreak::Upper {
1609                shard_upper: shard_upper.clone(),
1610                writer_upper: writer_state.most_recent_write_upper.clone(),
1611            });
1612        }
1613
1614        let new_inline_bytes = batch.inline_bytes();
1615        if new_inline_bytes > 0 {
1616            let mut existing_inline_bytes = 0;
1617            self.trace
1618                .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1619            // TODO: For very small batches, it may actually _increase_ the size
1620            // of state to flush them out. Consider another threshold under
1621            // which an inline part can be appended no matter what.
1622            if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1623                return Break(CompareAndAppendBreak::InlineBackpressure);
1624            }
1625        }
1626
1627        let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1628            self.trace.push_batch(batch.clone())
1629        } else {
1630            Vec::new()
1631        };
1632
1633        // NB: we don't claim unclaimed compactions when the recording flag is off, even if we'd
1634        // otherwise be allowed to, to avoid triggering the same compactions in every writer.
1635        let all_empty_reqs = merge_reqs
1636            .iter()
1637            .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1638        if all_empty_reqs && !batch.is_empty() {
1639            let mut reqs_to_take = claim_compaction_percent / 100;
1640            if (usize::cast_from(idempotency_token.hashed()) % 100)
1641                < (claim_compaction_percent % 100)
1642            {
1643                reqs_to_take += 1;
1644            }
1645            let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1646            let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1647            merge_reqs.extend(
1648                // We keep the oldest `reqs_to_take` batches, under the theory that they're least
1649                // likely to be compacted soon for other reasons.
1650                self.trace
1651                    .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1652                    .take(reqs_to_take),
1653            )
1654        }
1655
1656        for req in &merge_reqs {
1657            self.trace.claim_compaction(
1658                req.id,
1659                ActiveCompaction {
1660                    start_ms: heartbeat_timestamp_ms,
1661                },
1662            )
1663        }
1664
1665        debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1666        writer_state.most_recent_write_token = idempotency_token.clone();
1667        // The writer's most recent upper should only go forward.
1668        assert!(
1669            PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1670            "{:?} vs {:?}",
1671            &writer_state.most_recent_write_upper,
1672            batch.desc.upper()
1673        );
1674        writer_state
1675            .most_recent_write_upper
1676            .clone_from(batch.desc.upper());
1677
1678        // Heartbeat the writer state to keep our idempotency token alive.
1679        writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1680            heartbeat_timestamp_ms,
1681            writer_state.last_heartbeat_timestamp_ms,
1682        );
1683
1684        Continue(merge_reqs)
1685    }
1686
1687    pub fn apply_merge_res(
1688        &mut self,
1689        res: &FueledMergeRes<T>,
1690    ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1691        // We expire all writers if the upper and since both advance to the
1692        // empty antichain. Gracefully handle this. At the same time,
1693        // short-circuit the cmd application so we don't needlessly create new
1694        // SeqNos.
1695        if self.is_tombstone() {
1696            return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1697        }
1698
1699        let apply_merge_result = self.trace.apply_merge_res(res);
1700        Continue(apply_merge_result)
1701    }
1702
1703    pub fn spine_exert(
1704        &mut self,
1705        fuel: usize,
1706    ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1707        let (merge_reqs, did_work) = self.trace.exert(fuel);
1708        if did_work {
1709            Continue(merge_reqs)
1710        } else {
1711            assert!(merge_reqs.is_empty());
1712            // Break if we have nothing useful to do to save the seqno (and
1713            // resulting crdb traffic)
1714            Break(NoOpStateTransition(Vec::new()))
1715        }
1716    }
1717
1718    pub fn downgrade_since(
1719        &mut self,
1720        reader_id: &LeasedReaderId,
1721        seqno: SeqNo,
1722        outstanding_seqno: Option<SeqNo>,
1723        new_since: &Antichain<T>,
1724        heartbeat_timestamp_ms: u64,
1725    ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1726        // We expire all readers if the upper and since both advance to the
1727        // empty antichain. Gracefully handle this. At the same time,
1728        // short-circuit the cmd application so we don't needlessly create new
1729        // SeqNos.
1730        if self.is_tombstone() {
1731            return Break(NoOpStateTransition(Since(Antichain::new())));
1732        }
1733
1734        // The only way to have a missing reader in state is if it's been expired... and in that
1735        // case, we behave the same as though that reader had been downgraded to the empty antichain.
1736        let Some(reader_state) = self.leased_reader(reader_id) else {
1737            tracing::warn!(
1738                "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1739            );
1740            return Break(NoOpStateTransition(Since(Antichain::new())));
1741        };
1742
1743        // Also use this as an opportunity to heartbeat the reader and downgrade
1744        // the seqno capability.
1745        reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1746            heartbeat_timestamp_ms,
1747            reader_state.last_heartbeat_timestamp_ms,
1748        );
1749
1750        let seqno = match outstanding_seqno {
1751            Some(outstanding_seqno) => {
1752                assert!(
1753                    outstanding_seqno >= reader_state.seqno,
1754                    "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1755                    is behind current reader_state ({:?})",
1756                    outstanding_seqno,
1757                    reader_state.seqno,
1758                );
1759                std::cmp::min(outstanding_seqno, seqno)
1760            }
1761            None => seqno,
1762        };
1763
1764        reader_state.seqno = seqno;
1765
1766        let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1767            reader_state.since.clone_from(new_since);
1768            self.update_since();
1769            new_since.clone()
1770        } else {
1771            // No-op, but still commit the state change so that this gets
1772            // linearized.
1773            reader_state.since.clone()
1774        };
1775
1776        Continue(Since(reader_current_since))
1777    }
1778
1779    pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1780        &mut self,
1781        reader_id: &CriticalReaderId,
1782        expected_opaque: &O,
1783        (new_opaque, new_since): (&O, &Antichain<T>),
1784    ) -> ControlFlow<
1785        NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1786        Result<Since<T>, (O, Since<T>)>,
1787    > {
1788        // We expire all readers if the upper and since both advance to the
1789        // empty antichain. Gracefully handle this. At the same time,
1790        // short-circuit the cmd application so we don't needlessly create new
1791        // SeqNos.
1792        if self.is_tombstone() {
1793            // Match the idempotence behavior below of ignoring the token if
1794            // since is already advanced enough (in this case, because it's a
1795            // tombstone, we know it's the empty antichain).
1796            return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1797        }
1798
1799        let reader_state = self.critical_reader(reader_id);
1800        assert_eq!(reader_state.opaque_codec, O::codec_name());
1801
1802        if &O::decode(reader_state.opaque.0) != expected_opaque {
1803            // No-op, but still commit the state change so that this gets
1804            // linearized.
1805            return Continue(Err((
1806                Codec64::decode(reader_state.opaque.0),
1807                Since(reader_state.since.clone()),
1808            )));
1809        }
1810
1811        reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1812        if PartialOrder::less_equal(&reader_state.since, new_since) {
1813            reader_state.since.clone_from(new_since);
1814            self.update_since();
1815            Continue(Ok(Since(new_since.clone())))
1816        } else {
1817            // no work to be done -- the reader state's `since` is already sufficiently
1818            // advanced. we may someday need to revisit this branch when it's possible
1819            // for two `since` frontiers to be incomparable.
1820            Continue(Ok(Since(reader_state.since.clone())))
1821        }
1822    }
1823
1824    pub fn heartbeat_leased_reader(
1825        &mut self,
1826        reader_id: &LeasedReaderId,
1827        heartbeat_timestamp_ms: u64,
1828    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1829        // We expire all readers if the upper and since both advance to the
1830        // empty antichain. Gracefully handle this. At the same time,
1831        // short-circuit the cmd application so we don't needlessly create new
1832        // SeqNos.
1833        if self.is_tombstone() {
1834            return Break(NoOpStateTransition(false));
1835        }
1836
1837        match self.leased_readers.get_mut(reader_id) {
1838            Some(reader_state) => {
1839                reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1840                    heartbeat_timestamp_ms,
1841                    reader_state.last_heartbeat_timestamp_ms,
1842                );
1843                Continue(true)
1844            }
1845            // No-op, but we still commit the state change so that this gets
1846            // linearized (maybe we're looking at old state).
1847            None => Continue(false),
1848        }
1849    }
1850
1851    pub fn expire_leased_reader(
1852        &mut self,
1853        reader_id: &LeasedReaderId,
1854    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1855        // We expire all readers if the upper and since both advance to the
1856        // empty antichain. Gracefully handle this. At the same time,
1857        // short-circuit the cmd application so we don't needlessly create new
1858        // SeqNos.
1859        if self.is_tombstone() {
1860            return Break(NoOpStateTransition(false));
1861        }
1862
1863        let existed = self.leased_readers.remove(reader_id).is_some();
1864        if existed {
1865            // TODO(database-issues#6885): Re-enable this
1866            //
1867            // Temporarily disabling this because we think it might be the cause
1868            // of the remap since bug. Specifically, a clusterd process has a
1869            // ReadHandle for maintaining the once and one inside a Listen. If
1870            // we crash and stay down for longer than the read lease duration,
1871            // it's possible that an expiry of them both in quick succession
1872            // jumps the since forward to the Listen one.
1873            //
1874            // Don't forget to update the downgrade_since when this gets
1875            // switched back on.
1876            //
1877            // self.update_since();
1878        }
1879        // No-op if existed is false, but still commit the state change so that
1880        // this gets linearized.
1881        Continue(existed)
1882    }
1883
1884    pub fn expire_critical_reader(
1885        &mut self,
1886        reader_id: &CriticalReaderId,
1887    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1888        // We expire all readers if the upper and since both advance to the
1889        // empty antichain. Gracefully handle this. At the same time,
1890        // short-circuit the cmd application so we don't needlessly create new
1891        // SeqNos.
1892        if self.is_tombstone() {
1893            return Break(NoOpStateTransition(false));
1894        }
1895
1896        let existed = self.critical_readers.remove(reader_id).is_some();
1897        if existed {
1898            // TODO(database-issues#6885): Re-enable this
1899            //
1900            // Temporarily disabling this because we think it might be the cause
1901            // of the remap since bug. Specifically, a clusterd process has a
1902            // ReadHandle for maintaining the once and one inside a Listen. If
1903            // we crash and stay down for longer than the read lease duration,
1904            // it's possible that an expiry of them both in quick succession
1905            // jumps the since forward to the Listen one.
1906            //
1907            // Don't forget to update the downgrade_since when this gets
1908            // switched back on.
1909            //
1910            // self.update_since();
1911        }
1912        // This state transition is a no-op if existed is false, but we still
1913        // commit the state change so that this gets linearized (maybe we're
1914        // looking at old state).
1915        Continue(existed)
1916    }
1917
1918    pub fn expire_writer(
1919        &mut self,
1920        writer_id: &WriterId,
1921    ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1922        // We expire all writers if the upper and since both advance to the
1923        // empty antichain. Gracefully handle this. At the same time,
1924        // short-circuit the cmd application so we don't needlessly create new
1925        // SeqNos.
1926        if self.is_tombstone() {
1927            return Break(NoOpStateTransition(false));
1928        }
1929
1930        let existed = self.writers.remove(writer_id).is_some();
1931        // This state transition is a no-op if existed is false, but we still
1932        // commit the state change so that this gets linearized (maybe we're
1933        // looking at old state).
1934        Continue(existed)
1935    }
1936
1937    fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
1938        self.leased_readers.get_mut(id)
1939    }
1940
1941    fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
1942        self.critical_readers
1943            .get_mut(id)
1944            .unwrap_or_else(|| {
1945                panic!(
1946                    "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
1947                    id
1948                )
1949            })
1950    }
1951
1952    fn critical_since(&self) -> Option<Antichain<T>> {
1953        let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
1954        let mut since = critical_sinces.next().cloned()?;
1955        for s in critical_sinces {
1956            since.meet_assign(s);
1957        }
1958        Some(since)
1959    }
1960
1961    fn update_since(&mut self) {
1962        let mut sinces_iter = self
1963            .leased_readers
1964            .values()
1965            .map(|x| &x.since)
1966            .chain(self.critical_readers.values().map(|x| &x.since));
1967        let mut since = match sinces_iter.next() {
1968            Some(since) => since.clone(),
1969            None => {
1970                // If there are no current readers, leave `since` unchanged so
1971                // it doesn't regress.
1972                return;
1973            }
1974        };
1975        while let Some(s) = sinces_iter.next() {
1976            since.meet_assign(s);
1977        }
1978        self.trace.downgrade_since(&since);
1979    }
1980
1981    fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
1982        let mut seqno_since = seqno;
1983        for cap in self.leased_readers.values() {
1984            seqno_since = std::cmp::min(seqno_since, cap.seqno);
1985        }
1986        // critical_readers don't hold a seqno capability.
1987        seqno_since
1988    }
1989
1990    fn tombstone_batch() -> HollowBatch<T> {
1991        HollowBatch::empty(Description::new(
1992            Antichain::from_elem(T::minimum()),
1993            Antichain::new(),
1994            Antichain::new(),
1995        ))
1996    }
1997
1998    pub(crate) fn is_tombstone(&self) -> bool {
1999        self.trace.upper().is_empty()
2000            && self.trace.since().is_empty()
2001            && self.writers.is_empty()
2002            && self.leased_readers.is_empty()
2003            && self.critical_readers.is_empty()
2004    }
2005
2006    pub(crate) fn is_single_empty_batch(&self) -> bool {
2007        let mut batch_count = 0;
2008        let mut is_empty = true;
2009        self.trace.map_batches(|b| {
2010            batch_count += 1;
2011            is_empty &= b.is_empty()
2012        });
2013        batch_count <= 1 && is_empty
2014    }
2015
2016    pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2017        assert_eq!(self.trace.upper(), &Antichain::new());
2018        assert_eq!(self.trace.since(), &Antichain::new());
2019
2020        // Remember our current state, so we can decide whether we have to
2021        // record a transition in durable state.
2022        let was_tombstone = self.is_tombstone();
2023
2024        // Enter the "tombstone" state, if we're not in it already.
2025        self.writers.clear();
2026        self.leased_readers.clear();
2027        self.critical_readers.clear();
2028
2029        debug_assert!(self.is_tombstone());
2030
2031        // Now that we're in a "tombstone" state -- ie. nobody can read the data from a shard or write to
2032        // it -- the actual contents of our batches no longer matter.
2033        // This method progressively replaces batches in our state with simpler versions, to allow
2034        // freeing up resources and to reduce the state size. (Since the state is unreadable, this
2035        // is not visible to clients.) We do this a little bit at a time to avoid really large state
2036        // transitions... most operations happen incrementally, and large single writes can overwhelm
2037        // a backing store. See comments for why we believe the relevant diffs are reasonably small.
2038
2039        let mut to_replace = None;
2040        let mut batch_count = 0;
2041        self.trace.map_batches(|b| {
2042            batch_count += 1;
2043            if !b.is_empty() && to_replace.is_none() {
2044                to_replace = Some(b.desc.clone());
2045            }
2046        });
2047        if let Some(desc) = to_replace {
2048            // We have a nonempty batch: replace it with an empty batch and return.
2049            // This should not produce an excessively large diff: if it did, we wouldn't have been
2050            // able to append that batch in the first place.
2051            let fake_merge = FueledMergeRes {
2052                output: HollowBatch::empty(desc),
2053            };
2054            let result = self.trace.apply_merge_res(&fake_merge);
2055            assert!(
2056                result.matched(),
2057                "merge with a matching desc should always match"
2058            );
2059            Continue(())
2060        } else if batch_count > 1 {
2061            // All our batches are empty, but we have more than one of them. Replace the whole set
2062            // with a new single-batch trace.
2063            // This produces a diff with a size proportional to the number of batches, but since
2064            // Spine keeps a logarithmic number of batches this should never be excessively large.
2065            let mut new_trace = Trace::default();
2066            new_trace.downgrade_since(&Antichain::new());
2067            let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2068            assert_eq!(merge_reqs, Vec::new());
2069            self.trace = new_trace;
2070            Continue(())
2071        } else if !was_tombstone {
2072            // We were not tombstoned before, so have to make sure this state
2073            // transition is recorded.
2074            Continue(())
2075        } else {
2076            // All our batches are empty, and there's only one... there's no shrinking this
2077            // tombstone further.
2078            Break(NoOpStateTransition(()))
2079        }
2080    }
2081}
2082
2083// TODO: Document invariants.
2084#[derive(Debug)]
2085#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2086pub struct State<T> {
2087    pub(crate) applier_version: semver::Version,
2088    pub(crate) shard_id: ShardId,
2089
2090    pub(crate) seqno: SeqNo,
2091    /// A strictly increasing wall time of when this state was written, in
2092    /// milliseconds since the unix epoch.
2093    pub(crate) walltime_ms: u64,
2094    /// Hostname of the persist user that created this version of state. For
2095    /// debugging.
2096    pub(crate) hostname: String,
2097    pub(crate) collections: StateCollections<T>,
2098}
2099
2100/// A newtype wrapper of State that guarantees the K, V, and D codecs match the
2101/// ones in durable storage.
2102pub struct TypedState<K, V, T, D> {
2103    pub(crate) state: State<T>,
2104
2105    // According to the docs, PhantomData is to "mark things that act like they
2106    // own a T". State doesn't actually own K, V, or D, just the ability to
2107    // produce them. Using the `fn() -> T` pattern gets us the same variance as
2108    // T [1], but also allows State to correctly derive Send+Sync.
2109    //
2110    // [1]:
2111    //     https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
2112    pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2113}
2114
2115impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2116    #[cfg(any(test, debug_assertions))]
2117    pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2118        TypedState {
2119            state: State {
2120                applier_version,
2121                shard_id: self.shard_id.clone(),
2122                seqno: self.seqno.clone(),
2123                walltime_ms: self.walltime_ms,
2124                hostname,
2125                collections: self.collections.clone(),
2126            },
2127            _phantom: PhantomData,
2128        }
2129    }
2130
2131    pub(crate) fn clone_for_rollup(&self) -> Self {
2132        TypedState {
2133            state: State {
2134                applier_version: self.applier_version.clone(),
2135                shard_id: self.shard_id.clone(),
2136                seqno: self.seqno.clone(),
2137                walltime_ms: self.walltime_ms,
2138                hostname: self.hostname.clone(),
2139                collections: self.collections.clone(),
2140            },
2141            _phantom: PhantomData,
2142        }
2143    }
2144}
2145
2146impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2147    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2148        // Deconstruct self so we get a compile failure if new fields
2149        // are added.
2150        let TypedState { state, _phantom } = self;
2151        f.debug_struct("TypedState").field("state", state).finish()
2152    }
2153}
2154
2155// Impl PartialEq regardless of the type params.
2156#[cfg(any(test, debug_assertions))]
2157impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2158    fn eq(&self, other: &Self) -> bool {
2159        // Deconstruct self and other so we get a compile failure if new fields
2160        // are added.
2161        let TypedState {
2162            state: self_state,
2163            _phantom,
2164        } = self;
2165        let TypedState {
2166            state: other_state,
2167            _phantom,
2168        } = other;
2169        self_state == other_state
2170    }
2171}
2172
2173impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2174    type Target = State<T>;
2175
2176    fn deref(&self) -> &Self::Target {
2177        &self.state
2178    }
2179}
2180
2181impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2182    fn deref_mut(&mut self) -> &mut Self::Target {
2183        &mut self.state
2184    }
2185}
2186
2187impl<K, V, T, D> TypedState<K, V, T, D>
2188where
2189    K: Codec,
2190    V: Codec,
2191    T: Timestamp + Lattice + Codec64,
2192    D: Codec64,
2193{
2194    pub fn new(
2195        applier_version: Version,
2196        shard_id: ShardId,
2197        hostname: String,
2198        walltime_ms: u64,
2199    ) -> Self {
2200        let state = State {
2201            applier_version,
2202            shard_id,
2203            seqno: SeqNo::minimum(),
2204            walltime_ms,
2205            hostname,
2206            collections: StateCollections {
2207                last_gc_req: SeqNo::minimum(),
2208                rollups: BTreeMap::new(),
2209                active_rollup: None,
2210                active_gc: None,
2211                leased_readers: BTreeMap::new(),
2212                critical_readers: BTreeMap::new(),
2213                writers: BTreeMap::new(),
2214                schemas: BTreeMap::new(),
2215                trace: Trace::default(),
2216            },
2217        };
2218        TypedState {
2219            state,
2220            _phantom: PhantomData,
2221        }
2222    }
2223
2224    pub fn clone_apply<R, E, WorkFn>(
2225        &self,
2226        cfg: &PersistConfig,
2227        work_fn: &mut WorkFn,
2228    ) -> ControlFlow<E, (R, Self)>
2229    where
2230        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2231    {
2232        // Now that we support one minor version of forward compatibility, tag
2233        // each version of state with the _max_ version of code that has ever
2234        // contributed to it. Otherwise, we'd erroneously allow rolling back an
2235        // arbitrary number of versions if they were done one-by-one.
2236        let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2237        let mut new_state = State {
2238            applier_version: new_applier_version.clone(),
2239            shard_id: self.shard_id,
2240            seqno: self.seqno.next(),
2241            walltime_ms: (cfg.now)(),
2242            hostname: cfg.hostname.clone(),
2243            collections: self.collections.clone(),
2244        };
2245        // Make sure walltime_ms is strictly increasing, in case clocks are
2246        // offset.
2247        if new_state.walltime_ms <= self.walltime_ms {
2248            new_state.walltime_ms = self.walltime_ms + 1;
2249        }
2250
2251        let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2252        let new_state = TypedState {
2253            state: new_state,
2254            _phantom: PhantomData,
2255        };
2256        Continue((work_ret, new_state))
2257    }
2258}
2259
2260impl<T> State<T>
2261where
2262    T: Timestamp + Lattice + Codec64,
2263{
2264    pub fn shard_id(&self) -> ShardId {
2265        self.shard_id
2266    }
2267
2268    pub fn seqno(&self) -> SeqNo {
2269        self.seqno
2270    }
2271
2272    pub fn since(&self) -> &Antichain<T> {
2273        self.collections.trace.since()
2274    }
2275
2276    pub fn upper(&self) -> &Antichain<T> {
2277        self.collections.trace.upper()
2278    }
2279
2280    pub fn spine_batch_count(&self) -> usize {
2281        self.collections.trace.num_spine_batches()
2282    }
2283
2284    pub fn size_metrics(&self) -> StateSizeMetrics {
2285        let mut ret = StateSizeMetrics::default();
2286        self.blobs().for_each(|x| match x {
2287            HollowBlobRef::Batch(x) => {
2288                ret.hollow_batch_count += 1;
2289                ret.batch_part_count += x.part_count();
2290                ret.num_updates += x.len;
2291
2292                let batch_size = x.encoded_size_bytes();
2293                for x in x.parts.iter() {
2294                    if x.ts_rewrite().is_some() {
2295                        ret.rewrite_part_count += 1;
2296                    }
2297                    if x.is_inline() {
2298                        ret.inline_part_count += 1;
2299                        ret.inline_part_bytes += x.inline_bytes();
2300                    }
2301                }
2302                ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2303                ret.state_batches_bytes += batch_size;
2304            }
2305            HollowBlobRef::Rollup(x) => {
2306                ret.state_rollup_count += 1;
2307                ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2308            }
2309        });
2310        ret
2311    }
2312
2313    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2314        // We maintain the invariant that every version of state has at least
2315        // one rollup.
2316        self.collections
2317            .rollups
2318            .iter()
2319            .rev()
2320            .next()
2321            .expect("State should have at least one rollup if seqno > minimum")
2322    }
2323
2324    pub(crate) fn seqno_since(&self) -> SeqNo {
2325        self.collections.seqno_since(self.seqno)
2326    }
2327
2328    // Returns whether the cmd proposing this state has been selected to perform
2329    // background garbage collection work.
2330    //
2331    // If it was selected, this information is recorded in the state itself for
2332    // commit along with the cmd's state transition. This helps us to avoid
2333    // redundant work.
2334    //
2335    // Correctness does not depend on a gc assignment being executed, nor on
2336    // them being executed in the order they are given. But it is expected that
2337    // gc assignments are best-effort respected. In practice, cmds like
2338    // register_foo or expire_foo, where it would be awkward, ignore gc.
2339    pub fn maybe_gc(
2340        &mut self,
2341        is_write: bool,
2342        use_active_gc: bool,
2343        fallback_threshold_ms: u64,
2344        now: u64,
2345    ) -> Option<GcReq> {
2346        // This is an arbitrary-ish threshold that scales with seqno, but never
2347        // gets particularly big. It probably could be much bigger and certainly
2348        // could use a tuning pass at some point.
2349        let gc_threshold = std::cmp::max(
2350            1,
2351            u64::from(self.seqno.0.next_power_of_two().trailing_zeros()),
2352        );
2353        let new_seqno_since = self.seqno_since();
2354        let should_gc = new_seqno_since
2355            .0
2356            .saturating_sub(self.collections.last_gc_req.0)
2357            >= gc_threshold;
2358
2359        // If we wouldn't otherwise gc, check if we have an active gc. If we do, and
2360        // it's been a while since it started, we should gc.
2361        let should_gc = if use_active_gc && !should_gc {
2362            match self.collections.active_gc {
2363                Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2364                None => false,
2365            }
2366        } else {
2367            should_gc
2368        };
2369        // Assign GC traffic preferentially to writers, falling back to anyone
2370        // generating new state versions if there are no writers.
2371        let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2372        // Always assign GC work to a tombstoned shard to have the chance to
2373        // clean up any residual blobs. This is safe (won't cause excess gc)
2374        // as the only allowed command after becoming a tombstone is to write
2375        // the final rollup.
2376        let tombstone_needs_gc = self.collections.is_tombstone();
2377        let should_gc = should_gc || tombstone_needs_gc;
2378        let should_gc = if use_active_gc {
2379            // If we have an active gc, we should only gc if the active gc is
2380            // sufficiently old. This is to avoid doing more gc work than
2381            // necessary.
2382            should_gc
2383                && match self.collections.active_gc {
2384                    Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2385                    None => true,
2386                }
2387        } else {
2388            should_gc
2389        };
2390        if should_gc {
2391            self.collections.last_gc_req = new_seqno_since;
2392            Some(GcReq {
2393                shard_id: self.shard_id,
2394                new_seqno_since,
2395            })
2396        } else {
2397            None
2398        }
2399    }
2400
2401    /// Return the number of gc-ineligible state versions.
2402    pub fn seqnos_held(&self) -> usize {
2403        usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2404    }
2405
2406    /// Expire all readers and writers up to the given walltime_ms.
2407    pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2408        let mut metrics = ExpiryMetrics::default();
2409        let shard_id = self.shard_id();
2410        self.collections.leased_readers.retain(|id, state| {
2411            let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2412            if !retain {
2413                info!(
2414                    "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2415                    state.debug.purpose
2416                );
2417                metrics.readers_expired += 1;
2418            }
2419            retain
2420        });
2421        // critical_readers don't need forced expiration. (In fact, that's the point!)
2422        self.collections.writers.retain(|id, state| {
2423            let retain =
2424                (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2425            if !retain {
2426                info!(
2427                    "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2428                    state.debug.purpose
2429                );
2430                metrics.writers_expired += 1;
2431            }
2432            retain
2433        });
2434        metrics
2435    }
2436
2437    /// Returns the batches that contain updates up to (and including) the given `as_of`. The
2438    /// result `Vec` contains blob keys, along with a [`Description`] of what updates in the
2439    /// referenced parts are valid to read.
2440    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2441        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2442            return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2443                self.collections.trace.since().clone(),
2444            )));
2445        }
2446        let upper = self.collections.trace.upper();
2447        if PartialOrder::less_equal(upper, as_of) {
2448            return Err(SnapshotErr::AsOfNotYetAvailable(
2449                self.seqno,
2450                Upper(upper.clone()),
2451            ));
2452        }
2453
2454        let batches = self
2455            .collections
2456            .trace
2457            .batches()
2458            .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2459            .cloned()
2460            .collect();
2461        Ok(batches)
2462    }
2463
2464    // NB: Unlike the other methods here, this one is read-only.
2465    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<Result<(), Upper<T>>, Since<T>> {
2466        if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2467            return Err(Since(self.collections.trace.since().clone()));
2468        }
2469        let upper = self.collections.trace.upper();
2470        if PartialOrder::less_equal(upper, as_of) {
2471            return Ok(Err(Upper(upper.clone())));
2472        }
2473        Ok(Ok(()))
2474    }
2475
2476    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2477        // TODO: Avoid the O(n^2) here: `next_listen_batch` is called once per
2478        // batch and this iterates through all batches to find the next one.
2479        self.collections
2480            .trace
2481            .batches()
2482            .find(|b| {
2483                PartialOrder::less_equal(b.desc.lower(), frontier)
2484                    && PartialOrder::less_than(frontier, b.desc.upper())
2485            })
2486            .cloned()
2487            .ok_or(self.seqno)
2488    }
2489
2490    pub fn active_rollup(&self) -> Option<ActiveRollup> {
2491        self.collections.active_rollup
2492    }
2493
2494    pub fn need_rollup(
2495        &self,
2496        threshold: usize,
2497        use_active_rollup: bool,
2498        fallback_threshold_ms: u64,
2499        now: u64,
2500    ) -> Option<SeqNo> {
2501        let (latest_rollup_seqno, _) = self.latest_rollup();
2502
2503        // Tombstoned shards require one final rollup. However, because we
2504        // write a rollup as of SeqNo X and then link it in using a state
2505        // transition (in this case from X to X+1), the minimum number of
2506        // live diffs is actually two. Detect when we're in this minimal
2507        // two diff state and stop the (otherwise) infinite iteration.
2508        if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2509            return Some(self.seqno);
2510        }
2511
2512        let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2513
2514        if use_active_rollup {
2515            // If sequnos_since_last_rollup>threshold, and there is no existing rollup in progress,
2516            // we should start a new rollup.
2517            // If there is an active rollup, we should check if it has been running too long.
2518            // If it has, we should start a new rollup.
2519            // This is to guard against a worker dying/taking too long/etc.
2520            if seqnos_since_last_rollup > u64::cast_from(threshold) {
2521                match self.active_rollup() {
2522                    Some(active_rollup) => {
2523                        if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2524                            return Some(self.seqno);
2525                        }
2526                    }
2527                    None => {
2528                        return Some(self.seqno);
2529                    }
2530                }
2531            }
2532        } else {
2533            // every `threshold` seqnos since the latest rollup, assign rollup maintenance.
2534            // we avoid assigning rollups to every seqno past the threshold to avoid handles
2535            // racing / performing redundant work.
2536            if seqnos_since_last_rollup > 0
2537                && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2538            {
2539                return Some(self.seqno);
2540            }
2541
2542            // however, since maintenance is best-effort and could fail, do assign rollup
2543            // work to every seqno after a fallback threshold to ensure one is written.
2544            if seqnos_since_last_rollup
2545                > u64::cast_from(
2546                    threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2547                )
2548            {
2549                return Some(self.seqno);
2550            }
2551        }
2552
2553        None
2554    }
2555
2556    pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<T>> {
2557        let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2558        let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2559        batches.chain(rollups)
2560    }
2561}
2562
2563fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2564    let val = hex::encode(val);
2565    val.serialize(s)
2566}
2567
2568fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2569    val: &Option<LazyProto<T>>,
2570    s: S,
2571) -> Result<S::Ok, S::Error> {
2572    val.as_ref()
2573        .map(|lazy| hex::encode(&lazy.into_proto()))
2574        .serialize(s)
2575}
2576
2577fn serialize_part_stats<S: Serializer>(
2578    val: &Option<LazyPartStats>,
2579    s: S,
2580) -> Result<S::Ok, S::Error> {
2581    let val = val.as_ref().map(|x| x.decode().key);
2582    val.serialize(s)
2583}
2584
2585fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2586    // This is only used for debugging, so hack to assume that D is i64.
2587    let val = val.map(i64::decode);
2588    val.serialize(s)
2589}
2590
2591// This Serialize impl is used for debugging/testing and exposed via SQL. It's
2592// intentionally gated from users, so not strictly subject to our backward
2593// compatibility guarantees, but still probably best to be thoughtful about
2594// making unnecessary changes. Additionally, it's nice to make the output as
2595// nice to use as possible without tying our hands for the actual code usages.
2596impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2597    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2598        let State {
2599            applier_version,
2600            shard_id,
2601            seqno,
2602            walltime_ms,
2603            hostname,
2604            collections:
2605                StateCollections {
2606                    last_gc_req,
2607                    rollups,
2608                    active_rollup,
2609                    active_gc,
2610                    leased_readers,
2611                    critical_readers,
2612                    writers,
2613                    schemas,
2614                    trace,
2615                },
2616        } = self;
2617        let mut s = s.serialize_struct("State", 13)?;
2618        let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2619        let () = s.serialize_field("shard_id", shard_id)?;
2620        let () = s.serialize_field("seqno", seqno)?;
2621        let () = s.serialize_field("walltime_ms", walltime_ms)?;
2622        let () = s.serialize_field("hostname", hostname)?;
2623        let () = s.serialize_field("last_gc_req", last_gc_req)?;
2624        let () = s.serialize_field("rollups", rollups)?;
2625        let () = s.serialize_field("active_rollup", active_rollup)?;
2626        let () = s.serialize_field("active_gc", active_gc)?;
2627        let () = s.serialize_field("leased_readers", leased_readers)?;
2628        let () = s.serialize_field("critical_readers", critical_readers)?;
2629        let () = s.serialize_field("writers", writers)?;
2630        let () = s.serialize_field("schemas", schemas)?;
2631        let () = s.serialize_field("since", &trace.since().elements())?;
2632        let () = s.serialize_field("upper", &trace.upper().elements())?;
2633        let trace = trace.flatten();
2634        let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2635        let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2636        let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2637        let () = s.serialize_field("merges", &trace.merges)?;
2638        s.end()
2639    }
2640}
2641
2642#[derive(Debug, Default)]
2643pub struct StateSizeMetrics {
2644    pub hollow_batch_count: usize,
2645    pub batch_part_count: usize,
2646    pub rewrite_part_count: usize,
2647    pub num_updates: usize,
2648    pub largest_batch_bytes: usize,
2649    pub state_batches_bytes: usize,
2650    pub state_rollups_bytes: usize,
2651    pub state_rollup_count: usize,
2652    pub inline_part_count: usize,
2653    pub inline_part_bytes: usize,
2654}
2655
2656#[derive(Default)]
2657pub struct ExpiryMetrics {
2658    pub(crate) readers_expired: usize,
2659    pub(crate) writers_expired: usize,
2660}
2661
2662/// Wrapper for Antichain that represents a Since
2663#[derive(Debug, Clone, PartialEq)]
2664pub struct Since<T>(pub Antichain<T>);
2665
2666/// Wrapper for Antichain that represents an Upper
2667#[derive(Debug, PartialEq)]
2668pub struct Upper<T>(pub Antichain<T>);
2669
2670#[cfg(test)]
2671pub(crate) mod tests {
2672    use std::ops::Range;
2673
2674    use bytes::Bytes;
2675    use mz_build_info::DUMMY_BUILD_INFO;
2676    use mz_dyncfg::ConfigUpdates;
2677    use mz_ore::now::SYSTEM_TIME;
2678    use mz_ore::{assert_none, assert_ok};
2679    use mz_proto::RustType;
2680    use proptest::prelude::*;
2681    use proptest::strategy::ValueTree;
2682
2683    use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2684    use crate::PersistLocation;
2685    use crate::cache::PersistClientCache;
2686    use crate::internal::encoding::any_some_lazy_part_stats;
2687    use crate::internal::paths::RollupId;
2688    use crate::internal::trace::tests::any_trace;
2689    use crate::tests::new_test_client_cache;
2690
2691    use super::*;
2692
2693    const LEASE_DURATION_MS: u64 = 900 * 1000;
2694    fn debug_state() -> HandleDebugState {
2695        HandleDebugState {
2696            hostname: "debug".to_owned(),
2697            purpose: "finding the bugs".to_owned(),
2698        }
2699    }
2700
2701    pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2702        Strategy::prop_map(
2703            (
2704                any::<T>(),
2705                any::<T>(),
2706                any::<T>(),
2707                proptest::collection::vec(any_run_part::<T>(), 0..3),
2708                any::<usize>(),
2709                any::<bool>(),
2710            ),
2711            |(t0, t1, since, parts, len, runs)| {
2712                let (lower, upper) = if t0 <= t1 {
2713                    (Antichain::from_elem(t0), Antichain::from_elem(t1))
2714                } else {
2715                    (Antichain::from_elem(t1), Antichain::from_elem(t0))
2716                };
2717                let since = Antichain::from_elem(since);
2718                if runs && parts.len() > 2 {
2719                    let split_at = parts.len() / 2;
2720                    HollowBatch::new(
2721                        Description::new(lower, upper, since),
2722                        parts,
2723                        len % 10,
2724                        vec![RunMeta::default(), RunMeta::default()],
2725                        vec![split_at],
2726                    )
2727                } else {
2728                    HollowBatch::new_run(Description::new(lower, upper, since), parts, len % 10)
2729                }
2730            },
2731        )
2732    }
2733
2734    pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2735        Strategy::prop_map(
2736            (
2737                any::<bool>(),
2738                any_hollow_batch_part(),
2739                any::<Option<T>>(),
2740                any::<Option<SchemaId>>(),
2741                any::<Option<SchemaId>>(),
2742            ),
2743            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2744                if is_hollow {
2745                    BatchPart::Hollow(hollow)
2746                } else {
2747                    let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2748                    let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2749                    BatchPart::Inline {
2750                        updates,
2751                        ts_rewrite,
2752                        schema_id,
2753                        deprecated_schema_id,
2754                    }
2755                }
2756            },
2757        )
2758    }
2759
2760    pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2761        Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2762    }
2763
2764    pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2765    -> impl Strategy<Value = HollowBatchPart<T>> {
2766        Strategy::prop_map(
2767            (
2768                any::<PartialBatchKey>(),
2769                any::<usize>(),
2770                any::<Vec<u8>>(),
2771                any_some_lazy_part_stats(),
2772                any::<Option<T>>(),
2773                any::<[u8; 8]>(),
2774                any::<Option<BatchColumnarFormat>>(),
2775                any::<Option<SchemaId>>(),
2776                any::<Option<SchemaId>>(),
2777            ),
2778            |(
2779                key,
2780                encoded_size_bytes,
2781                key_lower,
2782                stats,
2783                ts_rewrite,
2784                diffs_sum,
2785                format,
2786                schema_id,
2787                deprecated_schema_id,
2788            )| {
2789                HollowBatchPart {
2790                    key,
2791                    encoded_size_bytes,
2792                    key_lower,
2793                    structured_key_lower: None,
2794                    stats,
2795                    ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2796                    diffs_sum: Some(diffs_sum),
2797                    format,
2798                    schema_id,
2799                    deprecated_schema_id,
2800                }
2801            },
2802        )
2803    }
2804
2805    pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
2806        Strategy::prop_map(
2807            (
2808                any::<SeqNo>(),
2809                any::<Option<T>>(),
2810                any::<u64>(),
2811                any::<u64>(),
2812                any::<HandleDebugState>(),
2813            ),
2814            |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
2815                // lease_duration_ms of 0 means this state was written by an old
2816                // version of code, which means we'll migrate it in the decode
2817                // path. Avoid.
2818                if lease_duration_ms == 0 {
2819                    lease_duration_ms += 1;
2820                }
2821                LeasedReaderState {
2822                    seqno,
2823                    since: since.map_or_else(Antichain::new, Antichain::from_elem),
2824                    last_heartbeat_timestamp_ms,
2825                    lease_duration_ms,
2826                    debug,
2827                }
2828            },
2829        )
2830    }
2831
2832    pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
2833    {
2834        Strategy::prop_map(
2835            (
2836                any::<Option<T>>(),
2837                any::<OpaqueState>(),
2838                any::<String>(),
2839                any::<HandleDebugState>(),
2840            ),
2841            |(since, opaque, opaque_codec, debug)| CriticalReaderState {
2842                since: since.map_or_else(Antichain::new, Antichain::from_elem),
2843                opaque,
2844                opaque_codec,
2845                debug,
2846            },
2847        )
2848    }
2849
2850    pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
2851        Strategy::prop_map(
2852            (
2853                any::<u64>(),
2854                any::<u64>(),
2855                any::<IdempotencyToken>(),
2856                any::<Option<T>>(),
2857                any::<HandleDebugState>(),
2858            ),
2859            |(
2860                last_heartbeat_timestamp_ms,
2861                lease_duration_ms,
2862                most_recent_write_token,
2863                most_recent_write_upper,
2864                debug,
2865            )| WriterState {
2866                last_heartbeat_timestamp_ms,
2867                lease_duration_ms,
2868                most_recent_write_token,
2869                most_recent_write_upper: most_recent_write_upper
2870                    .map_or_else(Antichain::new, Antichain::from_elem),
2871                debug,
2872            },
2873        )
2874    }
2875
2876    pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
2877        Strategy::prop_map(
2878            (
2879                any::<Vec<u8>>(),
2880                any::<Vec<u8>>(),
2881                any::<Vec<u8>>(),
2882                any::<Vec<u8>>(),
2883            ),
2884            |(key, key_data_type, val, val_data_type)| EncodedSchemas {
2885                key: Bytes::from(key),
2886                key_data_type: Bytes::from(key_data_type),
2887                val: Bytes::from(val),
2888                val_data_type: Bytes::from(val_data_type),
2889            },
2890        )
2891    }
2892
2893    pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
2894        num_trace_batches: Range<usize>,
2895    ) -> impl Strategy<Value = State<T>> {
2896        let part1 = (
2897            any::<ShardId>(),
2898            any::<SeqNo>(),
2899            any::<u64>(),
2900            any::<String>(),
2901            any::<SeqNo>(),
2902            proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
2903            proptest::option::of(any::<ActiveRollup>()),
2904        );
2905
2906        let part2 = (
2907            proptest::option::of(any::<ActiveGc>()),
2908            proptest::collection::btree_map(
2909                any::<LeasedReaderId>(),
2910                any_leased_reader_state::<T>(),
2911                1..3,
2912            ),
2913            proptest::collection::btree_map(
2914                any::<CriticalReaderId>(),
2915                any_critical_reader_state::<T>(),
2916                1..3,
2917            ),
2918            proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
2919            proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
2920            any_trace::<T>(num_trace_batches),
2921        );
2922
2923        (part1, part2).prop_map(
2924            |(
2925                (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
2926                (active_gc, leased_readers, critical_readers, writers, schemas, trace),
2927            )| State {
2928                applier_version: semver::Version::new(1, 2, 3),
2929                shard_id,
2930                seqno,
2931                walltime_ms,
2932                hostname,
2933                collections: StateCollections {
2934                    last_gc_req,
2935                    rollups,
2936                    active_rollup,
2937                    active_gc,
2938                    leased_readers,
2939                    critical_readers,
2940                    writers,
2941                    schemas,
2942                    trace,
2943                },
2944            },
2945        )
2946    }
2947
2948    pub(crate) fn hollow<T: Timestamp>(
2949        lower: T,
2950        upper: T,
2951        keys: &[&str],
2952        len: usize,
2953    ) -> HollowBatch<T> {
2954        HollowBatch::new_run(
2955            Description::new(
2956                Antichain::from_elem(lower),
2957                Antichain::from_elem(upper),
2958                Antichain::from_elem(T::minimum()),
2959            ),
2960            keys.iter()
2961                .map(|x| {
2962                    RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2963                        key: PartialBatchKey((*x).to_owned()),
2964                        encoded_size_bytes: 0,
2965                        key_lower: vec![],
2966                        structured_key_lower: None,
2967                        stats: None,
2968                        ts_rewrite: None,
2969                        diffs_sum: None,
2970                        format: None,
2971                        schema_id: None,
2972                        deprecated_schema_id: None,
2973                    }))
2974                })
2975                .collect(),
2976            len,
2977        )
2978    }
2979
2980    #[mz_ore::test]
2981    fn downgrade_since() {
2982        let mut state = TypedState::<(), (), u64, i64>::new(
2983            DUMMY_BUILD_INFO.semver_version(),
2984            ShardId::new(),
2985            "".to_owned(),
2986            0,
2987        );
2988        let reader = LeasedReaderId::new();
2989        let seqno = SeqNo::minimum();
2990        let now = SYSTEM_TIME.clone();
2991        let _ = state.collections.register_leased_reader(
2992            "",
2993            &reader,
2994            "",
2995            seqno,
2996            Duration::from_secs(10),
2997            now(),
2998            false,
2999        );
3000
3001        // The shard global since == 0 initially.
3002        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3003
3004        // Greater
3005        assert_eq!(
3006            state.collections.downgrade_since(
3007                &reader,
3008                seqno,
3009                None,
3010                &Antichain::from_elem(2),
3011                now()
3012            ),
3013            Continue(Since(Antichain::from_elem(2)))
3014        );
3015        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3016        // Equal (no-op)
3017        assert_eq!(
3018            state.collections.downgrade_since(
3019                &reader,
3020                seqno,
3021                None,
3022                &Antichain::from_elem(2),
3023                now()
3024            ),
3025            Continue(Since(Antichain::from_elem(2)))
3026        );
3027        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3028        // Less (no-op)
3029        assert_eq!(
3030            state.collections.downgrade_since(
3031                &reader,
3032                seqno,
3033                None,
3034                &Antichain::from_elem(1),
3035                now()
3036            ),
3037            Continue(Since(Antichain::from_elem(2)))
3038        );
3039        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3040
3041        // Create a second reader.
3042        let reader2 = LeasedReaderId::new();
3043        let _ = state.collections.register_leased_reader(
3044            "",
3045            &reader2,
3046            "",
3047            seqno,
3048            Duration::from_secs(10),
3049            now(),
3050            false,
3051        );
3052
3053        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3054        assert_eq!(
3055            state.collections.downgrade_since(
3056                &reader2,
3057                seqno,
3058                None,
3059                &Antichain::from_elem(3),
3060                now()
3061            ),
3062            Continue(Since(Antichain::from_elem(3)))
3063        );
3064        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3065        // Shard since == 3 when all readers have since >= 3.
3066        assert_eq!(
3067            state.collections.downgrade_since(
3068                &reader,
3069                seqno,
3070                None,
3071                &Antichain::from_elem(5),
3072                now()
3073            ),
3074            Continue(Since(Antichain::from_elem(5)))
3075        );
3076        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3077
3078        // Shard since unaffected readers with since > shard since expiring.
3079        assert_eq!(
3080            state.collections.expire_leased_reader(&reader),
3081            Continue(true)
3082        );
3083        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3084
3085        // Create a third reader.
3086        let reader3 = LeasedReaderId::new();
3087        let _ = state.collections.register_leased_reader(
3088            "",
3089            &reader3,
3090            "",
3091            seqno,
3092            Duration::from_secs(10),
3093            now(),
3094            false,
3095        );
3096
3097        // Shard since doesn't change until the meet (min) of all reader sinces changes.
3098        assert_eq!(
3099            state.collections.downgrade_since(
3100                &reader3,
3101                seqno,
3102                None,
3103                &Antichain::from_elem(10),
3104                now()
3105            ),
3106            Continue(Since(Antichain::from_elem(10)))
3107        );
3108        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3109
3110        // Shard since advances when reader with the minimal since expires.
3111        assert_eq!(
3112            state.collections.expire_leased_reader(&reader2),
3113            Continue(true)
3114        );
3115        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3116        // Switch this assertion back when we re-enable this.
3117        //
3118        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3119        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3120
3121        // Shard since unaffected when all readers are expired.
3122        assert_eq!(
3123            state.collections.expire_leased_reader(&reader3),
3124            Continue(true)
3125        );
3126        // TODO(database-issues#6885): expiry temporarily doesn't advance since
3127        // Switch this assertion back when we re-enable this.
3128        //
3129        // assert_eq!(state.collections.trace.since(), &Antichain::from_elem(10));
3130        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3131    }
3132
3133    #[mz_ore::test]
3134    fn compare_and_downgrade_since() {
3135        let mut state = TypedState::<(), (), u64, i64>::new(
3136            DUMMY_BUILD_INFO.semver_version(),
3137            ShardId::new(),
3138            "".to_owned(),
3139            0,
3140        );
3141        let reader = CriticalReaderId::new();
3142        let _ = state
3143            .collections
3144            .register_critical_reader::<u64>("", &reader, "");
3145
3146        // The shard global since == 0 initially.
3147        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3148        // The initial opaque value should be set.
3149        assert_eq!(
3150            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3151            u64::initial()
3152        );
3153
3154        // Greater
3155        assert_eq!(
3156            state.collections.compare_and_downgrade_since::<u64>(
3157                &reader,
3158                &u64::initial(),
3159                (&1, &Antichain::from_elem(2)),
3160            ),
3161            Continue(Ok(Since(Antichain::from_elem(2))))
3162        );
3163        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3164        assert_eq!(
3165            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3166            1
3167        );
3168        // Equal (no-op)
3169        assert_eq!(
3170            state.collections.compare_and_downgrade_since::<u64>(
3171                &reader,
3172                &1,
3173                (&2, &Antichain::from_elem(2)),
3174            ),
3175            Continue(Ok(Since(Antichain::from_elem(2))))
3176        );
3177        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3178        assert_eq!(
3179            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3180            2
3181        );
3182        // Less (no-op)
3183        assert_eq!(
3184            state.collections.compare_and_downgrade_since::<u64>(
3185                &reader,
3186                &2,
3187                (&3, &Antichain::from_elem(1)),
3188            ),
3189            Continue(Ok(Since(Antichain::from_elem(2))))
3190        );
3191        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3192        assert_eq!(
3193            u64::decode(state.collections.critical_reader(&reader).opaque.0),
3194            3
3195        );
3196    }
3197
3198    #[mz_ore::test]
3199    fn compare_and_append() {
3200        let state = &mut TypedState::<String, String, u64, i64>::new(
3201            DUMMY_BUILD_INFO.semver_version(),
3202            ShardId::new(),
3203            "".to_owned(),
3204            0,
3205        )
3206        .collections;
3207
3208        let writer_id = WriterId::new();
3209        let now = SYSTEM_TIME.clone();
3210
3211        // State is initially empty.
3212        assert_eq!(state.trace.num_spine_batches(), 0);
3213        assert_eq!(state.trace.num_hollow_batches(), 0);
3214        assert_eq!(state.trace.num_updates(), 0);
3215
3216        // Cannot insert a batch with a lower != current shard upper.
3217        assert_eq!(
3218            state.compare_and_append(
3219                &hollow(1, 2, &["key1"], 1),
3220                &writer_id,
3221                now(),
3222                LEASE_DURATION_MS,
3223                &IdempotencyToken::new(),
3224                &debug_state(),
3225                0,
3226                100,
3227                None
3228            ),
3229            Break(CompareAndAppendBreak::Upper {
3230                shard_upper: Antichain::from_elem(0),
3231                writer_upper: Antichain::from_elem(0)
3232            })
3233        );
3234
3235        // Insert an empty batch with an upper > lower..
3236        assert!(
3237            state
3238                .compare_and_append(
3239                    &hollow(0, 5, &[], 0),
3240                    &writer_id,
3241                    now(),
3242                    LEASE_DURATION_MS,
3243                    &IdempotencyToken::new(),
3244                    &debug_state(),
3245                    0,
3246                    100,
3247                    None
3248                )
3249                .is_continue()
3250        );
3251
3252        // Cannot insert a batch with a upper less than the lower.
3253        assert_eq!(
3254            state.compare_and_append(
3255                &hollow(5, 4, &["key1"], 1),
3256                &writer_id,
3257                now(),
3258                LEASE_DURATION_MS,
3259                &IdempotencyToken::new(),
3260                &debug_state(),
3261                0,
3262                100,
3263                None
3264            ),
3265            Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3266                lower: Antichain::from_elem(5),
3267                upper: Antichain::from_elem(4)
3268            }))
3269        );
3270
3271        // Cannot insert a nonempty batch with an upper equal to lower.
3272        assert_eq!(
3273            state.compare_and_append(
3274                &hollow(5, 5, &["key1"], 1),
3275                &writer_id,
3276                now(),
3277                LEASE_DURATION_MS,
3278                &IdempotencyToken::new(),
3279                &debug_state(),
3280                0,
3281                100,
3282                None
3283            ),
3284            Break(CompareAndAppendBreak::InvalidUsage(
3285                InvalidEmptyTimeInterval {
3286                    lower: Antichain::from_elem(5),
3287                    upper: Antichain::from_elem(5),
3288                    keys: vec!["key1".to_owned()],
3289                }
3290            ))
3291        );
3292
3293        // Can insert an empty batch with an upper equal to lower.
3294        assert!(
3295            state
3296                .compare_and_append(
3297                    &hollow(5, 5, &[], 0),
3298                    &writer_id,
3299                    now(),
3300                    LEASE_DURATION_MS,
3301                    &IdempotencyToken::new(),
3302                    &debug_state(),
3303                    0,
3304                    100,
3305                    None
3306                )
3307                .is_continue()
3308        );
3309    }
3310
3311    #[mz_ore::test]
3312    fn snapshot() {
3313        let now = SYSTEM_TIME.clone();
3314
3315        let mut state = TypedState::<String, String, u64, i64>::new(
3316            DUMMY_BUILD_INFO.semver_version(),
3317            ShardId::new(),
3318            "".to_owned(),
3319            0,
3320        );
3321        // Cannot take a snapshot with as_of == shard upper.
3322        assert_eq!(
3323            state.snapshot(&Antichain::from_elem(0)),
3324            Err(SnapshotErr::AsOfNotYetAvailable(
3325                SeqNo(0),
3326                Upper(Antichain::from_elem(0))
3327            ))
3328        );
3329
3330        // Cannot take a snapshot with as_of > shard upper.
3331        assert_eq!(
3332            state.snapshot(&Antichain::from_elem(5)),
3333            Err(SnapshotErr::AsOfNotYetAvailable(
3334                SeqNo(0),
3335                Upper(Antichain::from_elem(0))
3336            ))
3337        );
3338
3339        let writer_id = WriterId::new();
3340
3341        // Advance upper to 5.
3342        assert!(
3343            state
3344                .collections
3345                .compare_and_append(
3346                    &hollow(0, 5, &["key1"], 1),
3347                    &writer_id,
3348                    now(),
3349                    LEASE_DURATION_MS,
3350                    &IdempotencyToken::new(),
3351                    &debug_state(),
3352                    0,
3353                    100,
3354                    None
3355                )
3356                .is_continue()
3357        );
3358
3359        // Can take a snapshot with as_of < upper.
3360        assert_eq!(
3361            state.snapshot(&Antichain::from_elem(0)),
3362            Ok(vec![hollow(0, 5, &["key1"], 1)])
3363        );
3364
3365        // Can take a snapshot with as_of >= shard since, as long as as_of < shard_upper.
3366        assert_eq!(
3367            state.snapshot(&Antichain::from_elem(4)),
3368            Ok(vec![hollow(0, 5, &["key1"], 1)])
3369        );
3370
3371        // Cannot take a snapshot with as_of >= upper.
3372        assert_eq!(
3373            state.snapshot(&Antichain::from_elem(5)),
3374            Err(SnapshotErr::AsOfNotYetAvailable(
3375                SeqNo(0),
3376                Upper(Antichain::from_elem(5))
3377            ))
3378        );
3379        assert_eq!(
3380            state.snapshot(&Antichain::from_elem(6)),
3381            Err(SnapshotErr::AsOfNotYetAvailable(
3382                SeqNo(0),
3383                Upper(Antichain::from_elem(5))
3384            ))
3385        );
3386
3387        let reader = LeasedReaderId::new();
3388        // Advance the since to 2.
3389        let _ = state.collections.register_leased_reader(
3390            "",
3391            &reader,
3392            "",
3393            SeqNo::minimum(),
3394            Duration::from_secs(10),
3395            now(),
3396            false,
3397        );
3398        assert_eq!(
3399            state.collections.downgrade_since(
3400                &reader,
3401                SeqNo::minimum(),
3402                None,
3403                &Antichain::from_elem(2),
3404                now()
3405            ),
3406            Continue(Since(Antichain::from_elem(2)))
3407        );
3408        assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3409        // Cannot take a snapshot with as_of < shard_since.
3410        assert_eq!(
3411            state.snapshot(&Antichain::from_elem(1)),
3412            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3413                Antichain::from_elem(2)
3414            )))
3415        );
3416
3417        // Advance the upper to 10 via an empty batch.
3418        assert!(
3419            state
3420                .collections
3421                .compare_and_append(
3422                    &hollow(5, 10, &[], 0),
3423                    &writer_id,
3424                    now(),
3425                    LEASE_DURATION_MS,
3426                    &IdempotencyToken::new(),
3427                    &debug_state(),
3428                    0,
3429                    100,
3430                    None
3431                )
3432                .is_continue()
3433        );
3434
3435        // Can still take snapshots at times < upper.
3436        assert_eq!(
3437            state.snapshot(&Antichain::from_elem(7)),
3438            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3439        );
3440
3441        // Cannot take snapshots with as_of >= upper.
3442        assert_eq!(
3443            state.snapshot(&Antichain::from_elem(10)),
3444            Err(SnapshotErr::AsOfNotYetAvailable(
3445                SeqNo(0),
3446                Upper(Antichain::from_elem(10))
3447            ))
3448        );
3449
3450        // Advance upper to 15.
3451        assert!(
3452            state
3453                .collections
3454                .compare_and_append(
3455                    &hollow(10, 15, &["key2"], 1),
3456                    &writer_id,
3457                    now(),
3458                    LEASE_DURATION_MS,
3459                    &IdempotencyToken::new(),
3460                    &debug_state(),
3461                    0,
3462                    100,
3463                    None
3464                )
3465                .is_continue()
3466        );
3467
3468        // Filter out batches whose lowers are less than the requested as of (the
3469        // batches that are too far in the future for the requested as_of).
3470        assert_eq!(
3471            state.snapshot(&Antichain::from_elem(9)),
3472            Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3473        );
3474
3475        // Don't filter out batches whose lowers are <= the requested as_of.
3476        assert_eq!(
3477            state.snapshot(&Antichain::from_elem(10)),
3478            Ok(vec![
3479                hollow(0, 5, &["key1"], 1),
3480                hollow(5, 10, &[], 0),
3481                hollow(10, 15, &["key2"], 1)
3482            ])
3483        );
3484
3485        assert_eq!(
3486            state.snapshot(&Antichain::from_elem(11)),
3487            Ok(vec![
3488                hollow(0, 5, &["key1"], 1),
3489                hollow(5, 10, &[], 0),
3490                hollow(10, 15, &["key2"], 1)
3491            ])
3492        );
3493    }
3494
3495    #[mz_ore::test]
3496    fn next_listen_batch() {
3497        let mut state = TypedState::<String, String, u64, i64>::new(
3498            DUMMY_BUILD_INFO.semver_version(),
3499            ShardId::new(),
3500            "".to_owned(),
3501            0,
3502        );
3503
3504        // Empty collection never has any batches to listen for, regardless of the
3505        // current frontier.
3506        assert_eq!(
3507            state.next_listen_batch(&Antichain::from_elem(0)),
3508            Err(SeqNo(0))
3509        );
3510        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3511
3512        let writer_id = WriterId::new();
3513        let now = SYSTEM_TIME.clone();
3514
3515        // Add two batches of data, one from [0, 5) and then another from [5, 10).
3516        assert!(
3517            state
3518                .collections
3519                .compare_and_append(
3520                    &hollow(0, 5, &["key1"], 1),
3521                    &writer_id,
3522                    now(),
3523                    LEASE_DURATION_MS,
3524                    &IdempotencyToken::new(),
3525                    &debug_state(),
3526                    0,
3527                    100,
3528                    None
3529                )
3530                .is_continue()
3531        );
3532        assert!(
3533            state
3534                .collections
3535                .compare_and_append(
3536                    &hollow(5, 10, &["key2"], 1),
3537                    &writer_id,
3538                    now(),
3539                    LEASE_DURATION_MS,
3540                    &IdempotencyToken::new(),
3541                    &debug_state(),
3542                    0,
3543                    100,
3544                    None
3545                )
3546                .is_continue()
3547        );
3548
3549        // All frontiers in [0, 5) return the first batch.
3550        for t in 0..=4 {
3551            assert_eq!(
3552                state.next_listen_batch(&Antichain::from_elem(t)),
3553                Ok(hollow(0, 5, &["key1"], 1))
3554            );
3555        }
3556
3557        // All frontiers in [5, 10) return the second batch.
3558        for t in 5..=9 {
3559            assert_eq!(
3560                state.next_listen_batch(&Antichain::from_elem(t)),
3561                Ok(hollow(5, 10, &["key2"], 1))
3562            );
3563        }
3564
3565        // There is no batch currently available for t = 10.
3566        assert_eq!(
3567            state.next_listen_batch(&Antichain::from_elem(10)),
3568            Err(SeqNo(0))
3569        );
3570
3571        // By definition, there is no frontier ever at the empty antichain which
3572        // is the time after all possible times.
3573        assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3574    }
3575
3576    #[mz_ore::test]
3577    fn expire_writer() {
3578        let mut state = TypedState::<String, String, u64, i64>::new(
3579            DUMMY_BUILD_INFO.semver_version(),
3580            ShardId::new(),
3581            "".to_owned(),
3582            0,
3583        );
3584        let now = SYSTEM_TIME.clone();
3585
3586        let writer_id_one = WriterId::new();
3587
3588        let writer_id_two = WriterId::new();
3589
3590        // Writer is eligible to write
3591        assert!(
3592            state
3593                .collections
3594                .compare_and_append(
3595                    &hollow(0, 2, &["key1"], 1),
3596                    &writer_id_one,
3597                    now(),
3598                    LEASE_DURATION_MS,
3599                    &IdempotencyToken::new(),
3600                    &debug_state(),
3601                    0,
3602                    100,
3603                    None
3604                )
3605                .is_continue()
3606        );
3607
3608        assert!(
3609            state
3610                .collections
3611                .expire_writer(&writer_id_one)
3612                .is_continue()
3613        );
3614
3615        // Other writers should still be able to write
3616        assert!(
3617            state
3618                .collections
3619                .compare_and_append(
3620                    &hollow(2, 5, &["key2"], 1),
3621                    &writer_id_two,
3622                    now(),
3623                    LEASE_DURATION_MS,
3624                    &IdempotencyToken::new(),
3625                    &debug_state(),
3626                    0,
3627                    100,
3628                    None
3629                )
3630                .is_continue()
3631        );
3632    }
3633
3634    #[mz_ore::test]
3635    fn maybe_gc_active_gc() {
3636        const GC_USE_ACTIVE_GC: bool = true;
3637        const GC_FALLBACK_THRESHOLD_MS: u64 = 5000;
3638        let now_fn = SYSTEM_TIME.clone();
3639
3640        let mut state = TypedState::<String, String, u64, i64>::new(
3641            DUMMY_BUILD_INFO.semver_version(),
3642            ShardId::new(),
3643            "".to_owned(),
3644            0,
3645        );
3646
3647        let now = now_fn();
3648        // Empty state doesn't need gc, regardless of is_write.
3649        assert_eq!(
3650            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3651            None
3652        );
3653        assert_eq!(
3654            state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3655            None
3656        );
3657
3658        // Artificially advance the seqno so the seqno_since advances past our
3659        // internal gc_threshold.
3660        state.seqno = SeqNo(100);
3661        assert_eq!(state.seqno_since(), SeqNo(100));
3662
3663        // When a writer is present, non-writes don't gc.
3664        let writer_id = WriterId::new();
3665        state.collections.compare_and_append(
3666            &hollow(1, 2, &["key1"], 1),
3667            &writer_id,
3668            now,
3669            LEASE_DURATION_MS,
3670            &IdempotencyToken::new(),
3671            &debug_state(),
3672            0,
3673            100,
3674            None,
3675        );
3676        assert_eq!(
3677            state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3678            None
3679        );
3680
3681        // A write will gc though.
3682        assert_eq!(
3683            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3684            Some(GcReq {
3685                shard_id: state.shard_id,
3686                new_seqno_since: SeqNo(100)
3687            })
3688        );
3689
3690        // But if we write down an active gc, we won't gc.
3691        state.collections.active_gc = Some(ActiveGc {
3692            seqno: state.seqno,
3693            start_ms: now,
3694        });
3695
3696        state.seqno = SeqNo(200);
3697        assert_eq!(state.seqno_since(), SeqNo(200));
3698
3699        assert_eq!(
3700            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3701            None
3702        );
3703
3704        state.seqno = SeqNo(300);
3705        assert_eq!(state.seqno_since(), SeqNo(300));
3706        // But if we advance the time past the threshold, we will gc.
3707        let new_now = now + GC_FALLBACK_THRESHOLD_MS + 1;
3708        assert_eq!(
3709            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, new_now),
3710            Some(GcReq {
3711                shard_id: state.shard_id,
3712                new_seqno_since: SeqNo(300)
3713            })
3714        );
3715
3716        // Even if the sequence number doesn't pass the threshold, if the
3717        // active gc is expired, we will gc.
3718
3719        state.seqno = SeqNo(301);
3720        assert_eq!(state.seqno_since(), SeqNo(301));
3721        assert_eq!(
3722            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, new_now),
3723            Some(GcReq {
3724                shard_id: state.shard_id,
3725                new_seqno_since: SeqNo(301)
3726            })
3727        );
3728
3729        state.collections.active_gc = None;
3730
3731        // Artificially advance the seqno (again) so the seqno_since advances
3732        // past our internal gc_threshold (again).
3733        state.seqno = SeqNo(400);
3734        assert_eq!(state.seqno_since(), SeqNo(400));
3735
3736        let now = now_fn();
3737
3738        // If there are no writers, even a non-write will gc.
3739        let _ = state.collections.expire_writer(&writer_id);
3740        assert_eq!(
3741            state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3742            Some(GcReq {
3743                shard_id: state.shard_id,
3744                new_seqno_since: SeqNo(400)
3745            })
3746        );
3747    }
3748
3749    #[mz_ore::test]
3750    fn maybe_gc_classic() {
3751        const GC_USE_ACTIVE_GC: bool = false;
3752        const GC_FALLBACK_THRESHOLD_MS: u64 = 5000;
3753        const NOW_MS: u64 = 0;
3754
3755        let mut state = TypedState::<String, String, u64, i64>::new(
3756            DUMMY_BUILD_INFO.semver_version(),
3757            ShardId::new(),
3758            "".to_owned(),
3759            0,
3760        );
3761
3762        // Empty state doesn't need gc, regardless of is_write.
3763        assert_eq!(
3764            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3765            None
3766        );
3767        assert_eq!(
3768            state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3769            None
3770        );
3771
3772        // Artificially advance the seqno so the seqno_since advances past our
3773        // internal gc_threshold.
3774        state.seqno = SeqNo(100);
3775        assert_eq!(state.seqno_since(), SeqNo(100));
3776
3777        // When a writer is present, non-writes don't gc.
3778        let writer_id = WriterId::new();
3779        let now = SYSTEM_TIME.clone();
3780        state.collections.compare_and_append(
3781            &hollow(1, 2, &["key1"], 1),
3782            &writer_id,
3783            now(),
3784            LEASE_DURATION_MS,
3785            &IdempotencyToken::new(),
3786            &debug_state(),
3787            0,
3788            100,
3789            None,
3790        );
3791        assert_eq!(
3792            state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3793            None
3794        );
3795
3796        // A write will gc though.
3797        assert_eq!(
3798            state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3799            Some(GcReq {
3800                shard_id: state.shard_id,
3801                new_seqno_since: SeqNo(100)
3802            })
3803        );
3804
3805        // Artificially advance the seqno (again) so the seqno_since advances
3806        // past our internal gc_threshold (again).
3807        state.seqno = SeqNo(200);
3808        assert_eq!(state.seqno_since(), SeqNo(200));
3809
3810        // If there are no writers, even a non-write will gc.
3811        let _ = state.collections.expire_writer(&writer_id);
3812        assert_eq!(
3813            state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3814            Some(GcReq {
3815                shard_id: state.shard_id,
3816                new_seqno_since: SeqNo(200)
3817            })
3818        );
3819    }
3820
3821    #[mz_ore::test]
3822    fn need_rollup_active_rollup() {
3823        const ROLLUP_THRESHOLD: usize = 3;
3824        const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
3825        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
3826        let now = SYSTEM_TIME.clone();
3827
3828        mz_ore::test::init_logging();
3829        let mut state = TypedState::<String, String, u64, i64>::new(
3830            DUMMY_BUILD_INFO.semver_version(),
3831            ShardId::new(),
3832            "".to_owned(),
3833            0,
3834        );
3835
3836        let rollup_seqno = SeqNo(5);
3837        let rollup = HollowRollup {
3838            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
3839            encoded_size_bytes: None,
3840        };
3841
3842        assert!(
3843            state
3844                .collections
3845                .add_rollup((rollup_seqno, &rollup))
3846                .is_continue()
3847        );
3848
3849        // shouldn't need a rollup at the seqno of the rollup
3850        state.seqno = SeqNo(5);
3851        assert_none!(state.need_rollup(
3852            ROLLUP_THRESHOLD,
3853            ROLLUP_USE_ACTIVE_ROLLUP,
3854            ROLLUP_FALLBACK_THRESHOLD_MS,
3855            now()
3856        ));
3857
3858        // shouldn't need a rollup at seqnos less than our threshold
3859        state.seqno = SeqNo(6);
3860        assert_none!(state.need_rollup(
3861            ROLLUP_THRESHOLD,
3862            ROLLUP_USE_ACTIVE_ROLLUP,
3863            ROLLUP_FALLBACK_THRESHOLD_MS,
3864            now()
3865        ));
3866        state.seqno = SeqNo(7);
3867        assert_none!(state.need_rollup(
3868            ROLLUP_THRESHOLD,
3869            ROLLUP_USE_ACTIVE_ROLLUP,
3870            ROLLUP_FALLBACK_THRESHOLD_MS,
3871            now()
3872        ));
3873        state.seqno = SeqNo(8);
3874        assert_none!(state.need_rollup(
3875            ROLLUP_THRESHOLD,
3876            ROLLUP_USE_ACTIVE_ROLLUP,
3877            ROLLUP_FALLBACK_THRESHOLD_MS,
3878            now()
3879        ));
3880
3881        let mut current_time = now();
3882        // hit our threshold! we should need a rollup
3883        state.seqno = SeqNo(9);
3884        assert_eq!(
3885            state
3886                .need_rollup(
3887                    ROLLUP_THRESHOLD,
3888                    ROLLUP_USE_ACTIVE_ROLLUP,
3889                    ROLLUP_FALLBACK_THRESHOLD_MS,
3890                    current_time
3891                )
3892                .expect("rollup"),
3893            SeqNo(9)
3894        );
3895
3896        state.collections.active_rollup = Some(ActiveRollup {
3897            seqno: SeqNo(9),
3898            start_ms: current_time,
3899        });
3900
3901        // There is now an active rollup, so we shouldn't need a rollup.
3902        assert_none!(state.need_rollup(
3903            ROLLUP_THRESHOLD,
3904            ROLLUP_USE_ACTIVE_ROLLUP,
3905            ROLLUP_FALLBACK_THRESHOLD_MS,
3906            current_time
3907        ));
3908
3909        state.seqno = SeqNo(10);
3910        // We still don't need a rollup, even though the seqno is greater than
3911        // the rollup threshold.
3912        assert_none!(state.need_rollup(
3913            ROLLUP_THRESHOLD,
3914            ROLLUP_USE_ACTIVE_ROLLUP,
3915            ROLLUP_FALLBACK_THRESHOLD_MS,
3916            current_time
3917        ));
3918
3919        // But if we wait long enough, we should need a rollup again.
3920        current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
3921        assert_eq!(
3922            state
3923                .need_rollup(
3924                    ROLLUP_THRESHOLD,
3925                    ROLLUP_USE_ACTIVE_ROLLUP,
3926                    ROLLUP_FALLBACK_THRESHOLD_MS,
3927                    current_time
3928                )
3929                .expect("rollup"),
3930            SeqNo(10)
3931        );
3932
3933        state.seqno = SeqNo(9);
3934        // Clear the active rollup and ensure we need a rollup again.
3935        state.collections.active_rollup = None;
3936        let rollup_seqno = SeqNo(9);
3937        let rollup = HollowRollup {
3938            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
3939            encoded_size_bytes: None,
3940        };
3941        assert!(
3942            state
3943                .collections
3944                .add_rollup((rollup_seqno, &rollup))
3945                .is_continue()
3946        );
3947
3948        state.seqno = SeqNo(11);
3949        // We shouldn't need a rollup at seqnos less than our threshold
3950        assert_none!(state.need_rollup(
3951            ROLLUP_THRESHOLD,
3952            ROLLUP_USE_ACTIVE_ROLLUP,
3953            ROLLUP_FALLBACK_THRESHOLD_MS,
3954            current_time
3955        ));
3956        // hit our threshold! we should need a rollup
3957        state.seqno = SeqNo(13);
3958        assert_eq!(
3959            state
3960                .need_rollup(
3961                    ROLLUP_THRESHOLD,
3962                    ROLLUP_USE_ACTIVE_ROLLUP,
3963                    ROLLUP_FALLBACK_THRESHOLD_MS,
3964                    current_time
3965                )
3966                .expect("rollup"),
3967            SeqNo(13)
3968        );
3969    }
3970
3971    #[mz_ore::test]
3972    fn need_rollup_classic() {
3973        const ROLLUP_THRESHOLD: usize = 3;
3974        const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
3975        const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
3976        const NOW: u64 = 0;
3977
3978        mz_ore::test::init_logging();
3979        let mut state = TypedState::<String, String, u64, i64>::new(
3980            DUMMY_BUILD_INFO.semver_version(),
3981            ShardId::new(),
3982            "".to_owned(),
3983            0,
3984        );
3985
3986        let rollup_seqno = SeqNo(5);
3987        let rollup = HollowRollup {
3988            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
3989            encoded_size_bytes: None,
3990        };
3991
3992        assert!(
3993            state
3994                .collections
3995                .add_rollup((rollup_seqno, &rollup))
3996                .is_continue()
3997        );
3998
3999        // shouldn't need a rollup at the seqno of the rollup
4000        state.seqno = SeqNo(5);
4001        assert_none!(state.need_rollup(
4002            ROLLUP_THRESHOLD,
4003            ROLLUP_USE_ACTIVE_ROLLUP,
4004            ROLLUP_FALLBACK_THRESHOLD_MS,
4005            NOW
4006        ));
4007
4008        // shouldn't need a rollup at seqnos less than our threshold
4009        state.seqno = SeqNo(6);
4010        assert_none!(state.need_rollup(
4011            ROLLUP_THRESHOLD,
4012            ROLLUP_USE_ACTIVE_ROLLUP,
4013            ROLLUP_FALLBACK_THRESHOLD_MS,
4014            NOW
4015        ));
4016        state.seqno = SeqNo(7);
4017        assert_none!(state.need_rollup(
4018            ROLLUP_THRESHOLD,
4019            ROLLUP_USE_ACTIVE_ROLLUP,
4020            ROLLUP_FALLBACK_THRESHOLD_MS,
4021            NOW
4022        ));
4023
4024        // hit our threshold! we should need a rollup
4025        state.seqno = SeqNo(8);
4026        assert_eq!(
4027            state
4028                .need_rollup(
4029                    ROLLUP_THRESHOLD,
4030                    ROLLUP_USE_ACTIVE_ROLLUP,
4031                    ROLLUP_FALLBACK_THRESHOLD_MS,
4032                    NOW
4033                )
4034                .expect("rollup"),
4035            SeqNo(8)
4036        );
4037
4038        // but we don't need rollups for every seqno > the threshold
4039        state.seqno = SeqNo(9);
4040        assert_none!(state.need_rollup(
4041            ROLLUP_THRESHOLD,
4042            ROLLUP_USE_ACTIVE_ROLLUP,
4043            ROLLUP_FALLBACK_THRESHOLD_MS,
4044            NOW
4045        ));
4046
4047        // we only need a rollup each `ROLLUP_THRESHOLD` beyond our current seqno
4048        state.seqno = SeqNo(11);
4049        assert_eq!(
4050            state
4051                .need_rollup(
4052                    ROLLUP_THRESHOLD,
4053                    ROLLUP_USE_ACTIVE_ROLLUP,
4054                    ROLLUP_FALLBACK_THRESHOLD_MS,
4055                    NOW
4056                )
4057                .expect("rollup"),
4058            SeqNo(11)
4059        );
4060
4061        // add another rollup and ensure we're always picking the latest
4062        let rollup_seqno = SeqNo(6);
4063        let rollup = HollowRollup {
4064            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4065            encoded_size_bytes: None,
4066        };
4067        assert!(
4068            state
4069                .collections
4070                .add_rollup((rollup_seqno, &rollup))
4071                .is_continue()
4072        );
4073
4074        state.seqno = SeqNo(8);
4075        assert_none!(state.need_rollup(
4076            ROLLUP_THRESHOLD,
4077            ROLLUP_USE_ACTIVE_ROLLUP,
4078            ROLLUP_FALLBACK_THRESHOLD_MS,
4079            NOW
4080        ));
4081        state.seqno = SeqNo(9);
4082        assert_eq!(
4083            state
4084                .need_rollup(
4085                    ROLLUP_THRESHOLD,
4086                    ROLLUP_USE_ACTIVE_ROLLUP,
4087                    ROLLUP_FALLBACK_THRESHOLD_MS,
4088                    NOW
4089                )
4090                .expect("rollup"),
4091            SeqNo(9)
4092        );
4093
4094        // and ensure that after a fallback point, we assign every seqno work
4095        let fallback_seqno = SeqNo(
4096            rollup_seqno.0
4097                * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4098        );
4099        state.seqno = fallback_seqno;
4100        assert_eq!(
4101            state
4102                .need_rollup(
4103                    ROLLUP_THRESHOLD,
4104                    ROLLUP_USE_ACTIVE_ROLLUP,
4105                    ROLLUP_FALLBACK_THRESHOLD_MS,
4106                    NOW
4107                )
4108                .expect("rollup"),
4109            fallback_seqno
4110        );
4111        state.seqno = fallback_seqno.next();
4112        assert_eq!(
4113            state
4114                .need_rollup(
4115                    ROLLUP_THRESHOLD,
4116                    ROLLUP_USE_ACTIVE_ROLLUP,
4117                    ROLLUP_FALLBACK_THRESHOLD_MS,
4118                    NOW
4119                )
4120                .expect("rollup"),
4121            fallback_seqno.next()
4122        );
4123    }
4124
4125    #[mz_ore::test]
4126    fn idempotency_token_sentinel() {
4127        assert_eq!(
4128            IdempotencyToken::SENTINEL.to_string(),
4129            "i11111111-1111-1111-1111-111111111111"
4130        );
4131    }
4132
4133    /// This test generates an "arbitrary" State, but uses a fixed seed for the
4134    /// randomness, so that it's deterministic. This lets us assert the
4135    /// serialization of that State against a golden file that's committed,
4136    /// making it easy to see what the serialization (used in an upcoming
4137    /// INSPECT feature) looks like.
4138    ///
4139    /// This golden will have to be updated each time we change State, but
4140    /// that's a feature, not a bug.
4141    #[mz_ore::test]
4142    fn state_inspect_serde_json() {
4143        const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4144        let mut runner = proptest::test_runner::TestRunner::deterministic();
4145        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4146        let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4147        assert_eq!(
4148            json.trim(),
4149            STATE_SERDE_JSON.trim(),
4150            "\n\nNEW GOLDEN\n{}\n",
4151            json
4152        );
4153    }
4154
4155    #[mz_persist_proc::test(tokio::test)]
4156    #[cfg_attr(miri, ignore)] // too slow
4157    async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4158        let mut clients = new_test_client_cache(&dyncfgs);
4159        let shard_id = ShardId::new();
4160
4161        async fn open_and_write(
4162            clients: &mut PersistClientCache,
4163            version: semver::Version,
4164            shard_id: ShardId,
4165        ) -> Result<(), tokio::task::JoinError> {
4166            clients.cfg.build_version = version.clone();
4167            clients.clear_state_cache();
4168            let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4169            // Run in a task so we can catch the panic.
4170            mz_ore::task::spawn(|| version.to_string(), async move {
4171                let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4172                let current = *write.upper().as_option().unwrap();
4173                // Do a write so that we tag the state with the version.
4174                write
4175                    .expect_compare_and_append_batch(&mut [], current, current + 1)
4176                    .await;
4177            })
4178            .await
4179        }
4180
4181        // Start at v0.10.0.
4182        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4183        assert_ok!(res);
4184
4185        // Upgrade to v0.11.0 is allowed.
4186        let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4187        assert_ok!(res);
4188
4189        // Downgrade to v0.10.0 is allowed.
4190        let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4191        assert_ok!(res);
4192
4193        // Downgrade to v0.9.0 is _NOT_ allowed.
4194        let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4195        assert!(res.unwrap_err().is_panic());
4196    }
4197}