Skip to main content

mz_persist_client/internal/
machine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the persist state machine.
11
12use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] // False positive.
25use mz_ore::fmt::FormatBuffer;
26use mz_ore::{assert_none, soft_assert_no_log};
27use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
28use mz_persist::retry::Retry;
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use semver::Version;
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::{Instrument, debug, info, trace_span};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
38use crate::cache::StateCache;
39use crate::cfg::RetryParameters;
40use crate::critical::CriticalReaderId;
41use crate::error::{CodecMismatch, InvalidUsage};
42use crate::internal::apply::Applier;
43use crate::internal::compact::CompactReq;
44use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
45use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
46use crate::internal::paths::PartialRollupKey;
47use crate::internal::state::{
48    CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
49    IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
50    Upper,
51};
52use crate::internal::state_versions::StateVersions;
53use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
54use crate::internal::watch::StateWatch;
55use crate::read::LeasedReaderId;
56use crate::rpc::PubSubSender;
57use crate::schema::CaESchema;
58use crate::write::WriterId;
59use crate::{Diagnostics, PersistConfig, ShardId};
60
61#[derive(Debug)]
62pub struct Machine<K, V, T, D> {
63    pub(crate) applier: Applier<K, V, T, D>,
64    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
65}
66
67// Impl Clone regardless of the type params.
68impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
69    fn clone(&self) -> Self {
70        Self {
71            applier: self.applier.clone(),
72            isolated_runtime: Arc::clone(&self.isolated_runtime),
73        }
74    }
75}
76
77pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
78    "persist_claim_unclaimed_compactions",
79    false,
80    "If an append doesn't result in a compaction request, but there is some uncompacted batch \
81    in state, compact that instead.",
82);
83
84pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
85    "persist_claim_compaction_percent",
86    100,
87    "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
88    (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
89    three and have a 65% chance of claiming a fourth.)",
90);
91
92pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
93    "persist_claim_compaction_min_version",
94    String::new(),
95    "If set to a valid version string, compact away any earlier versions if possible.",
96);
97
98impl<K, V, T, D> Machine<K, V, T, D>
99where
100    K: Debug + Codec,
101    V: Debug + Codec,
102    T: Timestamp + Lattice + Codec64 + Sync,
103    D: Monoid + Codec64,
104{
105    pub async fn new(
106        cfg: PersistConfig,
107        shard_id: ShardId,
108        metrics: Arc<Metrics>,
109        state_versions: Arc<StateVersions>,
110        shared_states: Arc<StateCache>,
111        pubsub_sender: Arc<dyn PubSubSender>,
112        isolated_runtime: Arc<IsolatedRuntime>,
113        diagnostics: Diagnostics,
114    ) -> Result<Self, Box<CodecMismatch>> {
115        let applier = Applier::new(
116            cfg,
117            shard_id,
118            metrics,
119            state_versions,
120            shared_states,
121            pubsub_sender,
122            diagnostics,
123        )
124        .await?;
125        Ok(Machine {
126            applier,
127            isolated_runtime,
128        })
129    }
130
131    pub fn shard_id(&self) -> ShardId {
132        self.applier.shard_id
133    }
134
135    pub fn seqno(&self) -> SeqNo {
136        self.applier.seqno()
137    }
138
139    pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
140        let rollup = self.applier.write_rollup_for_state().await;
141        let Some(rollup) = rollup else {
142            return RoutineMaintenance::default();
143        };
144
145        let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
146        if !applied {
147            // Someone else already wrote a rollup at this seqno, so ours didn't
148            // get added. Delete it.
149            self.applier
150                .state_versions
151                .delete_rollup(&rollup.shard_id, &rollup.key)
152                .await;
153        }
154        maintenance
155    }
156
157    pub async fn add_rollup(
158        &self,
159        add_rollup: (SeqNo, &HollowRollup),
160    ) -> (bool, RoutineMaintenance) {
161        // See the big SUBTLE comment in [Self::merge_res] for what's going on
162        // here.
163        let mut applied_ever_true = false;
164        let metrics = Arc::clone(&self.applier.metrics);
165        let (_seqno, _applied, maintenance) = self
166            .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
167                let ret = state.add_rollup(add_rollup);
168                if let Continue(applied) = ret {
169                    applied_ever_true = applied_ever_true || applied;
170                }
171                ret
172            })
173            .await;
174        (applied_ever_true, maintenance)
175    }
176
177    pub async fn remove_rollups(
178        &self,
179        remove_rollups: &[(SeqNo, PartialRollupKey)],
180    ) -> (Vec<SeqNo>, RoutineMaintenance) {
181        let metrics = Arc::clone(&self.applier.metrics);
182        let (_seqno, removed_rollup_seqnos, maintenance) = self
183            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
184                state.remove_rollups(remove_rollups)
185            })
186            .await;
187        (removed_rollup_seqnos, maintenance)
188    }
189
190    /// Attempt to upgrade the state to the latest version. If that's not possible, return the
191    /// actual data version of the shard.
192    pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
193        let metrics = Arc::clone(&self.applier.metrics);
194        let (_seqno, upgrade_result, maintenance) = self
195            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
196                if state.version <= cfg.build_version {
197                    // This would be the place to remove any deprecated items from state, now
198                    // that we're dropping compatibility with any previous versions.
199                    state.version = cfg.build_version.clone();
200                    Continue(Ok(()))
201                } else {
202                    Break(NoOpStateTransition(Err(state.version.clone())))
203                }
204            })
205            .await;
206
207        match upgrade_result {
208            Ok(()) => Ok(maintenance),
209            Err(version) => {
210                soft_assert_no_log!(
211                    maintenance.is_empty(),
212                    "should not generate maintenance on failed upgrade"
213                );
214                Err(version)
215            }
216        }
217    }
218
219    pub async fn register_leased_reader(
220        &self,
221        reader_id: &LeasedReaderId,
222        purpose: &str,
223        lease_duration: Duration,
224        heartbeat_timestamp_ms: u64,
225        use_critical_since: bool,
226    ) -> (LeasedReaderState<T>, RoutineMaintenance) {
227        let metrics = Arc::clone(&self.applier.metrics);
228        let (_seqno, (reader_state, seqno_since), maintenance) = self
229            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
230                state.register_leased_reader(
231                    &cfg.hostname,
232                    reader_id,
233                    purpose,
234                    seqno,
235                    lease_duration,
236                    heartbeat_timestamp_ms,
237                    use_critical_since,
238                )
239            })
240            .await;
241        // Usually, the reader gets an initial seqno hold of the seqno at which
242        // it was registered. However, on a tombstone shard the seqno hold
243        // happens to get computed as the tombstone seqno + 1
244        // (State::clone_apply provided seqno.next(), the non-no-op commit
245        // seqno, to the work fn and this is what register_reader uses for the
246        // seqno hold). The real invariant we want to protect here is that the
247        // hold is >= the seqno_since, so validate that instead of anything more
248        // specific.
249        debug_assert!(
250            reader_state.seqno >= seqno_since,
251            "{} vs {}",
252            reader_state.seqno,
253            seqno_since,
254        );
255        (reader_state, maintenance)
256    }
257
258    pub async fn register_critical_reader<O: Opaque + Codec64>(
259        &self,
260        reader_id: &CriticalReaderId,
261        purpose: &str,
262    ) -> (CriticalReaderState<T>, RoutineMaintenance) {
263        let metrics = Arc::clone(&self.applier.metrics);
264        let (_seqno, state, maintenance) = self
265            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
266                state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
267            })
268            .await;
269        (state, maintenance)
270    }
271
272    pub async fn register_schema(
273        &self,
274        key_schema: &K::Schema,
275        val_schema: &V::Schema,
276    ) -> (Option<SchemaId>, RoutineMaintenance) {
277        let metrics = Arc::clone(&self.applier.metrics);
278        let (_seqno, state, maintenance) = self
279            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
280                state.register_schema::<K, V>(key_schema, val_schema)
281            })
282            .await;
283        (state, maintenance)
284    }
285
286    pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
287        // Performance special case for no-ops, to avoid the State clones.
288        if fuel == 0 || self.applier.all_batches().len() < 2 {
289            return (Vec::new(), RoutineMaintenance::default());
290        }
291
292        let metrics = Arc::clone(&self.applier.metrics);
293        let (_seqno, reqs, maintenance) = self
294            .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
295                state.spine_exert(fuel)
296            })
297            .await;
298        let reqs = reqs
299            .into_iter()
300            .map(|req| CompactReq {
301                shard_id: self.shard_id(),
302                desc: req.desc,
303                inputs: req.inputs,
304            })
305            .collect();
306        (reqs, maintenance)
307    }
308
309    pub async fn compare_and_append(
310        &self,
311        batch: &HollowBatch<T>,
312        writer_id: &WriterId,
313        debug_info: &HandleDebugState,
314        heartbeat_timestamp_ms: u64,
315    ) -> CompareAndAppendRes<T> {
316        let idempotency_token = IdempotencyToken::new();
317        loop {
318            let res = self
319                .compare_and_append_idempotent(
320                    batch,
321                    writer_id,
322                    heartbeat_timestamp_ms,
323                    &idempotency_token,
324                    debug_info,
325                    None,
326                )
327                .await;
328            match res {
329                CompareAndAppendRes::Success(seqno, maintenance) => {
330                    return CompareAndAppendRes::Success(seqno, maintenance);
331                }
332                CompareAndAppendRes::InvalidUsage(x) => {
333                    return CompareAndAppendRes::InvalidUsage(x);
334                }
335                CompareAndAppendRes::InlineBackpressure => {
336                    return CompareAndAppendRes::InlineBackpressure;
337                }
338                CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
339                    // If the state machine thinks that the shard upper is not
340                    // far enough along, it could be because the caller of this
341                    // method has found out that it advanced via some some
342                    // side-channel that didn't update our local cache of the
343                    // machine state. So, fetch the latest state and try again
344                    // if we indeed get something different.
345                    self.applier.fetch_and_update_state(Some(seqno)).await;
346                    let (current_seqno, current_upper) =
347                        self.applier.upper(|seqno, upper| (seqno, upper.clone()));
348
349                    // We tried to to a compare_and_append with the wrong
350                    // expected upper, that won't work.
351                    if &current_upper != batch.desc.lower() {
352                        return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
353                    } else {
354                        // The upper stored in state was outdated. Retry after
355                        // updating.
356                    }
357                }
358            }
359        }
360    }
361
362    async fn compare_and_append_idempotent(
363        &self,
364        batch: &HollowBatch<T>,
365        writer_id: &WriterId,
366        heartbeat_timestamp_ms: u64,
367        idempotency_token: &IdempotencyToken,
368        debug_info: &HandleDebugState,
369        // Only exposed for testing. In prod, this always starts as None, but
370        // making it a parameter allows us to simulate hitting an indeterminate
371        // error on the first attempt in tests.
372        mut indeterminate: Option<Indeterminate>,
373    ) -> CompareAndAppendRes<T> {
374        let metrics = Arc::clone(&self.applier.metrics);
375        let lease_duration_ms = self
376            .applier
377            .cfg
378            .writer_lease_duration
379            .as_millis()
380            .try_into()
381            .expect("reasonable duration");
382        // SUBTLE: Retries of compare_and_append with Indeterminate errors are
383        // tricky (more discussion of this in database-issues#3680):
384        //
385        // - (1) We compare_and_append and get an Indeterminate error back from
386        //   CRDB/Consensus. This means we don't know if it committed or not.
387        // - (2) We retry it.
388        // - (3) We get back an upper mismatch. The tricky bit is deciding if we
389        //   conflicted with some other writer OR if the write in (1) actually
390        //   went through and we're "conflicting" with ourself.
391        //
392        // A number of scenarios can be distinguished with per-writer
393        // idempotency tokens, so I'll jump straight to the hardest one:
394        //
395        // - (1) A compare_and_append is issued for e.g. `[5,7)`, the consensus
396        //   call makes it onto the network before the operation is cancelled
397        //   (by dropping the future).
398        // - (2) A compare_and_append is issued from the same WriteHandle for
399        //   `[3,5)`, it uses a different conn from the consensus pool and gets
400        //   an Indeterminate error.
401        // - (3) The call in (1) is received by consensus and commits.
402        // - (4) The retry of (2) receives an upper mismatch with an upper of 7.
403        //
404        // At this point, how do we determine whether (2) committed or not and
405        // thus whether we should return success or upper mismatch? Getting this
406        // right is very important for correctness (imagine this is a table
407        // write and we either return success or failure to the client).
408        //
409        // - If we use per-writer IdempotencyTokens but only store the latest
410        //   one in state, then the `[5,7)` one will have clobbered whatever our
411        //   `[3,5)` one was.
412        // - We could store every IdempotencyToken that ever committed, but that
413        //   would require unbounded storage in state (non-starter).
414        // - We could require that IdempotencyTokens are comparable and that
415        //   each call issued by a WriteHandle uses one that is strictly greater
416        //   than every call before it. A previous version of this PR tried this
417        //   and it's remarkably subtle. As a result, I (Dan) have developed
418        //   strong feels that our correctness protocol _should not depend on
419        //   WriteHandle, only Machine_.
420        // - We could require a new WriterId if a request is ever cancelled by
421        //   making `compare_and_append` take ownership of `self` and then
422        //   handing it back for any call polled to completion. The ergonomics
423        //   of this are quite awkward and, like the previous idea, it depends
424        //   on the WriteHandle impl for correctness.
425        // - Any ideas that involve reading back the data are foiled by a step
426        //   `(0) set the since to 100` (plus the latency and memory usage would
427        //   be too unpredictable).
428        //
429        // The technique used here derives from the following observations:
430        //
431        // - In practice, we don't use compare_and_append with the sort of
432        //   "regressing frontiers" described above.
433        // - In practice, Indeterminate errors are rare-ish. They happen enough
434        //   that we don't want to always panic on them, but this is still a
435        //   useful property to build on.
436        //
437        // At a high level, we do just enough to be able to distinguish the
438        // cases that we think will happen in practice and then leave the rest
439        // for a panic! that we think we'll never see. Concretely:
440        //
441        // - Run compare_and_append in a loop, retrying on Indeterminate errors
442        //   but noting if we've ever done that.
443        // - If we haven't seen an Indeterminate error (i.e. this is the first
444        //   time though the loop) then the result we got is guaranteed to be
445        //   correct, so pass it up.
446        // - Otherwise, any result other than an expected upper mismatch is
447        //   guaranteed to be correct, so just pass it up.
448        // - Otherwise examine the writer's most recent upper and break it into
449        //   two cases:
450        // - Case 1 `expected_upper.less_than(writer_most_recent_upper)`: it's
451        //   impossible that we committed on a previous iteration because the
452        //   overall upper of the shard is less_than what this call would have
453        //   advanced it to. Pass up the expectation mismatch.
454        // - Case 2 `!Case1`: First note that this means our IdempotencyToken
455        //   didn't match, otherwise we would have gotten `AlreadyCommitted`. It
456        //   also means some previous write from _this writer_ has committed an
457        //   upper that is beyond the one in this call, which is a weird usage
458        //   (NB can't be a future write because that would mean someone is
459        //   still polling us, but `&mut self` prevents that).
460        //
461        // TODO: If this technique works in practice (leads to zero panics),
462        // then commit to it and remove the Indeterminate from
463        // [WriteHandle::compare_and_append_batch].
464        let mut retry = self
465            .applier
466            .metrics
467            .retries
468            .compare_and_append_idempotent
469            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
470        let mut writer_was_present = false;
471        loop {
472            let cmd_res = self
473                .applier
474                .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
475                    writer_was_present = state.writers.contains_key(writer_id);
476                    state.compare_and_append(
477                        batch,
478                        writer_id,
479                        heartbeat_timestamp_ms,
480                        lease_duration_ms,
481                        idempotency_token,
482                        debug_info,
483                        INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
484                        if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
485                            CLAIM_COMPACTION_PERCENT.get(cfg)
486                        } else {
487                            0
488                        },
489                        Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
490                            .ok()
491                            .as_ref(),
492                    )
493                })
494                .await;
495            let (seqno, res, routine) = match cmd_res {
496                Ok(x) => x,
497                Err(err) => {
498                    // These are rare and interesting enough that we always log
499                    // them at info!.
500                    info!(
501                        "compare_and_append received an indeterminate error, retrying in {:?}: {}",
502                        retry.next_sleep(),
503                        err
504                    );
505                    if indeterminate.is_none() {
506                        indeterminate = Some(err);
507                    }
508                    retry = retry.sleep().await;
509                    continue;
510                }
511            };
512            match res {
513                Ok(merge_reqs) => {
514                    // We got explicit confirmation that we succeeded, so
515                    // anything that happened in a previous retry is irrelevant.
516                    let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
517                    for req in merge_reqs {
518                        let req = CompactReq {
519                            shard_id: self.shard_id(),
520                            desc: req.desc,
521                            inputs: req.inputs,
522                        };
523                        compact_reqs.push(req);
524                    }
525                    let writer_maintenance = WriterMaintenance {
526                        routine,
527                        compaction: compact_reqs,
528                    };
529
530                    if !writer_was_present {
531                        metrics.state.writer_added.inc();
532                    }
533                    for part in &batch.parts {
534                        if part.is_inline() {
535                            let bytes = u64::cast_from(part.inline_bytes());
536                            metrics.inline.part_commit_bytes.inc_by(bytes);
537                            metrics.inline.part_commit_count.inc();
538                        }
539                    }
540                    return CompareAndAppendRes::Success(seqno, writer_maintenance);
541                }
542                Err(CompareAndAppendBreak::AlreadyCommitted) => {
543                    // A previous iteration through this loop got an
544                    // Indeterminate error but was successful. Sanity check this
545                    // and pass along the good news.
546                    assert!(indeterminate.is_some());
547                    self.applier.metrics.cmds.compare_and_append_noop.inc();
548                    if !writer_was_present {
549                        metrics.state.writer_added.inc();
550                    }
551                    return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
552                }
553                Err(CompareAndAppendBreak::InvalidUsage(err)) => {
554                    // InvalidUsage is (or should be) a deterministic function
555                    // of the inputs and independent of anything in persist
556                    // state. It's handed back via a Break, so we never even try
557                    // to commit it. No network, no Indeterminate.
558                    assert_none!(indeterminate);
559                    return CompareAndAppendRes::InvalidUsage(err);
560                }
561                Err(CompareAndAppendBreak::InlineBackpressure) => {
562                    // We tried to write an inline part, but there was already
563                    // too much in state. Flush it out to s3 and try again.
564                    return CompareAndAppendRes::InlineBackpressure;
565                }
566                Err(CompareAndAppendBreak::Upper {
567                    shard_upper,
568                    writer_upper,
569                }) => {
570                    // NB the below intentionally compares to writer_upper
571                    // (because it gives a tighter bound on the bad case), but
572                    // returns shard_upper (what the persist caller cares
573                    // about).
574                    assert!(
575                        PartialOrder::less_equal(&writer_upper, &shard_upper),
576                        "{:?} vs {:?}",
577                        &writer_upper,
578                        &shard_upper
579                    );
580                    if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
581                        // No way this could have committed in some previous
582                        // attempt of this loop: the upper of the writer is
583                        // strictly less than the proposed new upper.
584                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
585                    }
586                    if indeterminate.is_none() {
587                        // No way this could have committed in some previous
588                        // attempt of this loop: we never saw an indeterminate
589                        // error (thus there was no previous iteration of the
590                        // loop).
591                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
592                    }
593                    // This is the bad case. We can't distinguish if some
594                    // previous attempt that got an Indeterminate error
595                    // succeeded or not. This should be sufficiently rare in
596                    // practice (hopefully ~never) that we give up and let
597                    // process restart fix things. See the big comment above for
598                    // more context.
599                    //
600                    // NB: This is intentionally not a halt! because it's quite
601                    // unexpected.
602                    panic!(
603                        concat!(
604                            "cannot distinguish compare_and_append success or failure ",
605                            "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
606                        ),
607                        batch.desc.lower().elements(),
608                        batch.desc.upper().elements(),
609                        writer_upper.elements(),
610                        shard_upper.elements(),
611                        indeterminate,
612                    );
613                }
614            };
615        }
616    }
617
618    pub async fn downgrade_since(
619        &self,
620        reader_id: &LeasedReaderId,
621        outstanding_seqno: SeqNo,
622        new_since: &Antichain<T>,
623        heartbeat_timestamp_ms: u64,
624    ) -> (SeqNo, Since<T>, RoutineMaintenance) {
625        let metrics = Arc::clone(&self.applier.metrics);
626        self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
627            state.downgrade_since(
628                reader_id,
629                seqno,
630                outstanding_seqno,
631                new_since,
632                heartbeat_timestamp_ms,
633            )
634        })
635        .await
636    }
637
638    pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
639        &self,
640        reader_id: &CriticalReaderId,
641        expected_opaque: &O,
642        (new_opaque, new_since): (&O, &Antichain<T>),
643    ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
644        let metrics = Arc::clone(&self.applier.metrics);
645        let (_seqno, res, maintenance) = self
646            .apply_unbatched_idempotent_cmd(
647                &metrics.cmds.compare_and_downgrade_since,
648                |_seqno, _cfg, state| {
649                    state.compare_and_downgrade_since::<O>(
650                        reader_id,
651                        expected_opaque,
652                        (new_opaque, new_since),
653                    )
654                },
655            )
656            .await;
657
658        match res {
659            Ok(since) => (Ok(since), maintenance),
660            Err((opaque, since)) => (Err((opaque, since)), maintenance),
661        }
662    }
663
664    pub async fn expire_leased_reader(
665        &self,
666        reader_id: &LeasedReaderId,
667    ) -> (SeqNo, RoutineMaintenance) {
668        let metrics = Arc::clone(&self.applier.metrics);
669        let (seqno, _existed, maintenance) = self
670            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
671                state.expire_leased_reader(reader_id)
672            })
673            .await;
674        (seqno, maintenance)
675    }
676
677    #[allow(dead_code)] // TODO(bkirwi): remove this when since behaviour on expiry has settled
678    pub async fn expire_critical_reader(
679        &self,
680        reader_id: &CriticalReaderId,
681    ) -> (SeqNo, RoutineMaintenance) {
682        let metrics = Arc::clone(&self.applier.metrics);
683        let (seqno, _existed, maintenance) = self
684            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
685                state.expire_critical_reader(reader_id)
686            })
687            .await;
688        (seqno, maintenance)
689    }
690
691    pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
692        let metrics = Arc::clone(&self.applier.metrics);
693        let (seqno, _existed, maintenance) = self
694            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
695                state.expire_writer(writer_id)
696            })
697            .await;
698        metrics.state.writer_removed.inc();
699        (seqno, maintenance)
700    }
701
702    pub fn is_finalized(&self) -> bool {
703        self.applier.is_finalized()
704    }
705
706    /// See [crate::PersistClient::get_schema].
707    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
708        self.applier.get_schema(schema_id)
709    }
710
711    /// See [crate::PersistClient::latest_schema].
712    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
713        self.applier.latest_schema()
714    }
715
716    /// Returns the ID of the given schema, if known at the current state.
717    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
718        self.applier.find_schema(key_schema, val_schema)
719    }
720
721    /// See [crate::PersistClient::compare_and_evolve_schema].
722    ///
723    /// TODO: Unify this with [Self::register_schema]?
724    pub async fn compare_and_evolve_schema(
725        &self,
726        expected: SchemaId,
727        key_schema: &K::Schema,
728        val_schema: &V::Schema,
729    ) -> (CaESchema<K, V>, RoutineMaintenance) {
730        let metrics = Arc::clone(&self.applier.metrics);
731        let (_seqno, state, maintenance) = self
732            .apply_unbatched_idempotent_cmd(
733                &metrics.cmds.compare_and_evolve_schema,
734                |_seqno, _cfg, state| {
735                    state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
736                },
737            )
738            .await;
739        (state, maintenance)
740    }
741
742    async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
743        let metrics = Arc::clone(&self.applier.metrics);
744        let mut retry = self
745            .applier
746            .metrics
747            .retries
748            .idempotent_cmd
749            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
750        loop {
751            let res = self
752                .applier
753                .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
754                    state.become_tombstone_and_shrink()
755                })
756                .await;
757            let err = match res {
758                Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
759                Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
760                    return Ok((false, maintenance));
761                }
762                Err(err) => err,
763            };
764            if retry.attempt() >= INFO_MIN_ATTEMPTS {
765                info!(
766                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
767                    retry.next_sleep(),
768                    err
769                );
770            } else {
771                debug!(
772                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
773                    retry.next_sleep(),
774                    err
775                );
776            }
777            retry = retry.sleep().await;
778        }
779    }
780
781    pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
782        self.applier.check_since_upper_both_empty()?;
783
784        let mut maintenance = RoutineMaintenance::default();
785
786        loop {
787            let (made_progress, more_maintenance) = self.tombstone_step().await?;
788            maintenance.merge(more_maintenance);
789            if !made_progress {
790                break;
791            }
792        }
793
794        Ok(maintenance)
795    }
796
797    pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
798        let start = Instant::now();
799        let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
800            Ok(x) => return Ok(x),
801            Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
802            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
803                return Err(Since(since));
804            }
805        };
806
807        // The latest state still couldn't serve this as_of: watch+sleep in a
808        // loop until it's ready.
809        let mut watch = self.applier.watch();
810        let watch = &mut watch;
811        let sleeps = self
812            .applier
813            .metrics
814            .retries
815            .snapshot
816            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
817
818        enum Wake<'a, K, V, T, D> {
819            Watch(&'a mut StateWatch<K, V, T, D>),
820            Sleep(MetricsRetryStream),
821        }
822        let mut watch_fut = std::pin::pin!(
823            watch
824                .wait_for_seqno_ge(seqno.next())
825                .map(Wake::Watch)
826                .instrument(trace_span!("snapshot::watch")),
827        );
828        let mut sleep_fut = std::pin::pin!(
829            sleeps
830                .sleep()
831                .map(Wake::Sleep)
832                .instrument(trace_span!("snapshot::sleep")),
833        );
834
835        // To reduce log spam, we log "not yet available" only once at info if
836        // it passes a certain threshold. Then, if it did one info log, we log
837        // again at info when it resolves.
838        let mut logged_at_info = false;
839        loop {
840            // Use a duration based threshold here instead of the usual
841            // INFO_MIN_ATTEMPTS because here we're waiting on an
842            // external thing to arrive.
843            if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
844                logged_at_info = true;
845                info!(
846                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
847                    self.applier.shard_metrics.name,
848                    self.shard_id(),
849                    as_of.elements(),
850                    seqno,
851                    upper.elements(),
852                );
853            } else {
854                debug!(
855                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
856                    self.applier.shard_metrics.name,
857                    self.shard_id(),
858                    as_of.elements(),
859                    seqno,
860                    upper.elements(),
861                );
862            }
863
864            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
865                future::Either::Left((wake, _)) => wake,
866                future::Either::Right((wake, _)) => wake,
867            };
868            // Note that we don't need to fetch in the Watch case, because the
869            // Watch wakeup is a signal that the shared state has already been
870            // updated.
871            match &wake {
872                Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
873                Wake::Sleep(_) => {
874                    self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
875                    self.applier.fetch_and_update_state(Some(seqno)).await;
876                }
877            }
878
879            (seqno, upper) = match self.applier.snapshot(as_of) {
880                Ok(x) => {
881                    if logged_at_info {
882                        info!(
883                            "snapshot {} {} as of {:?} now available",
884                            self.applier.shard_metrics.name,
885                            self.shard_id(),
886                            as_of.elements(),
887                        );
888                    }
889                    return Ok(x);
890                }
891                Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
892                    // The upper isn't ready yet, fall through and try again.
893                    (seqno, upper)
894                }
895                Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
896                    return Err(Since(since));
897                }
898            };
899
900            match wake {
901                Wake::Watch(watch) => {
902                    watch_fut.set(
903                        watch
904                            .wait_for_seqno_ge(seqno.next())
905                            .map(Wake::Watch)
906                            .instrument(trace_span!("snapshot::watch")),
907                    );
908                }
909                Wake::Sleep(sleeps) => {
910                    debug!(
911                        "snapshot {} {} sleeping for {:?}",
912                        self.applier.shard_metrics.name,
913                        self.shard_id(),
914                        sleeps.next_sleep()
915                    );
916                    sleep_fut.set(
917                        sleeps
918                            .sleep()
919                            .map(Wake::Sleep)
920                            .instrument(trace_span!("snapshot::sleep")),
921                    );
922                }
923            }
924        }
925    }
926
927    // NB: Unlike the other methods here, this one is read-only.
928    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
929        self.applier.verify_listen(as_of)
930    }
931
932    pub async fn next_listen_batch(
933        &self,
934        frontier: &Antichain<T>,
935        watch: &mut StateWatch<K, V, T, D>,
936        reader_id: Option<&LeasedReaderId>,
937        // If Some, an override for the default listen sleep retry parameters.
938        retry: Option<RetryParameters>,
939    ) -> HollowBatch<T> {
940        let mut seqno = match self.applier.next_listen_batch(frontier) {
941            Ok(b) => return b,
942            Err(seqno) => seqno,
943        };
944
945        // The latest state still doesn't have a new frontier for us:
946        // watch+sleep in a loop until it does.
947        let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
948        let sleeps = self
949            .applier
950            .metrics
951            .retries
952            .next_listen_batch
953            .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
954
955        enum Wake<'a, K, V, T, D> {
956            Watch(&'a mut StateWatch<K, V, T, D>),
957            Sleep(MetricsRetryStream),
958        }
959        let mut watch_fut = std::pin::pin!(
960            watch
961                .wait_for_seqno_ge(seqno.next())
962                .map(Wake::Watch)
963                .instrument(trace_span!("snapshot::watch"))
964        );
965        let mut sleep_fut = std::pin::pin!(
966            sleeps
967                .sleep()
968                .map(Wake::Sleep)
969                .instrument(trace_span!("snapshot::sleep"))
970        );
971
972        loop {
973            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
974                future::Either::Left((wake, _)) => wake,
975                future::Either::Right((wake, _)) => wake,
976            };
977            // Note that we don't need to fetch in the Watch case, because the
978            // Watch wakeup is a signal that the shared state has already been
979            // updated.
980            match &wake {
981                Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
982                Wake::Sleep(_) => {
983                    self.applier.metrics.watch.listen_woken_via_sleep.inc();
984                    self.applier.fetch_and_update_state(Some(seqno)).await;
985                }
986            }
987
988            seqno = match self.applier.next_listen_batch(frontier) {
989                Ok(b) => {
990                    match &wake {
991                        Wake::Watch(_) => {
992                            self.applier.metrics.watch.listen_resolved_via_watch.inc()
993                        }
994                        Wake::Sleep(_) => {
995                            self.applier.metrics.watch.listen_resolved_via_sleep.inc()
996                        }
997                    }
998                    return b;
999                }
1000                Err(seqno) => seqno,
1001            };
1002
1003            // Wait a bit and try again. Intentionally don't ever log
1004            // this at info level.
1005            match wake {
1006                Wake::Watch(watch) => {
1007                    watch_fut.set(
1008                        watch
1009                            .wait_for_seqno_ge(seqno.next())
1010                            .map(Wake::Watch)
1011                            .instrument(trace_span!("snapshot::watch")),
1012                    );
1013                }
1014                Wake::Sleep(sleeps) => {
1015                    debug!(
1016                        "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1017                        reader_id,
1018                        self.applier.shard_metrics.name,
1019                        self.shard_id(),
1020                        sleeps.next_sleep()
1021                    );
1022                    sleep_fut.set(
1023                        sleeps
1024                            .sleep()
1025                            .map(Wake::Sleep)
1026                            .instrument(trace_span!("snapshot::sleep")),
1027                    );
1028                }
1029            }
1030        }
1031    }
1032
1033    async fn apply_unbatched_idempotent_cmd<
1034        R,
1035        WorkFn: FnMut(
1036            SeqNo,
1037            &PersistConfig,
1038            &mut StateCollections<T>,
1039        ) -> ControlFlow<NoOpStateTransition<R>, R>,
1040    >(
1041        &self,
1042        cmd: &CmdMetrics,
1043        mut work_fn: WorkFn,
1044    ) -> (SeqNo, R, RoutineMaintenance) {
1045        let mut retry = self
1046            .applier
1047            .metrics
1048            .retries
1049            .idempotent_cmd
1050            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1051        loop {
1052            match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1053                Ok((seqno, x, maintenance)) => match x {
1054                    Ok(x) => {
1055                        return (seqno, x, maintenance);
1056                    }
1057                    Err(NoOpStateTransition(x)) => {
1058                        return (seqno, x, maintenance);
1059                    }
1060                },
1061                Err(err) => {
1062                    if retry.attempt() >= INFO_MIN_ATTEMPTS {
1063                        info!(
1064                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1065                            cmd.name,
1066                            retry.next_sleep(),
1067                            err
1068                        );
1069                    } else {
1070                        debug!(
1071                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1072                            cmd.name,
1073                            retry.next_sleep(),
1074                            err
1075                        );
1076                    }
1077                    retry = retry.sleep().await;
1078                    continue;
1079                }
1080            }
1081        }
1082    }
1083}
1084
1085impl<K, V, T, D> Machine<K, V, T, D>
1086where
1087    K: Debug + Codec,
1088    V: Debug + Codec,
1089    T: Timestamp + Lattice + Codec64 + Sync,
1090    D: Monoid + Codec64 + PartialEq,
1091{
1092    pub async fn merge_res(
1093        &self,
1094        res: &FueledMergeRes<T>,
1095    ) -> (ApplyMergeResult, RoutineMaintenance) {
1096        let metrics = Arc::clone(&self.applier.metrics);
1097
1098        // SUBTLE! If Machine::merge_res returns false, the blobs referenced in
1099        // compaction output are deleted so we don't leak them. Naively passing
1100        // back the value returned by State::apply_merge_res might give a false
1101        // negative in the presence of retries and Indeterminate errors.
1102        // Specifically, something like the following:
1103        //
1104        // - We try to apply_merge_res, it matches.
1105        // - When apply_unbatched_cmd goes to commit the new state, the
1106        //   Consensus::compare_and_set returns an Indeterminate error (but
1107        //   actually succeeds). The committed State now contains references to
1108        //   the compaction output blobs.
1109        // - Machine::apply_unbatched_idempotent_cmd retries the Indeterminate
1110        //   error. For whatever reason, this time though it doesn't match
1111        //   (maybe the batches simply get grouped difference when deserialized
1112        //   from state, or more unavoidably perhaps another compaction
1113        //   happens).
1114        // - This now bubbles up applied=false to the caller, which uses it as a
1115        //   signal that the blobs in the compaction output should be deleted so
1116        //   that we don't leak them.
1117        // - We now contain references in committed State to blobs that don't
1118        //   exist.
1119        //
1120        // The fix is to keep track of whether applied ever was true, even for a
1121        // compare_and_set that returned an Indeterminate error. This has the
1122        // chance of false positive (leaking a blob) but that's better than a
1123        // false negative (a blob we can never recover referenced by state). We
1124        // anyway need a mechanism to clean up leaked blobs because of process
1125        // crashes.
1126        let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1127        let (_seqno, _apply_merge_result, maintenance) = self
1128            .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1129                let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1130                if let Continue(result) = ret {
1131                    // record if we've ever applied the merge
1132                    if result.applied() {
1133                        merge_result_ever_applied = result;
1134                    }
1135                    // otherwise record the most granular reason for _not_
1136                    // applying the merge when there was a matching batch
1137                    if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1138                    {
1139                        merge_result_ever_applied = result;
1140                    }
1141                }
1142                ret
1143            })
1144            .await;
1145        (merge_result_ever_applied, maintenance)
1146    }
1147}
1148
1149pub(crate) struct ExpireFn(
1150    /// This is stored on WriteHandle and ReadHandle, which we require to be
1151    /// Send + Sync, but the Future is only Send and not Sync. Instead store a
1152    /// FnOnce that returns the Future. This could also be made an `IntoFuture`,
1153    /// once producing one of those is made easier.
1154    pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1155);
1156
1157impl Debug for ExpireFn {
1158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1159        f.debug_struct("ExpireFn").finish_non_exhaustive()
1160    }
1161}
1162
1163#[derive(Debug)]
1164pub(crate) enum CompareAndAppendRes<T> {
1165    Success(SeqNo, WriterMaintenance<T>),
1166    InvalidUsage(InvalidUsage<T>),
1167    UpperMismatch(SeqNo, Antichain<T>),
1168    InlineBackpressure,
1169}
1170
1171#[cfg(test)]
1172impl<T: Debug> CompareAndAppendRes<T> {
1173    #[track_caller]
1174    fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1175        match self {
1176            CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1177            x => panic!("{:?}", x),
1178        }
1179    }
1180}
1181
1182pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1183    "persist_next_listen_batch_retryer_fixed_sleep",
1184    Duration::from_millis(1200), // pubsub is on by default!
1185    "\
1186    The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1187);
1188
1189pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1190    "persist_next_listen_batch_retryer_initial_backoff",
1191    Duration::from_millis(100), // pubsub is on by default!
1192    "The initial backoff when polling for new batches from a Listen or Subscribe.",
1193);
1194
1195pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1196    "persist_next_listen_batch_retryer_multiplier",
1197    2,
1198    "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1199);
1200
1201pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1202    "persist_next_listen_batch_retryer_clamp",
1203    Duration::from_secs(16), // pubsub is on by default!
1204    "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1205);
1206
1207fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1208    RetryParameters {
1209        fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1210        initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1211        multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1212        clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1213    }
1214}
1215
1216pub const INFO_MIN_ATTEMPTS: usize = 3;
1217
1218pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1219where
1220    F: std::future::Future<Output = Result<R, ExternalError>>,
1221    WorkFn: FnMut() -> F,
1222{
1223    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1224    loop {
1225        match work_fn().await {
1226            Ok(x) => {
1227                if retry.attempt() > 0 {
1228                    debug!(
1229                        "external operation {} succeeded after failing at least once",
1230                        metrics.name,
1231                    );
1232                }
1233                return x;
1234            }
1235            Err(err) => {
1236                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1237                    info!(
1238                        "external operation {} failed, retrying in {:?}: {}",
1239                        metrics.name,
1240                        retry.next_sleep(),
1241                        err.display_with_causes()
1242                    );
1243                } else {
1244                    debug!(
1245                        "external operation {} failed, retrying in {:?}: {}",
1246                        metrics.name,
1247                        retry.next_sleep(),
1248                        err.display_with_causes()
1249                    );
1250                }
1251                retry = retry.sleep().await;
1252            }
1253        }
1254    }
1255}
1256
1257pub async fn retry_determinate<R, F, WorkFn>(
1258    metrics: &RetryMetrics,
1259    mut work_fn: WorkFn,
1260) -> Result<R, Indeterminate>
1261where
1262    F: std::future::Future<Output = Result<R, ExternalError>>,
1263    WorkFn: FnMut() -> F,
1264{
1265    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1266    loop {
1267        match work_fn().await {
1268            Ok(x) => {
1269                if retry.attempt() > 0 {
1270                    debug!(
1271                        "external operation {} succeeded after failing at least once",
1272                        metrics.name,
1273                    );
1274                }
1275                return Ok(x);
1276            }
1277            Err(ExternalError::Determinate(err)) => {
1278                // The determinate "could not serialize access" errors
1279                // happen often enough in dev (which uses Postgres) that
1280                // it's impeding people's work. At the same time, it's been
1281                // a source of confusion for eng. The situation is much
1282                // better on CRDB and we have metrics coverage in prod, so
1283                // this is redundant enough that it's more hurtful than
1284                // helpful. As a result, this intentionally ignores
1285                // INFO_MIN_ATTEMPTS and always logs at debug.
1286                debug!(
1287                    "external operation {} failed, retrying in {:?}: {}",
1288                    metrics.name,
1289                    retry.next_sleep(),
1290                    err.display_with_causes()
1291                );
1292                retry = retry.sleep().await;
1293                continue;
1294            }
1295            Err(ExternalError::Indeterminate(x)) => return Err(x),
1296        }
1297    }
1298}
1299
1300#[cfg(test)]
1301pub mod datadriven {
1302    use std::collections::{BTreeMap, BTreeSet};
1303    use std::pin::pin;
1304    use std::sync::{Arc, LazyLock};
1305
1306    use anyhow::anyhow;
1307    use differential_dataflow::consolidation::consolidate_updates;
1308    use differential_dataflow::trace::Description;
1309    use futures::StreamExt;
1310    use mz_dyncfg::{ConfigUpdates, ConfigVal};
1311    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1312    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1313
1314    use crate::batch::{
1315        BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1316        BatchParts, validate_truncate_batch,
1317    };
1318    use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1319    use crate::fetch::{EncodedPart, FetchConfig};
1320    use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1321    use crate::internal::datadriven::DirectiveArgs;
1322    use crate::internal::encoding::Schemas;
1323    use crate::internal::gc::GcReq;
1324    use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1325    use crate::internal::state::{BatchPart, RunOrder, RunPart};
1326    use crate::internal::state_versions::EncodedRollup;
1327    use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1328    use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1329    use crate::rpc::NoopPubSubSender;
1330    use crate::tests::new_test_client;
1331    use crate::write::COMBINE_INLINE_WRITES;
1332    use crate::{GarbageCollector, PersistClient};
1333
1334    use super::*;
1335
1336    static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1337        id: Some(SchemaId(0)),
1338        key: Arc::new(StringSchema),
1339        val: Arc::new(UnitSchema),
1340    });
1341
1342    /// Shared state for a single [crate::internal::machine] [datadriven::TestFile].
1343    #[derive(Debug)]
1344    pub struct MachineState {
1345        pub client: PersistClient,
1346        pub shard_id: ShardId,
1347        pub state_versions: Arc<StateVersions>,
1348        pub machine: Machine<String, (), u64, i64>,
1349        pub gc: GarbageCollector<String, (), u64, i64>,
1350        pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1351        pub next_id: usize,
1352        pub rollups: BTreeMap<String, EncodedRollup>,
1353        pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1354        pub routine: Vec<RoutineMaintenance>,
1355        pub compactions: BTreeMap<String, CompactReq<u64>>,
1356    }
1357
1358    impl MachineState {
1359        pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1360            let shard_id = ShardId::new();
1361            let client = new_test_client(dyncfgs).await;
1362            // Reset blob_target_size. Individual batch writes and compactions
1363            // can override it with an arg.
1364            client
1365                .cfg
1366                .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1367            // Our structured compaction code uses slightly different estimates
1368            // for array size than the old path, which can affect the results of
1369            // some compaction tests.
1370            client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1371            let state_versions = Arc::new(StateVersions::new(
1372                client.cfg.clone(),
1373                Arc::clone(&client.consensus),
1374                Arc::clone(&client.blob),
1375                Arc::clone(&client.metrics),
1376            ));
1377            let machine = Machine::new(
1378                client.cfg.clone(),
1379                shard_id,
1380                Arc::clone(&client.metrics),
1381                Arc::clone(&state_versions),
1382                Arc::clone(&client.shared_states),
1383                Arc::new(NoopPubSubSender),
1384                Arc::clone(&client.isolated_runtime),
1385                Diagnostics::for_tests(),
1386            )
1387            .await
1388            .expect("codecs should match");
1389            let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1390            MachineState {
1391                shard_id,
1392                client,
1393                state_versions,
1394                machine,
1395                gc,
1396                batches: BTreeMap::default(),
1397                rollups: BTreeMap::default(),
1398                listens: BTreeMap::default(),
1399                routine: Vec::new(),
1400                compactions: BTreeMap::default(),
1401                next_id: 0,
1402            }
1403        }
1404
1405        fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1406            Batch::new(
1407                true,
1408                Arc::clone(&self.client.metrics),
1409                Arc::clone(&self.client.blob),
1410                self.client.metrics.shards.shard(&self.shard_id, "test"),
1411                self.client.cfg.build_version.clone(),
1412                (
1413                    <String>::encode_schema(&*SCHEMAS.key),
1414                    <()>::encode_schema(&*SCHEMAS.val),
1415                ),
1416                hollow,
1417            )
1418        }
1419    }
1420
1421    /// Scans consensus and returns all states with their SeqNos
1422    /// and which batches they reference
1423    pub async fn consensus_scan(
1424        datadriven: &MachineState,
1425        args: DirectiveArgs<'_>,
1426    ) -> Result<String, anyhow::Error> {
1427        let from = args.expect("from_seqno");
1428
1429        let mut states = datadriven
1430            .state_versions
1431            .fetch_all_live_states::<u64>(datadriven.shard_id)
1432            .await
1433            .expect("should only be called on an initialized shard")
1434            .check_ts_codec()
1435            .expect("shard codecs should not change");
1436        let mut s = String::new();
1437        while let Some(x) = states.next(|_| {}) {
1438            if x.seqno < from {
1439                continue;
1440            }
1441            let rollups: Vec<_> = x
1442                .collections
1443                .rollups
1444                .keys()
1445                .map(|seqno| seqno.to_string())
1446                .collect();
1447            let batches: Vec<_> = x
1448                .collections
1449                .trace
1450                .batches()
1451                .filter(|b| !b.is_empty())
1452                .filter_map(|b| {
1453                    datadriven
1454                        .batches
1455                        .iter()
1456                        .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1457                        .map(|(batch_name, _)| batch_name.to_owned())
1458                })
1459                .collect();
1460            write!(
1461                s,
1462                "seqno={} batches={} rollups={}\n",
1463                x.seqno,
1464                batches.join(","),
1465                rollups.join(","),
1466            );
1467        }
1468        Ok(s)
1469    }
1470
1471    pub async fn consensus_truncate(
1472        datadriven: &MachineState,
1473        args: DirectiveArgs<'_>,
1474    ) -> Result<String, anyhow::Error> {
1475        let to = args.expect("to_seqno");
1476        let removed = datadriven
1477            .client
1478            .consensus
1479            .truncate(&datadriven.shard_id.to_string(), to)
1480            .await
1481            .expect("valid truncation");
1482        Ok(format!("{:?}\n", removed))
1483    }
1484
1485    pub async fn blob_scan_batches(
1486        datadriven: &MachineState,
1487        _args: DirectiveArgs<'_>,
1488    ) -> Result<String, anyhow::Error> {
1489        let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1490
1491        let mut s = String::new();
1492        let () = datadriven
1493            .state_versions
1494            .blob
1495            .list_keys_and_metadata(&key_prefix, &mut |x| {
1496                let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1497                if let PartialBlobKey::Batch(_, _) = key {
1498                    write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1499                }
1500            })
1501            .await?;
1502        Ok(s)
1503    }
1504
1505    #[allow(clippy::unused_async)]
1506    pub async fn shard_desc(
1507        datadriven: &MachineState,
1508        _args: DirectiveArgs<'_>,
1509    ) -> Result<String, anyhow::Error> {
1510        Ok(format!(
1511            "since={:?} upper={:?}\n",
1512            datadriven.machine.applier.since().elements(),
1513            datadriven.machine.applier.clone_upper().elements()
1514        ))
1515    }
1516
1517    pub async fn downgrade_since(
1518        datadriven: &mut MachineState,
1519        args: DirectiveArgs<'_>,
1520    ) -> Result<String, anyhow::Error> {
1521        let since = args.expect_antichain("since");
1522        let seqno = args
1523            .optional("seqno")
1524            .unwrap_or_else(|| datadriven.machine.seqno());
1525        let reader_id = args.expect("reader_id");
1526        let (_, since, routine) = datadriven
1527            .machine
1528            .downgrade_since(
1529                &reader_id,
1530                seqno,
1531                &since,
1532                (datadriven.machine.applier.cfg.now)(),
1533            )
1534            .await;
1535        datadriven.routine.push(routine);
1536        Ok(format!(
1537            "{} {:?}\n",
1538            datadriven.machine.seqno(),
1539            since.0.elements()
1540        ))
1541    }
1542
1543    #[allow(clippy::unused_async)]
1544    pub async fn dyncfg(
1545        datadriven: &MachineState,
1546        args: DirectiveArgs<'_>,
1547    ) -> Result<String, anyhow::Error> {
1548        let mut updates = ConfigUpdates::default();
1549        for x in args.input.trim().split('\n') {
1550            match x.split(' ').collect::<Vec<_>>().as_slice() {
1551                &[name, val] => {
1552                    let config = datadriven
1553                        .client
1554                        .cfg
1555                        .entries()
1556                        .find(|x| x.name() == name)
1557                        .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1558                    match config.val() {
1559                        ConfigVal::Usize(_) => {
1560                            let val = val.parse().map_err(anyhow::Error::new)?;
1561                            updates.add_dynamic(name, ConfigVal::Usize(val));
1562                        }
1563                        ConfigVal::Bool(_) => {
1564                            let val = val.parse().map_err(anyhow::Error::new)?;
1565                            updates.add_dynamic(name, ConfigVal::Bool(val));
1566                        }
1567                        x => unimplemented!("dyncfg type: {:?}", x),
1568                    }
1569                }
1570                x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1571            }
1572        }
1573        updates.apply(&datadriven.client.cfg);
1574
1575        Ok("ok\n".to_string())
1576    }
1577
1578    pub async fn compare_and_downgrade_since(
1579        datadriven: &mut MachineState,
1580        args: DirectiveArgs<'_>,
1581    ) -> Result<String, anyhow::Error> {
1582        let expected_opaque: u64 = args.expect("expect_opaque");
1583        let new_opaque: u64 = args.expect("opaque");
1584        let new_since = args.expect_antichain("since");
1585        let reader_id = args.expect("reader_id");
1586        let (res, routine) = datadriven
1587            .machine
1588            .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1589            .await;
1590        datadriven.routine.push(routine);
1591        let since = res.map_err(|(opaque, since)| {
1592            anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1593        })?;
1594        Ok(format!(
1595            "{} {} {:?}\n",
1596            datadriven.machine.seqno(),
1597            new_opaque,
1598            since.0.elements()
1599        ))
1600    }
1601
1602    pub async fn write_rollup(
1603        datadriven: &mut MachineState,
1604        args: DirectiveArgs<'_>,
1605    ) -> Result<String, anyhow::Error> {
1606        let output = args.expect_str("output");
1607
1608        let rollup = datadriven
1609            .machine
1610            .applier
1611            .write_rollup_for_state()
1612            .await
1613            .expect("rollup");
1614
1615        datadriven
1616            .rollups
1617            .insert(output.to_string(), rollup.clone());
1618
1619        Ok(format!(
1620            "state={} diffs=[{}, {})\n",
1621            rollup.seqno,
1622            rollup._desc.lower().first().expect("seqno"),
1623            rollup._desc.upper().first().expect("seqno"),
1624        ))
1625    }
1626
1627    pub async fn add_rollup(
1628        datadriven: &mut MachineState,
1629        args: DirectiveArgs<'_>,
1630    ) -> Result<String, anyhow::Error> {
1631        let input = args.expect_str("input");
1632        let rollup = datadriven
1633            .rollups
1634            .get(input)
1635            .expect("unknown batch")
1636            .clone();
1637
1638        let (applied, maintenance) = datadriven
1639            .machine
1640            .add_rollup((rollup.seqno, &rollup.to_hollow()))
1641            .await;
1642
1643        if !applied {
1644            return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1645        }
1646
1647        datadriven.routine.push(maintenance);
1648        Ok(format!("{}\n", datadriven.machine.seqno()))
1649    }
1650
1651    pub async fn write_batch(
1652        datadriven: &mut MachineState,
1653        args: DirectiveArgs<'_>,
1654    ) -> Result<String, anyhow::Error> {
1655        let output = args.expect_str("output");
1656        let lower = args.expect_antichain("lower");
1657        let upper = args.expect_antichain("upper");
1658        assert!(PartialOrder::less_than(&lower, &upper));
1659        let since = args
1660            .optional_antichain("since")
1661            .unwrap_or_else(|| Antichain::from_elem(0));
1662        let target_size = args.optional("target_size");
1663        let parts_size_override = args.optional("parts_size_override");
1664        let consolidate = args.optional("consolidate").unwrap_or(true);
1665        let mut updates: Vec<_> = args
1666            .input
1667            .split('\n')
1668            .flat_map(DirectiveArgs::parse_update)
1669            .collect();
1670
1671        let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1672        if let Some(target_size) = target_size {
1673            cfg.blob_target_size = target_size;
1674        };
1675        if consolidate {
1676            consolidate_updates(&mut updates);
1677        }
1678        let run_order = if consolidate {
1679            cfg.preferred_order
1680        } else {
1681            RunOrder::Unordered
1682        };
1683        let parts = BatchParts::new_ordered::<i64>(
1684            cfg.clone(),
1685            run_order,
1686            Arc::clone(&datadriven.client.metrics),
1687            Arc::clone(&datadriven.machine.applier.shard_metrics),
1688            datadriven.shard_id,
1689            Arc::clone(&datadriven.client.blob),
1690            Arc::clone(&datadriven.client.isolated_runtime),
1691            &datadriven.client.metrics.user,
1692        );
1693        let builder = BatchBuilderInternal::new(
1694            cfg.clone(),
1695            parts,
1696            Arc::clone(&datadriven.client.metrics),
1697            SCHEMAS.clone(),
1698            Arc::clone(&datadriven.client.blob),
1699            datadriven.shard_id.clone(),
1700            datadriven.client.cfg.build_version.clone(),
1701        );
1702        let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1703        for ((k, ()), t, d) in updates {
1704            builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1705        }
1706        let mut batch = builder.finish(upper).await?;
1707        // We can only reasonably use parts_size_override with hollow batches,
1708        // so if it's set, flush any inline batches out.
1709        if parts_size_override.is_some() {
1710            batch
1711                .flush_to_blob(
1712                    &cfg,
1713                    &datadriven.client.metrics.user,
1714                    &datadriven.client.isolated_runtime,
1715                    &SCHEMAS,
1716                )
1717                .await;
1718        }
1719        let batch = batch.into_hollow_batch();
1720        let batch = IdHollowBatch {
1721            batch: Arc::new(batch),
1722            id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1723        };
1724        datadriven.next_id += 1;
1725
1726        if let Some(size) = parts_size_override {
1727            let mut batch = batch.clone();
1728            let mut hollow_batch = (*batch.batch).clone();
1729            for part in hollow_batch.parts.iter_mut() {
1730                match part {
1731                    RunPart::Many(run) => run.max_part_bytes = size,
1732                    RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1733                    RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1734                }
1735            }
1736            batch.batch = Arc::new(hollow_batch);
1737            datadriven.batches.insert(output.to_owned(), batch);
1738        } else {
1739            datadriven.batches.insert(output.to_owned(), batch.clone());
1740        }
1741        Ok(format!(
1742            "parts={} len={}\n",
1743            batch.batch.part_count(),
1744            batch.batch.len
1745        ))
1746    }
1747
1748    pub async fn fetch_batch(
1749        datadriven: &MachineState,
1750        args: DirectiveArgs<'_>,
1751    ) -> Result<String, anyhow::Error> {
1752        let input = args.expect_str("input");
1753        let stats = args.optional_str("stats");
1754        let batch = datadriven.batches.get(input).expect("unknown batch");
1755
1756        let mut s = String::new();
1757        let mut stream = pin!(
1758            batch
1759                .batch
1760                .part_stream(
1761                    datadriven.shard_id,
1762                    &*datadriven.state_versions.blob,
1763                    &*datadriven.state_versions.metrics
1764                )
1765                .enumerate()
1766        );
1767        while let Some((idx, part)) = stream.next().await {
1768            let part = &*part?;
1769            write!(s, "<part {idx}>\n");
1770
1771            let lower = match part {
1772                BatchPart::Inline { updates, .. } => {
1773                    let updates: BlobTraceBatchPart<u64> =
1774                        updates.decode(&datadriven.client.metrics.columnar)?;
1775                    updates.structured_key_lower()
1776                }
1777                other => other.structured_key_lower(),
1778            };
1779
1780            if let Some(lower) = lower {
1781                if stats == Some("lower") {
1782                    writeln!(s, "<key lower={}>", lower.get())
1783                }
1784            }
1785
1786            match part {
1787                BatchPart::Hollow(part) => {
1788                    let blob_batch = datadriven
1789                        .client
1790                        .blob
1791                        .get(&part.key.complete(&datadriven.shard_id))
1792                        .await;
1793                    match blob_batch {
1794                        Ok(Some(_)) | Err(_) => {}
1795                        // don't try to fetch/print the keys of the batch part
1796                        // if the blob store no longer has it
1797                        Ok(None) => {
1798                            s.push_str("<empty>\n");
1799                            continue;
1800                        }
1801                    };
1802                }
1803                BatchPart::Inline { .. } => {}
1804            };
1805            let part = EncodedPart::fetch(
1806                &FetchConfig::from_persist_config(&datadriven.client.cfg),
1807                &datadriven.shard_id,
1808                datadriven.client.blob.as_ref(),
1809                datadriven.client.metrics.as_ref(),
1810                datadriven.machine.applier.shard_metrics.as_ref(),
1811                &datadriven.client.metrics.read.batch_fetcher,
1812                &batch.batch.desc,
1813                part,
1814            )
1815            .await
1816            .expect("invalid batch part");
1817            let part = part
1818                .normalize(&datadriven.client.metrics.columnar)
1819                .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1820
1821            for ((k, _v), t, d) in part
1822                .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1823                .expect("valid schemas")
1824            {
1825                writeln!(s, "{k} {t} {d}");
1826            }
1827        }
1828        if !s.is_empty() {
1829            for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1830                write!(s, "<run {idx}>\n");
1831                for part in run {
1832                    let part_idx = batch
1833                        .batch
1834                        .parts
1835                        .iter()
1836                        .position(|p| p == part)
1837                        .expect("part should exist");
1838                    write!(s, "part {part_idx}\n");
1839                }
1840            }
1841        }
1842        Ok(s)
1843    }
1844
1845    #[allow(clippy::unused_async)]
1846    pub async fn truncate_batch_desc(
1847        datadriven: &mut MachineState,
1848        args: DirectiveArgs<'_>,
1849    ) -> Result<String, anyhow::Error> {
1850        let input = args.expect_str("input");
1851        let output = args.expect_str("output");
1852        let lower = args.expect_antichain("lower");
1853        let upper = args.expect_antichain("upper");
1854
1855        let batch = datadriven
1856            .batches
1857            .get(input)
1858            .expect("unknown batch")
1859            .clone();
1860        let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1861        let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1862        let mut new_hollow_batch = (*batch.batch).clone();
1863        new_hollow_batch.desc = truncated_desc;
1864        let new_batch = IdHollowBatch {
1865            batch: Arc::new(new_hollow_batch),
1866            id: batch.id,
1867        };
1868        datadriven
1869            .batches
1870            .insert(output.to_owned(), new_batch.clone());
1871        Ok(format!(
1872            "parts={} len={}\n",
1873            batch.batch.part_count(),
1874            batch.batch.len
1875        ))
1876    }
1877
1878    #[allow(clippy::unused_async)]
1879    pub async fn set_batch_parts_size(
1880        datadriven: &mut MachineState,
1881        args: DirectiveArgs<'_>,
1882    ) -> Result<String, anyhow::Error> {
1883        let input = args.expect_str("input");
1884        let size = args.expect("size");
1885        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1886        let mut hollow_batch = (*batch.batch).clone();
1887        for part in hollow_batch.parts.iter_mut() {
1888            match part {
1889                RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1890                _ => {
1891                    panic!("set_batch_parts_size only supports hollow parts")
1892                }
1893            }
1894        }
1895        batch.batch = Arc::new(hollow_batch);
1896        Ok("ok\n".to_string())
1897    }
1898
1899    pub async fn compact(
1900        datadriven: &mut MachineState,
1901        args: DirectiveArgs<'_>,
1902    ) -> Result<String, anyhow::Error> {
1903        let output = args.expect_str("output");
1904        let lower = args.expect_antichain("lower");
1905        let upper = args.expect_antichain("upper");
1906        let since = args.expect_antichain("since");
1907        let target_size = args.optional("target_size");
1908        let memory_bound = args.optional("memory_bound");
1909
1910        let mut inputs = Vec::new();
1911        for input in args.args.get("inputs").expect("missing inputs") {
1912            inputs.push(
1913                datadriven
1914                    .batches
1915                    .get(input)
1916                    .expect("unknown batch")
1917                    .clone(),
1918            );
1919        }
1920
1921        let cfg = datadriven.client.cfg.clone();
1922        if let Some(target_size) = target_size {
1923            cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1924        };
1925        if let Some(memory_bound) = memory_bound {
1926            cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1927        }
1928        let req = CompactReq {
1929            shard_id: datadriven.shard_id,
1930            desc: Description::new(lower, upper, since),
1931            inputs: inputs.clone(),
1932        };
1933        datadriven
1934            .compactions
1935            .insert(output.to_owned(), req.clone());
1936        let spine_lower = inputs
1937            .first()
1938            .map_or_else(|| datadriven.next_id, |x| x.id.0);
1939        let spine_upper = inputs.last().map_or_else(
1940            || {
1941                datadriven.next_id += 1;
1942                datadriven.next_id
1943            },
1944            |x| x.id.1,
1945        );
1946        let new_spine_id = SpineId(spine_lower, spine_upper);
1947        let res = Compactor::<String, (), u64, i64>::compact(
1948            CompactConfig::new(&cfg, datadriven.shard_id),
1949            Arc::clone(&datadriven.client.blob),
1950            Arc::clone(&datadriven.client.metrics),
1951            Arc::clone(&datadriven.machine.applier.shard_metrics),
1952            Arc::clone(&datadriven.client.isolated_runtime),
1953            req,
1954            SCHEMAS.clone(),
1955        )
1956        .await?;
1957
1958        let batch = IdHollowBatch {
1959            batch: Arc::new(res.output.clone()),
1960            id: new_spine_id,
1961        };
1962
1963        datadriven.batches.insert(output.to_owned(), batch.clone());
1964        Ok(format!(
1965            "parts={} len={}\n",
1966            res.output.part_count(),
1967            res.output.len
1968        ))
1969    }
1970
1971    pub async fn clear_blob(
1972        datadriven: &MachineState,
1973        _args: DirectiveArgs<'_>,
1974    ) -> Result<String, anyhow::Error> {
1975        let mut to_delete = vec![];
1976        datadriven
1977            .client
1978            .blob
1979            .list_keys_and_metadata("", &mut |meta| {
1980                to_delete.push(meta.key.to_owned());
1981            })
1982            .await?;
1983        for blob in &to_delete {
1984            datadriven.client.blob.delete(blob).await?;
1985        }
1986        Ok(format!("deleted={}\n", to_delete.len()))
1987    }
1988
1989    pub async fn restore_blob(
1990        datadriven: &MachineState,
1991        _args: DirectiveArgs<'_>,
1992    ) -> Result<String, anyhow::Error> {
1993        let not_restored = crate::internal::restore::restore_blob(
1994            &datadriven.state_versions,
1995            datadriven.client.blob.as_ref(),
1996            &datadriven.client.cfg.build_version,
1997            datadriven.shard_id,
1998            &*datadriven.state_versions.metrics,
1999        )
2000        .await?;
2001        let mut out = String::new();
2002        for key in not_restored {
2003            writeln!(&mut out, "{key}");
2004        }
2005        Ok(out)
2006    }
2007
2008    #[allow(clippy::unused_async)]
2009    pub async fn rewrite_ts(
2010        datadriven: &mut MachineState,
2011        args: DirectiveArgs<'_>,
2012    ) -> Result<String, anyhow::Error> {
2013        let input = args.expect_str("input");
2014        let ts_rewrite = args.expect_antichain("frontier");
2015        let upper = args.expect_antichain("upper");
2016
2017        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2018        let mut hollow_batch = (*batch.batch).clone();
2019        let () = hollow_batch
2020            .rewrite_ts(&ts_rewrite, upper)
2021            .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2022        batch.batch = Arc::new(hollow_batch);
2023        Ok("ok\n".into())
2024    }
2025
2026    pub async fn gc(
2027        datadriven: &mut MachineState,
2028        args: DirectiveArgs<'_>,
2029    ) -> Result<String, anyhow::Error> {
2030        let new_seqno_since = args.expect("to_seqno");
2031
2032        let req = GcReq {
2033            shard_id: datadriven.shard_id,
2034            new_seqno_since,
2035        };
2036        let (maintenance, stats) =
2037            GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2038        datadriven.routine.push(maintenance);
2039
2040        Ok(format!(
2041            "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2042            datadriven.machine.seqno(),
2043            stats.batch_parts_deleted_from_blob,
2044            stats.rollups_deleted_from_blob,
2045            stats
2046                .truncated_consensus_to
2047                .iter()
2048                .map(|x| x.to_string())
2049                .collect::<Vec<_>>()
2050                .join(","),
2051            stats
2052                .rollups_removed_from_state
2053                .iter()
2054                .map(|x| x.to_string())
2055                .collect::<Vec<_>>()
2056                .join(","),
2057        ))
2058    }
2059
2060    pub async fn snapshot(
2061        datadriven: &MachineState,
2062        args: DirectiveArgs<'_>,
2063    ) -> Result<String, anyhow::Error> {
2064        let as_of = args.expect_antichain("as_of");
2065        let snapshot = datadriven
2066            .machine
2067            .snapshot(&as_of)
2068            .await
2069            .map_err(|err| anyhow!("{:?}", err))?;
2070
2071        let mut result = String::new();
2072
2073        for batch in snapshot {
2074            writeln!(
2075                result,
2076                "<batch {:?}-{:?}>",
2077                batch.desc.lower().elements(),
2078                batch.desc.upper().elements()
2079            );
2080            for (run, (_meta, parts)) in batch.runs().enumerate() {
2081                writeln!(result, "<run {run}>");
2082                let mut stream = pin!(
2083                    futures::stream::iter(parts)
2084                        .flat_map(|part| part.part_stream(
2085                            datadriven.shard_id,
2086                            &*datadriven.state_versions.blob,
2087                            &*datadriven.state_versions.metrics
2088                        ))
2089                        .enumerate()
2090                );
2091
2092                while let Some((idx, part)) = stream.next().await {
2093                    let part = &*part?;
2094                    writeln!(result, "<part {idx}>");
2095
2096                    let part = EncodedPart::fetch(
2097                        &FetchConfig::from_persist_config(&datadriven.client.cfg),
2098                        &datadriven.shard_id,
2099                        datadriven.client.blob.as_ref(),
2100                        datadriven.client.metrics.as_ref(),
2101                        datadriven.machine.applier.shard_metrics.as_ref(),
2102                        &datadriven.client.metrics.read.batch_fetcher,
2103                        &batch.desc,
2104                        part,
2105                    )
2106                    .await
2107                    .expect("invalid batch part");
2108                    let part = part
2109                        .normalize(&datadriven.client.metrics.columnar)
2110                        .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2111
2112                    let mut updates = Vec::new();
2113
2114                    for ((k, _v), mut t, d) in part
2115                        .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2116                        .expect("valid schemas")
2117                    {
2118                        t.advance_by(as_of.borrow());
2119                        updates.push((k, t, d));
2120                    }
2121
2122                    consolidate_updates(&mut updates);
2123
2124                    for (k, t, d) in updates {
2125                        writeln!(result, "{k} {t} {d}");
2126                    }
2127                }
2128            }
2129        }
2130
2131        Ok(result)
2132    }
2133
2134    pub async fn register_listen(
2135        datadriven: &mut MachineState,
2136        args: DirectiveArgs<'_>,
2137    ) -> Result<String, anyhow::Error> {
2138        let output = args.expect_str("output");
2139        let as_of = args.expect_antichain("as_of");
2140        let read = datadriven
2141            .client
2142            .open_leased_reader::<String, (), u64, i64>(
2143                datadriven.shard_id,
2144                Arc::new(StringSchema),
2145                Arc::new(UnitSchema),
2146                Diagnostics::for_tests(),
2147                true,
2148            )
2149            .await
2150            .expect("invalid shard types");
2151        let listen = read
2152            .listen(as_of)
2153            .await
2154            .map_err(|err| anyhow!("{:?}", err))?;
2155        datadriven.listens.insert(output.to_owned(), listen);
2156        Ok("ok\n".into())
2157    }
2158
2159    pub async fn listen_through(
2160        datadriven: &mut MachineState,
2161        args: DirectiveArgs<'_>,
2162    ) -> Result<String, anyhow::Error> {
2163        let input = args.expect_str("input");
2164        // It's not possible to listen _through_ the empty antichain, so this is
2165        // intentionally `expect` instead of `expect_antichain`.
2166        let frontier = args.expect("frontier");
2167        let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2168        let mut s = String::new();
2169        loop {
2170            for event in listen.fetch_next().await {
2171                match event {
2172                    ListenEvent::Updates(x) => {
2173                        for ((k, _v), t, d) in x.iter() {
2174                            write!(s, "{} {} {}\n", k, t, d);
2175                        }
2176                    }
2177                    ListenEvent::Progress(x) => {
2178                        if !x.less_than(&frontier) {
2179                            return Ok(s);
2180                        }
2181                    }
2182                }
2183            }
2184        }
2185    }
2186
2187    pub async fn register_critical_reader(
2188        datadriven: &mut MachineState,
2189        args: DirectiveArgs<'_>,
2190    ) -> Result<String, anyhow::Error> {
2191        let reader_id = args.expect("reader_id");
2192        let (state, maintenance) = datadriven
2193            .machine
2194            .register_critical_reader::<u64>(&reader_id, "tests")
2195            .await;
2196        datadriven.routine.push(maintenance);
2197        Ok(format!(
2198            "{} {:?}\n",
2199            datadriven.machine.seqno(),
2200            state.since.elements(),
2201        ))
2202    }
2203
2204    pub async fn register_leased_reader(
2205        datadriven: &mut MachineState,
2206        args: DirectiveArgs<'_>,
2207    ) -> Result<String, anyhow::Error> {
2208        let reader_id = args.expect("reader_id");
2209        let (reader_state, maintenance) = datadriven
2210            .machine
2211            .register_leased_reader(
2212                &reader_id,
2213                "tests",
2214                READER_LEASE_DURATION.get(&datadriven.client.cfg),
2215                (datadriven.client.cfg.now)(),
2216                false,
2217            )
2218            .await;
2219        datadriven.routine.push(maintenance);
2220        Ok(format!(
2221            "{} {:?}\n",
2222            datadriven.machine.seqno(),
2223            reader_state.since.elements(),
2224        ))
2225    }
2226
2227    pub async fn expire_critical_reader(
2228        datadriven: &mut MachineState,
2229        args: DirectiveArgs<'_>,
2230    ) -> Result<String, anyhow::Error> {
2231        let reader_id = args.expect("reader_id");
2232        let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2233        datadriven.routine.push(maintenance);
2234        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2235    }
2236
2237    pub async fn expire_leased_reader(
2238        datadriven: &mut MachineState,
2239        args: DirectiveArgs<'_>,
2240    ) -> Result<String, anyhow::Error> {
2241        let reader_id = args.expect("reader_id");
2242        let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2243        datadriven.routine.push(maintenance);
2244        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2245    }
2246
2247    pub async fn compare_and_append_batches(
2248        datadriven: &MachineState,
2249        args: DirectiveArgs<'_>,
2250    ) -> Result<String, anyhow::Error> {
2251        let expected_upper = args.expect_antichain("expected_upper");
2252        let new_upper = args.expect_antichain("new_upper");
2253
2254        let mut batches: Vec<Batch<String, (), u64, i64>> = args
2255            .args
2256            .get("batches")
2257            .expect("missing batches")
2258            .into_iter()
2259            .map(|batch| {
2260                let hollow = (*datadriven
2261                    .batches
2262                    .get(batch)
2263                    .expect("unknown batch")
2264                    .clone()
2265                    .batch)
2266                    .clone();
2267                datadriven.to_batch(hollow)
2268            })
2269            .collect();
2270
2271        let mut writer = datadriven
2272            .client
2273            .open_writer(
2274                datadriven.shard_id,
2275                Arc::new(StringSchema),
2276                Arc::new(UnitSchema),
2277                Diagnostics::for_tests(),
2278            )
2279            .await?;
2280
2281        let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2282
2283        let () = writer
2284            .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2285            .await?
2286            .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2287
2288        writer.expire().await;
2289
2290        Ok("ok\n".into())
2291    }
2292
2293    pub async fn expire_writer(
2294        datadriven: &mut MachineState,
2295        args: DirectiveArgs<'_>,
2296    ) -> Result<String, anyhow::Error> {
2297        let writer_id = args.expect("writer_id");
2298        let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2299        datadriven.routine.push(maintenance);
2300        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2301    }
2302
2303    pub(crate) async fn finalize(
2304        datadriven: &mut MachineState,
2305        _args: DirectiveArgs<'_>,
2306    ) -> anyhow::Result<String> {
2307        let maintenance = datadriven.machine.become_tombstone().await?;
2308        datadriven.routine.push(maintenance);
2309        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2310    }
2311
2312    pub(crate) fn is_finalized(
2313        datadriven: &MachineState,
2314        _args: DirectiveArgs<'_>,
2315    ) -> anyhow::Result<String> {
2316        let seqno = datadriven.machine.seqno();
2317        let tombstone = datadriven.machine.is_finalized();
2318        Ok(format!("{seqno} {tombstone}\n"))
2319    }
2320
2321    pub async fn compare_and_append(
2322        datadriven: &mut MachineState,
2323        args: DirectiveArgs<'_>,
2324    ) -> Result<String, anyhow::Error> {
2325        let input = args.expect_str("input");
2326        let writer_id = args.expect("writer_id");
2327        let mut batch = datadriven
2328            .batches
2329            .get(input)
2330            .expect("unknown batch")
2331            .clone();
2332        let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2333        let now = (datadriven.client.cfg.now)();
2334
2335        let (id, maintenance) = datadriven
2336            .machine
2337            .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2338            .await;
2339        assert_eq!(id, SCHEMAS.id);
2340        datadriven.routine.push(maintenance);
2341        let maintenance = loop {
2342            let indeterminate = args
2343                .optional::<String>("prev_indeterminate")
2344                .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2345            let res = datadriven
2346                .machine
2347                .compare_and_append_idempotent(
2348                    &batch.batch,
2349                    &writer_id,
2350                    now,
2351                    &token,
2352                    &HandleDebugState::default(),
2353                    indeterminate,
2354                )
2355                .await;
2356            match res {
2357                CompareAndAppendRes::Success(_, x) => break x,
2358                CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2359                    return Err(anyhow!("{:?}", Upper(upper)));
2360                }
2361                CompareAndAppendRes::InlineBackpressure => {
2362                    let hollow_batch = (*batch.batch).clone();
2363                    let mut b = datadriven.to_batch(hollow_batch);
2364                    let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2365                    b.flush_to_blob(
2366                        &cfg,
2367                        &datadriven.client.metrics.user,
2368                        &datadriven.client.isolated_runtime,
2369                        &*SCHEMAS,
2370                    )
2371                    .await;
2372                    batch.batch = Arc::new(b.into_hollow_batch());
2373                    continue;
2374                }
2375                _ => panic!("{:?}", res),
2376            };
2377        };
2378        // TODO: Don't throw away writer maintenance. It's slightly tricky
2379        // because we need a WriterId for Compactor.
2380        datadriven.routine.push(maintenance.routine);
2381        Ok(format!(
2382            "{} {:?}\n",
2383            datadriven.machine.seqno(),
2384            datadriven.machine.applier.clone_upper().elements(),
2385        ))
2386    }
2387
2388    pub async fn apply_merge_res(
2389        datadriven: &mut MachineState,
2390        args: DirectiveArgs<'_>,
2391    ) -> Result<String, anyhow::Error> {
2392        let input = args.expect_str("input");
2393        let batch = datadriven
2394            .batches
2395            .get(input)
2396            .expect("unknown batch")
2397            .clone();
2398        let compact_req = datadriven
2399            .compactions
2400            .get(input)
2401            .expect("unknown compact req")
2402            .clone();
2403        let input_batches = compact_req
2404            .inputs
2405            .iter()
2406            .map(|x| x.id)
2407            .collect::<BTreeSet<_>>();
2408        let lower_spine_bound = input_batches
2409            .first()
2410            .map(|id| id.0)
2411            .expect("at least one batch must be present");
2412        let upper_spine_bound = input_batches
2413            .last()
2414            .map(|id| id.1)
2415            .expect("at least one batch must be present");
2416        let id = SpineId(lower_spine_bound, upper_spine_bound);
2417        let hollow_batch = (*batch.batch).clone();
2418
2419        let (merge_res, maintenance) = datadriven
2420            .machine
2421            .merge_res(&FueledMergeRes {
2422                output: hollow_batch,
2423                input: CompactionInput::IdRange(id),
2424                new_active_compaction: None,
2425            })
2426            .await;
2427        datadriven.routine.push(maintenance);
2428        Ok(format!(
2429            "{} {}\n",
2430            datadriven.machine.seqno(),
2431            merge_res.applied()
2432        ))
2433    }
2434
2435    pub async fn perform_maintenance(
2436        datadriven: &mut MachineState,
2437        _args: DirectiveArgs<'_>,
2438    ) -> Result<String, anyhow::Error> {
2439        let mut s = String::new();
2440        for maintenance in datadriven.routine.drain(..) {
2441            let () = maintenance
2442                .perform(&datadriven.machine, &datadriven.gc)
2443                .await;
2444            let () = datadriven
2445                .machine
2446                .applier
2447                .fetch_and_update_state(None)
2448                .await;
2449            write!(s, "{} ok\n", datadriven.machine.seqno());
2450        }
2451        Ok(s)
2452    }
2453}
2454
2455#[cfg(test)]
2456pub mod tests {
2457    use std::sync::Arc;
2458
2459    use mz_dyncfg::ConfigUpdates;
2460    use mz_ore::cast::CastFrom;
2461    use mz_ore::task::spawn;
2462    use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2463    use mz_persist::location::SeqNo;
2464    use mz_persist_types::PersistLocation;
2465    use semver::Version;
2466    use timely::progress::Antichain;
2467
2468    use crate::batch::BatchBuilderConfig;
2469    use crate::cache::StateCache;
2470    use crate::internal::gc::{GarbageCollector, GcReq};
2471    use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2472    use crate::tests::{new_test_client, new_test_client_cache};
2473    use crate::{Diagnostics, PersistClient, ShardId};
2474
2475    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2476    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2477    async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2478        mz_ore::test::init_logging();
2479
2480        let client = new_test_client(&dyncfgs).await;
2481        // set a low rollup threshold so GC/truncation is more aggressive
2482        client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2483        let (mut write, read) = client
2484            .expect_open::<String, (), u64, i64>(ShardId::new())
2485            .await;
2486
2487        // Ensure the reader is not holding back the since.
2488        read.expire().await;
2489
2490        // Write a bunch of batches. This should result in a bounded number of
2491        // live entries in consensus.
2492        const NUM_BATCHES: u64 = 100;
2493        for idx in 0..NUM_BATCHES {
2494            let mut batch = write
2495                .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2496                .await;
2497            // Flush this batch out so the CaA doesn't get inline writes
2498            // backpressure.
2499            let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2500            batch
2501                .flush_to_blob(
2502                    &cfg,
2503                    &client.metrics.user,
2504                    &client.isolated_runtime,
2505                    &write.write_schemas,
2506                )
2507                .await;
2508            let (_, writer_maintenance) = write
2509                .machine
2510                .compare_and_append(
2511                    &batch.into_hollow_batch(),
2512                    &write.writer_id,
2513                    &HandleDebugState::default(),
2514                    (write.cfg.now)(),
2515                )
2516                .await
2517                .unwrap();
2518            writer_maintenance
2519                .perform(&write.machine, &write.gc, write.compact.as_ref())
2520                .await;
2521        }
2522        let live_diffs = write
2523            .machine
2524            .applier
2525            .state_versions
2526            .fetch_all_live_diffs(&write.machine.shard_id())
2527            .await;
2528        // Make sure we constructed the key correctly.
2529        assert!(live_diffs.len() > 0);
2530        // Make sure the number of entries is bounded. (I think we could work
2531        // out a tighter bound than this, but the point is only that it's
2532        // bounded).
2533        let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2534        assert!(
2535            live_diffs.len() <= max_live_diffs,
2536            "{} vs {}",
2537            live_diffs.len(),
2538            max_live_diffs
2539        );
2540    }
2541
2542    // A regression test for database-issues#4206, where a bug in gc led to an incremental
2543    // state invariant being violated which resulted in gc being permanently
2544    // wedged for the shard.
2545    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2546    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2547    async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2548        let mut client = new_test_client(&dyncfgs).await;
2549        let intercept = InterceptHandle::default();
2550        client.blob = Arc::new(InterceptBlob::new(
2551            Arc::clone(&client.blob),
2552            intercept.clone(),
2553        ));
2554        let (_, mut read) = client
2555            .expect_open::<String, String, u64, i64>(ShardId::new())
2556            .await;
2557
2558        // Create a new SeqNo
2559        read.downgrade_since(&Antichain::from_elem(1)).await;
2560        let new_seqno_since = read.machine.applier.seqno_since();
2561
2562        // Start a GC in the background for some SeqNo range that is not
2563        // contiguous compared to the last gc req (in this case, n/a) and then
2564        // crash when it gets to the blob deletes. In the regression case, this
2565        // renders the shard permanently un-gc-able.
2566        assert!(new_seqno_since > SeqNo::minimum());
2567        intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2568        let machine = read.machine.clone();
2569        // Run this in a spawn so we can catch the boom panic
2570        let gc = spawn(|| "", async move {
2571            let req = GcReq {
2572                shard_id: machine.shard_id(),
2573                new_seqno_since,
2574            };
2575            GarbageCollector::gc_and_truncate(&machine, req).await
2576        });
2577        // Wait for gc to either panic (regression case) or finish (good case)
2578        // because it happens to not call blob delete.
2579        let _ = gc.await;
2580
2581        // Allow blob deletes to go through and try GC again. In the regression
2582        // case, this hangs.
2583        intercept.set_post_delete(None);
2584        let req = GcReq {
2585            shard_id: read.machine.shard_id(),
2586            new_seqno_since,
2587        };
2588        let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2589    }
2590
2591    // A regression test for materialize#20776, where a bug meant that compare_and_append
2592    // would not fetch the latest state after an upper mismatch. This meant that
2593    // a write that could succeed if retried on the latest state would instead
2594    // return an UpperMismatch.
2595    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2596    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2597    async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2598        let client = new_test_client(&dyncfgs).await;
2599        let mut client2 = client.clone();
2600
2601        // The bug can only happen if the two WriteHandles have separate copies
2602        // of state, so make sure that each is given its own StateCache.
2603        let new_state_cache = Arc::new(StateCache::new_no_metrics());
2604        client2.shared_states = new_state_cache;
2605
2606        let shard_id = ShardId::new();
2607        let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2608        let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2609
2610        let data = [
2611            (("1".to_owned(), ()), 1, 1),
2612            (("2".to_owned(), ()), 2, 1),
2613            (("3".to_owned(), ()), 3, 1),
2614            (("4".to_owned(), ()), 4, 1),
2615            (("5".to_owned(), ()), 5, 1),
2616        ];
2617
2618        write1.expect_compare_and_append(&data[..1], 0, 2).await;
2619
2620        // this handle's upper now lags behind. if compare_and_append fails to update
2621        // state after an upper mismatch then this call would (incorrectly) fail
2622        write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2623    }
2624
2625    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2626    #[cfg_attr(miri, ignore)]
2627    async fn version_upgrade(dyncfgs: ConfigUpdates) {
2628        let mut cache = new_test_client_cache(&dyncfgs);
2629        cache.cfg.build_version = Version::new(26, 1, 0);
2630        let shard_id = ShardId::new();
2631
2632        async fn fetch_catalog_upgrade_shard_version(
2633            persist_client: &PersistClient,
2634            upgrade_shard_id: ShardId,
2635        ) -> Option<semver::Version> {
2636            let shard_state = persist_client
2637                .inspect_shard::<u64>(&upgrade_shard_id)
2638                .await
2639                .ok()?;
2640            let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2641            let upgrade_version = json_state
2642                .get("applier_version")
2643                .cloned()
2644                .expect("missing applier_version");
2645            let upgrade_version =
2646                serde_json::from_value(upgrade_version).expect("version deserialization error");
2647            Some(upgrade_version)
2648        }
2649
2650        cache.cfg.build_version = Version::new(26, 1, 0);
2651        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2652        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2653        reader.downgrade_since(&Antichain::from_elem(1)).await;
2654        assert_eq!(
2655            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2656            Some(Version::new(26, 1, 0)),
2657        );
2658
2659        // Merely opening and operating on the shard at a new version doesn't bump version...
2660        cache.cfg.build_version = Version::new(27, 1, 0);
2661        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2662        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2663        reader.downgrade_since(&Antichain::from_elem(2)).await;
2664        assert_eq!(
2665            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2666            Some(Version::new(26, 1, 0)),
2667        );
2668
2669        // ...but an explicit call will.
2670        client
2671            .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2672            .await
2673            .unwrap();
2674        assert_eq!(
2675            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2676            Some(Version::new(27, 1, 0)),
2677        );
2678    }
2679}