Skip to main content

mz_persist_client/internal/
machine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the persist state machine.
11
12use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] // False positive.
25use mz_ore::fmt::FormatBuffer;
26use mz_ore::{assert_none, soft_assert_no_log};
27use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
28use mz_persist::retry::Retry;
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64};
31use semver::Version;
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::{Instrument, debug, info, trace_span};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
38use crate::cache::StateCache;
39use crate::cfg::RetryParameters;
40use crate::critical::{CriticalReaderId, Opaque};
41use crate::error::{CodecMismatch, InvalidUsage};
42use crate::internal::apply::Applier;
43use crate::internal::compact::CompactReq;
44use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
45use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
46use crate::internal::paths::PartialRollupKey;
47use crate::internal::state::{
48    CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
49    IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
50};
51use crate::internal::state_versions::StateVersions;
52use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
53use crate::internal::watch::StateWatch;
54use crate::read::LeasedReaderId;
55use crate::rpc::PubSubSender;
56use crate::schema::CaESchema;
57use crate::write::WriterId;
58use crate::{Diagnostics, PersistConfig, ShardId};
59
60#[derive(Debug)]
61pub struct Machine<K, V, T, D> {
62    pub(crate) applier: Applier<K, V, T, D>,
63    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
64}
65
66// Impl Clone regardless of the type params.
67impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
68    fn clone(&self) -> Self {
69        Self {
70            applier: self.applier.clone(),
71            isolated_runtime: Arc::clone(&self.isolated_runtime),
72        }
73    }
74}
75
76pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
77    "persist_claim_unclaimed_compactions",
78    false,
79    "If an append doesn't result in a compaction request, but there is some uncompacted batch \
80    in state, compact that instead.",
81);
82
83pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
84    "persist_claim_compaction_percent",
85    100,
86    "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
87    (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
88    three and have a 65% chance of claiming a fourth.)",
89);
90
91pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
92    "persist_claim_compaction_min_version",
93    String::new(),
94    "If set to a valid version string, compact away any earlier versions if possible.",
95);
96
97impl<K, V, T, D> Machine<K, V, T, D>
98where
99    K: Debug + Codec,
100    V: Debug + Codec,
101    T: Timestamp + Lattice + Codec64 + Sync,
102    D: Monoid + Codec64,
103{
104    pub async fn new(
105        cfg: PersistConfig,
106        shard_id: ShardId,
107        metrics: Arc<Metrics>,
108        state_versions: Arc<StateVersions>,
109        shared_states: Arc<StateCache>,
110        pubsub_sender: Arc<dyn PubSubSender>,
111        isolated_runtime: Arc<IsolatedRuntime>,
112        diagnostics: Diagnostics,
113    ) -> Result<Self, Box<CodecMismatch>> {
114        let applier = Applier::new(
115            cfg,
116            shard_id,
117            metrics,
118            state_versions,
119            shared_states,
120            pubsub_sender,
121            diagnostics,
122        )
123        .await?;
124        Ok(Machine {
125            applier,
126            isolated_runtime,
127        })
128    }
129
130    pub fn shard_id(&self) -> ShardId {
131        self.applier.shard_id
132    }
133
134    pub fn seqno(&self) -> SeqNo {
135        self.applier.seqno()
136    }
137
138    pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
139        let rollup = self.applier.write_rollup_for_state().await;
140        let Some(rollup) = rollup else {
141            return RoutineMaintenance::default();
142        };
143
144        let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
145        if !applied {
146            // Someone else already wrote a rollup at this seqno, so ours didn't
147            // get added. Delete it.
148            self.applier
149                .state_versions
150                .delete_rollup(&rollup.shard_id, &rollup.key)
151                .await;
152        }
153        maintenance
154    }
155
156    pub async fn add_rollup(
157        &self,
158        add_rollup: (SeqNo, &HollowRollup),
159    ) -> (bool, RoutineMaintenance) {
160        // See the big SUBTLE comment in [Self::merge_res] for what's going on
161        // here.
162        let mut applied_ever_true = false;
163        let metrics = Arc::clone(&self.applier.metrics);
164        let (_seqno, _applied, maintenance) = self
165            .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
166                let ret = state.add_rollup(add_rollup);
167                if let Continue(applied) = ret {
168                    applied_ever_true = applied_ever_true || applied;
169                }
170                ret
171            })
172            .await;
173        (applied_ever_true, maintenance)
174    }
175
176    pub async fn remove_rollups(
177        &self,
178        remove_rollups: &[(SeqNo, PartialRollupKey)],
179    ) -> (Vec<SeqNo>, RoutineMaintenance) {
180        let metrics = Arc::clone(&self.applier.metrics);
181        let (_seqno, removed_rollup_seqnos, maintenance) = self
182            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
183                state.remove_rollups(remove_rollups)
184            })
185            .await;
186        (removed_rollup_seqnos, maintenance)
187    }
188
189    /// Attempt to upgrade the state to the latest version. If that's not possible, return the
190    /// actual data version of the shard.
191    pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
192        let metrics = Arc::clone(&self.applier.metrics);
193        let (_seqno, upgrade_result, maintenance) = self
194            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
195                if state.version <= cfg.build_version {
196                    // This would be the place to remove any deprecated items from state, now
197                    // that we're dropping compatibility with any previous versions.
198                    state.version = cfg.build_version.clone();
199                    Continue(Ok(()))
200                } else {
201                    Break(NoOpStateTransition(Err(state.version.clone())))
202                }
203            })
204            .await;
205
206        match upgrade_result {
207            Ok(()) => Ok(maintenance),
208            Err(version) => {
209                soft_assert_no_log!(
210                    maintenance.is_empty(),
211                    "should not generate maintenance on failed upgrade"
212                );
213                Err(version)
214            }
215        }
216    }
217
218    pub async fn register_leased_reader(
219        &self,
220        reader_id: &LeasedReaderId,
221        purpose: &str,
222        lease_duration: Duration,
223        heartbeat_timestamp_ms: u64,
224        use_critical_since: bool,
225    ) -> (LeasedReaderState<T>, RoutineMaintenance) {
226        let metrics = Arc::clone(&self.applier.metrics);
227        let (_seqno, (reader_state, seqno_since), maintenance) = self
228            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
229                state.register_leased_reader(
230                    &cfg.hostname,
231                    reader_id,
232                    purpose,
233                    seqno,
234                    lease_duration,
235                    heartbeat_timestamp_ms,
236                    use_critical_since,
237                )
238            })
239            .await;
240        // Usually, the reader gets an initial seqno hold of the seqno at which
241        // it was registered. However, on a tombstone shard the seqno hold
242        // happens to get computed as the tombstone seqno + 1
243        // (State::clone_apply provided seqno.next(), the non-no-op commit
244        // seqno, to the work fn and this is what register_reader uses for the
245        // seqno hold). The real invariant we want to protect here is that the
246        // hold is >= the seqno_since, so validate that instead of anything more
247        // specific.
248        debug_assert!(
249            reader_state.seqno >= seqno_since,
250            "{} vs {}",
251            reader_state.seqno,
252            seqno_since,
253        );
254        (reader_state, maintenance)
255    }
256
257    pub async fn register_critical_reader(
258        &self,
259        reader_id: &CriticalReaderId,
260        default_opaque: Opaque,
261        purpose: &str,
262    ) -> (CriticalReaderState<T>, RoutineMaintenance) {
263        let metrics = Arc::clone(&self.applier.metrics);
264        let (_seqno, state, maintenance) = self
265            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
266                state.register_critical_reader(
267                    &cfg.hostname,
268                    reader_id,
269                    default_opaque.clone(),
270                    purpose,
271                )
272            })
273            .await;
274        (state, maintenance)
275    }
276
277    pub async fn register_schema(
278        &self,
279        key_schema: &K::Schema,
280        val_schema: &V::Schema,
281    ) -> (Option<SchemaId>, RoutineMaintenance) {
282        let metrics = Arc::clone(&self.applier.metrics);
283        let (_seqno, state, maintenance) = self
284            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
285                state.register_schema::<K, V>(key_schema, val_schema)
286            })
287            .await;
288        (state, maintenance)
289    }
290
291    pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
292        // Performance special case for no-ops, to avoid the State clones.
293        if fuel == 0 || self.applier.all_batches().len() < 2 {
294            return (Vec::new(), RoutineMaintenance::default());
295        }
296
297        let metrics = Arc::clone(&self.applier.metrics);
298        let (_seqno, reqs, maintenance) = self
299            .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
300                state.spine_exert(fuel)
301            })
302            .await;
303        let reqs = reqs
304            .into_iter()
305            .map(|req| CompactReq {
306                shard_id: self.shard_id(),
307                desc: req.desc,
308                inputs: req.inputs,
309            })
310            .collect();
311        (reqs, maintenance)
312    }
313
314    pub async fn compare_and_append(
315        &self,
316        batch: &HollowBatch<T>,
317        writer_id: &WriterId,
318        debug_info: &HandleDebugState,
319        heartbeat_timestamp_ms: u64,
320    ) -> CompareAndAppendRes<T> {
321        let idempotency_token = IdempotencyToken::new();
322        loop {
323            let res = self
324                .compare_and_append_idempotent(
325                    batch,
326                    writer_id,
327                    heartbeat_timestamp_ms,
328                    &idempotency_token,
329                    debug_info,
330                    None,
331                )
332                .await;
333            match res {
334                CompareAndAppendRes::Success(seqno, maintenance) => {
335                    return CompareAndAppendRes::Success(seqno, maintenance);
336                }
337                CompareAndAppendRes::InvalidUsage(x) => {
338                    return CompareAndAppendRes::InvalidUsage(x);
339                }
340                CompareAndAppendRes::InlineBackpressure => {
341                    return CompareAndAppendRes::InlineBackpressure;
342                }
343                CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
344                    // If the state machine thinks that the shard upper is not
345                    // far enough along, it could be because the caller of this
346                    // method has found out that it advanced via some some
347                    // side-channel that didn't update our local cache of the
348                    // machine state. So, fetch the latest state and try again
349                    // if we indeed get something different.
350                    self.applier.fetch_and_update_state(Some(seqno)).await;
351                    let (current_seqno, current_upper) =
352                        self.applier.upper(|seqno, upper| (seqno, upper.clone()));
353
354                    // We tried to to a compare_and_append with the wrong
355                    // expected upper, that won't work.
356                    if &current_upper != batch.desc.lower() {
357                        return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
358                    } else {
359                        // The upper stored in state was outdated. Retry after
360                        // updating.
361                    }
362                }
363            }
364        }
365    }
366
367    async fn compare_and_append_idempotent(
368        &self,
369        batch: &HollowBatch<T>,
370        writer_id: &WriterId,
371        heartbeat_timestamp_ms: u64,
372        idempotency_token: &IdempotencyToken,
373        debug_info: &HandleDebugState,
374        // Only exposed for testing. In prod, this always starts as None, but
375        // making it a parameter allows us to simulate hitting an indeterminate
376        // error on the first attempt in tests.
377        mut indeterminate: Option<Indeterminate>,
378    ) -> CompareAndAppendRes<T> {
379        let metrics = Arc::clone(&self.applier.metrics);
380        let lease_duration_ms = self
381            .applier
382            .cfg
383            .writer_lease_duration
384            .as_millis()
385            .try_into()
386            .expect("reasonable duration");
387        // SUBTLE: Retries of compare_and_append with Indeterminate errors are
388        // tricky (more discussion of this in database-issues#3680):
389        //
390        // - (1) We compare_and_append and get an Indeterminate error back from
391        //   CRDB/Consensus. This means we don't know if it committed or not.
392        // - (2) We retry it.
393        // - (3) We get back an upper mismatch. The tricky bit is deciding if we
394        //   conflicted with some other writer OR if the write in (1) actually
395        //   went through and we're "conflicting" with ourself.
396        //
397        // A number of scenarios can be distinguished with per-writer
398        // idempotency tokens, so I'll jump straight to the hardest one:
399        //
400        // - (1) A compare_and_append is issued for e.g. `[5,7)`, the consensus
401        //   call makes it onto the network before the operation is cancelled
402        //   (by dropping the future).
403        // - (2) A compare_and_append is issued from the same WriteHandle for
404        //   `[3,5)`, it uses a different conn from the consensus pool and gets
405        //   an Indeterminate error.
406        // - (3) The call in (1) is received by consensus and commits.
407        // - (4) The retry of (2) receives an upper mismatch with an upper of 7.
408        //
409        // At this point, how do we determine whether (2) committed or not and
410        // thus whether we should return success or upper mismatch? Getting this
411        // right is very important for correctness (imagine this is a table
412        // write and we either return success or failure to the client).
413        //
414        // - If we use per-writer IdempotencyTokens but only store the latest
415        //   one in state, then the `[5,7)` one will have clobbered whatever our
416        //   `[3,5)` one was.
417        // - We could store every IdempotencyToken that ever committed, but that
418        //   would require unbounded storage in state (non-starter).
419        // - We could require that IdempotencyTokens are comparable and that
420        //   each call issued by a WriteHandle uses one that is strictly greater
421        //   than every call before it. A previous version of this PR tried this
422        //   and it's remarkably subtle. As a result, I (Dan) have developed
423        //   strong feels that our correctness protocol _should not depend on
424        //   WriteHandle, only Machine_.
425        // - We could require a new WriterId if a request is ever cancelled by
426        //   making `compare_and_append` take ownership of `self` and then
427        //   handing it back for any call polled to completion. The ergonomics
428        //   of this are quite awkward and, like the previous idea, it depends
429        //   on the WriteHandle impl for correctness.
430        // - Any ideas that involve reading back the data are foiled by a step
431        //   `(0) set the since to 100` (plus the latency and memory usage would
432        //   be too unpredictable).
433        //
434        // The technique used here derives from the following observations:
435        //
436        // - In practice, we don't use compare_and_append with the sort of
437        //   "regressing frontiers" described above.
438        // - In practice, Indeterminate errors are rare-ish. They happen enough
439        //   that we don't want to always panic on them, but this is still a
440        //   useful property to build on.
441        //
442        // At a high level, we do just enough to be able to distinguish the
443        // cases that we think will happen in practice and then leave the rest
444        // for a panic! that we think we'll never see. Concretely:
445        //
446        // - Run compare_and_append in a loop, retrying on Indeterminate errors
447        //   but noting if we've ever done that.
448        // - If we haven't seen an Indeterminate error (i.e. this is the first
449        //   time though the loop) then the result we got is guaranteed to be
450        //   correct, so pass it up.
451        // - Otherwise, any result other than an expected upper mismatch is
452        //   guaranteed to be correct, so just pass it up.
453        // - Otherwise examine the writer's most recent upper and break it into
454        //   two cases:
455        // - Case 1 `expected_upper.less_than(writer_most_recent_upper)`: it's
456        //   impossible that we committed on a previous iteration because the
457        //   overall upper of the shard is less_than what this call would have
458        //   advanced it to. Pass up the expectation mismatch.
459        // - Case 2 `!Case1`: First note that this means our IdempotencyToken
460        //   didn't match, otherwise we would have gotten `AlreadyCommitted`. It
461        //   also means some previous write from _this writer_ has committed an
462        //   upper that is beyond the one in this call, which is a weird usage
463        //   (NB can't be a future write because that would mean someone is
464        //   still polling us, but `&mut self` prevents that).
465        //
466        // TODO: If this technique works in practice (leads to zero panics),
467        // then commit to it and remove the Indeterminate from
468        // [WriteHandle::compare_and_append_batch].
469        let mut retry = self
470            .applier
471            .metrics
472            .retries
473            .compare_and_append_idempotent
474            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
475        let mut writer_was_present = false;
476        loop {
477            let cmd_res = self
478                .applier
479                .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
480                    writer_was_present = state.writers.contains_key(writer_id);
481                    state.compare_and_append(
482                        batch,
483                        writer_id,
484                        heartbeat_timestamp_ms,
485                        lease_duration_ms,
486                        idempotency_token,
487                        debug_info,
488                        INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
489                        if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
490                            CLAIM_COMPACTION_PERCENT.get(cfg)
491                        } else {
492                            0
493                        },
494                        Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
495                            .ok()
496                            .as_ref(),
497                    )
498                })
499                .await;
500            let (seqno, res, routine) = match cmd_res {
501                Ok(x) => x,
502                Err(err) => {
503                    // These are rare and interesting enough that we always log
504                    // them at info!.
505                    info!(
506                        "compare_and_append received an indeterminate error, retrying in {:?}: {}",
507                        retry.next_sleep(),
508                        err
509                    );
510                    if indeterminate.is_none() {
511                        indeterminate = Some(err);
512                    }
513                    retry = retry.sleep().await;
514                    continue;
515                }
516            };
517            match res {
518                Ok(merge_reqs) => {
519                    // We got explicit confirmation that we succeeded, so
520                    // anything that happened in a previous retry is irrelevant.
521                    let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
522                    for req in merge_reqs {
523                        let req = CompactReq {
524                            shard_id: self.shard_id(),
525                            desc: req.desc,
526                            inputs: req.inputs,
527                        };
528                        compact_reqs.push(req);
529                    }
530                    let writer_maintenance = WriterMaintenance {
531                        routine,
532                        compaction: compact_reqs,
533                    };
534
535                    if !writer_was_present {
536                        metrics.state.writer_added.inc();
537                    }
538                    for part in &batch.parts {
539                        if part.is_inline() {
540                            let bytes = u64::cast_from(part.inline_bytes());
541                            metrics.inline.part_commit_bytes.inc_by(bytes);
542                            metrics.inline.part_commit_count.inc();
543                        }
544                    }
545                    return CompareAndAppendRes::Success(seqno, writer_maintenance);
546                }
547                Err(CompareAndAppendBreak::AlreadyCommitted) => {
548                    // A previous iteration through this loop got an
549                    // Indeterminate error but was successful. Sanity check this
550                    // and pass along the good news.
551                    assert!(indeterminate.is_some());
552                    self.applier.metrics.cmds.compare_and_append_noop.inc();
553                    if !writer_was_present {
554                        metrics.state.writer_added.inc();
555                    }
556                    return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
557                }
558                Err(CompareAndAppendBreak::InvalidUsage(err)) => {
559                    // InvalidUsage is (or should be) a deterministic function
560                    // of the inputs and independent of anything in persist
561                    // state. It's handed back via a Break, so we never even try
562                    // to commit it. No network, no Indeterminate.
563                    assert_none!(indeterminate);
564                    return CompareAndAppendRes::InvalidUsage(err);
565                }
566                Err(CompareAndAppendBreak::InlineBackpressure) => {
567                    // We tried to write an inline part, but there was already
568                    // too much in state. Flush it out to s3 and try again.
569                    return CompareAndAppendRes::InlineBackpressure;
570                }
571                Err(CompareAndAppendBreak::Upper {
572                    shard_upper,
573                    writer_upper,
574                }) => {
575                    // NB the below intentionally compares to writer_upper
576                    // (because it gives a tighter bound on the bad case), but
577                    // returns shard_upper (what the persist caller cares
578                    // about).
579                    assert!(
580                        PartialOrder::less_equal(&writer_upper, &shard_upper),
581                        "{:?} vs {:?}",
582                        &writer_upper,
583                        &shard_upper
584                    );
585                    if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
586                        // No way this could have committed in some previous
587                        // attempt of this loop: the upper of the writer is
588                        // strictly less than the proposed new upper.
589                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
590                    }
591                    if indeterminate.is_none() {
592                        // No way this could have committed in some previous
593                        // attempt of this loop: we never saw an indeterminate
594                        // error (thus there was no previous iteration of the
595                        // loop).
596                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
597                    }
598                    // This is the bad case. We can't distinguish if some
599                    // previous attempt that got an Indeterminate error
600                    // succeeded or not. This should be sufficiently rare in
601                    // practice (hopefully ~never) that we give up and let
602                    // process restart fix things. See the big comment above for
603                    // more context.
604                    //
605                    // NB: This is intentionally not a halt! because it's quite
606                    // unexpected.
607                    panic!(
608                        concat!(
609                            "cannot distinguish compare_and_append success or failure ",
610                            "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
611                        ),
612                        batch.desc.lower().elements(),
613                        batch.desc.upper().elements(),
614                        writer_upper.elements(),
615                        shard_upper.elements(),
616                        indeterminate,
617                    );
618                }
619            };
620        }
621    }
622
623    pub async fn downgrade_since(
624        &self,
625        reader_id: &LeasedReaderId,
626        outstanding_seqno: SeqNo,
627        new_since: &Antichain<T>,
628        heartbeat_timestamp_ms: u64,
629    ) -> (SeqNo, Since<T>, RoutineMaintenance) {
630        let metrics = Arc::clone(&self.applier.metrics);
631        self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
632            state.downgrade_since(
633                reader_id,
634                seqno,
635                outstanding_seqno,
636                new_since,
637                heartbeat_timestamp_ms,
638            )
639        })
640        .await
641    }
642
643    pub async fn compare_and_downgrade_since(
644        &self,
645        reader_id: &CriticalReaderId,
646        expected_opaque: &Opaque,
647        (new_opaque, new_since): (&Opaque, &Antichain<T>),
648    ) -> (Result<Since<T>, (Opaque, Since<T>)>, RoutineMaintenance) {
649        let metrics = Arc::clone(&self.applier.metrics);
650        let (_seqno, res, maintenance) = self
651            .apply_unbatched_idempotent_cmd(
652                &metrics.cmds.compare_and_downgrade_since,
653                |_seqno, _cfg, state| {
654                    state.compare_and_downgrade_since(
655                        reader_id,
656                        expected_opaque,
657                        (new_opaque, new_since),
658                    )
659                },
660            )
661            .await;
662
663        match res {
664            Ok(since) => (Ok(since), maintenance),
665            Err((opaque, since)) => (Err((opaque, since)), maintenance),
666        }
667    }
668
669    pub async fn expire_leased_reader(
670        &self,
671        reader_id: &LeasedReaderId,
672    ) -> (SeqNo, RoutineMaintenance) {
673        let metrics = Arc::clone(&self.applier.metrics);
674        let (seqno, _existed, maintenance) = self
675            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
676                state.expire_leased_reader(reader_id)
677            })
678            .await;
679        (seqno, maintenance)
680    }
681
682    #[allow(dead_code)] // TODO(bkirwi): remove this when since behaviour on expiry has settled
683    pub async fn expire_critical_reader(
684        &self,
685        reader_id: &CriticalReaderId,
686    ) -> (SeqNo, RoutineMaintenance) {
687        let metrics = Arc::clone(&self.applier.metrics);
688        let (seqno, _existed, maintenance) = self
689            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
690                state.expire_critical_reader(reader_id)
691            })
692            .await;
693        (seqno, maintenance)
694    }
695
696    pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
697        let metrics = Arc::clone(&self.applier.metrics);
698        let (seqno, _existed, maintenance) = self
699            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
700                state.expire_writer(writer_id)
701            })
702            .await;
703        metrics.state.writer_removed.inc();
704        (seqno, maintenance)
705    }
706
707    pub fn is_finalized(&self) -> bool {
708        self.applier.is_finalized()
709    }
710
711    /// See [crate::PersistClient::get_schema].
712    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
713        self.applier.get_schema(schema_id)
714    }
715
716    /// See [crate::PersistClient::latest_schema].
717    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
718        self.applier.latest_schema()
719    }
720
721    /// Returns the ID of the given schema, if known at the current state.
722    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
723        self.applier.find_schema(key_schema, val_schema)
724    }
725
726    /// See [crate::PersistClient::compare_and_evolve_schema].
727    ///
728    /// TODO: Unify this with [Self::register_schema]?
729    pub async fn compare_and_evolve_schema(
730        &self,
731        expected: SchemaId,
732        key_schema: &K::Schema,
733        val_schema: &V::Schema,
734    ) -> (CaESchema<K, V>, RoutineMaintenance) {
735        let metrics = Arc::clone(&self.applier.metrics);
736        let (_seqno, state, maintenance) = self
737            .apply_unbatched_idempotent_cmd(
738                &metrics.cmds.compare_and_evolve_schema,
739                |_seqno, _cfg, state| {
740                    state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
741                },
742            )
743            .await;
744        (state, maintenance)
745    }
746
747    async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
748        let metrics = Arc::clone(&self.applier.metrics);
749        let mut retry = self
750            .applier
751            .metrics
752            .retries
753            .idempotent_cmd
754            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
755        loop {
756            let res = self
757                .applier
758                .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
759                    state.become_tombstone_and_shrink()
760                })
761                .await;
762            let err = match res {
763                Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
764                Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
765                    return Ok((false, maintenance));
766                }
767                Err(err) => err,
768            };
769            if retry.attempt() >= INFO_MIN_ATTEMPTS {
770                info!(
771                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
772                    retry.next_sleep(),
773                    err
774                );
775            } else {
776                debug!(
777                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
778                    retry.next_sleep(),
779                    err
780                );
781            }
782            retry = retry.sleep().await;
783        }
784    }
785
786    pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
787        self.applier.check_since_upper_both_empty()?;
788
789        let mut maintenance = RoutineMaintenance::default();
790
791        loop {
792            let (made_progress, more_maintenance) = self.tombstone_step().await?;
793            maintenance.merge(more_maintenance);
794            if !made_progress {
795                break;
796            }
797        }
798
799        Ok(maintenance)
800    }
801
802    /// Fetch a snapshot at the frontier without taking a lease on it. This may be useful for stats
803    /// or testing, but most callers will wish to wait for the frontier to advance and obtain the
804    /// snapshot separately.
805    pub async fn unleased_snapshot(
806        &self,
807        as_of: &Antichain<T>,
808    ) -> Result<Vec<HollowBatch<T>>, Since<T>> {
809        if let Ok(data) = self.applier.snapshot(as_of) {
810            return Ok(data);
811        }
812        let mut watch = self.applier.watch();
813        self.wait_for_upper_past(
814            as_of,
815            &mut watch,
816            None,
817            &self.applier.metrics.retries.snapshot,
818            RetryParameters::persist_defaults(),
819        )
820        .await;
821        match self.applier.snapshot(as_of) {
822            Ok(data) => Ok(data),
823            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(since)) => Err(since),
824            Err(SnapshotErr::AsOfNotYetAvailable(seqno, upper)) => {
825                panic!(
826                    "waited for upper past {as_of:?}, but at latest seqno {seqno:?} the frontier was only {upper:?}",
827                    as_of = as_of.elements(),
828                    upper = upper.0.elements(),
829                )
830            }
831        }
832    }
833
834    // NB: Unlike the other methods here, this one is read-only.
835    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
836        self.applier.verify_listen(as_of)
837    }
838
839    pub async fn wait_for_upper_past(
840        &self,
841        frontier: &Antichain<T>,
842        watch: &mut StateWatch<K, V, T, D>,
843        reader_id: Option<&LeasedReaderId>,
844        metrics: &RetryMetrics,
845        retry: RetryParameters,
846    ) {
847        let start = Instant::now();
848        let wait_for_seqno_past = self.applier.upper(|seqno, upper| {
849            if PartialOrder::less_than(frontier, upper) {
850                None
851            } else {
852                Some((seqno, upper.clone()))
853            }
854        });
855        let Some((mut seqno, mut upper)) = wait_for_seqno_past else {
856            // The current state's upper is already past the given frontier.
857            return;
858        };
859
860        // The latest state still doesn't have a new frontier for us:
861        // watch+sleep in a loop until it does.
862        let sleeps = metrics.stream(retry.into_retry(SystemTime::now()).into_retry_stream());
863
864        enum Wake<'a, K, V, T, D> {
865            Watch(&'a mut StateWatch<K, V, T, D>),
866            Sleep(MetricsRetryStream),
867        }
868        let mut watch_fut = std::pin::pin!(
869            watch
870                .wait_for_seqno_ge(seqno.next())
871                .map(Wake::Watch)
872                .instrument(trace_span!("snapshot::watch"))
873        );
874        let mut sleep_fut = std::pin::pin!(
875            sleeps
876                .sleep()
877                .map(Wake::Sleep)
878                .instrument(trace_span!("snapshot::sleep"))
879        );
880
881        // To reduce log spam, we log "not yet available" only once at info if
882        // it passes a certain threshold. Then, if it did one info log, we log
883        // again at info when it resolves.
884        let mut logged_at_info = false;
885        loop {
886            // Use a duration based threshold here instead of the usual
887            // INFO_MIN_ATTEMPTS because here we're waiting on an
888            // external thing to arrive.
889            if !logged_at_info
890                && start.elapsed() >= Duration::from_millis(1024)
891                && metrics.name.as_str() == "snapshot"
892            {
893                logged_at_info = true;
894                info!(
895                    shard_id =? self.shard_id(),
896                    shard_name =? self.applier.shard_metrics.name,
897                    reader_id =? reader_id,
898                    wait_frontier =? frontier.elements(),
899                    current_upper =? upper.elements(),
900                    current_seqno =? seqno,
901                    wait_for = &metrics.name,
902                    "desired upper not yet available",
903                );
904            } else {
905                debug!(
906                    shard_id =? self.shard_id(),
907                    shard_name =? self.applier.shard_metrics.name,
908                    reader_id =? reader_id,
909                    wait_frontier =? frontier.elements(),
910                    current_upper =? upper.elements(),
911                    current_seqno =? seqno,
912                    wait_for = &metrics.name,
913                    "desired upper not yet available",
914                );
915            }
916
917            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
918                future::Either::Left((wake, _)) => wake,
919                future::Either::Right((wake, _)) => wake,
920            };
921            // Note that we don't need to fetch in the Watch case, because the
922            // Watch wakeup is a signal that the shared state has already been
923            // updated.
924            match &wake {
925                Wake::Watch(_) => self.applier.metrics.watch.wait_woken_via_watch.inc(),
926                Wake::Sleep(_) => {
927                    self.applier.metrics.watch.wait_woken_via_sleep.inc();
928                    self.applier.fetch_and_update_state(Some(seqno)).await;
929                }
930            }
931
932            let wait_for_seqno_past = self.applier.upper(|seqno, upper| {
933                if PartialOrder::less_than(frontier, upper) {
934                    None
935                } else {
936                    Some((seqno, upper.clone()))
937                }
938            });
939            match wait_for_seqno_past {
940                None => {
941                    match &wake {
942                        Wake::Watch(_) => self.applier.metrics.watch.wait_resolved_via_watch.inc(),
943                        Wake::Sleep(_) => self.applier.metrics.watch.wait_resolved_via_sleep.inc(),
944                    }
945                    return;
946                }
947                Some((s, u)) => {
948                    seqno = s;
949                    upper = u;
950                }
951            };
952
953            // Wait a bit and try again. Intentionally don't ever log
954            // this at info level.
955            match wake {
956                Wake::Watch(watch) => {
957                    watch_fut.set(
958                        watch
959                            .wait_for_seqno_ge(seqno.next())
960                            .map(Wake::Watch)
961                            .instrument(trace_span!("snapshot::watch")),
962                    );
963                }
964                Wake::Sleep(sleeps) => {
965                    debug!(
966                        shard_id =? self.shard_id(),
967                        shard_name =? self.applier.shard_metrics.name,
968                        reader_id =? reader_id,
969                        wait_frontier =? frontier.elements(),
970                        current_upper =? upper.elements(),
971                        current_seqno =? seqno,
972                        wait_for = &metrics.name,
973                        "didn't find new data, retrying in {:?}",
974                        sleeps.next_sleep(),
975                    );
976                    sleep_fut.set(
977                        sleeps
978                            .sleep()
979                            .map(Wake::Sleep)
980                            .instrument(trace_span!("snapshot::sleep")),
981                    );
982                }
983            }
984        }
985    }
986
987    async fn apply_unbatched_idempotent_cmd<
988        R,
989        WorkFn: FnMut(
990            SeqNo,
991            &PersistConfig,
992            &mut StateCollections<T>,
993        ) -> ControlFlow<NoOpStateTransition<R>, R>,
994    >(
995        &self,
996        cmd: &CmdMetrics,
997        mut work_fn: WorkFn,
998    ) -> (SeqNo, R, RoutineMaintenance) {
999        let mut retry = self
1000            .applier
1001            .metrics
1002            .retries
1003            .idempotent_cmd
1004            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1005        loop {
1006            match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1007                Ok((seqno, x, maintenance)) => match x {
1008                    Ok(x) => {
1009                        return (seqno, x, maintenance);
1010                    }
1011                    Err(NoOpStateTransition(x)) => {
1012                        return (seqno, x, maintenance);
1013                    }
1014                },
1015                Err(err) => {
1016                    if retry.attempt() >= INFO_MIN_ATTEMPTS {
1017                        info!(
1018                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1019                            cmd.name,
1020                            retry.next_sleep(),
1021                            err
1022                        );
1023                    } else {
1024                        debug!(
1025                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1026                            cmd.name,
1027                            retry.next_sleep(),
1028                            err
1029                        );
1030                    }
1031                    retry = retry.sleep().await;
1032                    continue;
1033                }
1034            }
1035        }
1036    }
1037}
1038
1039impl<K, V, T, D> Machine<K, V, T, D>
1040where
1041    K: Debug + Codec,
1042    V: Debug + Codec,
1043    T: Timestamp + Lattice + Codec64 + Sync,
1044    D: Monoid + Codec64 + PartialEq,
1045{
1046    pub async fn merge_res(
1047        &self,
1048        res: &FueledMergeRes<T>,
1049    ) -> (ApplyMergeResult, RoutineMaintenance) {
1050        let metrics = Arc::clone(&self.applier.metrics);
1051
1052        // SUBTLE! If Machine::merge_res returns false, the blobs referenced in
1053        // compaction output are deleted so we don't leak them. Naively passing
1054        // back the value returned by State::apply_merge_res might give a false
1055        // negative in the presence of retries and Indeterminate errors.
1056        // Specifically, something like the following:
1057        //
1058        // - We try to apply_merge_res, it matches.
1059        // - When apply_unbatched_cmd goes to commit the new state, the
1060        //   Consensus::compare_and_set returns an Indeterminate error (but
1061        //   actually succeeds). The committed State now contains references to
1062        //   the compaction output blobs.
1063        // - Machine::apply_unbatched_idempotent_cmd retries the Indeterminate
1064        //   error. For whatever reason, this time though it doesn't match
1065        //   (maybe the batches simply get grouped difference when deserialized
1066        //   from state, or more unavoidably perhaps another compaction
1067        //   happens).
1068        // - This now bubbles up applied=false to the caller, which uses it as a
1069        //   signal that the blobs in the compaction output should be deleted so
1070        //   that we don't leak them.
1071        // - We now contain references in committed State to blobs that don't
1072        //   exist.
1073        //
1074        // The fix is to keep track of whether applied ever was true, even for a
1075        // compare_and_set that returned an Indeterminate error. This has the
1076        // chance of false positive (leaking a blob) but that's better than a
1077        // false negative (a blob we can never recover referenced by state). We
1078        // anyway need a mechanism to clean up leaked blobs because of process
1079        // crashes.
1080        let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1081        let (_seqno, _apply_merge_result, maintenance) = self
1082            .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1083                let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1084                if let Continue(result) = ret {
1085                    // record if we've ever applied the merge
1086                    if result.applied() {
1087                        merge_result_ever_applied = result;
1088                    }
1089                    // otherwise record the most granular reason for _not_
1090                    // applying the merge when there was a matching batch
1091                    if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1092                    {
1093                        merge_result_ever_applied = result;
1094                    }
1095                }
1096                ret
1097            })
1098            .await;
1099        (merge_result_ever_applied, maintenance)
1100    }
1101}
1102
1103pub(crate) struct ExpireFn(
1104    /// This is stored on WriteHandle and ReadHandle, which we require to be
1105    /// Send + Sync, but the Future is only Send and not Sync. Instead store a
1106    /// FnOnce that returns the Future. This could also be made an `IntoFuture`,
1107    /// once producing one of those is made easier.
1108    pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1109);
1110
1111impl Debug for ExpireFn {
1112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113        f.debug_struct("ExpireFn").finish_non_exhaustive()
1114    }
1115}
1116
1117#[derive(Debug)]
1118pub(crate) enum CompareAndAppendRes<T> {
1119    Success(SeqNo, WriterMaintenance<T>),
1120    InvalidUsage(InvalidUsage<T>),
1121    UpperMismatch(SeqNo, Antichain<T>),
1122    InlineBackpressure,
1123}
1124
1125#[cfg(test)]
1126impl<T: Debug> CompareAndAppendRes<T> {
1127    #[track_caller]
1128    fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1129        match self {
1130            CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1131            x => panic!("{:?}", x),
1132        }
1133    }
1134}
1135
1136pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1137    "persist_next_listen_batch_retryer_fixed_sleep",
1138    Duration::from_millis(1200), // pubsub is on by default!
1139    "\
1140    The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1141);
1142
1143pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1144    "persist_next_listen_batch_retryer_initial_backoff",
1145    Duration::from_millis(100), // pubsub is on by default!
1146    "The initial backoff when polling for new batches from a Listen or Subscribe.",
1147);
1148
1149pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1150    "persist_next_listen_batch_retryer_multiplier",
1151    2,
1152    "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1153);
1154
1155pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1156    "persist_next_listen_batch_retryer_clamp",
1157    Duration::from_secs(16), // pubsub is on by default!
1158    "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1159);
1160
1161pub(crate) fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1162    RetryParameters {
1163        fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1164        initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1165        multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1166        clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1167    }
1168}
1169
1170pub const INFO_MIN_ATTEMPTS: usize = 3;
1171
1172pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1173where
1174    F: std::future::Future<Output = Result<R, ExternalError>>,
1175    WorkFn: FnMut() -> F,
1176{
1177    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1178    loop {
1179        match work_fn().await {
1180            Ok(x) => {
1181                if retry.attempt() > 0 {
1182                    debug!(
1183                        "external operation {} succeeded after failing at least once",
1184                        metrics.name,
1185                    );
1186                }
1187                return x;
1188            }
1189            Err(err) => {
1190                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1191                    info!(
1192                        "external operation {} failed, retrying in {:?}: {}",
1193                        metrics.name,
1194                        retry.next_sleep(),
1195                        err.display_with_causes()
1196                    );
1197                } else {
1198                    debug!(
1199                        "external operation {} failed, retrying in {:?}: {}",
1200                        metrics.name,
1201                        retry.next_sleep(),
1202                        err.display_with_causes()
1203                    );
1204                }
1205                retry = retry.sleep().await;
1206            }
1207        }
1208    }
1209}
1210
1211pub async fn retry_determinate<R, F, WorkFn>(
1212    metrics: &RetryMetrics,
1213    mut work_fn: WorkFn,
1214) -> Result<R, Indeterminate>
1215where
1216    F: std::future::Future<Output = Result<R, ExternalError>>,
1217    WorkFn: FnMut() -> F,
1218{
1219    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1220    loop {
1221        match work_fn().await {
1222            Ok(x) => {
1223                if retry.attempt() > 0 {
1224                    debug!(
1225                        "external operation {} succeeded after failing at least once",
1226                        metrics.name,
1227                    );
1228                }
1229                return Ok(x);
1230            }
1231            Err(ExternalError::Determinate(err)) => {
1232                // The determinate "could not serialize access" errors
1233                // happen often enough in dev (which uses Postgres) that
1234                // it's impeding people's work. At the same time, it's been
1235                // a source of confusion for eng. The situation is much
1236                // better on CRDB and we have metrics coverage in prod, so
1237                // this is redundant enough that it's more hurtful than
1238                // helpful. As a result, this intentionally ignores
1239                // INFO_MIN_ATTEMPTS and always logs at debug.
1240                debug!(
1241                    "external operation {} failed, retrying in {:?}: {}",
1242                    metrics.name,
1243                    retry.next_sleep(),
1244                    err.display_with_causes()
1245                );
1246                retry = retry.sleep().await;
1247                continue;
1248            }
1249            Err(ExternalError::Indeterminate(x)) => return Err(x),
1250        }
1251    }
1252}
1253
1254#[cfg(test)]
1255pub mod datadriven {
1256    use std::collections::{BTreeMap, BTreeSet};
1257    use std::pin::pin;
1258    use std::sync::{Arc, LazyLock};
1259
1260    use anyhow::anyhow;
1261    use differential_dataflow::consolidation::consolidate_updates;
1262    use differential_dataflow::trace::Description;
1263    use futures::StreamExt;
1264    use mz_dyncfg::{ConfigUpdates, ConfigVal};
1265    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1266    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1267
1268    use crate::batch::{
1269        BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1270        BatchParts, validate_truncate_batch,
1271    };
1272    use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1273    use crate::fetch::{EncodedPart, FetchConfig};
1274    use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1275    use crate::internal::datadriven::DirectiveArgs;
1276    use crate::internal::encoding::Schemas;
1277    use crate::internal::gc::GcReq;
1278    use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1279    use crate::internal::state::{BatchPart, RunOrder, RunPart, Upper};
1280    use crate::internal::state_versions::EncodedRollup;
1281    use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1282    use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1283    use crate::rpc::NoopPubSubSender;
1284    use crate::tests::new_test_client;
1285    use crate::write::COMBINE_INLINE_WRITES;
1286    use crate::{GarbageCollector, PersistClient};
1287
1288    use super::*;
1289
1290    static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1291        id: Some(SchemaId(0)),
1292        key: Arc::new(StringSchema),
1293        val: Arc::new(UnitSchema),
1294    });
1295
1296    /// Shared state for a single [crate::internal::machine] [datadriven::TestFile].
1297    #[derive(Debug)]
1298    pub struct MachineState {
1299        pub client: PersistClient,
1300        pub shard_id: ShardId,
1301        pub state_versions: Arc<StateVersions>,
1302        pub machine: Machine<String, (), u64, i64>,
1303        pub gc: GarbageCollector<String, (), u64, i64>,
1304        pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1305        pub next_id: usize,
1306        pub rollups: BTreeMap<String, EncodedRollup>,
1307        pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1308        pub routine: Vec<RoutineMaintenance>,
1309        pub compactions: BTreeMap<String, CompactReq<u64>>,
1310    }
1311
1312    impl MachineState {
1313        pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1314            let shard_id = ShardId::new();
1315            let client = new_test_client(dyncfgs).await;
1316            // Reset blob_target_size. Individual batch writes and compactions
1317            // can override it with an arg.
1318            client
1319                .cfg
1320                .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1321            // Our structured compaction code uses slightly different estimates
1322            // for array size than the old path, which can affect the results of
1323            // some compaction tests.
1324            client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1325            let state_versions = Arc::new(StateVersions::new(
1326                client.cfg.clone(),
1327                Arc::clone(&client.consensus),
1328                Arc::clone(&client.blob),
1329                Arc::clone(&client.metrics),
1330            ));
1331            let machine = Machine::new(
1332                client.cfg.clone(),
1333                shard_id,
1334                Arc::clone(&client.metrics),
1335                Arc::clone(&state_versions),
1336                Arc::clone(&client.shared_states),
1337                Arc::new(NoopPubSubSender),
1338                Arc::clone(&client.isolated_runtime),
1339                Diagnostics::for_tests(),
1340            )
1341            .await
1342            .expect("codecs should match");
1343            let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1344            MachineState {
1345                shard_id,
1346                client,
1347                state_versions,
1348                machine,
1349                gc,
1350                batches: BTreeMap::default(),
1351                rollups: BTreeMap::default(),
1352                listens: BTreeMap::default(),
1353                routine: Vec::new(),
1354                compactions: BTreeMap::default(),
1355                next_id: 0,
1356            }
1357        }
1358
1359        fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1360            Batch::new(
1361                true,
1362                Arc::clone(&self.client.metrics),
1363                Arc::clone(&self.client.blob),
1364                self.client.metrics.shards.shard(&self.shard_id, "test"),
1365                self.client.cfg.build_version.clone(),
1366                (
1367                    <String>::encode_schema(&*SCHEMAS.key),
1368                    <()>::encode_schema(&*SCHEMAS.val),
1369                ),
1370                hollow,
1371            )
1372        }
1373    }
1374
1375    /// Scans consensus and returns all states with their SeqNos
1376    /// and which batches they reference
1377    pub async fn consensus_scan(
1378        datadriven: &MachineState,
1379        args: DirectiveArgs<'_>,
1380    ) -> Result<String, anyhow::Error> {
1381        let from = args.expect("from_seqno");
1382
1383        let mut states = datadriven
1384            .state_versions
1385            .fetch_all_live_states::<u64>(datadriven.shard_id)
1386            .await
1387            .expect("should only be called on an initialized shard")
1388            .check_ts_codec()
1389            .expect("shard codecs should not change");
1390        let mut s = String::new();
1391        while let Some(x) = states.next(|_| {}) {
1392            if x.seqno < from {
1393                continue;
1394            }
1395            let rollups: Vec<_> = x
1396                .collections
1397                .rollups
1398                .keys()
1399                .map(|seqno| seqno.to_string())
1400                .collect();
1401            let batches: Vec<_> = x
1402                .collections
1403                .trace
1404                .batches()
1405                .filter(|b| !b.is_empty())
1406                .filter_map(|b| {
1407                    datadriven
1408                        .batches
1409                        .iter()
1410                        .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1411                        .map(|(batch_name, _)| batch_name.to_owned())
1412                })
1413                .collect();
1414            write!(
1415                s,
1416                "seqno={} batches={} rollups={}\n",
1417                x.seqno,
1418                batches.join(","),
1419                rollups.join(","),
1420            );
1421        }
1422        Ok(s)
1423    }
1424
1425    pub async fn consensus_truncate(
1426        datadriven: &MachineState,
1427        args: DirectiveArgs<'_>,
1428    ) -> Result<String, anyhow::Error> {
1429        let to = args.expect("to_seqno");
1430        let removed = datadriven
1431            .client
1432            .consensus
1433            .truncate(&datadriven.shard_id.to_string(), to)
1434            .await
1435            .expect("valid truncation");
1436        Ok(format!("{:?}\n", removed))
1437    }
1438
1439    pub async fn blob_scan_batches(
1440        datadriven: &MachineState,
1441        _args: DirectiveArgs<'_>,
1442    ) -> Result<String, anyhow::Error> {
1443        let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1444
1445        let mut s = String::new();
1446        let () = datadriven
1447            .state_versions
1448            .blob
1449            .list_keys_and_metadata(&key_prefix, &mut |x| {
1450                let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1451                if let PartialBlobKey::Batch(_, _) = key {
1452                    write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1453                }
1454            })
1455            .await?;
1456        Ok(s)
1457    }
1458
1459    #[allow(clippy::unused_async)]
1460    pub async fn shard_desc(
1461        datadriven: &MachineState,
1462        _args: DirectiveArgs<'_>,
1463    ) -> Result<String, anyhow::Error> {
1464        Ok(format!(
1465            "since={:?} upper={:?}\n",
1466            datadriven.machine.applier.since().elements(),
1467            datadriven.machine.applier.clone_upper().elements()
1468        ))
1469    }
1470
1471    pub async fn downgrade_since(
1472        datadriven: &mut MachineState,
1473        args: DirectiveArgs<'_>,
1474    ) -> Result<String, anyhow::Error> {
1475        let since = args.expect_antichain("since");
1476        let seqno = args
1477            .optional("seqno")
1478            .unwrap_or_else(|| datadriven.machine.seqno());
1479        let reader_id = args.expect("reader_id");
1480        let (_, since, routine) = datadriven
1481            .machine
1482            .downgrade_since(
1483                &reader_id,
1484                seqno,
1485                &since,
1486                (datadriven.machine.applier.cfg.now)(),
1487            )
1488            .await;
1489        datadriven.routine.push(routine);
1490        Ok(format!(
1491            "{} {:?}\n",
1492            datadriven.machine.seqno(),
1493            since.0.elements()
1494        ))
1495    }
1496
1497    #[allow(clippy::unused_async)]
1498    pub async fn dyncfg(
1499        datadriven: &MachineState,
1500        args: DirectiveArgs<'_>,
1501    ) -> Result<String, anyhow::Error> {
1502        let mut updates = ConfigUpdates::default();
1503        for x in args.input.trim().split('\n') {
1504            match x.split(' ').collect::<Vec<_>>().as_slice() {
1505                &[name, val] => {
1506                    let config = datadriven
1507                        .client
1508                        .cfg
1509                        .entries()
1510                        .find(|x| x.name() == name)
1511                        .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1512                    match config.val() {
1513                        ConfigVal::Usize(_) => {
1514                            let val = val.parse().map_err(anyhow::Error::new)?;
1515                            updates.add_dynamic(name, ConfigVal::Usize(val));
1516                        }
1517                        ConfigVal::Bool(_) => {
1518                            let val = val.parse().map_err(anyhow::Error::new)?;
1519                            updates.add_dynamic(name, ConfigVal::Bool(val));
1520                        }
1521                        x => unimplemented!("dyncfg type: {:?}", x),
1522                    }
1523                }
1524                x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1525            }
1526        }
1527        updates.apply(&datadriven.client.cfg);
1528
1529        Ok("ok\n".to_string())
1530    }
1531
1532    pub async fn compare_and_downgrade_since(
1533        datadriven: &mut MachineState,
1534        args: DirectiveArgs<'_>,
1535    ) -> Result<String, anyhow::Error> {
1536        let expected_opaque: u64 = args.expect("expect_opaque");
1537        let new_opaque: u64 = args.expect("opaque");
1538        let new_since = args.expect_antichain("since");
1539        let reader_id = args.expect("reader_id");
1540        let (res, routine) = datadriven
1541            .machine
1542            .compare_and_downgrade_since(
1543                &reader_id,
1544                &Opaque::encode(&expected_opaque),
1545                (&Opaque::encode(&new_opaque), &new_since),
1546            )
1547            .await;
1548        datadriven.routine.push(routine);
1549        let since = res.map_err(|(opaque, since)| {
1550            anyhow!(
1551                "mismatch: opaque={} since={:?}",
1552                opaque.decode::<u64>(),
1553                since.0.elements()
1554            )
1555        })?;
1556        Ok(format!(
1557            "{} {} {:?}\n",
1558            datadriven.machine.seqno(),
1559            new_opaque,
1560            since.0.elements()
1561        ))
1562    }
1563
1564    pub async fn write_rollup(
1565        datadriven: &mut MachineState,
1566        args: DirectiveArgs<'_>,
1567    ) -> Result<String, anyhow::Error> {
1568        let output = args.expect_str("output");
1569
1570        let rollup = datadriven
1571            .machine
1572            .applier
1573            .write_rollup_for_state()
1574            .await
1575            .expect("rollup");
1576
1577        datadriven
1578            .rollups
1579            .insert(output.to_string(), rollup.clone());
1580
1581        Ok(format!(
1582            "state={} diffs=[{}, {})\n",
1583            rollup.seqno,
1584            rollup._desc.lower().first().expect("seqno"),
1585            rollup._desc.upper().first().expect("seqno"),
1586        ))
1587    }
1588
1589    pub async fn add_rollup(
1590        datadriven: &mut MachineState,
1591        args: DirectiveArgs<'_>,
1592    ) -> Result<String, anyhow::Error> {
1593        let input = args.expect_str("input");
1594        let rollup = datadriven
1595            .rollups
1596            .get(input)
1597            .expect("unknown batch")
1598            .clone();
1599
1600        let (applied, maintenance) = datadriven
1601            .machine
1602            .add_rollup((rollup.seqno, &rollup.to_hollow()))
1603            .await;
1604
1605        if !applied {
1606            return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1607        }
1608
1609        datadriven.routine.push(maintenance);
1610        Ok(format!("{}\n", datadriven.machine.seqno()))
1611    }
1612
1613    pub async fn write_batch(
1614        datadriven: &mut MachineState,
1615        args: DirectiveArgs<'_>,
1616    ) -> Result<String, anyhow::Error> {
1617        let output = args.expect_str("output");
1618        let lower = args.expect_antichain("lower");
1619        let upper = args.expect_antichain("upper");
1620        assert!(PartialOrder::less_than(&lower, &upper));
1621        let since = args
1622            .optional_antichain("since")
1623            .unwrap_or_else(|| Antichain::from_elem(0));
1624        let target_size = args.optional("target_size");
1625        let parts_size_override = args.optional("parts_size_override");
1626        let consolidate = args.optional("consolidate").unwrap_or(true);
1627        let mut updates: Vec<_> = args
1628            .input
1629            .split('\n')
1630            .flat_map(DirectiveArgs::parse_update)
1631            .collect();
1632
1633        let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1634        if let Some(target_size) = target_size {
1635            cfg.blob_target_size = target_size;
1636        };
1637        if consolidate {
1638            consolidate_updates(&mut updates);
1639        }
1640        let run_order = if consolidate {
1641            cfg.preferred_order
1642        } else {
1643            RunOrder::Unordered
1644        };
1645        let parts = BatchParts::new_ordered::<i64>(
1646            cfg.clone(),
1647            run_order,
1648            Arc::clone(&datadriven.client.metrics),
1649            Arc::clone(&datadriven.machine.applier.shard_metrics),
1650            datadriven.shard_id,
1651            Arc::clone(&datadriven.client.blob),
1652            Arc::clone(&datadriven.client.isolated_runtime),
1653            &datadriven.client.metrics.user,
1654        );
1655        let builder = BatchBuilderInternal::new(
1656            cfg.clone(),
1657            parts,
1658            Arc::clone(&datadriven.client.metrics),
1659            SCHEMAS.clone(),
1660            Arc::clone(&datadriven.client.blob),
1661            datadriven.shard_id.clone(),
1662            datadriven.client.cfg.build_version.clone(),
1663        );
1664        let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1665        for ((k, ()), t, d) in updates {
1666            builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1667        }
1668        let mut batch = builder.finish(upper).await?;
1669        // We can only reasonably use parts_size_override with hollow batches,
1670        // so if it's set, flush any inline batches out.
1671        if parts_size_override.is_some() {
1672            batch
1673                .flush_to_blob(
1674                    &cfg,
1675                    &datadriven.client.metrics.user,
1676                    &datadriven.client.isolated_runtime,
1677                    &SCHEMAS,
1678                )
1679                .await;
1680        }
1681        let batch = batch.into_hollow_batch();
1682        let batch = IdHollowBatch {
1683            batch: Arc::new(batch),
1684            id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1685        };
1686        datadriven.next_id += 1;
1687
1688        if let Some(size) = parts_size_override {
1689            let mut batch = batch.clone();
1690            let mut hollow_batch = (*batch.batch).clone();
1691            for part in hollow_batch.parts.iter_mut() {
1692                match part {
1693                    RunPart::Many(run) => run.max_part_bytes = size,
1694                    RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1695                    RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1696                }
1697            }
1698            batch.batch = Arc::new(hollow_batch);
1699            datadriven.batches.insert(output.to_owned(), batch);
1700        } else {
1701            datadriven.batches.insert(output.to_owned(), batch.clone());
1702        }
1703        Ok(format!(
1704            "parts={} len={}\n",
1705            batch.batch.part_count(),
1706            batch.batch.len
1707        ))
1708    }
1709
1710    pub async fn fetch_batch(
1711        datadriven: &MachineState,
1712        args: DirectiveArgs<'_>,
1713    ) -> Result<String, anyhow::Error> {
1714        let input = args.expect_str("input");
1715        let stats = args.optional_str("stats");
1716        let batch = datadriven.batches.get(input).expect("unknown batch");
1717
1718        let mut s = String::new();
1719        let mut stream = pin!(
1720            batch
1721                .batch
1722                .part_stream(
1723                    datadriven.shard_id,
1724                    &*datadriven.state_versions.blob,
1725                    &*datadriven.state_versions.metrics
1726                )
1727                .enumerate()
1728        );
1729        while let Some((idx, part)) = stream.next().await {
1730            let part = &*part?;
1731            write!(s, "<part {idx}>\n");
1732
1733            let lower = match part {
1734                BatchPart::Inline { updates, .. } => {
1735                    let updates: BlobTraceBatchPart<u64> =
1736                        updates.decode(&datadriven.client.metrics.columnar)?;
1737                    updates.structured_key_lower()
1738                }
1739                other @ BatchPart::Hollow(_) => other.structured_key_lower(),
1740            };
1741
1742            if let Some(lower) = lower {
1743                if stats == Some("lower") {
1744                    writeln!(s, "<key lower={}>", lower.get())
1745                }
1746            }
1747
1748            match part {
1749                BatchPart::Hollow(part) => {
1750                    let blob_batch = datadriven
1751                        .client
1752                        .blob
1753                        .get(&part.key.complete(&datadriven.shard_id))
1754                        .await;
1755                    match blob_batch {
1756                        Ok(Some(_)) | Err(_) => {}
1757                        // don't try to fetch/print the keys of the batch part
1758                        // if the blob store no longer has it
1759                        Ok(None) => {
1760                            s.push_str("<empty>\n");
1761                            continue;
1762                        }
1763                    };
1764                }
1765                BatchPart::Inline { .. } => {}
1766            };
1767            let part = EncodedPart::fetch(
1768                &FetchConfig::from_persist_config(&datadriven.client.cfg),
1769                &datadriven.shard_id,
1770                datadriven.client.blob.as_ref(),
1771                datadriven.client.metrics.as_ref(),
1772                datadriven.machine.applier.shard_metrics.as_ref(),
1773                &datadriven.client.metrics.read.batch_fetcher,
1774                &batch.batch.desc,
1775                part,
1776            )
1777            .await
1778            .expect("invalid batch part");
1779            let part = part
1780                .normalize(&datadriven.client.metrics.columnar)
1781                .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1782
1783            for ((k, _v), t, d) in part
1784                .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1785                .expect("valid schemas")
1786            {
1787                writeln!(s, "{k} {t} {d}");
1788            }
1789        }
1790        if !s.is_empty() {
1791            for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1792                write!(s, "<run {idx}>\n");
1793                for part in run {
1794                    let part_idx = batch
1795                        .batch
1796                        .parts
1797                        .iter()
1798                        .position(|p| p == part)
1799                        .expect("part should exist");
1800                    write!(s, "part {part_idx}\n");
1801                }
1802            }
1803        }
1804        Ok(s)
1805    }
1806
1807    #[allow(clippy::unused_async)]
1808    pub async fn truncate_batch_desc(
1809        datadriven: &mut MachineState,
1810        args: DirectiveArgs<'_>,
1811    ) -> Result<String, anyhow::Error> {
1812        let input = args.expect_str("input");
1813        let output = args.expect_str("output");
1814        let lower = args.expect_antichain("lower");
1815        let upper = args.expect_antichain("upper");
1816
1817        let batch = datadriven
1818            .batches
1819            .get(input)
1820            .expect("unknown batch")
1821            .clone();
1822        let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1823        let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1824        let mut new_hollow_batch = (*batch.batch).clone();
1825        new_hollow_batch.desc = truncated_desc;
1826        let new_batch = IdHollowBatch {
1827            batch: Arc::new(new_hollow_batch),
1828            id: batch.id,
1829        };
1830        datadriven
1831            .batches
1832            .insert(output.to_owned(), new_batch.clone());
1833        Ok(format!(
1834            "parts={} len={}\n",
1835            batch.batch.part_count(),
1836            batch.batch.len
1837        ))
1838    }
1839
1840    #[allow(clippy::unused_async)]
1841    pub async fn set_batch_parts_size(
1842        datadriven: &mut MachineState,
1843        args: DirectiveArgs<'_>,
1844    ) -> Result<String, anyhow::Error> {
1845        let input = args.expect_str("input");
1846        let size = args.expect("size");
1847        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1848        let mut hollow_batch = (*batch.batch).clone();
1849        for part in hollow_batch.parts.iter_mut() {
1850            match part {
1851                RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1852                _ => {
1853                    panic!("set_batch_parts_size only supports hollow parts")
1854                }
1855            }
1856        }
1857        batch.batch = Arc::new(hollow_batch);
1858        Ok("ok\n".to_string())
1859    }
1860
1861    pub async fn compact(
1862        datadriven: &mut MachineState,
1863        args: DirectiveArgs<'_>,
1864    ) -> Result<String, anyhow::Error> {
1865        let output = args.expect_str("output");
1866        let lower = args.expect_antichain("lower");
1867        let upper = args.expect_antichain("upper");
1868        let since = args.expect_antichain("since");
1869        let target_size = args.optional("target_size");
1870        let memory_bound = args.optional("memory_bound");
1871
1872        let mut inputs = Vec::new();
1873        for input in args.args.get("inputs").expect("missing inputs") {
1874            inputs.push(
1875                datadriven
1876                    .batches
1877                    .get(input)
1878                    .expect("unknown batch")
1879                    .clone(),
1880            );
1881        }
1882
1883        let cfg = datadriven.client.cfg.clone();
1884        if let Some(target_size) = target_size {
1885            cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1886        };
1887        if let Some(memory_bound) = memory_bound {
1888            cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1889        }
1890        let req = CompactReq {
1891            shard_id: datadriven.shard_id,
1892            desc: Description::new(lower, upper, since),
1893            inputs: inputs.clone(),
1894        };
1895        datadriven
1896            .compactions
1897            .insert(output.to_owned(), req.clone());
1898        let spine_lower = inputs
1899            .first()
1900            .map_or_else(|| datadriven.next_id, |x| x.id.0);
1901        let spine_upper = inputs.last().map_or_else(
1902            || {
1903                datadriven.next_id += 1;
1904                datadriven.next_id
1905            },
1906            |x| x.id.1,
1907        );
1908        let new_spine_id = SpineId(spine_lower, spine_upper);
1909        let res = Compactor::<String, (), u64, i64>::compact(
1910            CompactConfig::new(&cfg, datadriven.shard_id),
1911            Arc::clone(&datadriven.client.blob),
1912            Arc::clone(&datadriven.client.metrics),
1913            Arc::clone(&datadriven.machine.applier.shard_metrics),
1914            Arc::clone(&datadriven.client.isolated_runtime),
1915            req,
1916            SCHEMAS.clone(),
1917        )
1918        .await?;
1919
1920        let batch = IdHollowBatch {
1921            batch: Arc::new(res.output.clone()),
1922            id: new_spine_id,
1923        };
1924
1925        datadriven.batches.insert(output.to_owned(), batch.clone());
1926        Ok(format!(
1927            "parts={} len={}\n",
1928            res.output.part_count(),
1929            res.output.len
1930        ))
1931    }
1932
1933    pub async fn clear_blob(
1934        datadriven: &MachineState,
1935        _args: DirectiveArgs<'_>,
1936    ) -> Result<String, anyhow::Error> {
1937        let mut to_delete = vec![];
1938        datadriven
1939            .client
1940            .blob
1941            .list_keys_and_metadata("", &mut |meta| {
1942                to_delete.push(meta.key.to_owned());
1943            })
1944            .await?;
1945        for blob in &to_delete {
1946            datadriven.client.blob.delete(blob).await?;
1947        }
1948        Ok(format!("deleted={}\n", to_delete.len()))
1949    }
1950
1951    pub async fn restore_blob(
1952        datadriven: &MachineState,
1953        _args: DirectiveArgs<'_>,
1954    ) -> Result<String, anyhow::Error> {
1955        let not_restored = crate::internal::restore::restore_blob(
1956            &datadriven.state_versions,
1957            datadriven.client.blob.as_ref(),
1958            &datadriven.client.cfg.build_version,
1959            datadriven.shard_id,
1960            &*datadriven.state_versions.metrics,
1961        )
1962        .await?;
1963        let mut out = String::new();
1964        for key in not_restored {
1965            writeln!(&mut out, "{key}");
1966        }
1967        Ok(out)
1968    }
1969
1970    #[allow(clippy::unused_async)]
1971    pub async fn rewrite_ts(
1972        datadriven: &mut MachineState,
1973        args: DirectiveArgs<'_>,
1974    ) -> Result<String, anyhow::Error> {
1975        let input = args.expect_str("input");
1976        let ts_rewrite = args.expect_antichain("frontier");
1977        let upper = args.expect_antichain("upper");
1978
1979        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1980        let mut hollow_batch = (*batch.batch).clone();
1981        let () = hollow_batch
1982            .rewrite_ts(&ts_rewrite, upper)
1983            .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
1984        batch.batch = Arc::new(hollow_batch);
1985        Ok("ok\n".into())
1986    }
1987
1988    pub async fn gc(
1989        datadriven: &mut MachineState,
1990        args: DirectiveArgs<'_>,
1991    ) -> Result<String, anyhow::Error> {
1992        let new_seqno_since = args.expect("to_seqno");
1993
1994        let req = GcReq {
1995            shard_id: datadriven.shard_id,
1996            new_seqno_since,
1997        };
1998        let (maintenance, stats) =
1999            GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2000        datadriven.routine.push(maintenance);
2001
2002        Ok(format!(
2003            "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2004            datadriven.machine.seqno(),
2005            stats.batch_parts_deleted_from_blob,
2006            stats.rollups_deleted_from_blob,
2007            stats
2008                .truncated_consensus_to
2009                .iter()
2010                .map(|x| x.to_string())
2011                .collect::<Vec<_>>()
2012                .join(","),
2013            stats
2014                .rollups_removed_from_state
2015                .iter()
2016                .map(|x| x.to_string())
2017                .collect::<Vec<_>>()
2018                .join(","),
2019        ))
2020    }
2021
2022    pub async fn snapshot(
2023        datadriven: &MachineState,
2024        args: DirectiveArgs<'_>,
2025    ) -> Result<String, anyhow::Error> {
2026        let as_of = args.expect_antichain("as_of");
2027        let snapshot = datadriven
2028            .machine
2029            .unleased_snapshot(&as_of)
2030            .await
2031            .map_err(|err| anyhow!("{:?}", err))?;
2032
2033        let mut result = String::new();
2034
2035        for batch in snapshot {
2036            writeln!(
2037                result,
2038                "<batch {:?}-{:?}>",
2039                batch.desc.lower().elements(),
2040                batch.desc.upper().elements()
2041            );
2042            for (run, (_meta, parts)) in batch.runs().enumerate() {
2043                writeln!(result, "<run {run}>");
2044                let mut stream = pin!(
2045                    futures::stream::iter(parts)
2046                        .flat_map(|part| part.part_stream(
2047                            datadriven.shard_id,
2048                            &*datadriven.state_versions.blob,
2049                            &*datadriven.state_versions.metrics
2050                        ))
2051                        .enumerate()
2052                );
2053
2054                while let Some((idx, part)) = stream.next().await {
2055                    let part = &*part?;
2056                    writeln!(result, "<part {idx}>");
2057
2058                    let part = EncodedPart::fetch(
2059                        &FetchConfig::from_persist_config(&datadriven.client.cfg),
2060                        &datadriven.shard_id,
2061                        datadriven.client.blob.as_ref(),
2062                        datadriven.client.metrics.as_ref(),
2063                        datadriven.machine.applier.shard_metrics.as_ref(),
2064                        &datadriven.client.metrics.read.batch_fetcher,
2065                        &batch.desc,
2066                        part,
2067                    )
2068                    .await
2069                    .expect("invalid batch part");
2070                    let part = part
2071                        .normalize(&datadriven.client.metrics.columnar)
2072                        .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2073
2074                    let mut updates = Vec::new();
2075
2076                    for ((k, _v), mut t, d) in part
2077                        .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2078                        .expect("valid schemas")
2079                    {
2080                        t.advance_by(as_of.borrow());
2081                        updates.push((k, t, d));
2082                    }
2083
2084                    consolidate_updates(&mut updates);
2085
2086                    for (k, t, d) in updates {
2087                        writeln!(result, "{k} {t} {d}");
2088                    }
2089                }
2090            }
2091        }
2092
2093        Ok(result)
2094    }
2095
2096    pub async fn register_listen(
2097        datadriven: &mut MachineState,
2098        args: DirectiveArgs<'_>,
2099    ) -> Result<String, anyhow::Error> {
2100        let output = args.expect_str("output");
2101        let as_of = args.expect_antichain("as_of");
2102        let read = datadriven
2103            .client
2104            .open_leased_reader::<String, (), u64, i64>(
2105                datadriven.shard_id,
2106                Arc::new(StringSchema),
2107                Arc::new(UnitSchema),
2108                Diagnostics::for_tests(),
2109                true,
2110            )
2111            .await
2112            .expect("invalid shard types");
2113        let listen = read
2114            .listen(as_of)
2115            .await
2116            .map_err(|err| anyhow!("{:?}", err))?;
2117        datadriven.listens.insert(output.to_owned(), listen);
2118        Ok("ok\n".into())
2119    }
2120
2121    pub async fn listen_through(
2122        datadriven: &mut MachineState,
2123        args: DirectiveArgs<'_>,
2124    ) -> Result<String, anyhow::Error> {
2125        let input = args.expect_str("input");
2126        // It's not possible to listen _through_ the empty antichain, so this is
2127        // intentionally `expect` instead of `expect_antichain`.
2128        let frontier = args.expect("frontier");
2129        let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2130        let mut s = String::new();
2131        loop {
2132            for event in listen.fetch_next().await {
2133                match event {
2134                    ListenEvent::Updates(x) => {
2135                        for ((k, _v), t, d) in x.iter() {
2136                            write!(s, "{} {} {}\n", k, t, d);
2137                        }
2138                    }
2139                    ListenEvent::Progress(x) => {
2140                        if !x.less_than(&frontier) {
2141                            return Ok(s);
2142                        }
2143                    }
2144                }
2145            }
2146        }
2147    }
2148
2149    pub async fn register_critical_reader(
2150        datadriven: &mut MachineState,
2151        args: DirectiveArgs<'_>,
2152    ) -> Result<String, anyhow::Error> {
2153        let reader_id = args.expect("reader_id");
2154        let (state, maintenance) = datadriven
2155            .machine
2156            .register_critical_reader(&reader_id, Opaque::encode(&0u64), "tests")
2157            .await;
2158        datadriven.routine.push(maintenance);
2159        Ok(format!(
2160            "{} {:?}\n",
2161            datadriven.machine.seqno(),
2162            state.since.elements(),
2163        ))
2164    }
2165
2166    pub async fn register_leased_reader(
2167        datadriven: &mut MachineState,
2168        args: DirectiveArgs<'_>,
2169    ) -> Result<String, anyhow::Error> {
2170        let reader_id = args.expect("reader_id");
2171        let (reader_state, maintenance) = datadriven
2172            .machine
2173            .register_leased_reader(
2174                &reader_id,
2175                "tests",
2176                READER_LEASE_DURATION.get(&datadriven.client.cfg),
2177                (datadriven.client.cfg.now)(),
2178                false,
2179            )
2180            .await;
2181        datadriven.routine.push(maintenance);
2182        Ok(format!(
2183            "{} {:?}\n",
2184            datadriven.machine.seqno(),
2185            reader_state.since.elements(),
2186        ))
2187    }
2188
2189    pub async fn expire_critical_reader(
2190        datadriven: &mut MachineState,
2191        args: DirectiveArgs<'_>,
2192    ) -> Result<String, anyhow::Error> {
2193        let reader_id = args.expect("reader_id");
2194        let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2195        datadriven.routine.push(maintenance);
2196        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2197    }
2198
2199    pub async fn expire_leased_reader(
2200        datadriven: &mut MachineState,
2201        args: DirectiveArgs<'_>,
2202    ) -> Result<String, anyhow::Error> {
2203        let reader_id = args.expect("reader_id");
2204        let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2205        datadriven.routine.push(maintenance);
2206        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2207    }
2208
2209    pub async fn compare_and_append_batches(
2210        datadriven: &MachineState,
2211        args: DirectiveArgs<'_>,
2212    ) -> Result<String, anyhow::Error> {
2213        let expected_upper = args.expect_antichain("expected_upper");
2214        let new_upper = args.expect_antichain("new_upper");
2215
2216        let mut batches: Vec<Batch<String, (), u64, i64>> = args
2217            .args
2218            .get("batches")
2219            .expect("missing batches")
2220            .into_iter()
2221            .map(|batch| {
2222                let hollow = (*datadriven
2223                    .batches
2224                    .get(batch)
2225                    .expect("unknown batch")
2226                    .clone()
2227                    .batch)
2228                    .clone();
2229                datadriven.to_batch(hollow)
2230            })
2231            .collect();
2232
2233        let mut writer = datadriven
2234            .client
2235            .open_writer(
2236                datadriven.shard_id,
2237                Arc::new(StringSchema),
2238                Arc::new(UnitSchema),
2239                Diagnostics::for_tests(),
2240            )
2241            .await?;
2242
2243        let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2244
2245        let () = writer
2246            .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2247            .await?
2248            .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2249
2250        writer.expire().await;
2251
2252        Ok("ok\n".into())
2253    }
2254
2255    pub async fn expire_writer(
2256        datadriven: &mut MachineState,
2257        args: DirectiveArgs<'_>,
2258    ) -> Result<String, anyhow::Error> {
2259        let writer_id = args.expect("writer_id");
2260        let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2261        datadriven.routine.push(maintenance);
2262        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2263    }
2264
2265    pub(crate) async fn finalize(
2266        datadriven: &mut MachineState,
2267        _args: DirectiveArgs<'_>,
2268    ) -> anyhow::Result<String> {
2269        let maintenance = datadriven.machine.become_tombstone().await?;
2270        datadriven.routine.push(maintenance);
2271        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2272    }
2273
2274    pub(crate) fn is_finalized(
2275        datadriven: &MachineState,
2276        _args: DirectiveArgs<'_>,
2277    ) -> anyhow::Result<String> {
2278        let seqno = datadriven.machine.seqno();
2279        let tombstone = datadriven.machine.is_finalized();
2280        Ok(format!("{seqno} {tombstone}\n"))
2281    }
2282
2283    pub async fn compare_and_append(
2284        datadriven: &mut MachineState,
2285        args: DirectiveArgs<'_>,
2286    ) -> Result<String, anyhow::Error> {
2287        let input = args.expect_str("input");
2288        let writer_id = args.expect("writer_id");
2289        let mut batch = datadriven
2290            .batches
2291            .get(input)
2292            .expect("unknown batch")
2293            .clone();
2294        let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2295        let now = (datadriven.client.cfg.now)();
2296
2297        let (id, maintenance) = datadriven
2298            .machine
2299            .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2300            .await;
2301        assert_eq!(id, SCHEMAS.id);
2302        datadriven.routine.push(maintenance);
2303        let maintenance = loop {
2304            let indeterminate = args
2305                .optional::<String>("prev_indeterminate")
2306                .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2307            let res = datadriven
2308                .machine
2309                .compare_and_append_idempotent(
2310                    &batch.batch,
2311                    &writer_id,
2312                    now,
2313                    &token,
2314                    &HandleDebugState::default(),
2315                    indeterminate,
2316                )
2317                .await;
2318            match res {
2319                CompareAndAppendRes::Success(_, x) => break x,
2320                CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2321                    return Err(anyhow!("{:?}", Upper(upper)));
2322                }
2323                CompareAndAppendRes::InlineBackpressure => {
2324                    let hollow_batch = (*batch.batch).clone();
2325                    let mut b = datadriven.to_batch(hollow_batch);
2326                    let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2327                    b.flush_to_blob(
2328                        &cfg,
2329                        &datadriven.client.metrics.user,
2330                        &datadriven.client.isolated_runtime,
2331                        &*SCHEMAS,
2332                    )
2333                    .await;
2334                    batch.batch = Arc::new(b.into_hollow_batch());
2335                    continue;
2336                }
2337                CompareAndAppendRes::InvalidUsage(_) => panic!("{:?}", res),
2338            };
2339        };
2340        // TODO: Don't throw away writer maintenance. It's slightly tricky
2341        // because we need a WriterId for Compactor.
2342        datadriven.routine.push(maintenance.routine);
2343        Ok(format!(
2344            "{} {:?}\n",
2345            datadriven.machine.seqno(),
2346            datadriven.machine.applier.clone_upper().elements(),
2347        ))
2348    }
2349
2350    pub async fn apply_merge_res(
2351        datadriven: &mut MachineState,
2352        args: DirectiveArgs<'_>,
2353    ) -> Result<String, anyhow::Error> {
2354        let input = args.expect_str("input");
2355        let batch = datadriven
2356            .batches
2357            .get(input)
2358            .expect("unknown batch")
2359            .clone();
2360        let compact_req = datadriven
2361            .compactions
2362            .get(input)
2363            .expect("unknown compact req")
2364            .clone();
2365        let input_batches = compact_req
2366            .inputs
2367            .iter()
2368            .map(|x| x.id)
2369            .collect::<BTreeSet<_>>();
2370        let lower_spine_bound = input_batches
2371            .first()
2372            .map(|id| id.0)
2373            .expect("at least one batch must be present");
2374        let upper_spine_bound = input_batches
2375            .last()
2376            .map(|id| id.1)
2377            .expect("at least one batch must be present");
2378        let id = SpineId(lower_spine_bound, upper_spine_bound);
2379        let hollow_batch = (*batch.batch).clone();
2380
2381        let (merge_res, maintenance) = datadriven
2382            .machine
2383            .merge_res(&FueledMergeRes {
2384                output: hollow_batch,
2385                input: CompactionInput::IdRange(id),
2386                new_active_compaction: None,
2387            })
2388            .await;
2389        datadriven.routine.push(maintenance);
2390        Ok(format!(
2391            "{} {}\n",
2392            datadriven.machine.seqno(),
2393            merge_res.applied()
2394        ))
2395    }
2396
2397    pub async fn perform_maintenance(
2398        datadriven: &mut MachineState,
2399        _args: DirectiveArgs<'_>,
2400    ) -> Result<String, anyhow::Error> {
2401        let mut s = String::new();
2402        for maintenance in datadriven.routine.drain(..) {
2403            let () = maintenance
2404                .perform(&datadriven.machine, &datadriven.gc)
2405                .await;
2406            let () = datadriven
2407                .machine
2408                .applier
2409                .fetch_and_update_state(None)
2410                .await;
2411            write!(s, "{} ok\n", datadriven.machine.seqno());
2412        }
2413        Ok(s)
2414    }
2415}
2416
2417#[cfg(test)]
2418pub mod tests {
2419    use std::sync::Arc;
2420
2421    use mz_dyncfg::ConfigUpdates;
2422    use mz_ore::cast::CastFrom;
2423    use mz_ore::task::spawn;
2424    use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2425    use mz_persist::location::SeqNo;
2426    use mz_persist_types::PersistLocation;
2427    use semver::Version;
2428    use timely::progress::Antichain;
2429
2430    use crate::batch::BatchBuilderConfig;
2431    use crate::cache::StateCache;
2432    use crate::internal::gc::{GarbageCollector, GcReq};
2433    use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2434    use crate::tests::{new_test_client, new_test_client_cache};
2435    use crate::{Diagnostics, PersistClient, ShardId};
2436
2437    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2438    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2439    async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2440        mz_ore::test::init_logging();
2441
2442        let client = new_test_client(&dyncfgs).await;
2443        // set a low rollup threshold so GC/truncation is more aggressive
2444        client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2445        let (mut write, read) = client
2446            .expect_open::<String, (), u64, i64>(ShardId::new())
2447            .await;
2448
2449        // Ensure the reader is not holding back the since.
2450        read.expire().await;
2451
2452        // Write a bunch of batches. This should result in a bounded number of
2453        // live entries in consensus.
2454        const NUM_BATCHES: u64 = 100;
2455        for idx in 0..NUM_BATCHES {
2456            let mut batch = write
2457                .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2458                .await;
2459            // Flush this batch out so the CaA doesn't get inline writes
2460            // backpressure.
2461            let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2462            batch
2463                .flush_to_blob(
2464                    &cfg,
2465                    &client.metrics.user,
2466                    &client.isolated_runtime,
2467                    &write.write_schemas,
2468                )
2469                .await;
2470            let (_, writer_maintenance) = write
2471                .machine
2472                .compare_and_append(
2473                    &batch.into_hollow_batch(),
2474                    &write.writer_id,
2475                    &HandleDebugState::default(),
2476                    (write.cfg.now)(),
2477                )
2478                .await
2479                .unwrap();
2480            writer_maintenance
2481                .perform(&write.machine, &write.gc, write.compact.as_ref())
2482                .await;
2483        }
2484        let live_diffs = write
2485            .machine
2486            .applier
2487            .state_versions
2488            .fetch_all_live_diffs(&write.machine.shard_id())
2489            .await;
2490        // Make sure we constructed the key correctly.
2491        assert!(live_diffs.len() > 0);
2492        // Make sure the number of entries is bounded. (I think we could work
2493        // out a tighter bound than this, but the point is only that it's
2494        // bounded).
2495        let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2496        assert!(
2497            live_diffs.len() <= max_live_diffs,
2498            "{} vs {}",
2499            live_diffs.len(),
2500            max_live_diffs
2501        );
2502    }
2503
2504    // A regression test for database-issues#4206, where a bug in gc led to an incremental
2505    // state invariant being violated which resulted in gc being permanently
2506    // wedged for the shard.
2507    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2508    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2509    async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2510        let mut client = new_test_client(&dyncfgs).await;
2511        let intercept = InterceptHandle::default();
2512        client.blob = Arc::new(InterceptBlob::new(
2513            Arc::clone(&client.blob),
2514            intercept.clone(),
2515        ));
2516        let (_, mut read) = client
2517            .expect_open::<String, String, u64, i64>(ShardId::new())
2518            .await;
2519
2520        // Create a new SeqNo
2521        read.downgrade_since(&Antichain::from_elem(1)).await;
2522        let new_seqno_since = read.machine.applier.seqno_since();
2523
2524        // Start a GC in the background for some SeqNo range that is not
2525        // contiguous compared to the last gc req (in this case, n/a) and then
2526        // crash when it gets to the blob deletes. In the regression case, this
2527        // renders the shard permanently un-gc-able.
2528        assert!(new_seqno_since > SeqNo::minimum());
2529        intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2530        let machine = read.machine.clone();
2531        // Run this in a spawn so we can catch the boom panic
2532        let gc = spawn(|| "", async move {
2533            let req = GcReq {
2534                shard_id: machine.shard_id(),
2535                new_seqno_since,
2536            };
2537            GarbageCollector::gc_and_truncate(&machine, req).await
2538        });
2539        // Wait for gc to either panic (regression case) or finish (good case)
2540        // because it happens to not call blob delete.
2541        let _ = gc.await;
2542
2543        // Allow blob deletes to go through and try GC again. In the regression
2544        // case, this hangs.
2545        intercept.set_post_delete(None);
2546        let req = GcReq {
2547            shard_id: read.machine.shard_id(),
2548            new_seqno_since,
2549        };
2550        let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2551    }
2552
2553    // A regression test for materialize#20776, where a bug meant that compare_and_append
2554    // would not fetch the latest state after an upper mismatch. This meant that
2555    // a write that could succeed if retried on the latest state would instead
2556    // return an UpperMismatch.
2557    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2558    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2559    async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2560        let client = new_test_client(&dyncfgs).await;
2561        let mut client2 = client.clone();
2562
2563        // The bug can only happen if the two WriteHandles have separate copies
2564        // of state, so make sure that each is given its own StateCache.
2565        let new_state_cache = Arc::new(StateCache::new_no_metrics());
2566        client2.shared_states = new_state_cache;
2567
2568        let shard_id = ShardId::new();
2569        let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2570        let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2571
2572        let data = [
2573            (("1".to_owned(), ()), 1, 1),
2574            (("2".to_owned(), ()), 2, 1),
2575            (("3".to_owned(), ()), 3, 1),
2576            (("4".to_owned(), ()), 4, 1),
2577            (("5".to_owned(), ()), 5, 1),
2578        ];
2579
2580        write1.expect_compare_and_append(&data[..1], 0, 2).await;
2581
2582        // this handle's upper now lags behind. if compare_and_append fails to update
2583        // state after an upper mismatch then this call would (incorrectly) fail
2584        write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2585    }
2586
2587    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2588    #[cfg_attr(miri, ignore)]
2589    async fn version_upgrade(dyncfgs: ConfigUpdates) {
2590        let mut cache = new_test_client_cache(&dyncfgs);
2591        cache.cfg.build_version = Version::new(26, 1, 0);
2592        let shard_id = ShardId::new();
2593
2594        async fn fetch_catalog_upgrade_shard_version(
2595            persist_client: &PersistClient,
2596            upgrade_shard_id: ShardId,
2597        ) -> Option<semver::Version> {
2598            let shard_state = persist_client
2599                .inspect_shard::<u64>(&upgrade_shard_id)
2600                .await
2601                .ok()?;
2602            let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2603            let upgrade_version = json_state
2604                .get("applier_version")
2605                .cloned()
2606                .expect("missing applier_version");
2607            let upgrade_version =
2608                serde_json::from_value(upgrade_version).expect("version deserialization error");
2609            Some(upgrade_version)
2610        }
2611
2612        cache.cfg.build_version = Version::new(26, 1, 0);
2613        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2614        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2615        reader.downgrade_since(&Antichain::from_elem(1)).await;
2616        assert_eq!(
2617            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2618            Some(Version::new(26, 1, 0)),
2619        );
2620
2621        // Merely opening and operating on the shard at a new version doesn't bump version...
2622        cache.cfg.build_version = Version::new(27, 1, 0);
2623        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2624        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2625        reader.downgrade_since(&Antichain::from_elem(2)).await;
2626        assert_eq!(
2627            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2628            Some(Version::new(26, 1, 0)),
2629        );
2630
2631        // ...but an explicit call will.
2632        client
2633            .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2634            .await
2635            .unwrap();
2636        assert_eq!(
2637            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2638            Some(Version::new(27, 1, 0)),
2639        );
2640    }
2641}