Skip to main content

mz_persist_client/internal/
machine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the persist state machine.
11
12use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] // False positive.
25use mz_ore::fmt::FormatBuffer;
26use mz_ore::{assert_none, soft_assert_no_log};
27use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
28use mz_persist::retry::Retry;
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64};
31use semver::Version;
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::{Instrument, debug, info, trace_span};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
38use crate::cache::StateCache;
39use crate::cfg::RetryParameters;
40use crate::critical::{CriticalReaderId, Opaque};
41use crate::error::{CodecMismatch, InvalidUsage};
42use crate::internal::apply::Applier;
43use crate::internal::compact::CompactReq;
44use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
45use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
46use crate::internal::paths::PartialRollupKey;
47use crate::internal::state::{
48    CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
49    IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
50    Upper,
51};
52use crate::internal::state_versions::StateVersions;
53use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
54use crate::internal::watch::StateWatch;
55use crate::read::LeasedReaderId;
56use crate::rpc::PubSubSender;
57use crate::schema::CaESchema;
58use crate::write::WriterId;
59use crate::{Diagnostics, PersistConfig, ShardId};
60
61#[derive(Debug)]
62pub struct Machine<K, V, T, D> {
63    pub(crate) applier: Applier<K, V, T, D>,
64    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
65}
66
67// Impl Clone regardless of the type params.
68impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
69    fn clone(&self) -> Self {
70        Self {
71            applier: self.applier.clone(),
72            isolated_runtime: Arc::clone(&self.isolated_runtime),
73        }
74    }
75}
76
77pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
78    "persist_claim_unclaimed_compactions",
79    false,
80    "If an append doesn't result in a compaction request, but there is some uncompacted batch \
81    in state, compact that instead.",
82);
83
84pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
85    "persist_claim_compaction_percent",
86    100,
87    "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
88    (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
89    three and have a 65% chance of claiming a fourth.)",
90);
91
92pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
93    "persist_claim_compaction_min_version",
94    String::new(),
95    "If set to a valid version string, compact away any earlier versions if possible.",
96);
97
98impl<K, V, T, D> Machine<K, V, T, D>
99where
100    K: Debug + Codec,
101    V: Debug + Codec,
102    T: Timestamp + Lattice + Codec64 + Sync,
103    D: Monoid + Codec64,
104{
105    pub async fn new(
106        cfg: PersistConfig,
107        shard_id: ShardId,
108        metrics: Arc<Metrics>,
109        state_versions: Arc<StateVersions>,
110        shared_states: Arc<StateCache>,
111        pubsub_sender: Arc<dyn PubSubSender>,
112        isolated_runtime: Arc<IsolatedRuntime>,
113        diagnostics: Diagnostics,
114    ) -> Result<Self, Box<CodecMismatch>> {
115        let applier = Applier::new(
116            cfg,
117            shard_id,
118            metrics,
119            state_versions,
120            shared_states,
121            pubsub_sender,
122            diagnostics,
123        )
124        .await?;
125        Ok(Machine {
126            applier,
127            isolated_runtime,
128        })
129    }
130
131    pub fn shard_id(&self) -> ShardId {
132        self.applier.shard_id
133    }
134
135    pub fn seqno(&self) -> SeqNo {
136        self.applier.seqno()
137    }
138
139    pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
140        let rollup = self.applier.write_rollup_for_state().await;
141        let Some(rollup) = rollup else {
142            return RoutineMaintenance::default();
143        };
144
145        let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
146        if !applied {
147            // Someone else already wrote a rollup at this seqno, so ours didn't
148            // get added. Delete it.
149            self.applier
150                .state_versions
151                .delete_rollup(&rollup.shard_id, &rollup.key)
152                .await;
153        }
154        maintenance
155    }
156
157    pub async fn add_rollup(
158        &self,
159        add_rollup: (SeqNo, &HollowRollup),
160    ) -> (bool, RoutineMaintenance) {
161        // See the big SUBTLE comment in [Self::merge_res] for what's going on
162        // here.
163        let mut applied_ever_true = false;
164        let metrics = Arc::clone(&self.applier.metrics);
165        let (_seqno, _applied, maintenance) = self
166            .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
167                let ret = state.add_rollup(add_rollup);
168                if let Continue(applied) = ret {
169                    applied_ever_true = applied_ever_true || applied;
170                }
171                ret
172            })
173            .await;
174        (applied_ever_true, maintenance)
175    }
176
177    pub async fn remove_rollups(
178        &self,
179        remove_rollups: &[(SeqNo, PartialRollupKey)],
180    ) -> (Vec<SeqNo>, RoutineMaintenance) {
181        let metrics = Arc::clone(&self.applier.metrics);
182        let (_seqno, removed_rollup_seqnos, maintenance) = self
183            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
184                state.remove_rollups(remove_rollups)
185            })
186            .await;
187        (removed_rollup_seqnos, maintenance)
188    }
189
190    /// Attempt to upgrade the state to the latest version. If that's not possible, return the
191    /// actual data version of the shard.
192    pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
193        let metrics = Arc::clone(&self.applier.metrics);
194        let (_seqno, upgrade_result, maintenance) = self
195            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
196                if state.version <= cfg.build_version {
197                    // This would be the place to remove any deprecated items from state, now
198                    // that we're dropping compatibility with any previous versions.
199                    state.version = cfg.build_version.clone();
200                    Continue(Ok(()))
201                } else {
202                    Break(NoOpStateTransition(Err(state.version.clone())))
203                }
204            })
205            .await;
206
207        match upgrade_result {
208            Ok(()) => Ok(maintenance),
209            Err(version) => {
210                soft_assert_no_log!(
211                    maintenance.is_empty(),
212                    "should not generate maintenance on failed upgrade"
213                );
214                Err(version)
215            }
216        }
217    }
218
219    pub async fn register_leased_reader(
220        &self,
221        reader_id: &LeasedReaderId,
222        purpose: &str,
223        lease_duration: Duration,
224        heartbeat_timestamp_ms: u64,
225        use_critical_since: bool,
226    ) -> (LeasedReaderState<T>, RoutineMaintenance) {
227        let metrics = Arc::clone(&self.applier.metrics);
228        let (_seqno, (reader_state, seqno_since), maintenance) = self
229            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
230                state.register_leased_reader(
231                    &cfg.hostname,
232                    reader_id,
233                    purpose,
234                    seqno,
235                    lease_duration,
236                    heartbeat_timestamp_ms,
237                    use_critical_since,
238                )
239            })
240            .await;
241        // Usually, the reader gets an initial seqno hold of the seqno at which
242        // it was registered. However, on a tombstone shard the seqno hold
243        // happens to get computed as the tombstone seqno + 1
244        // (State::clone_apply provided seqno.next(), the non-no-op commit
245        // seqno, to the work fn and this is what register_reader uses for the
246        // seqno hold). The real invariant we want to protect here is that the
247        // hold is >= the seqno_since, so validate that instead of anything more
248        // specific.
249        debug_assert!(
250            reader_state.seqno >= seqno_since,
251            "{} vs {}",
252            reader_state.seqno,
253            seqno_since,
254        );
255        (reader_state, maintenance)
256    }
257
258    pub async fn register_critical_reader(
259        &self,
260        reader_id: &CriticalReaderId,
261        default_opaque: Opaque,
262        purpose: &str,
263    ) -> (CriticalReaderState<T>, RoutineMaintenance) {
264        let metrics = Arc::clone(&self.applier.metrics);
265        let (_seqno, state, maintenance) = self
266            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
267                state.register_critical_reader(
268                    &cfg.hostname,
269                    reader_id,
270                    default_opaque.clone(),
271                    purpose,
272                )
273            })
274            .await;
275        (state, maintenance)
276    }
277
278    pub async fn register_schema(
279        &self,
280        key_schema: &K::Schema,
281        val_schema: &V::Schema,
282    ) -> (Option<SchemaId>, RoutineMaintenance) {
283        let metrics = Arc::clone(&self.applier.metrics);
284        let (_seqno, state, maintenance) = self
285            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
286                state.register_schema::<K, V>(key_schema, val_schema)
287            })
288            .await;
289        (state, maintenance)
290    }
291
292    pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
293        // Performance special case for no-ops, to avoid the State clones.
294        if fuel == 0 || self.applier.all_batches().len() < 2 {
295            return (Vec::new(), RoutineMaintenance::default());
296        }
297
298        let metrics = Arc::clone(&self.applier.metrics);
299        let (_seqno, reqs, maintenance) = self
300            .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
301                state.spine_exert(fuel)
302            })
303            .await;
304        let reqs = reqs
305            .into_iter()
306            .map(|req| CompactReq {
307                shard_id: self.shard_id(),
308                desc: req.desc,
309                inputs: req.inputs,
310            })
311            .collect();
312        (reqs, maintenance)
313    }
314
315    pub async fn compare_and_append(
316        &self,
317        batch: &HollowBatch<T>,
318        writer_id: &WriterId,
319        debug_info: &HandleDebugState,
320        heartbeat_timestamp_ms: u64,
321    ) -> CompareAndAppendRes<T> {
322        let idempotency_token = IdempotencyToken::new();
323        loop {
324            let res = self
325                .compare_and_append_idempotent(
326                    batch,
327                    writer_id,
328                    heartbeat_timestamp_ms,
329                    &idempotency_token,
330                    debug_info,
331                    None,
332                )
333                .await;
334            match res {
335                CompareAndAppendRes::Success(seqno, maintenance) => {
336                    return CompareAndAppendRes::Success(seqno, maintenance);
337                }
338                CompareAndAppendRes::InvalidUsage(x) => {
339                    return CompareAndAppendRes::InvalidUsage(x);
340                }
341                CompareAndAppendRes::InlineBackpressure => {
342                    return CompareAndAppendRes::InlineBackpressure;
343                }
344                CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
345                    // If the state machine thinks that the shard upper is not
346                    // far enough along, it could be because the caller of this
347                    // method has found out that it advanced via some some
348                    // side-channel that didn't update our local cache of the
349                    // machine state. So, fetch the latest state and try again
350                    // if we indeed get something different.
351                    self.applier.fetch_and_update_state(Some(seqno)).await;
352                    let (current_seqno, current_upper) =
353                        self.applier.upper(|seqno, upper| (seqno, upper.clone()));
354
355                    // We tried to to a compare_and_append with the wrong
356                    // expected upper, that won't work.
357                    if &current_upper != batch.desc.lower() {
358                        return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
359                    } else {
360                        // The upper stored in state was outdated. Retry after
361                        // updating.
362                    }
363                }
364            }
365        }
366    }
367
368    async fn compare_and_append_idempotent(
369        &self,
370        batch: &HollowBatch<T>,
371        writer_id: &WriterId,
372        heartbeat_timestamp_ms: u64,
373        idempotency_token: &IdempotencyToken,
374        debug_info: &HandleDebugState,
375        // Only exposed for testing. In prod, this always starts as None, but
376        // making it a parameter allows us to simulate hitting an indeterminate
377        // error on the first attempt in tests.
378        mut indeterminate: Option<Indeterminate>,
379    ) -> CompareAndAppendRes<T> {
380        let metrics = Arc::clone(&self.applier.metrics);
381        let lease_duration_ms = self
382            .applier
383            .cfg
384            .writer_lease_duration
385            .as_millis()
386            .try_into()
387            .expect("reasonable duration");
388        // SUBTLE: Retries of compare_and_append with Indeterminate errors are
389        // tricky (more discussion of this in database-issues#3680):
390        //
391        // - (1) We compare_and_append and get an Indeterminate error back from
392        //   CRDB/Consensus. This means we don't know if it committed or not.
393        // - (2) We retry it.
394        // - (3) We get back an upper mismatch. The tricky bit is deciding if we
395        //   conflicted with some other writer OR if the write in (1) actually
396        //   went through and we're "conflicting" with ourself.
397        //
398        // A number of scenarios can be distinguished with per-writer
399        // idempotency tokens, so I'll jump straight to the hardest one:
400        //
401        // - (1) A compare_and_append is issued for e.g. `[5,7)`, the consensus
402        //   call makes it onto the network before the operation is cancelled
403        //   (by dropping the future).
404        // - (2) A compare_and_append is issued from the same WriteHandle for
405        //   `[3,5)`, it uses a different conn from the consensus pool and gets
406        //   an Indeterminate error.
407        // - (3) The call in (1) is received by consensus and commits.
408        // - (4) The retry of (2) receives an upper mismatch with an upper of 7.
409        //
410        // At this point, how do we determine whether (2) committed or not and
411        // thus whether we should return success or upper mismatch? Getting this
412        // right is very important for correctness (imagine this is a table
413        // write and we either return success or failure to the client).
414        //
415        // - If we use per-writer IdempotencyTokens but only store the latest
416        //   one in state, then the `[5,7)` one will have clobbered whatever our
417        //   `[3,5)` one was.
418        // - We could store every IdempotencyToken that ever committed, but that
419        //   would require unbounded storage in state (non-starter).
420        // - We could require that IdempotencyTokens are comparable and that
421        //   each call issued by a WriteHandle uses one that is strictly greater
422        //   than every call before it. A previous version of this PR tried this
423        //   and it's remarkably subtle. As a result, I (Dan) have developed
424        //   strong feels that our correctness protocol _should not depend on
425        //   WriteHandle, only Machine_.
426        // - We could require a new WriterId if a request is ever cancelled by
427        //   making `compare_and_append` take ownership of `self` and then
428        //   handing it back for any call polled to completion. The ergonomics
429        //   of this are quite awkward and, like the previous idea, it depends
430        //   on the WriteHandle impl for correctness.
431        // - Any ideas that involve reading back the data are foiled by a step
432        //   `(0) set the since to 100` (plus the latency and memory usage would
433        //   be too unpredictable).
434        //
435        // The technique used here derives from the following observations:
436        //
437        // - In practice, we don't use compare_and_append with the sort of
438        //   "regressing frontiers" described above.
439        // - In practice, Indeterminate errors are rare-ish. They happen enough
440        //   that we don't want to always panic on them, but this is still a
441        //   useful property to build on.
442        //
443        // At a high level, we do just enough to be able to distinguish the
444        // cases that we think will happen in practice and then leave the rest
445        // for a panic! that we think we'll never see. Concretely:
446        //
447        // - Run compare_and_append in a loop, retrying on Indeterminate errors
448        //   but noting if we've ever done that.
449        // - If we haven't seen an Indeterminate error (i.e. this is the first
450        //   time though the loop) then the result we got is guaranteed to be
451        //   correct, so pass it up.
452        // - Otherwise, any result other than an expected upper mismatch is
453        //   guaranteed to be correct, so just pass it up.
454        // - Otherwise examine the writer's most recent upper and break it into
455        //   two cases:
456        // - Case 1 `expected_upper.less_than(writer_most_recent_upper)`: it's
457        //   impossible that we committed on a previous iteration because the
458        //   overall upper of the shard is less_than what this call would have
459        //   advanced it to. Pass up the expectation mismatch.
460        // - Case 2 `!Case1`: First note that this means our IdempotencyToken
461        //   didn't match, otherwise we would have gotten `AlreadyCommitted`. It
462        //   also means some previous write from _this writer_ has committed an
463        //   upper that is beyond the one in this call, which is a weird usage
464        //   (NB can't be a future write because that would mean someone is
465        //   still polling us, but `&mut self` prevents that).
466        //
467        // TODO: If this technique works in practice (leads to zero panics),
468        // then commit to it and remove the Indeterminate from
469        // [WriteHandle::compare_and_append_batch].
470        let mut retry = self
471            .applier
472            .metrics
473            .retries
474            .compare_and_append_idempotent
475            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
476        let mut writer_was_present = false;
477        loop {
478            let cmd_res = self
479                .applier
480                .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
481                    writer_was_present = state.writers.contains_key(writer_id);
482                    state.compare_and_append(
483                        batch,
484                        writer_id,
485                        heartbeat_timestamp_ms,
486                        lease_duration_ms,
487                        idempotency_token,
488                        debug_info,
489                        INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
490                        if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
491                            CLAIM_COMPACTION_PERCENT.get(cfg)
492                        } else {
493                            0
494                        },
495                        Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
496                            .ok()
497                            .as_ref(),
498                    )
499                })
500                .await;
501            let (seqno, res, routine) = match cmd_res {
502                Ok(x) => x,
503                Err(err) => {
504                    // These are rare and interesting enough that we always log
505                    // them at info!.
506                    info!(
507                        "compare_and_append received an indeterminate error, retrying in {:?}: {}",
508                        retry.next_sleep(),
509                        err
510                    );
511                    if indeterminate.is_none() {
512                        indeterminate = Some(err);
513                    }
514                    retry = retry.sleep().await;
515                    continue;
516                }
517            };
518            match res {
519                Ok(merge_reqs) => {
520                    // We got explicit confirmation that we succeeded, so
521                    // anything that happened in a previous retry is irrelevant.
522                    let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
523                    for req in merge_reqs {
524                        let req = CompactReq {
525                            shard_id: self.shard_id(),
526                            desc: req.desc,
527                            inputs: req.inputs,
528                        };
529                        compact_reqs.push(req);
530                    }
531                    let writer_maintenance = WriterMaintenance {
532                        routine,
533                        compaction: compact_reqs,
534                    };
535
536                    if !writer_was_present {
537                        metrics.state.writer_added.inc();
538                    }
539                    for part in &batch.parts {
540                        if part.is_inline() {
541                            let bytes = u64::cast_from(part.inline_bytes());
542                            metrics.inline.part_commit_bytes.inc_by(bytes);
543                            metrics.inline.part_commit_count.inc();
544                        }
545                    }
546                    return CompareAndAppendRes::Success(seqno, writer_maintenance);
547                }
548                Err(CompareAndAppendBreak::AlreadyCommitted) => {
549                    // A previous iteration through this loop got an
550                    // Indeterminate error but was successful. Sanity check this
551                    // and pass along the good news.
552                    assert!(indeterminate.is_some());
553                    self.applier.metrics.cmds.compare_and_append_noop.inc();
554                    if !writer_was_present {
555                        metrics.state.writer_added.inc();
556                    }
557                    return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
558                }
559                Err(CompareAndAppendBreak::InvalidUsage(err)) => {
560                    // InvalidUsage is (or should be) a deterministic function
561                    // of the inputs and independent of anything in persist
562                    // state. It's handed back via a Break, so we never even try
563                    // to commit it. No network, no Indeterminate.
564                    assert_none!(indeterminate);
565                    return CompareAndAppendRes::InvalidUsage(err);
566                }
567                Err(CompareAndAppendBreak::InlineBackpressure) => {
568                    // We tried to write an inline part, but there was already
569                    // too much in state. Flush it out to s3 and try again.
570                    return CompareAndAppendRes::InlineBackpressure;
571                }
572                Err(CompareAndAppendBreak::Upper {
573                    shard_upper,
574                    writer_upper,
575                }) => {
576                    // NB the below intentionally compares to writer_upper
577                    // (because it gives a tighter bound on the bad case), but
578                    // returns shard_upper (what the persist caller cares
579                    // about).
580                    assert!(
581                        PartialOrder::less_equal(&writer_upper, &shard_upper),
582                        "{:?} vs {:?}",
583                        &writer_upper,
584                        &shard_upper
585                    );
586                    if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
587                        // No way this could have committed in some previous
588                        // attempt of this loop: the upper of the writer is
589                        // strictly less than the proposed new upper.
590                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
591                    }
592                    if indeterminate.is_none() {
593                        // No way this could have committed in some previous
594                        // attempt of this loop: we never saw an indeterminate
595                        // error (thus there was no previous iteration of the
596                        // loop).
597                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
598                    }
599                    // This is the bad case. We can't distinguish if some
600                    // previous attempt that got an Indeterminate error
601                    // succeeded or not. This should be sufficiently rare in
602                    // practice (hopefully ~never) that we give up and let
603                    // process restart fix things. See the big comment above for
604                    // more context.
605                    //
606                    // NB: This is intentionally not a halt! because it's quite
607                    // unexpected.
608                    panic!(
609                        concat!(
610                            "cannot distinguish compare_and_append success or failure ",
611                            "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
612                        ),
613                        batch.desc.lower().elements(),
614                        batch.desc.upper().elements(),
615                        writer_upper.elements(),
616                        shard_upper.elements(),
617                        indeterminate,
618                    );
619                }
620            };
621        }
622    }
623
624    pub async fn downgrade_since(
625        &self,
626        reader_id: &LeasedReaderId,
627        outstanding_seqno: SeqNo,
628        new_since: &Antichain<T>,
629        heartbeat_timestamp_ms: u64,
630    ) -> (SeqNo, Since<T>, RoutineMaintenance) {
631        let metrics = Arc::clone(&self.applier.metrics);
632        self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
633            state.downgrade_since(
634                reader_id,
635                seqno,
636                outstanding_seqno,
637                new_since,
638                heartbeat_timestamp_ms,
639            )
640        })
641        .await
642    }
643
644    pub async fn compare_and_downgrade_since(
645        &self,
646        reader_id: &CriticalReaderId,
647        expected_opaque: &Opaque,
648        (new_opaque, new_since): (&Opaque, &Antichain<T>),
649    ) -> (Result<Since<T>, (Opaque, Since<T>)>, RoutineMaintenance) {
650        let metrics = Arc::clone(&self.applier.metrics);
651        let (_seqno, res, maintenance) = self
652            .apply_unbatched_idempotent_cmd(
653                &metrics.cmds.compare_and_downgrade_since,
654                |_seqno, _cfg, state| {
655                    state.compare_and_downgrade_since(
656                        reader_id,
657                        expected_opaque,
658                        (new_opaque, new_since),
659                    )
660                },
661            )
662            .await;
663
664        match res {
665            Ok(since) => (Ok(since), maintenance),
666            Err((opaque, since)) => (Err((opaque, since)), maintenance),
667        }
668    }
669
670    pub async fn expire_leased_reader(
671        &self,
672        reader_id: &LeasedReaderId,
673    ) -> (SeqNo, RoutineMaintenance) {
674        let metrics = Arc::clone(&self.applier.metrics);
675        let (seqno, _existed, maintenance) = self
676            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
677                state.expire_leased_reader(reader_id)
678            })
679            .await;
680        (seqno, maintenance)
681    }
682
683    #[allow(dead_code)] // TODO(bkirwi): remove this when since behaviour on expiry has settled
684    pub async fn expire_critical_reader(
685        &self,
686        reader_id: &CriticalReaderId,
687    ) -> (SeqNo, RoutineMaintenance) {
688        let metrics = Arc::clone(&self.applier.metrics);
689        let (seqno, _existed, maintenance) = self
690            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
691                state.expire_critical_reader(reader_id)
692            })
693            .await;
694        (seqno, maintenance)
695    }
696
697    pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
698        let metrics = Arc::clone(&self.applier.metrics);
699        let (seqno, _existed, maintenance) = self
700            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
701                state.expire_writer(writer_id)
702            })
703            .await;
704        metrics.state.writer_removed.inc();
705        (seqno, maintenance)
706    }
707
708    pub fn is_finalized(&self) -> bool {
709        self.applier.is_finalized()
710    }
711
712    /// See [crate::PersistClient::get_schema].
713    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
714        self.applier.get_schema(schema_id)
715    }
716
717    /// See [crate::PersistClient::latest_schema].
718    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
719        self.applier.latest_schema()
720    }
721
722    /// Returns the ID of the given schema, if known at the current state.
723    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
724        self.applier.find_schema(key_schema, val_schema)
725    }
726
727    /// See [crate::PersistClient::compare_and_evolve_schema].
728    ///
729    /// TODO: Unify this with [Self::register_schema]?
730    pub async fn compare_and_evolve_schema(
731        &self,
732        expected: SchemaId,
733        key_schema: &K::Schema,
734        val_schema: &V::Schema,
735    ) -> (CaESchema<K, V>, RoutineMaintenance) {
736        let metrics = Arc::clone(&self.applier.metrics);
737        let (_seqno, state, maintenance) = self
738            .apply_unbatched_idempotent_cmd(
739                &metrics.cmds.compare_and_evolve_schema,
740                |_seqno, _cfg, state| {
741                    state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
742                },
743            )
744            .await;
745        (state, maintenance)
746    }
747
748    async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
749        let metrics = Arc::clone(&self.applier.metrics);
750        let mut retry = self
751            .applier
752            .metrics
753            .retries
754            .idempotent_cmd
755            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
756        loop {
757            let res = self
758                .applier
759                .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
760                    state.become_tombstone_and_shrink()
761                })
762                .await;
763            let err = match res {
764                Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
765                Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
766                    return Ok((false, maintenance));
767                }
768                Err(err) => err,
769            };
770            if retry.attempt() >= INFO_MIN_ATTEMPTS {
771                info!(
772                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
773                    retry.next_sleep(),
774                    err
775                );
776            } else {
777                debug!(
778                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
779                    retry.next_sleep(),
780                    err
781                );
782            }
783            retry = retry.sleep().await;
784        }
785    }
786
787    pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
788        self.applier.check_since_upper_both_empty()?;
789
790        let mut maintenance = RoutineMaintenance::default();
791
792        loop {
793            let (made_progress, more_maintenance) = self.tombstone_step().await?;
794            maintenance.merge(more_maintenance);
795            if !made_progress {
796                break;
797            }
798        }
799
800        Ok(maintenance)
801    }
802
803    pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
804        let start = Instant::now();
805        let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
806            Ok(x) => return Ok(x),
807            Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
808            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
809                return Err(Since(since));
810            }
811        };
812
813        // The latest state still couldn't serve this as_of: watch+sleep in a
814        // loop until it's ready.
815        let mut watch = self.applier.watch();
816        let watch = &mut watch;
817        let sleeps = self
818            .applier
819            .metrics
820            .retries
821            .snapshot
822            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
823
824        enum Wake<'a, K, V, T, D> {
825            Watch(&'a mut StateWatch<K, V, T, D>),
826            Sleep(MetricsRetryStream),
827        }
828        let mut watch_fut = std::pin::pin!(
829            watch
830                .wait_for_seqno_ge(seqno.next())
831                .map(Wake::Watch)
832                .instrument(trace_span!("snapshot::watch")),
833        );
834        let mut sleep_fut = std::pin::pin!(
835            sleeps
836                .sleep()
837                .map(Wake::Sleep)
838                .instrument(trace_span!("snapshot::sleep")),
839        );
840
841        // To reduce log spam, we log "not yet available" only once at info if
842        // it passes a certain threshold. Then, if it did one info log, we log
843        // again at info when it resolves.
844        let mut logged_at_info = false;
845        loop {
846            // Use a duration based threshold here instead of the usual
847            // INFO_MIN_ATTEMPTS because here we're waiting on an
848            // external thing to arrive.
849            if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
850                logged_at_info = true;
851                info!(
852                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
853                    self.applier.shard_metrics.name,
854                    self.shard_id(),
855                    as_of.elements(),
856                    seqno,
857                    upper.elements(),
858                );
859            } else {
860                debug!(
861                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
862                    self.applier.shard_metrics.name,
863                    self.shard_id(),
864                    as_of.elements(),
865                    seqno,
866                    upper.elements(),
867                );
868            }
869
870            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
871                future::Either::Left((wake, _)) => wake,
872                future::Either::Right((wake, _)) => wake,
873            };
874            // Note that we don't need to fetch in the Watch case, because the
875            // Watch wakeup is a signal that the shared state has already been
876            // updated.
877            match &wake {
878                Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
879                Wake::Sleep(_) => {
880                    self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
881                    self.applier.fetch_and_update_state(Some(seqno)).await;
882                }
883            }
884
885            (seqno, upper) = match self.applier.snapshot(as_of) {
886                Ok(x) => {
887                    if logged_at_info {
888                        info!(
889                            "snapshot {} {} as of {:?} now available",
890                            self.applier.shard_metrics.name,
891                            self.shard_id(),
892                            as_of.elements(),
893                        );
894                    }
895                    return Ok(x);
896                }
897                Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
898                    // The upper isn't ready yet, fall through and try again.
899                    (seqno, upper)
900                }
901                Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
902                    return Err(Since(since));
903                }
904            };
905
906            match wake {
907                Wake::Watch(watch) => {
908                    watch_fut.set(
909                        watch
910                            .wait_for_seqno_ge(seqno.next())
911                            .map(Wake::Watch)
912                            .instrument(trace_span!("snapshot::watch")),
913                    );
914                }
915                Wake::Sleep(sleeps) => {
916                    debug!(
917                        "snapshot {} {} sleeping for {:?}",
918                        self.applier.shard_metrics.name,
919                        self.shard_id(),
920                        sleeps.next_sleep()
921                    );
922                    sleep_fut.set(
923                        sleeps
924                            .sleep()
925                            .map(Wake::Sleep)
926                            .instrument(trace_span!("snapshot::sleep")),
927                    );
928                }
929            }
930        }
931    }
932
933    // NB: Unlike the other methods here, this one is read-only.
934    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
935        self.applier.verify_listen(as_of)
936    }
937
938    pub async fn next_listen_batch(
939        &self,
940        frontier: &Antichain<T>,
941        watch: &mut StateWatch<K, V, T, D>,
942        reader_id: Option<&LeasedReaderId>,
943        // If Some, an override for the default listen sleep retry parameters.
944        retry: Option<RetryParameters>,
945    ) -> HollowBatch<T> {
946        let mut seqno = match self.applier.next_listen_batch(frontier) {
947            Ok(b) => return b,
948            Err(seqno) => seqno,
949        };
950
951        // The latest state still doesn't have a new frontier for us:
952        // watch+sleep in a loop until it does.
953        let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
954        let sleeps = self
955            .applier
956            .metrics
957            .retries
958            .next_listen_batch
959            .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
960
961        enum Wake<'a, K, V, T, D> {
962            Watch(&'a mut StateWatch<K, V, T, D>),
963            Sleep(MetricsRetryStream),
964        }
965        let mut watch_fut = std::pin::pin!(
966            watch
967                .wait_for_seqno_ge(seqno.next())
968                .map(Wake::Watch)
969                .instrument(trace_span!("snapshot::watch"))
970        );
971        let mut sleep_fut = std::pin::pin!(
972            sleeps
973                .sleep()
974                .map(Wake::Sleep)
975                .instrument(trace_span!("snapshot::sleep"))
976        );
977
978        loop {
979            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
980                future::Either::Left((wake, _)) => wake,
981                future::Either::Right((wake, _)) => wake,
982            };
983            // Note that we don't need to fetch in the Watch case, because the
984            // Watch wakeup is a signal that the shared state has already been
985            // updated.
986            match &wake {
987                Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
988                Wake::Sleep(_) => {
989                    self.applier.metrics.watch.listen_woken_via_sleep.inc();
990                    self.applier.fetch_and_update_state(Some(seqno)).await;
991                }
992            }
993
994            seqno = match self.applier.next_listen_batch(frontier) {
995                Ok(b) => {
996                    match &wake {
997                        Wake::Watch(_) => {
998                            self.applier.metrics.watch.listen_resolved_via_watch.inc()
999                        }
1000                        Wake::Sleep(_) => {
1001                            self.applier.metrics.watch.listen_resolved_via_sleep.inc()
1002                        }
1003                    }
1004                    return b;
1005                }
1006                Err(seqno) => seqno,
1007            };
1008
1009            // Wait a bit and try again. Intentionally don't ever log
1010            // this at info level.
1011            match wake {
1012                Wake::Watch(watch) => {
1013                    watch_fut.set(
1014                        watch
1015                            .wait_for_seqno_ge(seqno.next())
1016                            .map(Wake::Watch)
1017                            .instrument(trace_span!("snapshot::watch")),
1018                    );
1019                }
1020                Wake::Sleep(sleeps) => {
1021                    debug!(
1022                        "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1023                        reader_id,
1024                        self.applier.shard_metrics.name,
1025                        self.shard_id(),
1026                        sleeps.next_sleep()
1027                    );
1028                    sleep_fut.set(
1029                        sleeps
1030                            .sleep()
1031                            .map(Wake::Sleep)
1032                            .instrument(trace_span!("snapshot::sleep")),
1033                    );
1034                }
1035            }
1036        }
1037    }
1038
1039    async fn apply_unbatched_idempotent_cmd<
1040        R,
1041        WorkFn: FnMut(
1042            SeqNo,
1043            &PersistConfig,
1044            &mut StateCollections<T>,
1045        ) -> ControlFlow<NoOpStateTransition<R>, R>,
1046    >(
1047        &self,
1048        cmd: &CmdMetrics,
1049        mut work_fn: WorkFn,
1050    ) -> (SeqNo, R, RoutineMaintenance) {
1051        let mut retry = self
1052            .applier
1053            .metrics
1054            .retries
1055            .idempotent_cmd
1056            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1057        loop {
1058            match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1059                Ok((seqno, x, maintenance)) => match x {
1060                    Ok(x) => {
1061                        return (seqno, x, maintenance);
1062                    }
1063                    Err(NoOpStateTransition(x)) => {
1064                        return (seqno, x, maintenance);
1065                    }
1066                },
1067                Err(err) => {
1068                    if retry.attempt() >= INFO_MIN_ATTEMPTS {
1069                        info!(
1070                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1071                            cmd.name,
1072                            retry.next_sleep(),
1073                            err
1074                        );
1075                    } else {
1076                        debug!(
1077                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1078                            cmd.name,
1079                            retry.next_sleep(),
1080                            err
1081                        );
1082                    }
1083                    retry = retry.sleep().await;
1084                    continue;
1085                }
1086            }
1087        }
1088    }
1089}
1090
1091impl<K, V, T, D> Machine<K, V, T, D>
1092where
1093    K: Debug + Codec,
1094    V: Debug + Codec,
1095    T: Timestamp + Lattice + Codec64 + Sync,
1096    D: Monoid + Codec64 + PartialEq,
1097{
1098    pub async fn merge_res(
1099        &self,
1100        res: &FueledMergeRes<T>,
1101    ) -> (ApplyMergeResult, RoutineMaintenance) {
1102        let metrics = Arc::clone(&self.applier.metrics);
1103
1104        // SUBTLE! If Machine::merge_res returns false, the blobs referenced in
1105        // compaction output are deleted so we don't leak them. Naively passing
1106        // back the value returned by State::apply_merge_res might give a false
1107        // negative in the presence of retries and Indeterminate errors.
1108        // Specifically, something like the following:
1109        //
1110        // - We try to apply_merge_res, it matches.
1111        // - When apply_unbatched_cmd goes to commit the new state, the
1112        //   Consensus::compare_and_set returns an Indeterminate error (but
1113        //   actually succeeds). The committed State now contains references to
1114        //   the compaction output blobs.
1115        // - Machine::apply_unbatched_idempotent_cmd retries the Indeterminate
1116        //   error. For whatever reason, this time though it doesn't match
1117        //   (maybe the batches simply get grouped difference when deserialized
1118        //   from state, or more unavoidably perhaps another compaction
1119        //   happens).
1120        // - This now bubbles up applied=false to the caller, which uses it as a
1121        //   signal that the blobs in the compaction output should be deleted so
1122        //   that we don't leak them.
1123        // - We now contain references in committed State to blobs that don't
1124        //   exist.
1125        //
1126        // The fix is to keep track of whether applied ever was true, even for a
1127        // compare_and_set that returned an Indeterminate error. This has the
1128        // chance of false positive (leaking a blob) but that's better than a
1129        // false negative (a blob we can never recover referenced by state). We
1130        // anyway need a mechanism to clean up leaked blobs because of process
1131        // crashes.
1132        let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1133        let (_seqno, _apply_merge_result, maintenance) = self
1134            .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1135                let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1136                if let Continue(result) = ret {
1137                    // record if we've ever applied the merge
1138                    if result.applied() {
1139                        merge_result_ever_applied = result;
1140                    }
1141                    // otherwise record the most granular reason for _not_
1142                    // applying the merge when there was a matching batch
1143                    if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1144                    {
1145                        merge_result_ever_applied = result;
1146                    }
1147                }
1148                ret
1149            })
1150            .await;
1151        (merge_result_ever_applied, maintenance)
1152    }
1153}
1154
1155pub(crate) struct ExpireFn(
1156    /// This is stored on WriteHandle and ReadHandle, which we require to be
1157    /// Send + Sync, but the Future is only Send and not Sync. Instead store a
1158    /// FnOnce that returns the Future. This could also be made an `IntoFuture`,
1159    /// once producing one of those is made easier.
1160    pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1161);
1162
1163impl Debug for ExpireFn {
1164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1165        f.debug_struct("ExpireFn").finish_non_exhaustive()
1166    }
1167}
1168
1169#[derive(Debug)]
1170pub(crate) enum CompareAndAppendRes<T> {
1171    Success(SeqNo, WriterMaintenance<T>),
1172    InvalidUsage(InvalidUsage<T>),
1173    UpperMismatch(SeqNo, Antichain<T>),
1174    InlineBackpressure,
1175}
1176
1177#[cfg(test)]
1178impl<T: Debug> CompareAndAppendRes<T> {
1179    #[track_caller]
1180    fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1181        match self {
1182            CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1183            x => panic!("{:?}", x),
1184        }
1185    }
1186}
1187
1188pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1189    "persist_next_listen_batch_retryer_fixed_sleep",
1190    Duration::from_millis(1200), // pubsub is on by default!
1191    "\
1192    The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1193);
1194
1195pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1196    "persist_next_listen_batch_retryer_initial_backoff",
1197    Duration::from_millis(100), // pubsub is on by default!
1198    "The initial backoff when polling for new batches from a Listen or Subscribe.",
1199);
1200
1201pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1202    "persist_next_listen_batch_retryer_multiplier",
1203    2,
1204    "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1205);
1206
1207pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1208    "persist_next_listen_batch_retryer_clamp",
1209    Duration::from_secs(16), // pubsub is on by default!
1210    "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1211);
1212
1213fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1214    RetryParameters {
1215        fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1216        initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1217        multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1218        clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1219    }
1220}
1221
1222pub const INFO_MIN_ATTEMPTS: usize = 3;
1223
1224pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1225where
1226    F: std::future::Future<Output = Result<R, ExternalError>>,
1227    WorkFn: FnMut() -> F,
1228{
1229    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1230    loop {
1231        match work_fn().await {
1232            Ok(x) => {
1233                if retry.attempt() > 0 {
1234                    debug!(
1235                        "external operation {} succeeded after failing at least once",
1236                        metrics.name,
1237                    );
1238                }
1239                return x;
1240            }
1241            Err(err) => {
1242                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1243                    info!(
1244                        "external operation {} failed, retrying in {:?}: {}",
1245                        metrics.name,
1246                        retry.next_sleep(),
1247                        err.display_with_causes()
1248                    );
1249                } else {
1250                    debug!(
1251                        "external operation {} failed, retrying in {:?}: {}",
1252                        metrics.name,
1253                        retry.next_sleep(),
1254                        err.display_with_causes()
1255                    );
1256                }
1257                retry = retry.sleep().await;
1258            }
1259        }
1260    }
1261}
1262
1263pub async fn retry_determinate<R, F, WorkFn>(
1264    metrics: &RetryMetrics,
1265    mut work_fn: WorkFn,
1266) -> Result<R, Indeterminate>
1267where
1268    F: std::future::Future<Output = Result<R, ExternalError>>,
1269    WorkFn: FnMut() -> F,
1270{
1271    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1272    loop {
1273        match work_fn().await {
1274            Ok(x) => {
1275                if retry.attempt() > 0 {
1276                    debug!(
1277                        "external operation {} succeeded after failing at least once",
1278                        metrics.name,
1279                    );
1280                }
1281                return Ok(x);
1282            }
1283            Err(ExternalError::Determinate(err)) => {
1284                // The determinate "could not serialize access" errors
1285                // happen often enough in dev (which uses Postgres) that
1286                // it's impeding people's work. At the same time, it's been
1287                // a source of confusion for eng. The situation is much
1288                // better on CRDB and we have metrics coverage in prod, so
1289                // this is redundant enough that it's more hurtful than
1290                // helpful. As a result, this intentionally ignores
1291                // INFO_MIN_ATTEMPTS and always logs at debug.
1292                debug!(
1293                    "external operation {} failed, retrying in {:?}: {}",
1294                    metrics.name,
1295                    retry.next_sleep(),
1296                    err.display_with_causes()
1297                );
1298                retry = retry.sleep().await;
1299                continue;
1300            }
1301            Err(ExternalError::Indeterminate(x)) => return Err(x),
1302        }
1303    }
1304}
1305
1306#[cfg(test)]
1307pub mod datadriven {
1308    use std::collections::{BTreeMap, BTreeSet};
1309    use std::pin::pin;
1310    use std::sync::{Arc, LazyLock};
1311
1312    use anyhow::anyhow;
1313    use differential_dataflow::consolidation::consolidate_updates;
1314    use differential_dataflow::trace::Description;
1315    use futures::StreamExt;
1316    use mz_dyncfg::{ConfigUpdates, ConfigVal};
1317    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1318    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1319
1320    use crate::batch::{
1321        BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1322        BatchParts, validate_truncate_batch,
1323    };
1324    use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1325    use crate::fetch::{EncodedPart, FetchConfig};
1326    use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1327    use crate::internal::datadriven::DirectiveArgs;
1328    use crate::internal::encoding::Schemas;
1329    use crate::internal::gc::GcReq;
1330    use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1331    use crate::internal::state::{BatchPart, RunOrder, RunPart};
1332    use crate::internal::state_versions::EncodedRollup;
1333    use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1334    use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1335    use crate::rpc::NoopPubSubSender;
1336    use crate::tests::new_test_client;
1337    use crate::write::COMBINE_INLINE_WRITES;
1338    use crate::{GarbageCollector, PersistClient};
1339
1340    use super::*;
1341
1342    static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1343        id: Some(SchemaId(0)),
1344        key: Arc::new(StringSchema),
1345        val: Arc::new(UnitSchema),
1346    });
1347
1348    /// Shared state for a single [crate::internal::machine] [datadriven::TestFile].
1349    #[derive(Debug)]
1350    pub struct MachineState {
1351        pub client: PersistClient,
1352        pub shard_id: ShardId,
1353        pub state_versions: Arc<StateVersions>,
1354        pub machine: Machine<String, (), u64, i64>,
1355        pub gc: GarbageCollector<String, (), u64, i64>,
1356        pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1357        pub next_id: usize,
1358        pub rollups: BTreeMap<String, EncodedRollup>,
1359        pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1360        pub routine: Vec<RoutineMaintenance>,
1361        pub compactions: BTreeMap<String, CompactReq<u64>>,
1362    }
1363
1364    impl MachineState {
1365        pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1366            let shard_id = ShardId::new();
1367            let client = new_test_client(dyncfgs).await;
1368            // Reset blob_target_size. Individual batch writes and compactions
1369            // can override it with an arg.
1370            client
1371                .cfg
1372                .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1373            // Our structured compaction code uses slightly different estimates
1374            // for array size than the old path, which can affect the results of
1375            // some compaction tests.
1376            client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1377            let state_versions = Arc::new(StateVersions::new(
1378                client.cfg.clone(),
1379                Arc::clone(&client.consensus),
1380                Arc::clone(&client.blob),
1381                Arc::clone(&client.metrics),
1382            ));
1383            let machine = Machine::new(
1384                client.cfg.clone(),
1385                shard_id,
1386                Arc::clone(&client.metrics),
1387                Arc::clone(&state_versions),
1388                Arc::clone(&client.shared_states),
1389                Arc::new(NoopPubSubSender),
1390                Arc::clone(&client.isolated_runtime),
1391                Diagnostics::for_tests(),
1392            )
1393            .await
1394            .expect("codecs should match");
1395            let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1396            MachineState {
1397                shard_id,
1398                client,
1399                state_versions,
1400                machine,
1401                gc,
1402                batches: BTreeMap::default(),
1403                rollups: BTreeMap::default(),
1404                listens: BTreeMap::default(),
1405                routine: Vec::new(),
1406                compactions: BTreeMap::default(),
1407                next_id: 0,
1408            }
1409        }
1410
1411        fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1412            Batch::new(
1413                true,
1414                Arc::clone(&self.client.metrics),
1415                Arc::clone(&self.client.blob),
1416                self.client.metrics.shards.shard(&self.shard_id, "test"),
1417                self.client.cfg.build_version.clone(),
1418                (
1419                    <String>::encode_schema(&*SCHEMAS.key),
1420                    <()>::encode_schema(&*SCHEMAS.val),
1421                ),
1422                hollow,
1423            )
1424        }
1425    }
1426
1427    /// Scans consensus and returns all states with their SeqNos
1428    /// and which batches they reference
1429    pub async fn consensus_scan(
1430        datadriven: &MachineState,
1431        args: DirectiveArgs<'_>,
1432    ) -> Result<String, anyhow::Error> {
1433        let from = args.expect("from_seqno");
1434
1435        let mut states = datadriven
1436            .state_versions
1437            .fetch_all_live_states::<u64>(datadriven.shard_id)
1438            .await
1439            .expect("should only be called on an initialized shard")
1440            .check_ts_codec()
1441            .expect("shard codecs should not change");
1442        let mut s = String::new();
1443        while let Some(x) = states.next(|_| {}) {
1444            if x.seqno < from {
1445                continue;
1446            }
1447            let rollups: Vec<_> = x
1448                .collections
1449                .rollups
1450                .keys()
1451                .map(|seqno| seqno.to_string())
1452                .collect();
1453            let batches: Vec<_> = x
1454                .collections
1455                .trace
1456                .batches()
1457                .filter(|b| !b.is_empty())
1458                .filter_map(|b| {
1459                    datadriven
1460                        .batches
1461                        .iter()
1462                        .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1463                        .map(|(batch_name, _)| batch_name.to_owned())
1464                })
1465                .collect();
1466            write!(
1467                s,
1468                "seqno={} batches={} rollups={}\n",
1469                x.seqno,
1470                batches.join(","),
1471                rollups.join(","),
1472            );
1473        }
1474        Ok(s)
1475    }
1476
1477    pub async fn consensus_truncate(
1478        datadriven: &MachineState,
1479        args: DirectiveArgs<'_>,
1480    ) -> Result<String, anyhow::Error> {
1481        let to = args.expect("to_seqno");
1482        let removed = datadriven
1483            .client
1484            .consensus
1485            .truncate(&datadriven.shard_id.to_string(), to)
1486            .await
1487            .expect("valid truncation");
1488        Ok(format!("{:?}\n", removed))
1489    }
1490
1491    pub async fn blob_scan_batches(
1492        datadriven: &MachineState,
1493        _args: DirectiveArgs<'_>,
1494    ) -> Result<String, anyhow::Error> {
1495        let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1496
1497        let mut s = String::new();
1498        let () = datadriven
1499            .state_versions
1500            .blob
1501            .list_keys_and_metadata(&key_prefix, &mut |x| {
1502                let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1503                if let PartialBlobKey::Batch(_, _) = key {
1504                    write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1505                }
1506            })
1507            .await?;
1508        Ok(s)
1509    }
1510
1511    #[allow(clippy::unused_async)]
1512    pub async fn shard_desc(
1513        datadriven: &MachineState,
1514        _args: DirectiveArgs<'_>,
1515    ) -> Result<String, anyhow::Error> {
1516        Ok(format!(
1517            "since={:?} upper={:?}\n",
1518            datadriven.machine.applier.since().elements(),
1519            datadriven.machine.applier.clone_upper().elements()
1520        ))
1521    }
1522
1523    pub async fn downgrade_since(
1524        datadriven: &mut MachineState,
1525        args: DirectiveArgs<'_>,
1526    ) -> Result<String, anyhow::Error> {
1527        let since = args.expect_antichain("since");
1528        let seqno = args
1529            .optional("seqno")
1530            .unwrap_or_else(|| datadriven.machine.seqno());
1531        let reader_id = args.expect("reader_id");
1532        let (_, since, routine) = datadriven
1533            .machine
1534            .downgrade_since(
1535                &reader_id,
1536                seqno,
1537                &since,
1538                (datadriven.machine.applier.cfg.now)(),
1539            )
1540            .await;
1541        datadriven.routine.push(routine);
1542        Ok(format!(
1543            "{} {:?}\n",
1544            datadriven.machine.seqno(),
1545            since.0.elements()
1546        ))
1547    }
1548
1549    #[allow(clippy::unused_async)]
1550    pub async fn dyncfg(
1551        datadriven: &MachineState,
1552        args: DirectiveArgs<'_>,
1553    ) -> Result<String, anyhow::Error> {
1554        let mut updates = ConfigUpdates::default();
1555        for x in args.input.trim().split('\n') {
1556            match x.split(' ').collect::<Vec<_>>().as_slice() {
1557                &[name, val] => {
1558                    let config = datadriven
1559                        .client
1560                        .cfg
1561                        .entries()
1562                        .find(|x| x.name() == name)
1563                        .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1564                    match config.val() {
1565                        ConfigVal::Usize(_) => {
1566                            let val = val.parse().map_err(anyhow::Error::new)?;
1567                            updates.add_dynamic(name, ConfigVal::Usize(val));
1568                        }
1569                        ConfigVal::Bool(_) => {
1570                            let val = val.parse().map_err(anyhow::Error::new)?;
1571                            updates.add_dynamic(name, ConfigVal::Bool(val));
1572                        }
1573                        x => unimplemented!("dyncfg type: {:?}", x),
1574                    }
1575                }
1576                x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1577            }
1578        }
1579        updates.apply(&datadriven.client.cfg);
1580
1581        Ok("ok\n".to_string())
1582    }
1583
1584    pub async fn compare_and_downgrade_since(
1585        datadriven: &mut MachineState,
1586        args: DirectiveArgs<'_>,
1587    ) -> Result<String, anyhow::Error> {
1588        let expected_opaque: u64 = args.expect("expect_opaque");
1589        let new_opaque: u64 = args.expect("opaque");
1590        let new_since = args.expect_antichain("since");
1591        let reader_id = args.expect("reader_id");
1592        let (res, routine) = datadriven
1593            .machine
1594            .compare_and_downgrade_since(
1595                &reader_id,
1596                &Opaque::encode(&expected_opaque),
1597                (&Opaque::encode(&new_opaque), &new_since),
1598            )
1599            .await;
1600        datadriven.routine.push(routine);
1601        let since = res.map_err(|(opaque, since)| {
1602            anyhow!(
1603                "mismatch: opaque={} since={:?}",
1604                opaque.decode::<u64>(),
1605                since.0.elements()
1606            )
1607        })?;
1608        Ok(format!(
1609            "{} {} {:?}\n",
1610            datadriven.machine.seqno(),
1611            new_opaque,
1612            since.0.elements()
1613        ))
1614    }
1615
1616    pub async fn write_rollup(
1617        datadriven: &mut MachineState,
1618        args: DirectiveArgs<'_>,
1619    ) -> Result<String, anyhow::Error> {
1620        let output = args.expect_str("output");
1621
1622        let rollup = datadriven
1623            .machine
1624            .applier
1625            .write_rollup_for_state()
1626            .await
1627            .expect("rollup");
1628
1629        datadriven
1630            .rollups
1631            .insert(output.to_string(), rollup.clone());
1632
1633        Ok(format!(
1634            "state={} diffs=[{}, {})\n",
1635            rollup.seqno,
1636            rollup._desc.lower().first().expect("seqno"),
1637            rollup._desc.upper().first().expect("seqno"),
1638        ))
1639    }
1640
1641    pub async fn add_rollup(
1642        datadriven: &mut MachineState,
1643        args: DirectiveArgs<'_>,
1644    ) -> Result<String, anyhow::Error> {
1645        let input = args.expect_str("input");
1646        let rollup = datadriven
1647            .rollups
1648            .get(input)
1649            .expect("unknown batch")
1650            .clone();
1651
1652        let (applied, maintenance) = datadriven
1653            .machine
1654            .add_rollup((rollup.seqno, &rollup.to_hollow()))
1655            .await;
1656
1657        if !applied {
1658            return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1659        }
1660
1661        datadriven.routine.push(maintenance);
1662        Ok(format!("{}\n", datadriven.machine.seqno()))
1663    }
1664
1665    pub async fn write_batch(
1666        datadriven: &mut MachineState,
1667        args: DirectiveArgs<'_>,
1668    ) -> Result<String, anyhow::Error> {
1669        let output = args.expect_str("output");
1670        let lower = args.expect_antichain("lower");
1671        let upper = args.expect_antichain("upper");
1672        assert!(PartialOrder::less_than(&lower, &upper));
1673        let since = args
1674            .optional_antichain("since")
1675            .unwrap_or_else(|| Antichain::from_elem(0));
1676        let target_size = args.optional("target_size");
1677        let parts_size_override = args.optional("parts_size_override");
1678        let consolidate = args.optional("consolidate").unwrap_or(true);
1679        let mut updates: Vec<_> = args
1680            .input
1681            .split('\n')
1682            .flat_map(DirectiveArgs::parse_update)
1683            .collect();
1684
1685        let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1686        if let Some(target_size) = target_size {
1687            cfg.blob_target_size = target_size;
1688        };
1689        if consolidate {
1690            consolidate_updates(&mut updates);
1691        }
1692        let run_order = if consolidate {
1693            cfg.preferred_order
1694        } else {
1695            RunOrder::Unordered
1696        };
1697        let parts = BatchParts::new_ordered::<i64>(
1698            cfg.clone(),
1699            run_order,
1700            Arc::clone(&datadriven.client.metrics),
1701            Arc::clone(&datadriven.machine.applier.shard_metrics),
1702            datadriven.shard_id,
1703            Arc::clone(&datadriven.client.blob),
1704            Arc::clone(&datadriven.client.isolated_runtime),
1705            &datadriven.client.metrics.user,
1706        );
1707        let builder = BatchBuilderInternal::new(
1708            cfg.clone(),
1709            parts,
1710            Arc::clone(&datadriven.client.metrics),
1711            SCHEMAS.clone(),
1712            Arc::clone(&datadriven.client.blob),
1713            datadriven.shard_id.clone(),
1714            datadriven.client.cfg.build_version.clone(),
1715        );
1716        let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1717        for ((k, ()), t, d) in updates {
1718            builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1719        }
1720        let mut batch = builder.finish(upper).await?;
1721        // We can only reasonably use parts_size_override with hollow batches,
1722        // so if it's set, flush any inline batches out.
1723        if parts_size_override.is_some() {
1724            batch
1725                .flush_to_blob(
1726                    &cfg,
1727                    &datadriven.client.metrics.user,
1728                    &datadriven.client.isolated_runtime,
1729                    &SCHEMAS,
1730                )
1731                .await;
1732        }
1733        let batch = batch.into_hollow_batch();
1734        let batch = IdHollowBatch {
1735            batch: Arc::new(batch),
1736            id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1737        };
1738        datadriven.next_id += 1;
1739
1740        if let Some(size) = parts_size_override {
1741            let mut batch = batch.clone();
1742            let mut hollow_batch = (*batch.batch).clone();
1743            for part in hollow_batch.parts.iter_mut() {
1744                match part {
1745                    RunPart::Many(run) => run.max_part_bytes = size,
1746                    RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1747                    RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1748                }
1749            }
1750            batch.batch = Arc::new(hollow_batch);
1751            datadriven.batches.insert(output.to_owned(), batch);
1752        } else {
1753            datadriven.batches.insert(output.to_owned(), batch.clone());
1754        }
1755        Ok(format!(
1756            "parts={} len={}\n",
1757            batch.batch.part_count(),
1758            batch.batch.len
1759        ))
1760    }
1761
1762    pub async fn fetch_batch(
1763        datadriven: &MachineState,
1764        args: DirectiveArgs<'_>,
1765    ) -> Result<String, anyhow::Error> {
1766        let input = args.expect_str("input");
1767        let stats = args.optional_str("stats");
1768        let batch = datadriven.batches.get(input).expect("unknown batch");
1769
1770        let mut s = String::new();
1771        let mut stream = pin!(
1772            batch
1773                .batch
1774                .part_stream(
1775                    datadriven.shard_id,
1776                    &*datadriven.state_versions.blob,
1777                    &*datadriven.state_versions.metrics
1778                )
1779                .enumerate()
1780        );
1781        while let Some((idx, part)) = stream.next().await {
1782            let part = &*part?;
1783            write!(s, "<part {idx}>\n");
1784
1785            let lower = match part {
1786                BatchPart::Inline { updates, .. } => {
1787                    let updates: BlobTraceBatchPart<u64> =
1788                        updates.decode(&datadriven.client.metrics.columnar)?;
1789                    updates.structured_key_lower()
1790                }
1791                other => other.structured_key_lower(),
1792            };
1793
1794            if let Some(lower) = lower {
1795                if stats == Some("lower") {
1796                    writeln!(s, "<key lower={}>", lower.get())
1797                }
1798            }
1799
1800            match part {
1801                BatchPart::Hollow(part) => {
1802                    let blob_batch = datadriven
1803                        .client
1804                        .blob
1805                        .get(&part.key.complete(&datadriven.shard_id))
1806                        .await;
1807                    match blob_batch {
1808                        Ok(Some(_)) | Err(_) => {}
1809                        // don't try to fetch/print the keys of the batch part
1810                        // if the blob store no longer has it
1811                        Ok(None) => {
1812                            s.push_str("<empty>\n");
1813                            continue;
1814                        }
1815                    };
1816                }
1817                BatchPart::Inline { .. } => {}
1818            };
1819            let part = EncodedPart::fetch(
1820                &FetchConfig::from_persist_config(&datadriven.client.cfg),
1821                &datadriven.shard_id,
1822                datadriven.client.blob.as_ref(),
1823                datadriven.client.metrics.as_ref(),
1824                datadriven.machine.applier.shard_metrics.as_ref(),
1825                &datadriven.client.metrics.read.batch_fetcher,
1826                &batch.batch.desc,
1827                part,
1828            )
1829            .await
1830            .expect("invalid batch part");
1831            let part = part
1832                .normalize(&datadriven.client.metrics.columnar)
1833                .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1834
1835            for ((k, _v), t, d) in part
1836                .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1837                .expect("valid schemas")
1838            {
1839                writeln!(s, "{k} {t} {d}");
1840            }
1841        }
1842        if !s.is_empty() {
1843            for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1844                write!(s, "<run {idx}>\n");
1845                for part in run {
1846                    let part_idx = batch
1847                        .batch
1848                        .parts
1849                        .iter()
1850                        .position(|p| p == part)
1851                        .expect("part should exist");
1852                    write!(s, "part {part_idx}\n");
1853                }
1854            }
1855        }
1856        Ok(s)
1857    }
1858
1859    #[allow(clippy::unused_async)]
1860    pub async fn truncate_batch_desc(
1861        datadriven: &mut MachineState,
1862        args: DirectiveArgs<'_>,
1863    ) -> Result<String, anyhow::Error> {
1864        let input = args.expect_str("input");
1865        let output = args.expect_str("output");
1866        let lower = args.expect_antichain("lower");
1867        let upper = args.expect_antichain("upper");
1868
1869        let batch = datadriven
1870            .batches
1871            .get(input)
1872            .expect("unknown batch")
1873            .clone();
1874        let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1875        let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1876        let mut new_hollow_batch = (*batch.batch).clone();
1877        new_hollow_batch.desc = truncated_desc;
1878        let new_batch = IdHollowBatch {
1879            batch: Arc::new(new_hollow_batch),
1880            id: batch.id,
1881        };
1882        datadriven
1883            .batches
1884            .insert(output.to_owned(), new_batch.clone());
1885        Ok(format!(
1886            "parts={} len={}\n",
1887            batch.batch.part_count(),
1888            batch.batch.len
1889        ))
1890    }
1891
1892    #[allow(clippy::unused_async)]
1893    pub async fn set_batch_parts_size(
1894        datadriven: &mut MachineState,
1895        args: DirectiveArgs<'_>,
1896    ) -> Result<String, anyhow::Error> {
1897        let input = args.expect_str("input");
1898        let size = args.expect("size");
1899        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1900        let mut hollow_batch = (*batch.batch).clone();
1901        for part in hollow_batch.parts.iter_mut() {
1902            match part {
1903                RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1904                _ => {
1905                    panic!("set_batch_parts_size only supports hollow parts")
1906                }
1907            }
1908        }
1909        batch.batch = Arc::new(hollow_batch);
1910        Ok("ok\n".to_string())
1911    }
1912
1913    pub async fn compact(
1914        datadriven: &mut MachineState,
1915        args: DirectiveArgs<'_>,
1916    ) -> Result<String, anyhow::Error> {
1917        let output = args.expect_str("output");
1918        let lower = args.expect_antichain("lower");
1919        let upper = args.expect_antichain("upper");
1920        let since = args.expect_antichain("since");
1921        let target_size = args.optional("target_size");
1922        let memory_bound = args.optional("memory_bound");
1923
1924        let mut inputs = Vec::new();
1925        for input in args.args.get("inputs").expect("missing inputs") {
1926            inputs.push(
1927                datadriven
1928                    .batches
1929                    .get(input)
1930                    .expect("unknown batch")
1931                    .clone(),
1932            );
1933        }
1934
1935        let cfg = datadriven.client.cfg.clone();
1936        if let Some(target_size) = target_size {
1937            cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1938        };
1939        if let Some(memory_bound) = memory_bound {
1940            cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1941        }
1942        let req = CompactReq {
1943            shard_id: datadriven.shard_id,
1944            desc: Description::new(lower, upper, since),
1945            inputs: inputs.clone(),
1946        };
1947        datadriven
1948            .compactions
1949            .insert(output.to_owned(), req.clone());
1950        let spine_lower = inputs
1951            .first()
1952            .map_or_else(|| datadriven.next_id, |x| x.id.0);
1953        let spine_upper = inputs.last().map_or_else(
1954            || {
1955                datadriven.next_id += 1;
1956                datadriven.next_id
1957            },
1958            |x| x.id.1,
1959        );
1960        let new_spine_id = SpineId(spine_lower, spine_upper);
1961        let res = Compactor::<String, (), u64, i64>::compact(
1962            CompactConfig::new(&cfg, datadriven.shard_id),
1963            Arc::clone(&datadriven.client.blob),
1964            Arc::clone(&datadriven.client.metrics),
1965            Arc::clone(&datadriven.machine.applier.shard_metrics),
1966            Arc::clone(&datadriven.client.isolated_runtime),
1967            req,
1968            SCHEMAS.clone(),
1969        )
1970        .await?;
1971
1972        let batch = IdHollowBatch {
1973            batch: Arc::new(res.output.clone()),
1974            id: new_spine_id,
1975        };
1976
1977        datadriven.batches.insert(output.to_owned(), batch.clone());
1978        Ok(format!(
1979            "parts={} len={}\n",
1980            res.output.part_count(),
1981            res.output.len
1982        ))
1983    }
1984
1985    pub async fn clear_blob(
1986        datadriven: &MachineState,
1987        _args: DirectiveArgs<'_>,
1988    ) -> Result<String, anyhow::Error> {
1989        let mut to_delete = vec![];
1990        datadriven
1991            .client
1992            .blob
1993            .list_keys_and_metadata("", &mut |meta| {
1994                to_delete.push(meta.key.to_owned());
1995            })
1996            .await?;
1997        for blob in &to_delete {
1998            datadriven.client.blob.delete(blob).await?;
1999        }
2000        Ok(format!("deleted={}\n", to_delete.len()))
2001    }
2002
2003    pub async fn restore_blob(
2004        datadriven: &MachineState,
2005        _args: DirectiveArgs<'_>,
2006    ) -> Result<String, anyhow::Error> {
2007        let not_restored = crate::internal::restore::restore_blob(
2008            &datadriven.state_versions,
2009            datadriven.client.blob.as_ref(),
2010            &datadriven.client.cfg.build_version,
2011            datadriven.shard_id,
2012            &*datadriven.state_versions.metrics,
2013        )
2014        .await?;
2015        let mut out = String::new();
2016        for key in not_restored {
2017            writeln!(&mut out, "{key}");
2018        }
2019        Ok(out)
2020    }
2021
2022    #[allow(clippy::unused_async)]
2023    pub async fn rewrite_ts(
2024        datadriven: &mut MachineState,
2025        args: DirectiveArgs<'_>,
2026    ) -> Result<String, anyhow::Error> {
2027        let input = args.expect_str("input");
2028        let ts_rewrite = args.expect_antichain("frontier");
2029        let upper = args.expect_antichain("upper");
2030
2031        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2032        let mut hollow_batch = (*batch.batch).clone();
2033        let () = hollow_batch
2034            .rewrite_ts(&ts_rewrite, upper)
2035            .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2036        batch.batch = Arc::new(hollow_batch);
2037        Ok("ok\n".into())
2038    }
2039
2040    pub async fn gc(
2041        datadriven: &mut MachineState,
2042        args: DirectiveArgs<'_>,
2043    ) -> Result<String, anyhow::Error> {
2044        let new_seqno_since = args.expect("to_seqno");
2045
2046        let req = GcReq {
2047            shard_id: datadriven.shard_id,
2048            new_seqno_since,
2049        };
2050        let (maintenance, stats) =
2051            GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2052        datadriven.routine.push(maintenance);
2053
2054        Ok(format!(
2055            "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2056            datadriven.machine.seqno(),
2057            stats.batch_parts_deleted_from_blob,
2058            stats.rollups_deleted_from_blob,
2059            stats
2060                .truncated_consensus_to
2061                .iter()
2062                .map(|x| x.to_string())
2063                .collect::<Vec<_>>()
2064                .join(","),
2065            stats
2066                .rollups_removed_from_state
2067                .iter()
2068                .map(|x| x.to_string())
2069                .collect::<Vec<_>>()
2070                .join(","),
2071        ))
2072    }
2073
2074    pub async fn snapshot(
2075        datadriven: &MachineState,
2076        args: DirectiveArgs<'_>,
2077    ) -> Result<String, anyhow::Error> {
2078        let as_of = args.expect_antichain("as_of");
2079        let snapshot = datadriven
2080            .machine
2081            .snapshot(&as_of)
2082            .await
2083            .map_err(|err| anyhow!("{:?}", err))?;
2084
2085        let mut result = String::new();
2086
2087        for batch in snapshot {
2088            writeln!(
2089                result,
2090                "<batch {:?}-{:?}>",
2091                batch.desc.lower().elements(),
2092                batch.desc.upper().elements()
2093            );
2094            for (run, (_meta, parts)) in batch.runs().enumerate() {
2095                writeln!(result, "<run {run}>");
2096                let mut stream = pin!(
2097                    futures::stream::iter(parts)
2098                        .flat_map(|part| part.part_stream(
2099                            datadriven.shard_id,
2100                            &*datadriven.state_versions.blob,
2101                            &*datadriven.state_versions.metrics
2102                        ))
2103                        .enumerate()
2104                );
2105
2106                while let Some((idx, part)) = stream.next().await {
2107                    let part = &*part?;
2108                    writeln!(result, "<part {idx}>");
2109
2110                    let part = EncodedPart::fetch(
2111                        &FetchConfig::from_persist_config(&datadriven.client.cfg),
2112                        &datadriven.shard_id,
2113                        datadriven.client.blob.as_ref(),
2114                        datadriven.client.metrics.as_ref(),
2115                        datadriven.machine.applier.shard_metrics.as_ref(),
2116                        &datadriven.client.metrics.read.batch_fetcher,
2117                        &batch.desc,
2118                        part,
2119                    )
2120                    .await
2121                    .expect("invalid batch part");
2122                    let part = part
2123                        .normalize(&datadriven.client.metrics.columnar)
2124                        .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2125
2126                    let mut updates = Vec::new();
2127
2128                    for ((k, _v), mut t, d) in part
2129                        .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2130                        .expect("valid schemas")
2131                    {
2132                        t.advance_by(as_of.borrow());
2133                        updates.push((k, t, d));
2134                    }
2135
2136                    consolidate_updates(&mut updates);
2137
2138                    for (k, t, d) in updates {
2139                        writeln!(result, "{k} {t} {d}");
2140                    }
2141                }
2142            }
2143        }
2144
2145        Ok(result)
2146    }
2147
2148    pub async fn register_listen(
2149        datadriven: &mut MachineState,
2150        args: DirectiveArgs<'_>,
2151    ) -> Result<String, anyhow::Error> {
2152        let output = args.expect_str("output");
2153        let as_of = args.expect_antichain("as_of");
2154        let read = datadriven
2155            .client
2156            .open_leased_reader::<String, (), u64, i64>(
2157                datadriven.shard_id,
2158                Arc::new(StringSchema),
2159                Arc::new(UnitSchema),
2160                Diagnostics::for_tests(),
2161                true,
2162            )
2163            .await
2164            .expect("invalid shard types");
2165        let listen = read
2166            .listen(as_of)
2167            .await
2168            .map_err(|err| anyhow!("{:?}", err))?;
2169        datadriven.listens.insert(output.to_owned(), listen);
2170        Ok("ok\n".into())
2171    }
2172
2173    pub async fn listen_through(
2174        datadriven: &mut MachineState,
2175        args: DirectiveArgs<'_>,
2176    ) -> Result<String, anyhow::Error> {
2177        let input = args.expect_str("input");
2178        // It's not possible to listen _through_ the empty antichain, so this is
2179        // intentionally `expect` instead of `expect_antichain`.
2180        let frontier = args.expect("frontier");
2181        let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2182        let mut s = String::new();
2183        loop {
2184            for event in listen.fetch_next().await {
2185                match event {
2186                    ListenEvent::Updates(x) => {
2187                        for ((k, _v), t, d) in x.iter() {
2188                            write!(s, "{} {} {}\n", k, t, d);
2189                        }
2190                    }
2191                    ListenEvent::Progress(x) => {
2192                        if !x.less_than(&frontier) {
2193                            return Ok(s);
2194                        }
2195                    }
2196                }
2197            }
2198        }
2199    }
2200
2201    pub async fn register_critical_reader(
2202        datadriven: &mut MachineState,
2203        args: DirectiveArgs<'_>,
2204    ) -> Result<String, anyhow::Error> {
2205        let reader_id = args.expect("reader_id");
2206        let (state, maintenance) = datadriven
2207            .machine
2208            .register_critical_reader(&reader_id, Opaque::encode(&0u64), "tests")
2209            .await;
2210        datadriven.routine.push(maintenance);
2211        Ok(format!(
2212            "{} {:?}\n",
2213            datadriven.machine.seqno(),
2214            state.since.elements(),
2215        ))
2216    }
2217
2218    pub async fn register_leased_reader(
2219        datadriven: &mut MachineState,
2220        args: DirectiveArgs<'_>,
2221    ) -> Result<String, anyhow::Error> {
2222        let reader_id = args.expect("reader_id");
2223        let (reader_state, maintenance) = datadriven
2224            .machine
2225            .register_leased_reader(
2226                &reader_id,
2227                "tests",
2228                READER_LEASE_DURATION.get(&datadriven.client.cfg),
2229                (datadriven.client.cfg.now)(),
2230                false,
2231            )
2232            .await;
2233        datadriven.routine.push(maintenance);
2234        Ok(format!(
2235            "{} {:?}\n",
2236            datadriven.machine.seqno(),
2237            reader_state.since.elements(),
2238        ))
2239    }
2240
2241    pub async fn expire_critical_reader(
2242        datadriven: &mut MachineState,
2243        args: DirectiveArgs<'_>,
2244    ) -> Result<String, anyhow::Error> {
2245        let reader_id = args.expect("reader_id");
2246        let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2247        datadriven.routine.push(maintenance);
2248        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2249    }
2250
2251    pub async fn expire_leased_reader(
2252        datadriven: &mut MachineState,
2253        args: DirectiveArgs<'_>,
2254    ) -> Result<String, anyhow::Error> {
2255        let reader_id = args.expect("reader_id");
2256        let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2257        datadriven.routine.push(maintenance);
2258        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2259    }
2260
2261    pub async fn compare_and_append_batches(
2262        datadriven: &MachineState,
2263        args: DirectiveArgs<'_>,
2264    ) -> Result<String, anyhow::Error> {
2265        let expected_upper = args.expect_antichain("expected_upper");
2266        let new_upper = args.expect_antichain("new_upper");
2267
2268        let mut batches: Vec<Batch<String, (), u64, i64>> = args
2269            .args
2270            .get("batches")
2271            .expect("missing batches")
2272            .into_iter()
2273            .map(|batch| {
2274                let hollow = (*datadriven
2275                    .batches
2276                    .get(batch)
2277                    .expect("unknown batch")
2278                    .clone()
2279                    .batch)
2280                    .clone();
2281                datadriven.to_batch(hollow)
2282            })
2283            .collect();
2284
2285        let mut writer = datadriven
2286            .client
2287            .open_writer(
2288                datadriven.shard_id,
2289                Arc::new(StringSchema),
2290                Arc::new(UnitSchema),
2291                Diagnostics::for_tests(),
2292            )
2293            .await?;
2294
2295        let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2296
2297        let () = writer
2298            .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2299            .await?
2300            .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2301
2302        writer.expire().await;
2303
2304        Ok("ok\n".into())
2305    }
2306
2307    pub async fn expire_writer(
2308        datadriven: &mut MachineState,
2309        args: DirectiveArgs<'_>,
2310    ) -> Result<String, anyhow::Error> {
2311        let writer_id = args.expect("writer_id");
2312        let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2313        datadriven.routine.push(maintenance);
2314        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2315    }
2316
2317    pub(crate) async fn finalize(
2318        datadriven: &mut MachineState,
2319        _args: DirectiveArgs<'_>,
2320    ) -> anyhow::Result<String> {
2321        let maintenance = datadriven.machine.become_tombstone().await?;
2322        datadriven.routine.push(maintenance);
2323        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2324    }
2325
2326    pub(crate) fn is_finalized(
2327        datadriven: &MachineState,
2328        _args: DirectiveArgs<'_>,
2329    ) -> anyhow::Result<String> {
2330        let seqno = datadriven.machine.seqno();
2331        let tombstone = datadriven.machine.is_finalized();
2332        Ok(format!("{seqno} {tombstone}\n"))
2333    }
2334
2335    pub async fn compare_and_append(
2336        datadriven: &mut MachineState,
2337        args: DirectiveArgs<'_>,
2338    ) -> Result<String, anyhow::Error> {
2339        let input = args.expect_str("input");
2340        let writer_id = args.expect("writer_id");
2341        let mut batch = datadriven
2342            .batches
2343            .get(input)
2344            .expect("unknown batch")
2345            .clone();
2346        let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2347        let now = (datadriven.client.cfg.now)();
2348
2349        let (id, maintenance) = datadriven
2350            .machine
2351            .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2352            .await;
2353        assert_eq!(id, SCHEMAS.id);
2354        datadriven.routine.push(maintenance);
2355        let maintenance = loop {
2356            let indeterminate = args
2357                .optional::<String>("prev_indeterminate")
2358                .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2359            let res = datadriven
2360                .machine
2361                .compare_and_append_idempotent(
2362                    &batch.batch,
2363                    &writer_id,
2364                    now,
2365                    &token,
2366                    &HandleDebugState::default(),
2367                    indeterminate,
2368                )
2369                .await;
2370            match res {
2371                CompareAndAppendRes::Success(_, x) => break x,
2372                CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2373                    return Err(anyhow!("{:?}", Upper(upper)));
2374                }
2375                CompareAndAppendRes::InlineBackpressure => {
2376                    let hollow_batch = (*batch.batch).clone();
2377                    let mut b = datadriven.to_batch(hollow_batch);
2378                    let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2379                    b.flush_to_blob(
2380                        &cfg,
2381                        &datadriven.client.metrics.user,
2382                        &datadriven.client.isolated_runtime,
2383                        &*SCHEMAS,
2384                    )
2385                    .await;
2386                    batch.batch = Arc::new(b.into_hollow_batch());
2387                    continue;
2388                }
2389                _ => panic!("{:?}", res),
2390            };
2391        };
2392        // TODO: Don't throw away writer maintenance. It's slightly tricky
2393        // because we need a WriterId for Compactor.
2394        datadriven.routine.push(maintenance.routine);
2395        Ok(format!(
2396            "{} {:?}\n",
2397            datadriven.machine.seqno(),
2398            datadriven.machine.applier.clone_upper().elements(),
2399        ))
2400    }
2401
2402    pub async fn apply_merge_res(
2403        datadriven: &mut MachineState,
2404        args: DirectiveArgs<'_>,
2405    ) -> Result<String, anyhow::Error> {
2406        let input = args.expect_str("input");
2407        let batch = datadriven
2408            .batches
2409            .get(input)
2410            .expect("unknown batch")
2411            .clone();
2412        let compact_req = datadriven
2413            .compactions
2414            .get(input)
2415            .expect("unknown compact req")
2416            .clone();
2417        let input_batches = compact_req
2418            .inputs
2419            .iter()
2420            .map(|x| x.id)
2421            .collect::<BTreeSet<_>>();
2422        let lower_spine_bound = input_batches
2423            .first()
2424            .map(|id| id.0)
2425            .expect("at least one batch must be present");
2426        let upper_spine_bound = input_batches
2427            .last()
2428            .map(|id| id.1)
2429            .expect("at least one batch must be present");
2430        let id = SpineId(lower_spine_bound, upper_spine_bound);
2431        let hollow_batch = (*batch.batch).clone();
2432
2433        let (merge_res, maintenance) = datadriven
2434            .machine
2435            .merge_res(&FueledMergeRes {
2436                output: hollow_batch,
2437                input: CompactionInput::IdRange(id),
2438                new_active_compaction: None,
2439            })
2440            .await;
2441        datadriven.routine.push(maintenance);
2442        Ok(format!(
2443            "{} {}\n",
2444            datadriven.machine.seqno(),
2445            merge_res.applied()
2446        ))
2447    }
2448
2449    pub async fn perform_maintenance(
2450        datadriven: &mut MachineState,
2451        _args: DirectiveArgs<'_>,
2452    ) -> Result<String, anyhow::Error> {
2453        let mut s = String::new();
2454        for maintenance in datadriven.routine.drain(..) {
2455            let () = maintenance
2456                .perform(&datadriven.machine, &datadriven.gc)
2457                .await;
2458            let () = datadriven
2459                .machine
2460                .applier
2461                .fetch_and_update_state(None)
2462                .await;
2463            write!(s, "{} ok\n", datadriven.machine.seqno());
2464        }
2465        Ok(s)
2466    }
2467}
2468
2469#[cfg(test)]
2470pub mod tests {
2471    use std::sync::Arc;
2472
2473    use mz_dyncfg::ConfigUpdates;
2474    use mz_ore::cast::CastFrom;
2475    use mz_ore::task::spawn;
2476    use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2477    use mz_persist::location::SeqNo;
2478    use mz_persist_types::PersistLocation;
2479    use semver::Version;
2480    use timely::progress::Antichain;
2481
2482    use crate::batch::BatchBuilderConfig;
2483    use crate::cache::StateCache;
2484    use crate::internal::gc::{GarbageCollector, GcReq};
2485    use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2486    use crate::tests::{new_test_client, new_test_client_cache};
2487    use crate::{Diagnostics, PersistClient, ShardId};
2488
2489    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2490    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2491    async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2492        mz_ore::test::init_logging();
2493
2494        let client = new_test_client(&dyncfgs).await;
2495        // set a low rollup threshold so GC/truncation is more aggressive
2496        client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2497        let (mut write, read) = client
2498            .expect_open::<String, (), u64, i64>(ShardId::new())
2499            .await;
2500
2501        // Ensure the reader is not holding back the since.
2502        read.expire().await;
2503
2504        // Write a bunch of batches. This should result in a bounded number of
2505        // live entries in consensus.
2506        const NUM_BATCHES: u64 = 100;
2507        for idx in 0..NUM_BATCHES {
2508            let mut batch = write
2509                .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2510                .await;
2511            // Flush this batch out so the CaA doesn't get inline writes
2512            // backpressure.
2513            let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2514            batch
2515                .flush_to_blob(
2516                    &cfg,
2517                    &client.metrics.user,
2518                    &client.isolated_runtime,
2519                    &write.write_schemas,
2520                )
2521                .await;
2522            let (_, writer_maintenance) = write
2523                .machine
2524                .compare_and_append(
2525                    &batch.into_hollow_batch(),
2526                    &write.writer_id,
2527                    &HandleDebugState::default(),
2528                    (write.cfg.now)(),
2529                )
2530                .await
2531                .unwrap();
2532            writer_maintenance
2533                .perform(&write.machine, &write.gc, write.compact.as_ref())
2534                .await;
2535        }
2536        let live_diffs = write
2537            .machine
2538            .applier
2539            .state_versions
2540            .fetch_all_live_diffs(&write.machine.shard_id())
2541            .await;
2542        // Make sure we constructed the key correctly.
2543        assert!(live_diffs.len() > 0);
2544        // Make sure the number of entries is bounded. (I think we could work
2545        // out a tighter bound than this, but the point is only that it's
2546        // bounded).
2547        let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2548        assert!(
2549            live_diffs.len() <= max_live_diffs,
2550            "{} vs {}",
2551            live_diffs.len(),
2552            max_live_diffs
2553        );
2554    }
2555
2556    // A regression test for database-issues#4206, where a bug in gc led to an incremental
2557    // state invariant being violated which resulted in gc being permanently
2558    // wedged for the shard.
2559    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2560    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2561    async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2562        let mut client = new_test_client(&dyncfgs).await;
2563        let intercept = InterceptHandle::default();
2564        client.blob = Arc::new(InterceptBlob::new(
2565            Arc::clone(&client.blob),
2566            intercept.clone(),
2567        ));
2568        let (_, mut read) = client
2569            .expect_open::<String, String, u64, i64>(ShardId::new())
2570            .await;
2571
2572        // Create a new SeqNo
2573        read.downgrade_since(&Antichain::from_elem(1)).await;
2574        let new_seqno_since = read.machine.applier.seqno_since();
2575
2576        // Start a GC in the background for some SeqNo range that is not
2577        // contiguous compared to the last gc req (in this case, n/a) and then
2578        // crash when it gets to the blob deletes. In the regression case, this
2579        // renders the shard permanently un-gc-able.
2580        assert!(new_seqno_since > SeqNo::minimum());
2581        intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2582        let machine = read.machine.clone();
2583        // Run this in a spawn so we can catch the boom panic
2584        let gc = spawn(|| "", async move {
2585            let req = GcReq {
2586                shard_id: machine.shard_id(),
2587                new_seqno_since,
2588            };
2589            GarbageCollector::gc_and_truncate(&machine, req).await
2590        });
2591        // Wait for gc to either panic (regression case) or finish (good case)
2592        // because it happens to not call blob delete.
2593        let _ = gc.await;
2594
2595        // Allow blob deletes to go through and try GC again. In the regression
2596        // case, this hangs.
2597        intercept.set_post_delete(None);
2598        let req = GcReq {
2599            shard_id: read.machine.shard_id(),
2600            new_seqno_since,
2601        };
2602        let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2603    }
2604
2605    // A regression test for materialize#20776, where a bug meant that compare_and_append
2606    // would not fetch the latest state after an upper mismatch. This meant that
2607    // a write that could succeed if retried on the latest state would instead
2608    // return an UpperMismatch.
2609    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2610    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2611    async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2612        let client = new_test_client(&dyncfgs).await;
2613        let mut client2 = client.clone();
2614
2615        // The bug can only happen if the two WriteHandles have separate copies
2616        // of state, so make sure that each is given its own StateCache.
2617        let new_state_cache = Arc::new(StateCache::new_no_metrics());
2618        client2.shared_states = new_state_cache;
2619
2620        let shard_id = ShardId::new();
2621        let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2622        let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2623
2624        let data = [
2625            (("1".to_owned(), ()), 1, 1),
2626            (("2".to_owned(), ()), 2, 1),
2627            (("3".to_owned(), ()), 3, 1),
2628            (("4".to_owned(), ()), 4, 1),
2629            (("5".to_owned(), ()), 5, 1),
2630        ];
2631
2632        write1.expect_compare_and_append(&data[..1], 0, 2).await;
2633
2634        // this handle's upper now lags behind. if compare_and_append fails to update
2635        // state after an upper mismatch then this call would (incorrectly) fail
2636        write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2637    }
2638
2639    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2640    #[cfg_attr(miri, ignore)]
2641    async fn version_upgrade(dyncfgs: ConfigUpdates) {
2642        let mut cache = new_test_client_cache(&dyncfgs);
2643        cache.cfg.build_version = Version::new(26, 1, 0);
2644        let shard_id = ShardId::new();
2645
2646        async fn fetch_catalog_upgrade_shard_version(
2647            persist_client: &PersistClient,
2648            upgrade_shard_id: ShardId,
2649        ) -> Option<semver::Version> {
2650            let shard_state = persist_client
2651                .inspect_shard::<u64>(&upgrade_shard_id)
2652                .await
2653                .ok()?;
2654            let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2655            let upgrade_version = json_state
2656                .get("applier_version")
2657                .cloned()
2658                .expect("missing applier_version");
2659            let upgrade_version =
2660                serde_json::from_value(upgrade_version).expect("version deserialization error");
2661            Some(upgrade_version)
2662        }
2663
2664        cache.cfg.build_version = Version::new(26, 1, 0);
2665        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2666        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2667        reader.downgrade_since(&Antichain::from_elem(1)).await;
2668        assert_eq!(
2669            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2670            Some(Version::new(26, 1, 0)),
2671        );
2672
2673        // Merely opening and operating on the shard at a new version doesn't bump version...
2674        cache.cfg.build_version = Version::new(27, 1, 0);
2675        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2676        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2677        reader.downgrade_since(&Antichain::from_elem(2)).await;
2678        assert_eq!(
2679            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2680            Some(Version::new(26, 1, 0)),
2681        );
2682
2683        // ...but an explicit call will.
2684        client
2685            .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2686            .await
2687            .unwrap();
2688        assert_eq!(
2689            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2690            Some(Version::new(27, 1, 0)),
2691        );
2692    }
2693}