mz_persist_client/internal/
machine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the persist state machine.
11
12use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::assert_none;
23use mz_ore::cast::CastFrom;
24use mz_ore::error::ErrorExt;
25#[allow(unused_imports)] // False positive.
26use mz_ore::fmt::FormatBuffer;
27use mz_ore::task::JoinHandle;
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50    CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
51    IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
52    Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65    pub(crate) applier: Applier<K, V, T, D>,
66    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69// Impl Clone regardless of the type params.
70impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71    fn clone(&self) -> Self {
72        Self {
73            applier: self.applier.clone(),
74            isolated_runtime: Arc::clone(&self.isolated_runtime),
75        }
76    }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80    "persist_claim_unclaimed_compactions",
81    false,
82    "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83    in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87    "persist_claim_compaction_percent",
88    100,
89    "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90    (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91    three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95    "persist_claim_compaction_min_version",
96    String::new(),
97    "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102    K: Debug + Codec,
103    V: Debug + Codec,
104    T: Timestamp + Lattice + Codec64 + Sync,
105    D: Semigroup + Codec64,
106{
107    pub async fn new(
108        cfg: PersistConfig,
109        shard_id: ShardId,
110        metrics: Arc<Metrics>,
111        state_versions: Arc<StateVersions>,
112        shared_states: Arc<StateCache>,
113        pubsub_sender: Arc<dyn PubSubSender>,
114        isolated_runtime: Arc<IsolatedRuntime>,
115        diagnostics: Diagnostics,
116    ) -> Result<Self, Box<CodecMismatch>> {
117        let applier = Applier::new(
118            cfg,
119            shard_id,
120            metrics,
121            state_versions,
122            shared_states,
123            pubsub_sender,
124            diagnostics,
125        )
126        .await?;
127        Ok(Machine {
128            applier,
129            isolated_runtime,
130        })
131    }
132
133    pub fn shard_id(&self) -> ShardId {
134        self.applier.shard_id
135    }
136
137    pub fn seqno(&self) -> SeqNo {
138        self.applier.seqno()
139    }
140
141    pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142        let rollup = self.applier.write_rollup_for_state().await;
143        let Some(rollup) = rollup else {
144            return RoutineMaintenance::default();
145        };
146
147        let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148        if !applied {
149            // Someone else already wrote a rollup at this seqno, so ours didn't
150            // get added. Delete it.
151            self.applier
152                .state_versions
153                .delete_rollup(&rollup.shard_id, &rollup.key)
154                .await;
155        }
156        maintenance
157    }
158
159    pub async fn add_rollup(
160        &self,
161        add_rollup: (SeqNo, &HollowRollup),
162    ) -> (bool, RoutineMaintenance) {
163        // See the big SUBTLE comment in [Self::merge_res] for what's going on
164        // here.
165        let mut applied_ever_true = false;
166        let metrics = Arc::clone(&self.applier.metrics);
167        let (_seqno, _applied, maintenance) = self
168            .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169                let ret = state.add_rollup(add_rollup);
170                if let Continue(applied) = ret {
171                    applied_ever_true = applied_ever_true || applied;
172                }
173                ret
174            })
175            .await;
176        (applied_ever_true, maintenance)
177    }
178
179    pub async fn remove_rollups(
180        &self,
181        remove_rollups: &[(SeqNo, PartialRollupKey)],
182    ) -> (Vec<SeqNo>, RoutineMaintenance) {
183        let metrics = Arc::clone(&self.applier.metrics);
184        let (_seqno, removed_rollup_seqnos, maintenance) = self
185            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186                state.remove_rollups(remove_rollups)
187            })
188            .await;
189        (removed_rollup_seqnos, maintenance)
190    }
191
192    pub async fn register_leased_reader(
193        &self,
194        reader_id: &LeasedReaderId,
195        purpose: &str,
196        lease_duration: Duration,
197        heartbeat_timestamp_ms: u64,
198        use_critical_since: bool,
199    ) -> (LeasedReaderState<T>, RoutineMaintenance) {
200        let metrics = Arc::clone(&self.applier.metrics);
201        let (_seqno, (reader_state, seqno_since), maintenance) = self
202            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
203                state.register_leased_reader(
204                    &cfg.hostname,
205                    reader_id,
206                    purpose,
207                    seqno,
208                    lease_duration,
209                    heartbeat_timestamp_ms,
210                    use_critical_since,
211                )
212            })
213            .await;
214        // Usually, the reader gets an initial seqno hold of the seqno at which
215        // it was registered. However, on a tombstone shard the seqno hold
216        // happens to get computed as the tombstone seqno + 1
217        // (State::clone_apply provided seqno.next(), the non-no-op commit
218        // seqno, to the work fn and this is what register_reader uses for the
219        // seqno hold). The real invariant we want to protect here is that the
220        // hold is >= the seqno_since, so validate that instead of anything more
221        // specific.
222        debug_assert!(
223            reader_state.seqno >= seqno_since,
224            "{} vs {}",
225            reader_state.seqno,
226            seqno_since,
227        );
228        (reader_state, maintenance)
229    }
230
231    pub async fn register_critical_reader<O: Opaque + Codec64>(
232        &self,
233        reader_id: &CriticalReaderId,
234        purpose: &str,
235    ) -> (CriticalReaderState<T>, RoutineMaintenance) {
236        let metrics = Arc::clone(&self.applier.metrics);
237        let (_seqno, state, maintenance) = self
238            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
239                state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
240            })
241            .await;
242        (state, maintenance)
243    }
244
245    pub async fn register_schema(
246        &self,
247        key_schema: &K::Schema,
248        val_schema: &V::Schema,
249    ) -> (Option<SchemaId>, RoutineMaintenance) {
250        let metrics = Arc::clone(&self.applier.metrics);
251        let (_seqno, state, maintenance) = self
252            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
253                state.register_schema::<K, V>(key_schema, val_schema)
254            })
255            .await;
256        (state, maintenance)
257    }
258
259    pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
260        // Performance special case for no-ops, to avoid the State clones.
261        if fuel == 0 || self.applier.all_batches().len() < 2 {
262            return (Vec::new(), RoutineMaintenance::default());
263        }
264
265        let metrics = Arc::clone(&self.applier.metrics);
266        let (_seqno, reqs, maintenance) = self
267            .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
268                state.spine_exert(fuel)
269            })
270            .await;
271        let reqs = reqs
272            .into_iter()
273            .map(|req| CompactReq {
274                shard_id: self.shard_id(),
275                desc: req.desc,
276                inputs: req
277                    .inputs
278                    .into_iter()
279                    .map(|b| Arc::unwrap_or_clone(b.batch))
280                    .collect(),
281            })
282            .collect();
283        (reqs, maintenance)
284    }
285
286    pub async fn compare_and_append(
287        &self,
288        batch: &HollowBatch<T>,
289        writer_id: &WriterId,
290        debug_info: &HandleDebugState,
291        heartbeat_timestamp_ms: u64,
292    ) -> CompareAndAppendRes<T> {
293        let idempotency_token = IdempotencyToken::new();
294        loop {
295            let res = self
296                .compare_and_append_idempotent(
297                    batch,
298                    writer_id,
299                    heartbeat_timestamp_ms,
300                    &idempotency_token,
301                    debug_info,
302                    None,
303                )
304                .await;
305            match res {
306                CompareAndAppendRes::Success(seqno, maintenance) => {
307                    return CompareAndAppendRes::Success(seqno, maintenance);
308                }
309                CompareAndAppendRes::InvalidUsage(x) => {
310                    return CompareAndAppendRes::InvalidUsage(x);
311                }
312                CompareAndAppendRes::InlineBackpressure => {
313                    return CompareAndAppendRes::InlineBackpressure;
314                }
315                CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
316                    // If the state machine thinks that the shard upper is not
317                    // far enough along, it could be because the caller of this
318                    // method has found out that it advanced via some some
319                    // side-channel that didn't update our local cache of the
320                    // machine state. So, fetch the latest state and try again
321                    // if we indeed get something different.
322                    self.applier.fetch_and_update_state(Some(seqno)).await;
323                    let (current_seqno, current_upper) =
324                        self.applier.upper(|seqno, upper| (seqno, upper.clone()));
325
326                    // We tried to to a compare_and_append with the wrong
327                    // expected upper, that won't work.
328                    if &current_upper != batch.desc.lower() {
329                        return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
330                    } else {
331                        // The upper stored in state was outdated. Retry after
332                        // updating.
333                    }
334                }
335            }
336        }
337    }
338
339    async fn compare_and_append_idempotent(
340        &self,
341        batch: &HollowBatch<T>,
342        writer_id: &WriterId,
343        heartbeat_timestamp_ms: u64,
344        idempotency_token: &IdempotencyToken,
345        debug_info: &HandleDebugState,
346        // Only exposed for testing. In prod, this always starts as None, but
347        // making it a parameter allows us to simulate hitting an indeterminate
348        // error on the first attempt in tests.
349        mut indeterminate: Option<Indeterminate>,
350    ) -> CompareAndAppendRes<T> {
351        let metrics = Arc::clone(&self.applier.metrics);
352        let lease_duration_ms = self
353            .applier
354            .cfg
355            .writer_lease_duration
356            .as_millis()
357            .try_into()
358            .expect("reasonable duration");
359        // SUBTLE: Retries of compare_and_append with Indeterminate errors are
360        // tricky (more discussion of this in database-issues#3680):
361        //
362        // - (1) We compare_and_append and get an Indeterminate error back from
363        //   CRDB/Consensus. This means we don't know if it committed or not.
364        // - (2) We retry it.
365        // - (3) We get back an upper mismatch. The tricky bit is deciding if we
366        //   conflicted with some other writer OR if the write in (1) actually
367        //   went through and we're "conflicting" with ourself.
368        //
369        // A number of scenarios can be distinguished with per-writer
370        // idempotency tokens, so I'll jump straight to the hardest one:
371        //
372        // - (1) A compare_and_append is issued for e.g. `[5,7)`, the consensus
373        //   call makes it onto the network before the operation is cancelled
374        //   (by dropping the future).
375        // - (2) A compare_and_append is issued from the same WriteHandle for
376        //   `[3,5)`, it uses a different conn from the consensus pool and gets
377        //   an Indeterminate error.
378        // - (3) The call in (1) is received by consensus and commits.
379        // - (4) The retry of (2) receives an upper mismatch with an upper of 7.
380        //
381        // At this point, how do we determine whether (2) committed or not and
382        // thus whether we should return success or upper mismatch? Getting this
383        // right is very important for correctness (imagine this is a table
384        // write and we either return success or failure to the client).
385        //
386        // - If we use per-writer IdempotencyTokens but only store the latest
387        //   one in state, then the `[5,7)` one will have clobbered whatever our
388        //   `[3,5)` one was.
389        // - We could store every IdempotencyToken that ever committed, but that
390        //   would require unbounded storage in state (non-starter).
391        // - We could require that IdempotencyTokens are comparable and that
392        //   each call issued by a WriteHandle uses one that is strictly greater
393        //   than every call before it. A previous version of this PR tried this
394        //   and it's remarkably subtle. As a result, I (Dan) have developed
395        //   strong feels that our correctness protocol _should not depend on
396        //   WriteHandle, only Machine_.
397        // - We could require a new WriterId if a request is ever cancelled by
398        //   making `compare_and_append` take ownership of `self` and then
399        //   handing it back for any call polled to completion. The ergonomics
400        //   of this are quite awkward and, like the previous idea, it depends
401        //   on the WriteHandle impl for correctness.
402        // - Any ideas that involve reading back the data are foiled by a step
403        //   `(0) set the since to 100` (plus the latency and memory usage would
404        //   be too unpredictable).
405        //
406        // The technique used here derives from the following observations:
407        //
408        // - In practice, we don't use compare_and_append with the sort of
409        //   "regressing frontiers" described above.
410        // - In practice, Indeterminate errors are rare-ish. They happen enough
411        //   that we don't want to always panic on them, but this is still a
412        //   useful property to build on.
413        //
414        // At a high level, we do just enough to be able to distinguish the
415        // cases that we think will happen in practice and then leave the rest
416        // for a panic! that we think we'll never see. Concretely:
417        //
418        // - Run compare_and_append in a loop, retrying on Indeterminate errors
419        //   but noting if we've ever done that.
420        // - If we haven't seen an Indeterminate error (i.e. this is the first
421        //   time though the loop) then the result we got is guaranteed to be
422        //   correct, so pass it up.
423        // - Otherwise, any result other than an expected upper mismatch is
424        //   guaranteed to be correct, so just pass it up.
425        // - Otherwise examine the writer's most recent upper and break it into
426        //   two cases:
427        // - Case 1 `expected_upper.less_than(writer_most_recent_upper)`: it's
428        //   impossible that we committed on a previous iteration because the
429        //   overall upper of the shard is less_than what this call would have
430        //   advanced it to. Pass up the expectation mismatch.
431        // - Case 2 `!Case1`: First note that this means our IdempotencyToken
432        //   didn't match, otherwise we would have gotten `AlreadyCommitted`. It
433        //   also means some previous write from _this writer_ has committed an
434        //   upper that is beyond the one in this call, which is a weird usage
435        //   (NB can't be a future write because that would mean someone is
436        //   still polling us, but `&mut self` prevents that).
437        //
438        // TODO: If this technique works in practice (leads to zero panics),
439        // then commit to it and remove the Indeterminate from
440        // [WriteHandle::compare_and_append_batch].
441        let mut retry = self
442            .applier
443            .metrics
444            .retries
445            .compare_and_append_idempotent
446            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
447        let mut writer_was_present = false;
448        loop {
449            let cmd_res = self
450                .applier
451                .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
452                    writer_was_present = state.writers.contains_key(writer_id);
453                    state.compare_and_append(
454                        batch,
455                        writer_id,
456                        heartbeat_timestamp_ms,
457                        lease_duration_ms,
458                        idempotency_token,
459                        debug_info,
460                        INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
461                        if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
462                            CLAIM_COMPACTION_PERCENT.get(cfg)
463                        } else {
464                            0
465                        },
466                        Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
467                            .ok()
468                            .as_ref(),
469                    )
470                })
471                .await;
472            let (seqno, res, routine) = match cmd_res {
473                Ok(x) => x,
474                Err(err) => {
475                    // These are rare and interesting enough that we always log
476                    // them at info!.
477                    info!(
478                        "compare_and_append received an indeterminate error, retrying in {:?}: {}",
479                        retry.next_sleep(),
480                        err
481                    );
482                    if indeterminate.is_none() {
483                        indeterminate = Some(err);
484                    }
485                    retry = retry.sleep().await;
486                    continue;
487                }
488            };
489            match res {
490                Ok(merge_reqs) => {
491                    // We got explicit confirmation that we succeeded, so
492                    // anything that happened in a previous retry is irrelevant.
493                    let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
494                    for req in merge_reqs {
495                        let req = CompactReq {
496                            shard_id: self.shard_id(),
497                            desc: req.desc,
498                            inputs: req
499                                .inputs
500                                .into_iter()
501                                .map(|b| Arc::unwrap_or_clone(b.batch))
502                                .collect(),
503                        };
504                        compact_reqs.push(req);
505                    }
506                    let writer_maintenance = WriterMaintenance {
507                        routine,
508                        compaction: compact_reqs,
509                    };
510
511                    if !writer_was_present {
512                        metrics.state.writer_added.inc();
513                    }
514                    for part in &batch.parts {
515                        if part.is_inline() {
516                            let bytes = u64::cast_from(part.inline_bytes());
517                            metrics.inline.part_commit_bytes.inc_by(bytes);
518                            metrics.inline.part_commit_count.inc();
519                        }
520                    }
521                    return CompareAndAppendRes::Success(seqno, writer_maintenance);
522                }
523                Err(CompareAndAppendBreak::AlreadyCommitted) => {
524                    // A previous iteration through this loop got an
525                    // Indeterminate error but was successful. Sanity check this
526                    // and pass along the good news.
527                    assert!(indeterminate.is_some());
528                    self.applier.metrics.cmds.compare_and_append_noop.inc();
529                    if !writer_was_present {
530                        metrics.state.writer_added.inc();
531                    }
532                    return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
533                }
534                Err(CompareAndAppendBreak::InvalidUsage(err)) => {
535                    // InvalidUsage is (or should be) a deterministic function
536                    // of the inputs and independent of anything in persist
537                    // state. It's handed back via a Break, so we never even try
538                    // to commit it. No network, no Indeterminate.
539                    assert_none!(indeterminate);
540                    return CompareAndAppendRes::InvalidUsage(err);
541                }
542                Err(CompareAndAppendBreak::InlineBackpressure) => {
543                    // We tried to write an inline part, but there was already
544                    // too much in state. Flush it out to s3 and try again.
545                    return CompareAndAppendRes::InlineBackpressure;
546                }
547                Err(CompareAndAppendBreak::Upper {
548                    shard_upper,
549                    writer_upper,
550                }) => {
551                    // NB the below intentionally compares to writer_upper
552                    // (because it gives a tighter bound on the bad case), but
553                    // returns shard_upper (what the persist caller cares
554                    // about).
555                    assert!(
556                        PartialOrder::less_equal(&writer_upper, &shard_upper),
557                        "{:?} vs {:?}",
558                        &writer_upper,
559                        &shard_upper
560                    );
561                    if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
562                        // No way this could have committed in some previous
563                        // attempt of this loop: the upper of the writer is
564                        // strictly less than the proposed new upper.
565                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
566                    }
567                    if indeterminate.is_none() {
568                        // No way this could have committed in some previous
569                        // attempt of this loop: we never saw an indeterminate
570                        // error (thus there was no previous iteration of the
571                        // loop).
572                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
573                    }
574                    // This is the bad case. We can't distinguish if some
575                    // previous attempt that got an Indeterminate error
576                    // succeeded or not. This should be sufficiently rare in
577                    // practice (hopefully ~never) that we give up and let
578                    // process restart fix things. See the big comment above for
579                    // more context.
580                    //
581                    // NB: This is intentionally not a halt! because it's quite
582                    // unexpected.
583                    panic!(
584                        concat!(
585                            "cannot distinguish compare_and_append success or failure ",
586                            "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
587                        ),
588                        batch.desc.lower().elements(),
589                        batch.desc.upper().elements(),
590                        writer_upper.elements(),
591                        shard_upper.elements(),
592                        indeterminate,
593                    );
594                }
595            };
596        }
597    }
598
599    pub async fn merge_res(
600        &self,
601        res: &FueledMergeRes<T>,
602    ) -> (ApplyMergeResult, RoutineMaintenance) {
603        let metrics = Arc::clone(&self.applier.metrics);
604
605        // SUBTLE! If Machine::merge_res returns false, the blobs referenced in
606        // compaction output are deleted so we don't leak them. Naively passing
607        // back the value returned by State::apply_merge_res might give a false
608        // negative in the presence of retries and Indeterminate errors.
609        // Specifically, something like the following:
610        //
611        // - We try to apply_merge_res, it matches.
612        // - When apply_unbatched_cmd goes to commit the new state, the
613        //   Consensus::compare_and_set returns an Indeterminate error (but
614        //   actually succeeds). The committed State now contains references to
615        //   the compaction output blobs.
616        // - Machine::apply_unbatched_idempotent_cmd retries the Indeterminate
617        //   error. For whatever reason, this time though it doesn't match
618        //   (maybe the batches simply get grouped difference when deserialized
619        //   from state, or more unavoidably perhaps another compaction
620        //   happens).
621        // - This now bubbles up applied=false to the caller, which uses it as a
622        //   signal that the blobs in the compaction output should be deleted so
623        //   that we don't leak them.
624        // - We now contain references in committed State to blobs that don't
625        //   exist.
626        //
627        // The fix is to keep track of whether applied ever was true, even for a
628        // compare_and_set that returned an Indeterminate error. This has the
629        // chance of false positive (leaking a blob) but that's better than a
630        // false negative (a blob we can never recover referenced by state). We
631        // anyway need a mechanism to clean up leaked blobs because of process
632        // crashes.
633        let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
634        let (_seqno, _apply_merge_result, maintenance) = self
635            .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
636                let ret = state.apply_merge_res(res);
637                if let Continue(result) = ret {
638                    // record if we've ever applied the merge
639                    if result.applied() {
640                        merge_result_ever_applied = result;
641                    }
642                    // otherwise record the most granular reason for _not_
643                    // applying the merge when there was a matching batch
644                    if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
645                    {
646                        merge_result_ever_applied = result;
647                    }
648                }
649                ret
650            })
651            .await;
652        (merge_result_ever_applied, maintenance)
653    }
654
655    pub async fn downgrade_since(
656        &self,
657        reader_id: &LeasedReaderId,
658        outstanding_seqno: Option<SeqNo>,
659        new_since: &Antichain<T>,
660        heartbeat_timestamp_ms: u64,
661    ) -> (SeqNo, Since<T>, RoutineMaintenance) {
662        let metrics = Arc::clone(&self.applier.metrics);
663        self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
664            state.downgrade_since(
665                reader_id,
666                seqno,
667                outstanding_seqno,
668                new_since,
669                heartbeat_timestamp_ms,
670            )
671        })
672        .await
673    }
674
675    pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
676        &self,
677        reader_id: &CriticalReaderId,
678        expected_opaque: &O,
679        (new_opaque, new_since): (&O, &Antichain<T>),
680    ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
681        let metrics = Arc::clone(&self.applier.metrics);
682        let (_seqno, res, maintenance) = self
683            .apply_unbatched_idempotent_cmd(
684                &metrics.cmds.compare_and_downgrade_since,
685                |_seqno, _cfg, state| {
686                    state.compare_and_downgrade_since::<O>(
687                        reader_id,
688                        expected_opaque,
689                        (new_opaque, new_since),
690                    )
691                },
692            )
693            .await;
694
695        match res {
696            Ok(since) => (Ok(since), maintenance),
697            Err((opaque, since)) => (Err((opaque, since)), maintenance),
698        }
699    }
700
701    pub async fn heartbeat_leased_reader(
702        &self,
703        reader_id: &LeasedReaderId,
704        heartbeat_timestamp_ms: u64,
705    ) -> (SeqNo, bool, RoutineMaintenance) {
706        let metrics = Arc::clone(&self.applier.metrics);
707        let (seqno, existed, maintenance) = self
708            .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
709                state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
710            })
711            .await;
712        (seqno, existed, maintenance)
713    }
714
715    pub async fn expire_leased_reader(
716        &self,
717        reader_id: &LeasedReaderId,
718    ) -> (SeqNo, RoutineMaintenance) {
719        let metrics = Arc::clone(&self.applier.metrics);
720        let (seqno, _existed, maintenance) = self
721            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
722                state.expire_leased_reader(reader_id)
723            })
724            .await;
725        (seqno, maintenance)
726    }
727
728    #[allow(dead_code)] // TODO(bkirwi): remove this when since behaviour on expiry has settled
729    pub async fn expire_critical_reader(
730        &self,
731        reader_id: &CriticalReaderId,
732    ) -> (SeqNo, RoutineMaintenance) {
733        let metrics = Arc::clone(&self.applier.metrics);
734        let (seqno, _existed, maintenance) = self
735            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
736                state.expire_critical_reader(reader_id)
737            })
738            .await;
739        (seqno, maintenance)
740    }
741
742    pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
743        let metrics = Arc::clone(&self.applier.metrics);
744        let (seqno, _existed, maintenance) = self
745            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
746                state.expire_writer(writer_id)
747            })
748            .await;
749        metrics.state.writer_removed.inc();
750        (seqno, maintenance)
751    }
752
753    pub fn is_finalized(&self) -> bool {
754        self.applier.is_finalized()
755    }
756
757    /// See [crate::PersistClient::get_schema].
758    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
759        self.applier.get_schema(schema_id)
760    }
761
762    /// See [crate::PersistClient::latest_schema].
763    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
764        self.applier.latest_schema()
765    }
766
767    /// See [crate::PersistClient::compare_and_evolve_schema].
768    ///
769    /// TODO: Unify this with [Self::register_schema]?
770    pub async fn compare_and_evolve_schema(
771        &self,
772        expected: SchemaId,
773        key_schema: &K::Schema,
774        val_schema: &V::Schema,
775    ) -> (CaESchema<K, V>, RoutineMaintenance) {
776        let metrics = Arc::clone(&self.applier.metrics);
777        let (_seqno, state, maintenance) = self
778            .apply_unbatched_idempotent_cmd(
779                &metrics.cmds.compare_and_evolve_schema,
780                |_seqno, _cfg, state| {
781                    state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
782                },
783            )
784            .await;
785        (state, maintenance)
786    }
787
788    async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
789        let metrics = Arc::clone(&self.applier.metrics);
790        let mut retry = self
791            .applier
792            .metrics
793            .retries
794            .idempotent_cmd
795            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
796        loop {
797            let res = self
798                .applier
799                .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
800                    state.become_tombstone_and_shrink()
801                })
802                .await;
803            let err = match res {
804                Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
805                Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
806                    return Ok((false, maintenance));
807                }
808                Err(err) => err,
809            };
810            if retry.attempt() >= INFO_MIN_ATTEMPTS {
811                info!(
812                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
813                    retry.next_sleep(),
814                    err
815                );
816            } else {
817                debug!(
818                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
819                    retry.next_sleep(),
820                    err
821                );
822            }
823            retry = retry.sleep().await;
824        }
825    }
826
827    pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
828        self.applier.check_since_upper_both_empty()?;
829
830        let mut maintenance = RoutineMaintenance::default();
831
832        loop {
833            let (made_progress, more_maintenance) = self.tombstone_step().await?;
834            maintenance.merge(more_maintenance);
835            if !made_progress {
836                break;
837            }
838        }
839
840        Ok(maintenance)
841    }
842
843    pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
844        let start = Instant::now();
845        let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
846            Ok(x) => return Ok(x),
847            Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
848            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
849                return Err(Since(since));
850            }
851        };
852
853        // The latest state still couldn't serve this as_of: watch+sleep in a
854        // loop until it's ready.
855        let mut watch = self.applier.watch();
856        let watch = &mut watch;
857        let sleeps = self
858            .applier
859            .metrics
860            .retries
861            .snapshot
862            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
863
864        enum Wake<'a, K, V, T, D> {
865            Watch(&'a mut StateWatch<K, V, T, D>),
866            Sleep(MetricsRetryStream),
867        }
868        let mut watch_fut = std::pin::pin!(
869            watch
870                .wait_for_seqno_ge(seqno.next())
871                .map(Wake::Watch)
872                .instrument(trace_span!("snapshot::watch")),
873        );
874        let mut sleep_fut = std::pin::pin!(
875            sleeps
876                .sleep()
877                .map(Wake::Sleep)
878                .instrument(trace_span!("snapshot::sleep")),
879        );
880
881        // To reduce log spam, we log "not yet available" only once at info if
882        // it passes a certain threshold. Then, if it did one info log, we log
883        // again at info when it resolves.
884        let mut logged_at_info = false;
885        loop {
886            // Use a duration based threshold here instead of the usual
887            // INFO_MIN_ATTEMPTS because here we're waiting on an
888            // external thing to arrive.
889            if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
890                logged_at_info = true;
891                info!(
892                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
893                    self.applier.shard_metrics.name,
894                    self.shard_id(),
895                    as_of.elements(),
896                    seqno,
897                    upper.elements(),
898                );
899            } else {
900                debug!(
901                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
902                    self.applier.shard_metrics.name,
903                    self.shard_id(),
904                    as_of.elements(),
905                    seqno,
906                    upper.elements(),
907                );
908            }
909
910            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
911                future::Either::Left((wake, _)) => wake,
912                future::Either::Right((wake, _)) => wake,
913            };
914            // Note that we don't need to fetch in the Watch case, because the
915            // Watch wakeup is a signal that the shared state has already been
916            // updated.
917            match &wake {
918                Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
919                Wake::Sleep(_) => {
920                    self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
921                    self.applier.fetch_and_update_state(Some(seqno)).await;
922                }
923            }
924
925            (seqno, upper) = match self.applier.snapshot(as_of) {
926                Ok(x) => {
927                    if logged_at_info {
928                        info!(
929                            "snapshot {} {} as of {:?} now available",
930                            self.applier.shard_metrics.name,
931                            self.shard_id(),
932                            as_of.elements(),
933                        );
934                    }
935                    return Ok(x);
936                }
937                Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
938                    // The upper isn't ready yet, fall through and try again.
939                    (seqno, upper)
940                }
941                Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
942                    return Err(Since(since));
943                }
944            };
945
946            match wake {
947                Wake::Watch(watch) => {
948                    watch_fut.set(
949                        watch
950                            .wait_for_seqno_ge(seqno.next())
951                            .map(Wake::Watch)
952                            .instrument(trace_span!("snapshot::watch")),
953                    );
954                }
955                Wake::Sleep(sleeps) => {
956                    debug!(
957                        "snapshot {} {} sleeping for {:?}",
958                        self.applier.shard_metrics.name,
959                        self.shard_id(),
960                        sleeps.next_sleep()
961                    );
962                    sleep_fut.set(
963                        sleeps
964                            .sleep()
965                            .map(Wake::Sleep)
966                            .instrument(trace_span!("snapshot::sleep")),
967                    );
968                }
969            }
970        }
971    }
972
973    // NB: Unlike the other methods here, this one is read-only.
974    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
975        match self.applier.verify_listen(as_of) {
976            Ok(Ok(())) => Ok(()),
977            Ok(Err(Upper(_))) => {
978                // The upper may not be ready yet (maybe it would be ready if we
979                // re-fetched state), but that's okay! One way to think of
980                // Listen is as an async stream where creating the stream at any
981                // legal as_of does not block but then updates trickle in once
982                // they are available.
983                Ok(())
984            }
985            Err(Since(since)) => Err(Since(since)),
986        }
987    }
988
989    pub async fn next_listen_batch(
990        &self,
991        frontier: &Antichain<T>,
992        watch: &mut StateWatch<K, V, T, D>,
993        reader_id: Option<&LeasedReaderId>,
994        // If Some, an override for the default listen sleep retry parameters.
995        retry: Option<RetryParameters>,
996    ) -> HollowBatch<T> {
997        let mut seqno = match self.applier.next_listen_batch(frontier) {
998            Ok(b) => return b,
999            Err(seqno) => seqno,
1000        };
1001
1002        // The latest state still doesn't have a new frontier for us:
1003        // watch+sleep in a loop until it does.
1004        let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
1005        let sleeps = self
1006            .applier
1007            .metrics
1008            .retries
1009            .next_listen_batch
1010            .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
1011
1012        enum Wake<'a, K, V, T, D> {
1013            Watch(&'a mut StateWatch<K, V, T, D>),
1014            Sleep(MetricsRetryStream),
1015        }
1016        let mut watch_fut = std::pin::pin!(
1017            watch
1018                .wait_for_seqno_ge(seqno.next())
1019                .map(Wake::Watch)
1020                .instrument(trace_span!("snapshot::watch"))
1021        );
1022        let mut sleep_fut = std::pin::pin!(
1023            sleeps
1024                .sleep()
1025                .map(Wake::Sleep)
1026                .instrument(trace_span!("snapshot::sleep"))
1027        );
1028
1029        loop {
1030            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
1031                future::Either::Left((wake, _)) => wake,
1032                future::Either::Right((wake, _)) => wake,
1033            };
1034            // Note that we don't need to fetch in the Watch case, because the
1035            // Watch wakeup is a signal that the shared state has already been
1036            // updated.
1037            match &wake {
1038                Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
1039                Wake::Sleep(_) => {
1040                    self.applier.metrics.watch.listen_woken_via_sleep.inc();
1041                    self.applier.fetch_and_update_state(Some(seqno)).await;
1042                }
1043            }
1044
1045            seqno = match self.applier.next_listen_batch(frontier) {
1046                Ok(b) => {
1047                    match &wake {
1048                        Wake::Watch(_) => {
1049                            self.applier.metrics.watch.listen_resolved_via_watch.inc()
1050                        }
1051                        Wake::Sleep(_) => {
1052                            self.applier.metrics.watch.listen_resolved_via_sleep.inc()
1053                        }
1054                    }
1055                    return b;
1056                }
1057                Err(seqno) => seqno,
1058            };
1059
1060            // Wait a bit and try again. Intentionally don't ever log
1061            // this at info level.
1062            match wake {
1063                Wake::Watch(watch) => {
1064                    watch_fut.set(
1065                        watch
1066                            .wait_for_seqno_ge(seqno.next())
1067                            .map(Wake::Watch)
1068                            .instrument(trace_span!("snapshot::watch")),
1069                    );
1070                }
1071                Wake::Sleep(sleeps) => {
1072                    debug!(
1073                        "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1074                        reader_id,
1075                        self.applier.shard_metrics.name,
1076                        self.shard_id(),
1077                        sleeps.next_sleep()
1078                    );
1079                    sleep_fut.set(
1080                        sleeps
1081                            .sleep()
1082                            .map(Wake::Sleep)
1083                            .instrument(trace_span!("snapshot::sleep")),
1084                    );
1085                }
1086            }
1087        }
1088    }
1089
1090    async fn apply_unbatched_idempotent_cmd<
1091        R,
1092        WorkFn: FnMut(
1093            SeqNo,
1094            &PersistConfig,
1095            &mut StateCollections<T>,
1096        ) -> ControlFlow<NoOpStateTransition<R>, R>,
1097    >(
1098        &self,
1099        cmd: &CmdMetrics,
1100        mut work_fn: WorkFn,
1101    ) -> (SeqNo, R, RoutineMaintenance) {
1102        let mut retry = self
1103            .applier
1104            .metrics
1105            .retries
1106            .idempotent_cmd
1107            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1108        loop {
1109            match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1110                Ok((seqno, x, maintenance)) => match x {
1111                    Ok(x) => {
1112                        return (seqno, x, maintenance);
1113                    }
1114                    Err(NoOpStateTransition(x)) => {
1115                        return (seqno, x, maintenance);
1116                    }
1117                },
1118                Err(err) => {
1119                    if retry.attempt() >= INFO_MIN_ATTEMPTS {
1120                        info!(
1121                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1122                            cmd.name,
1123                            retry.next_sleep(),
1124                            err
1125                        );
1126                    } else {
1127                        debug!(
1128                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1129                            cmd.name,
1130                            retry.next_sleep(),
1131                            err
1132                        );
1133                    }
1134                    retry = retry.sleep().await;
1135                    continue;
1136                }
1137            }
1138        }
1139    }
1140}
1141
1142pub(crate) struct ExpireFn(
1143    /// This is stored on WriteHandle and ReadHandle, which we require to be
1144    /// Send + Sync, but the Future is only Send and not Sync. Instead store a
1145    /// FnOnce that returns the Future. This could also be made an `IntoFuture`,
1146    /// once producing one of those is made easier.
1147    pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1148);
1149
1150impl Debug for ExpireFn {
1151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1152        f.debug_struct("ExpireFn").finish_non_exhaustive()
1153    }
1154}
1155
1156#[derive(Debug)]
1157pub(crate) enum CompareAndAppendRes<T> {
1158    Success(SeqNo, WriterMaintenance<T>),
1159    InvalidUsage(InvalidUsage<T>),
1160    UpperMismatch(SeqNo, Antichain<T>),
1161    InlineBackpressure,
1162}
1163
1164#[cfg(test)]
1165impl<T: Debug> CompareAndAppendRes<T> {
1166    #[track_caller]
1167    fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1168        match self {
1169            CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1170            x => panic!("{:?}", x),
1171        }
1172    }
1173}
1174
1175impl<K, V, T, D> Machine<K, V, T, D>
1176where
1177    K: Debug + Codec,
1178    V: Debug + Codec,
1179    T: Timestamp + Lattice + Codec64 + Sync,
1180    D: Semigroup + Codec64 + Send + Sync,
1181{
1182    #[allow(clippy::unused_async)]
1183    pub async fn start_reader_heartbeat_tasks(
1184        self,
1185        reader_id: LeasedReaderId,
1186        gc: GarbageCollector<K, V, T, D>,
1187    ) -> Vec<JoinHandle<()>> {
1188        let mut ret = Vec::new();
1189        let metrics = Arc::clone(&self.applier.metrics);
1190
1191        // TODO: In response to a production incident, this runs the heartbeat
1192        // task on both the in-context tokio runtime and persist's isolated
1193        // runtime. We think we were seeing tasks (including this one) get stuck
1194        // indefinitely in tokio while waiting for a runtime worker. This could
1195        // happen if some other task in that runtime never yields. It's possible
1196        // that one of the two runtimes is healthy while the other isn't (this
1197        // was inconclusive in the incident debugging), and the heartbeat task
1198        // is fairly lightweight, so run a copy in each in case that helps.
1199        //
1200        // The real fix here is to find the misbehaving task and fix it. Remove
1201        // this duplication when that happens.
1202        let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1203        ret.push(mz_ore::task::spawn(|| name, {
1204            let machine = self.clone();
1205            let reader_id = reader_id.clone();
1206            let gc = gc.clone();
1207            metrics
1208                .tasks
1209                .heartbeat_read
1210                .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1211        }));
1212
1213        let isolated_runtime = Arc::clone(&self.isolated_runtime);
1214        let name = format!(
1215            "persist::heartbeat_read_isolated({},{})",
1216            self.shard_id(),
1217            reader_id
1218        );
1219        ret.push(
1220            isolated_runtime.spawn_named(
1221                || name,
1222                metrics
1223                    .tasks
1224                    .heartbeat_read
1225                    .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1226            ),
1227        );
1228
1229        ret
1230    }
1231
1232    async fn reader_heartbeat_task(
1233        machine: Self,
1234        reader_id: LeasedReaderId,
1235        gc: GarbageCollector<K, V, T, D>,
1236    ) {
1237        let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1238        loop {
1239            let before_sleep = Instant::now();
1240            tokio::time::sleep(sleep_duration).await;
1241
1242            let elapsed_since_before_sleeping = before_sleep.elapsed();
1243            if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1244                warn!(
1245                    "reader ({}) of shard ({}) went {}s between heartbeats",
1246                    reader_id,
1247                    machine.shard_id(),
1248                    elapsed_since_before_sleeping.as_secs_f64()
1249                );
1250            }
1251
1252            let before_heartbeat = Instant::now();
1253            let (_seqno, existed, maintenance) = machine
1254                .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1255                .await;
1256            maintenance.start_performing(&machine, &gc);
1257
1258            let elapsed_since_heartbeat = before_heartbeat.elapsed();
1259            if elapsed_since_heartbeat > Duration::from_secs(60) {
1260                warn!(
1261                    "reader ({}) of shard ({}) heartbeat call took {}s",
1262                    reader_id,
1263                    machine.shard_id(),
1264                    elapsed_since_heartbeat.as_secs_f64(),
1265                );
1266            }
1267
1268            if !existed {
1269                // If the read handle was intentionally expired, this task
1270                // *should* be aborted before it observes the expiration. So if
1271                // we get here, this task somehow failed to keep the read lease
1272                // alive. Warn loudly, because there's now a live read handle to
1273                // an expired shard that will panic if used, but don't panic,
1274                // just in case there is some edge case that results in this
1275                // task observing the intentional expiration of a read handle.
1276                warn!(
1277                    "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1278                     while read handle is live",
1279                    reader_id,
1280                    machine.shard_id(),
1281                );
1282                return;
1283            }
1284        }
1285    }
1286}
1287
1288pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1289    "persist_next_listen_batch_retryer_fixed_sleep",
1290    Duration::from_millis(1200), // pubsub is on by default!
1291    "\
1292    The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1293);
1294
1295pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1296    "persist_next_listen_batch_retryer_initial_backoff",
1297    Duration::from_millis(100), // pubsub is on by default!
1298    "The initial backoff when polling for new batches from a Listen or Subscribe.",
1299);
1300
1301pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1302    "persist_next_listen_batch_retryer_multiplier",
1303    2,
1304    "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1305);
1306
1307pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1308    "persist_next_listen_batch_retryer_clamp",
1309    Duration::from_secs(16), // pubsub is on by default!
1310    "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1311);
1312
1313fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1314    RetryParameters {
1315        fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1316        initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1317        multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1318        clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1319    }
1320}
1321
1322pub const INFO_MIN_ATTEMPTS: usize = 3;
1323
1324pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1325where
1326    F: std::future::Future<Output = Result<R, ExternalError>>,
1327    WorkFn: FnMut() -> F,
1328{
1329    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1330    loop {
1331        match work_fn().await {
1332            Ok(x) => {
1333                if retry.attempt() > 0 {
1334                    debug!(
1335                        "external operation {} succeeded after failing at least once",
1336                        metrics.name,
1337                    );
1338                }
1339                return x;
1340            }
1341            Err(err) => {
1342                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1343                    info!(
1344                        "external operation {} failed, retrying in {:?}: {}",
1345                        metrics.name,
1346                        retry.next_sleep(),
1347                        err.display_with_causes()
1348                    );
1349                } else {
1350                    debug!(
1351                        "external operation {} failed, retrying in {:?}: {}",
1352                        metrics.name,
1353                        retry.next_sleep(),
1354                        err.display_with_causes()
1355                    );
1356                }
1357                retry = retry.sleep().await;
1358            }
1359        }
1360    }
1361}
1362
1363pub async fn retry_determinate<R, F, WorkFn>(
1364    metrics: &RetryMetrics,
1365    mut work_fn: WorkFn,
1366) -> Result<R, Indeterminate>
1367where
1368    F: std::future::Future<Output = Result<R, ExternalError>>,
1369    WorkFn: FnMut() -> F,
1370{
1371    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1372    loop {
1373        match work_fn().await {
1374            Ok(x) => {
1375                if retry.attempt() > 0 {
1376                    debug!(
1377                        "external operation {} succeeded after failing at least once",
1378                        metrics.name,
1379                    );
1380                }
1381                return Ok(x);
1382            }
1383            Err(ExternalError::Determinate(err)) => {
1384                // The determinate "could not serialize access" errors
1385                // happen often enough in dev (which uses Postgres) that
1386                // it's impeding people's work. At the same time, it's been
1387                // a source of confusion for eng. The situation is much
1388                // better on CRDB and we have metrics coverage in prod, so
1389                // this is redundant enough that it's more hurtful than
1390                // helpful. As a result, this intentionally ignores
1391                // INFO_MIN_ATTEMPTS and always logs at debug.
1392                debug!(
1393                    "external operation {} failed, retrying in {:?}: {}",
1394                    metrics.name,
1395                    retry.next_sleep(),
1396                    err.display_with_causes()
1397                );
1398                retry = retry.sleep().await;
1399                continue;
1400            }
1401            Err(ExternalError::Indeterminate(x)) => return Err(x),
1402        }
1403    }
1404}
1405
1406#[cfg(test)]
1407pub mod datadriven {
1408    use std::collections::BTreeMap;
1409    use std::pin::pin;
1410    use std::sync::{Arc, LazyLock};
1411
1412    use anyhow::anyhow;
1413    use differential_dataflow::consolidation::consolidate_updates;
1414    use differential_dataflow::trace::Description;
1415    use futures::StreamExt;
1416    use mz_dyncfg::{ConfigUpdates, ConfigVal};
1417    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1418    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1419
1420    use crate::batch::{
1421        BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1422        BatchParts, validate_truncate_batch,
1423    };
1424    use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1425    use crate::fetch::EncodedPart;
1426    use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1427    use crate::internal::datadriven::DirectiveArgs;
1428    use crate::internal::encoding::Schemas;
1429    use crate::internal::gc::GcReq;
1430    use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1431    use crate::internal::state::{BatchPart, RunOrder, RunPart};
1432    use crate::internal::state_versions::EncodedRollup;
1433    use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1434    use crate::rpc::NoopPubSubSender;
1435    use crate::tests::new_test_client;
1436    use crate::write::COMBINE_INLINE_WRITES;
1437    use crate::{GarbageCollector, PersistClient};
1438
1439    use super::*;
1440
1441    static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1442        id: Some(SchemaId(0)),
1443        key: Arc::new(StringSchema),
1444        val: Arc::new(UnitSchema),
1445    });
1446
1447    /// Shared state for a single [crate::internal::machine] [datadriven::TestFile].
1448    #[derive(Debug)]
1449    pub struct MachineState {
1450        pub client: PersistClient,
1451        pub shard_id: ShardId,
1452        pub state_versions: Arc<StateVersions>,
1453        pub machine: Machine<String, (), u64, i64>,
1454        pub gc: GarbageCollector<String, (), u64, i64>,
1455        pub batches: BTreeMap<String, HollowBatch<u64>>,
1456        pub rollups: BTreeMap<String, EncodedRollup>,
1457        pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1458        pub routine: Vec<RoutineMaintenance>,
1459    }
1460
1461    impl MachineState {
1462        pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1463            let shard_id = ShardId::new();
1464            let client = new_test_client(dyncfgs).await;
1465            // Reset blob_target_size. Individual batch writes and compactions
1466            // can override it with an arg.
1467            client
1468                .cfg
1469                .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1470            // Our structured compaction code uses slightly different estimates
1471            // for array size than the old path, which can affect the results of
1472            // some compaction tests.
1473            client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1474            let state_versions = Arc::new(StateVersions::new(
1475                client.cfg.clone(),
1476                Arc::clone(&client.consensus),
1477                Arc::clone(&client.blob),
1478                Arc::clone(&client.metrics),
1479            ));
1480            let machine = Machine::new(
1481                client.cfg.clone(),
1482                shard_id,
1483                Arc::clone(&client.metrics),
1484                Arc::clone(&state_versions),
1485                Arc::clone(&client.shared_states),
1486                Arc::new(NoopPubSubSender),
1487                Arc::clone(&client.isolated_runtime),
1488                Diagnostics::for_tests(),
1489            )
1490            .await
1491            .expect("codecs should match");
1492            let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1493            MachineState {
1494                shard_id,
1495                client,
1496                state_versions,
1497                machine,
1498                gc,
1499                batches: BTreeMap::default(),
1500                rollups: BTreeMap::default(),
1501                listens: BTreeMap::default(),
1502                routine: Vec::new(),
1503            }
1504        }
1505
1506        fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1507            Batch::new(
1508                true,
1509                Arc::clone(&self.client.metrics),
1510                Arc::clone(&self.client.blob),
1511                self.client.metrics.shards.shard(&self.shard_id, "test"),
1512                self.client.cfg.build_version.clone(),
1513                hollow,
1514            )
1515        }
1516    }
1517
1518    /// Scans consensus and returns all states with their SeqNos
1519    /// and which batches they reference
1520    pub async fn consensus_scan(
1521        datadriven: &MachineState,
1522        args: DirectiveArgs<'_>,
1523    ) -> Result<String, anyhow::Error> {
1524        let from = args.expect("from_seqno");
1525
1526        let mut states = datadriven
1527            .state_versions
1528            .fetch_all_live_states::<u64>(datadriven.shard_id)
1529            .await
1530            .expect("should only be called on an initialized shard")
1531            .check_ts_codec()
1532            .expect("shard codecs should not change");
1533        let mut s = String::new();
1534        while let Some(x) = states.next(|_| {}) {
1535            if x.seqno < from {
1536                continue;
1537            }
1538            let rollups: Vec<_> = x
1539                .collections
1540                .rollups
1541                .keys()
1542                .map(|seqno| seqno.to_string())
1543                .collect();
1544            let batches: Vec<_> = x
1545                .collections
1546                .trace
1547                .batches()
1548                .filter(|b| !b.is_empty())
1549                .filter_map(|b| {
1550                    datadriven
1551                        .batches
1552                        .iter()
1553                        .find(|(_, original_batch)| original_batch.parts == b.parts)
1554                        .map(|(batch_name, _)| batch_name.to_owned())
1555                })
1556                .collect();
1557            write!(
1558                s,
1559                "seqno={} batches={} rollups={}\n",
1560                x.seqno,
1561                batches.join(","),
1562                rollups.join(","),
1563            );
1564        }
1565        Ok(s)
1566    }
1567
1568    pub async fn consensus_truncate(
1569        datadriven: &MachineState,
1570        args: DirectiveArgs<'_>,
1571    ) -> Result<String, anyhow::Error> {
1572        let to = args.expect("to_seqno");
1573        let removed = datadriven
1574            .client
1575            .consensus
1576            .truncate(&datadriven.shard_id.to_string(), to)
1577            .await
1578            .expect("valid truncation");
1579        Ok(format!("{}\n", removed))
1580    }
1581
1582    pub async fn blob_scan_batches(
1583        datadriven: &MachineState,
1584        _args: DirectiveArgs<'_>,
1585    ) -> Result<String, anyhow::Error> {
1586        let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1587
1588        let mut s = String::new();
1589        let () = datadriven
1590            .state_versions
1591            .blob
1592            .list_keys_and_metadata(&key_prefix, &mut |x| {
1593                let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1594                if let PartialBlobKey::Batch(_, _) = key {
1595                    write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1596                }
1597            })
1598            .await?;
1599        Ok(s)
1600    }
1601
1602    #[allow(clippy::unused_async)]
1603    pub async fn shard_desc(
1604        datadriven: &MachineState,
1605        _args: DirectiveArgs<'_>,
1606    ) -> Result<String, anyhow::Error> {
1607        Ok(format!(
1608            "since={:?} upper={:?}\n",
1609            datadriven.machine.applier.since().elements(),
1610            datadriven.machine.applier.clone_upper().elements()
1611        ))
1612    }
1613
1614    pub async fn downgrade_since(
1615        datadriven: &mut MachineState,
1616        args: DirectiveArgs<'_>,
1617    ) -> Result<String, anyhow::Error> {
1618        let since = args.expect_antichain("since");
1619        let seqno = args.optional("seqno");
1620        let reader_id = args.expect("reader_id");
1621        let (_, since, routine) = datadriven
1622            .machine
1623            .downgrade_since(
1624                &reader_id,
1625                seqno,
1626                &since,
1627                (datadriven.machine.applier.cfg.now)(),
1628            )
1629            .await;
1630        datadriven.routine.push(routine);
1631        Ok(format!(
1632            "{} {:?}\n",
1633            datadriven.machine.seqno(),
1634            since.0.elements()
1635        ))
1636    }
1637
1638    #[allow(clippy::unused_async)]
1639    pub async fn dyncfg(
1640        datadriven: &MachineState,
1641        args: DirectiveArgs<'_>,
1642    ) -> Result<String, anyhow::Error> {
1643        let mut updates = ConfigUpdates::default();
1644        for x in args.input.trim().split('\n') {
1645            match x.split(' ').collect::<Vec<_>>().as_slice() {
1646                &[name, val] => {
1647                    let config = datadriven
1648                        .client
1649                        .cfg
1650                        .entries()
1651                        .find(|x| x.name() == name)
1652                        .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1653                    match config.val() {
1654                        ConfigVal::Usize(_) => {
1655                            let val = val.parse().map_err(anyhow::Error::new)?;
1656                            updates.add_dynamic(name, ConfigVal::Usize(val));
1657                        }
1658                        ConfigVal::Bool(_) => {
1659                            let val = val.parse().map_err(anyhow::Error::new)?;
1660                            updates.add_dynamic(name, ConfigVal::Bool(val));
1661                        }
1662                        x => unimplemented!("dyncfg type: {:?}", x),
1663                    }
1664                }
1665                x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1666            }
1667        }
1668        updates.apply(&datadriven.client.cfg);
1669
1670        Ok("ok\n".to_string())
1671    }
1672
1673    pub async fn compare_and_downgrade_since(
1674        datadriven: &mut MachineState,
1675        args: DirectiveArgs<'_>,
1676    ) -> Result<String, anyhow::Error> {
1677        let expected_opaque: u64 = args.expect("expect_opaque");
1678        let new_opaque: u64 = args.expect("opaque");
1679        let new_since = args.expect_antichain("since");
1680        let reader_id = args.expect("reader_id");
1681        let (res, routine) = datadriven
1682            .machine
1683            .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1684            .await;
1685        datadriven.routine.push(routine);
1686        let since = res.map_err(|(opaque, since)| {
1687            anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1688        })?;
1689        Ok(format!(
1690            "{} {} {:?}\n",
1691            datadriven.machine.seqno(),
1692            new_opaque,
1693            since.0.elements()
1694        ))
1695    }
1696
1697    pub async fn write_rollup(
1698        datadriven: &mut MachineState,
1699        args: DirectiveArgs<'_>,
1700    ) -> Result<String, anyhow::Error> {
1701        let output = args.expect_str("output");
1702
1703        let rollup = datadriven
1704            .machine
1705            .applier
1706            .write_rollup_for_state()
1707            .await
1708            .expect("rollup");
1709
1710        datadriven
1711            .rollups
1712            .insert(output.to_string(), rollup.clone());
1713
1714        Ok(format!(
1715            "state={} diffs=[{}, {})\n",
1716            rollup.seqno,
1717            rollup._desc.lower().first().expect("seqno"),
1718            rollup._desc.upper().first().expect("seqno"),
1719        ))
1720    }
1721
1722    pub async fn add_rollup(
1723        datadriven: &mut MachineState,
1724        args: DirectiveArgs<'_>,
1725    ) -> Result<String, anyhow::Error> {
1726        let input = args.expect_str("input");
1727        let rollup = datadriven
1728            .rollups
1729            .get(input)
1730            .expect("unknown batch")
1731            .clone();
1732
1733        let (applied, maintenance) = datadriven
1734            .machine
1735            .add_rollup((rollup.seqno, &rollup.to_hollow()))
1736            .await;
1737
1738        if !applied {
1739            return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1740        }
1741
1742        datadriven.routine.push(maintenance);
1743        Ok(format!("{}\n", datadriven.machine.seqno()))
1744    }
1745
1746    pub async fn write_batch(
1747        datadriven: &mut MachineState,
1748        args: DirectiveArgs<'_>,
1749    ) -> Result<String, anyhow::Error> {
1750        let output = args.expect_str("output");
1751        let lower = args.expect_antichain("lower");
1752        let upper = args.expect_antichain("upper");
1753        assert!(PartialOrder::less_than(&lower, &upper));
1754        let since = args
1755            .optional_antichain("since")
1756            .unwrap_or_else(|| Antichain::from_elem(0));
1757        let target_size = args.optional("target_size");
1758        let parts_size_override = args.optional("parts_size_override");
1759        let consolidate = args.optional("consolidate").unwrap_or(true);
1760        let mut updates: Vec<_> = args
1761            .input
1762            .split('\n')
1763            .flat_map(DirectiveArgs::parse_update)
1764            .collect();
1765
1766        let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1767        if let Some(target_size) = target_size {
1768            cfg.blob_target_size = target_size;
1769        };
1770        if consolidate {
1771            consolidate_updates(&mut updates);
1772        }
1773        let run_order = if consolidate {
1774            cfg.preferred_order
1775        } else {
1776            RunOrder::Unordered
1777        };
1778        let parts = BatchParts::new_ordered(
1779            cfg.clone(),
1780            run_order,
1781            Arc::clone(&datadriven.client.metrics),
1782            Arc::clone(&datadriven.machine.applier.shard_metrics),
1783            datadriven.shard_id,
1784            Arc::clone(&datadriven.client.blob),
1785            Arc::clone(&datadriven.client.isolated_runtime),
1786            &datadriven.client.metrics.user,
1787        );
1788        let builder = BatchBuilderInternal::new(
1789            cfg.clone(),
1790            parts,
1791            Arc::clone(&datadriven.client.metrics),
1792            SCHEMAS.clone(),
1793            Arc::clone(&datadriven.client.blob),
1794            datadriven.shard_id.clone(),
1795            datadriven.client.cfg.build_version.clone(),
1796        );
1797        let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1798        for ((k, ()), t, d) in updates {
1799            builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1800        }
1801        let mut batch = builder.finish(upper).await?;
1802        // We can only reasonably use parts_size_override with hollow batches,
1803        // so if it's set, flush any inline batches out.
1804        if parts_size_override.is_some() {
1805            batch
1806                .flush_to_blob(
1807                    &cfg,
1808                    &datadriven.client.metrics.user,
1809                    &datadriven.client.isolated_runtime,
1810                    &SCHEMAS,
1811                )
1812                .await;
1813        }
1814        let batch = batch.into_hollow_batch();
1815
1816        if let Some(size) = parts_size_override {
1817            let mut batch = batch.clone();
1818            for part in batch.parts.iter_mut() {
1819                match part {
1820                    RunPart::Many(run) => run.max_part_bytes = size,
1821                    RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1822                    RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1823                }
1824            }
1825            datadriven.batches.insert(output.to_owned(), batch);
1826        } else {
1827            datadriven.batches.insert(output.to_owned(), batch.clone());
1828        }
1829        Ok(format!("parts={} len={}\n", batch.part_count(), batch.len))
1830    }
1831
1832    pub async fn fetch_batch(
1833        datadriven: &MachineState,
1834        args: DirectiveArgs<'_>,
1835    ) -> Result<String, anyhow::Error> {
1836        let input = args.expect_str("input");
1837        let stats = args.optional_str("stats");
1838        let batch = datadriven.batches.get(input).expect("unknown batch");
1839
1840        let mut s = String::new();
1841        let mut stream = pin!(
1842            batch
1843                .part_stream(
1844                    datadriven.shard_id,
1845                    &*datadriven.state_versions.blob,
1846                    &*datadriven.state_versions.metrics
1847                )
1848                .enumerate()
1849        );
1850        while let Some((idx, part)) = stream.next().await {
1851            let part = &*part?;
1852            write!(s, "<part {idx}>\n");
1853
1854            let lower = match part {
1855                BatchPart::Inline { updates, .. } => {
1856                    let updates: BlobTraceBatchPart<u64> =
1857                        updates.decode(&datadriven.client.metrics.columnar)?;
1858                    updates.structured_key_lower()
1859                }
1860                other => other.structured_key_lower(),
1861            };
1862
1863            if let Some(lower) = lower {
1864                if stats == Some("lower") {
1865                    writeln!(s, "<key lower={}>", lower.get())
1866                }
1867            }
1868
1869            match part {
1870                BatchPart::Hollow(part) => {
1871                    let blob_batch = datadriven
1872                        .client
1873                        .blob
1874                        .get(&part.key.complete(&datadriven.shard_id))
1875                        .await;
1876                    match blob_batch {
1877                        Ok(Some(_)) | Err(_) => {}
1878                        // don't try to fetch/print the keys of the batch part
1879                        // if the blob store no longer has it
1880                        Ok(None) => {
1881                            s.push_str("<empty>\n");
1882                            continue;
1883                        }
1884                    };
1885                }
1886                BatchPart::Inline { .. } => {}
1887            };
1888            let part = EncodedPart::fetch(
1889                &datadriven.shard_id,
1890                datadriven.client.blob.as_ref(),
1891                datadriven.client.metrics.as_ref(),
1892                datadriven.machine.applier.shard_metrics.as_ref(),
1893                &datadriven.client.metrics.read.batch_fetcher,
1894                &batch.desc,
1895                part,
1896            )
1897            .await
1898            .expect("invalid batch part");
1899            let part = part
1900                .normalize(&datadriven.client.metrics.columnar)
1901                .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1902
1903            for ((k, _v), t, d) in part
1904                .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1905                .expect("valid schemas")
1906            {
1907                writeln!(s, "{k} {t} {d}");
1908            }
1909        }
1910        if !s.is_empty() {
1911            for (idx, (_meta, run)) in batch.runs().enumerate() {
1912                write!(s, "<run {idx}>\n");
1913                for part in run {
1914                    let part_idx = batch
1915                        .parts
1916                        .iter()
1917                        .position(|p| p == part)
1918                        .expect("part should exist");
1919                    write!(s, "part {part_idx}\n");
1920                }
1921            }
1922        }
1923        Ok(s)
1924    }
1925
1926    #[allow(clippy::unused_async)]
1927    pub async fn truncate_batch_desc(
1928        datadriven: &mut MachineState,
1929        args: DirectiveArgs<'_>,
1930    ) -> Result<String, anyhow::Error> {
1931        let input = args.expect_str("input");
1932        let output = args.expect_str("output");
1933        let lower = args.expect_antichain("lower");
1934        let upper = args.expect_antichain("upper");
1935
1936        let mut batch = datadriven
1937            .batches
1938            .get(input)
1939            .expect("unknown batch")
1940            .clone();
1941        let truncated_desc = Description::new(lower, upper, batch.desc.since().clone());
1942        let () = validate_truncate_batch(&batch, &truncated_desc, false)?;
1943        batch.desc = truncated_desc;
1944        datadriven.batches.insert(output.to_owned(), batch.clone());
1945        Ok(format!("parts={} len={}\n", batch.part_count(), batch.len))
1946    }
1947
1948    #[allow(clippy::unused_async)]
1949    pub async fn set_batch_parts_size(
1950        datadriven: &mut MachineState,
1951        args: DirectiveArgs<'_>,
1952    ) -> Result<String, anyhow::Error> {
1953        let input = args.expect_str("input");
1954        let size = args.expect("size");
1955        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1956        for part in batch.parts.iter_mut() {
1957            match part {
1958                RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1959                _ => {
1960                    panic!("set_batch_parts_size only supports hollow parts")
1961                }
1962            }
1963        }
1964        Ok("ok\n".to_string())
1965    }
1966
1967    pub async fn compact(
1968        datadriven: &mut MachineState,
1969        args: DirectiveArgs<'_>,
1970    ) -> Result<String, anyhow::Error> {
1971        let output = args.expect_str("output");
1972        let lower = args.expect_antichain("lower");
1973        let upper = args.expect_antichain("upper");
1974        let since = args.expect_antichain("since");
1975        let target_size = args.optional("target_size");
1976        let memory_bound = args.optional("memory_bound");
1977
1978        let mut inputs = Vec::new();
1979        for input in args.args.get("inputs").expect("missing inputs") {
1980            inputs.push(
1981                datadriven
1982                    .batches
1983                    .get(input)
1984                    .expect("unknown batch")
1985                    .clone(),
1986            );
1987        }
1988
1989        let cfg = datadriven.client.cfg.clone();
1990        if let Some(target_size) = target_size {
1991            cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1992        };
1993        if let Some(memory_bound) = memory_bound {
1994            cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1995        }
1996        let req = CompactReq {
1997            shard_id: datadriven.shard_id,
1998            desc: Description::new(lower, upper, since),
1999            inputs,
2000        };
2001        let res = Compactor::<String, (), u64, i64>::compact(
2002            CompactConfig::new(&cfg, datadriven.shard_id),
2003            Arc::clone(&datadriven.client.blob),
2004            Arc::clone(&datadriven.client.metrics),
2005            Arc::clone(&datadriven.machine.applier.shard_metrics),
2006            Arc::clone(&datadriven.client.isolated_runtime),
2007            req,
2008            SCHEMAS.clone(),
2009        )
2010        .await?;
2011
2012        datadriven
2013            .batches
2014            .insert(output.to_owned(), res.output.clone());
2015        Ok(format!(
2016            "parts={} len={}\n",
2017            res.output.part_count(),
2018            res.output.len
2019        ))
2020    }
2021
2022    pub async fn clear_blob(
2023        datadriven: &MachineState,
2024        _args: DirectiveArgs<'_>,
2025    ) -> Result<String, anyhow::Error> {
2026        let mut to_delete = vec![];
2027        datadriven
2028            .client
2029            .blob
2030            .list_keys_and_metadata("", &mut |meta| {
2031                to_delete.push(meta.key.to_owned());
2032            })
2033            .await?;
2034        for blob in &to_delete {
2035            datadriven.client.blob.delete(blob).await?;
2036        }
2037        Ok(format!("deleted={}\n", to_delete.len()))
2038    }
2039
2040    pub async fn restore_blob(
2041        datadriven: &MachineState,
2042        _args: DirectiveArgs<'_>,
2043    ) -> Result<String, anyhow::Error> {
2044        let not_restored = crate::internal::restore::restore_blob(
2045            &datadriven.state_versions,
2046            datadriven.client.blob.as_ref(),
2047            &datadriven.client.cfg.build_version,
2048            datadriven.shard_id,
2049            &*datadriven.state_versions.metrics,
2050        )
2051        .await?;
2052        let mut out = String::new();
2053        for key in not_restored {
2054            writeln!(&mut out, "{key}");
2055        }
2056        Ok(out)
2057    }
2058
2059    #[allow(clippy::unused_async)]
2060    pub async fn rewrite_ts(
2061        datadriven: &mut MachineState,
2062        args: DirectiveArgs<'_>,
2063    ) -> Result<String, anyhow::Error> {
2064        let input = args.expect_str("input");
2065        let ts_rewrite = args.expect_antichain("frontier");
2066        let upper = args.expect_antichain("upper");
2067
2068        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2069        let () = batch
2070            .rewrite_ts(&ts_rewrite, upper)
2071            .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2072        Ok("ok\n".into())
2073    }
2074
2075    pub async fn gc(
2076        datadriven: &mut MachineState,
2077        args: DirectiveArgs<'_>,
2078    ) -> Result<String, anyhow::Error> {
2079        let new_seqno_since = args.expect("to_seqno");
2080
2081        let req = GcReq {
2082            shard_id: datadriven.shard_id,
2083            new_seqno_since,
2084        };
2085        let (maintenance, stats) =
2086            GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2087        datadriven.routine.push(maintenance);
2088
2089        Ok(format!(
2090            "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2091            datadriven.machine.seqno(),
2092            stats.batch_parts_deleted_from_blob,
2093            stats.rollups_deleted_from_blob,
2094            stats
2095                .truncated_consensus_to
2096                .iter()
2097                .map(|x| x.to_string())
2098                .collect::<Vec<_>>()
2099                .join(","),
2100            stats
2101                .rollups_removed_from_state
2102                .iter()
2103                .map(|x| x.to_string())
2104                .collect::<Vec<_>>()
2105                .join(","),
2106        ))
2107    }
2108
2109    pub async fn snapshot(
2110        datadriven: &MachineState,
2111        args: DirectiveArgs<'_>,
2112    ) -> Result<String, anyhow::Error> {
2113        let as_of = args.expect_antichain("as_of");
2114        let snapshot = datadriven
2115            .machine
2116            .snapshot(&as_of)
2117            .await
2118            .map_err(|err| anyhow!("{:?}", err))?;
2119
2120        let mut result = String::new();
2121
2122        for batch in snapshot {
2123            writeln!(
2124                result,
2125                "<batch {:?}-{:?}>",
2126                batch.desc.lower().elements(),
2127                batch.desc.upper().elements()
2128            );
2129            for (run, (_meta, parts)) in batch.runs().enumerate() {
2130                writeln!(result, "<run {run}>");
2131                let mut stream = pin!(
2132                    futures::stream::iter(parts)
2133                        .flat_map(|part| part.part_stream(
2134                            datadriven.shard_id,
2135                            &*datadriven.state_versions.blob,
2136                            &*datadriven.state_versions.metrics
2137                        ))
2138                        .enumerate()
2139                );
2140
2141                while let Some((idx, part)) = stream.next().await {
2142                    let part = &*part?;
2143                    writeln!(result, "<part {idx}>");
2144
2145                    let part = EncodedPart::fetch(
2146                        &datadriven.shard_id,
2147                        datadriven.client.blob.as_ref(),
2148                        datadriven.client.metrics.as_ref(),
2149                        datadriven.machine.applier.shard_metrics.as_ref(),
2150                        &datadriven.client.metrics.read.batch_fetcher,
2151                        &batch.desc,
2152                        part,
2153                    )
2154                    .await
2155                    .expect("invalid batch part");
2156                    let part = part
2157                        .normalize(&datadriven.client.metrics.columnar)
2158                        .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2159
2160                    let mut updates = Vec::new();
2161
2162                    for ((k, _v), mut t, d) in part
2163                        .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2164                        .expect("valid schemas")
2165                    {
2166                        t.advance_by(as_of.borrow());
2167                        updates.push((k, t, d));
2168                    }
2169
2170                    consolidate_updates(&mut updates);
2171
2172                    for (k, t, d) in updates {
2173                        writeln!(result, "{k} {t} {d}");
2174                    }
2175                }
2176            }
2177        }
2178
2179        Ok(result)
2180    }
2181
2182    pub async fn register_listen(
2183        datadriven: &mut MachineState,
2184        args: DirectiveArgs<'_>,
2185    ) -> Result<String, anyhow::Error> {
2186        let output = args.expect_str("output");
2187        let as_of = args.expect_antichain("as_of");
2188        let read = datadriven
2189            .client
2190            .open_leased_reader::<String, (), u64, i64>(
2191                datadriven.shard_id,
2192                Arc::new(StringSchema),
2193                Arc::new(UnitSchema),
2194                Diagnostics::for_tests(),
2195                true,
2196            )
2197            .await
2198            .expect("invalid shard types");
2199        let listen = read
2200            .listen(as_of)
2201            .await
2202            .map_err(|err| anyhow!("{:?}", err))?;
2203        datadriven.listens.insert(output.to_owned(), listen);
2204        Ok("ok\n".into())
2205    }
2206
2207    pub async fn listen_through(
2208        datadriven: &mut MachineState,
2209        args: DirectiveArgs<'_>,
2210    ) -> Result<String, anyhow::Error> {
2211        let input = args.expect_str("input");
2212        // It's not possible to listen _through_ the empty antichain, so this is
2213        // intentionally `expect` instead of `expect_antichain`.
2214        let frontier = args.expect("frontier");
2215        let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2216        let mut s = String::new();
2217        loop {
2218            for event in listen.fetch_next().await {
2219                match event {
2220                    ListenEvent::Updates(x) => {
2221                        for ((k, _v), t, d) in x.iter() {
2222                            write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2223                        }
2224                    }
2225                    ListenEvent::Progress(x) => {
2226                        if !x.less_than(&frontier) {
2227                            return Ok(s);
2228                        }
2229                    }
2230                }
2231            }
2232        }
2233    }
2234
2235    pub async fn register_critical_reader(
2236        datadriven: &mut MachineState,
2237        args: DirectiveArgs<'_>,
2238    ) -> Result<String, anyhow::Error> {
2239        let reader_id = args.expect("reader_id");
2240        let (state, maintenance) = datadriven
2241            .machine
2242            .register_critical_reader::<u64>(&reader_id, "tests")
2243            .await;
2244        datadriven.routine.push(maintenance);
2245        Ok(format!(
2246            "{} {:?}\n",
2247            datadriven.machine.seqno(),
2248            state.since.elements(),
2249        ))
2250    }
2251
2252    pub async fn register_leased_reader(
2253        datadriven: &mut MachineState,
2254        args: DirectiveArgs<'_>,
2255    ) -> Result<String, anyhow::Error> {
2256        let reader_id = args.expect("reader_id");
2257        let (reader_state, maintenance) = datadriven
2258            .machine
2259            .register_leased_reader(
2260                &reader_id,
2261                "tests",
2262                READER_LEASE_DURATION.get(&datadriven.client.cfg),
2263                (datadriven.client.cfg.now)(),
2264                false,
2265            )
2266            .await;
2267        datadriven.routine.push(maintenance);
2268        Ok(format!(
2269            "{} {:?}\n",
2270            datadriven.machine.seqno(),
2271            reader_state.since.elements(),
2272        ))
2273    }
2274
2275    pub async fn heartbeat_leased_reader(
2276        datadriven: &MachineState,
2277        args: DirectiveArgs<'_>,
2278    ) -> Result<String, anyhow::Error> {
2279        let reader_id = args.expect("reader_id");
2280        let _ = datadriven
2281            .machine
2282            .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2283            .await;
2284        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2285    }
2286
2287    pub async fn expire_critical_reader(
2288        datadriven: &mut MachineState,
2289        args: DirectiveArgs<'_>,
2290    ) -> Result<String, anyhow::Error> {
2291        let reader_id = args.expect("reader_id");
2292        let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2293        datadriven.routine.push(maintenance);
2294        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2295    }
2296
2297    pub async fn expire_leased_reader(
2298        datadriven: &mut MachineState,
2299        args: DirectiveArgs<'_>,
2300    ) -> Result<String, anyhow::Error> {
2301        let reader_id = args.expect("reader_id");
2302        let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2303        datadriven.routine.push(maintenance);
2304        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2305    }
2306
2307    pub async fn compare_and_append_batches(
2308        datadriven: &MachineState,
2309        args: DirectiveArgs<'_>,
2310    ) -> Result<String, anyhow::Error> {
2311        let expected_upper = args.expect_antichain("expected_upper");
2312        let new_upper = args.expect_antichain("new_upper");
2313
2314        let mut batches: Vec<Batch<String, (), u64, i64>> = args
2315            .args
2316            .get("batches")
2317            .expect("missing batches")
2318            .into_iter()
2319            .map(|batch| {
2320                let hollow = datadriven
2321                    .batches
2322                    .get(batch)
2323                    .expect("unknown batch")
2324                    .clone();
2325                datadriven.to_batch(hollow)
2326            })
2327            .collect();
2328
2329        let mut writer = datadriven
2330            .client
2331            .open_writer(
2332                datadriven.shard_id,
2333                Arc::new(StringSchema),
2334                Arc::new(UnitSchema),
2335                Diagnostics::for_tests(),
2336            )
2337            .await?;
2338
2339        let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2340
2341        let () = writer
2342            .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper)
2343            .await?
2344            .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2345
2346        writer.expire().await;
2347
2348        Ok("ok\n".into())
2349    }
2350
2351    pub async fn expire_writer(
2352        datadriven: &mut MachineState,
2353        args: DirectiveArgs<'_>,
2354    ) -> Result<String, anyhow::Error> {
2355        let writer_id = args.expect("writer_id");
2356        let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2357        datadriven.routine.push(maintenance);
2358        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2359    }
2360
2361    pub(crate) async fn finalize(
2362        datadriven: &mut MachineState,
2363        _args: DirectiveArgs<'_>,
2364    ) -> anyhow::Result<String> {
2365        let maintenance = datadriven.machine.become_tombstone().await?;
2366        datadriven.routine.push(maintenance);
2367        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2368    }
2369
2370    pub(crate) fn is_finalized(
2371        datadriven: &MachineState,
2372        _args: DirectiveArgs<'_>,
2373    ) -> anyhow::Result<String> {
2374        let seqno = datadriven.machine.seqno();
2375        let tombstone = datadriven.machine.is_finalized();
2376        Ok(format!("{seqno} {tombstone}\n"))
2377    }
2378
2379    pub async fn compare_and_append(
2380        datadriven: &mut MachineState,
2381        args: DirectiveArgs<'_>,
2382    ) -> Result<String, anyhow::Error> {
2383        let input = args.expect_str("input");
2384        let writer_id = args.expect("writer_id");
2385        let mut batch = datadriven
2386            .batches
2387            .get(input)
2388            .expect("unknown batch")
2389            .clone();
2390        let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2391        let now = (datadriven.client.cfg.now)();
2392
2393        let (id, maintenance) = datadriven
2394            .machine
2395            .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2396            .await;
2397        assert_eq!(id, SCHEMAS.id);
2398        datadriven.routine.push(maintenance);
2399        let maintenance = loop {
2400            let indeterminate = args
2401                .optional::<String>("prev_indeterminate")
2402                .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2403            let res = datadriven
2404                .machine
2405                .compare_and_append_idempotent(
2406                    &batch,
2407                    &writer_id,
2408                    now,
2409                    &token,
2410                    &HandleDebugState::default(),
2411                    indeterminate,
2412                )
2413                .await;
2414            match res {
2415                CompareAndAppendRes::Success(_, x) => break x,
2416                CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2417                    return Err(anyhow!("{:?}", Upper(upper)));
2418                }
2419                CompareAndAppendRes::InlineBackpressure => {
2420                    let mut b = datadriven.to_batch(batch.clone());
2421                    let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2422                    b.flush_to_blob(
2423                        &cfg,
2424                        &datadriven.client.metrics.user,
2425                        &datadriven.client.isolated_runtime,
2426                        &*SCHEMAS,
2427                    )
2428                    .await;
2429                    batch = b.into_hollow_batch();
2430                    continue;
2431                }
2432                _ => panic!("{:?}", res),
2433            };
2434        };
2435        // TODO: Don't throw away writer maintenance. It's slightly tricky
2436        // because we need a WriterId for Compactor.
2437        datadriven.routine.push(maintenance.routine);
2438        Ok(format!(
2439            "{} {:?}\n",
2440            datadriven.machine.seqno(),
2441            datadriven.machine.applier.clone_upper().elements(),
2442        ))
2443    }
2444
2445    pub async fn apply_merge_res(
2446        datadriven: &mut MachineState,
2447        args: DirectiveArgs<'_>,
2448    ) -> Result<String, anyhow::Error> {
2449        let input = args.expect_str("input");
2450        let batch = datadriven
2451            .batches
2452            .get(input)
2453            .expect("unknown batch")
2454            .clone();
2455        let (merge_res, maintenance) = datadriven
2456            .machine
2457            .merge_res(&FueledMergeRes { output: batch })
2458            .await;
2459        datadriven.routine.push(maintenance);
2460        Ok(format!(
2461            "{} {}\n",
2462            datadriven.machine.seqno(),
2463            merge_res.applied()
2464        ))
2465    }
2466
2467    pub async fn perform_maintenance(
2468        datadriven: &mut MachineState,
2469        _args: DirectiveArgs<'_>,
2470    ) -> Result<String, anyhow::Error> {
2471        let mut s = String::new();
2472        for maintenance in datadriven.routine.drain(..) {
2473            let () = maintenance
2474                .perform(&datadriven.machine, &datadriven.gc)
2475                .await;
2476            let () = datadriven
2477                .machine
2478                .applier
2479                .fetch_and_update_state(None)
2480                .await;
2481            write!(s, "{} ok\n", datadriven.machine.seqno());
2482        }
2483        Ok(s)
2484    }
2485}
2486
2487#[cfg(test)]
2488pub mod tests {
2489    use std::sync::Arc;
2490
2491    use mz_dyncfg::ConfigUpdates;
2492    use mz_ore::cast::CastFrom;
2493    use mz_ore::task::spawn;
2494    use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2495    use mz_persist::location::SeqNo;
2496    use timely::progress::Antichain;
2497
2498    use crate::ShardId;
2499    use crate::batch::BatchBuilderConfig;
2500    use crate::cache::StateCache;
2501    use crate::internal::gc::{GarbageCollector, GcReq};
2502    use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2503    use crate::tests::new_test_client;
2504
2505    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2506    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2507    async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2508        mz_ore::test::init_logging();
2509
2510        let client = new_test_client(&dyncfgs).await;
2511        // set a low rollup threshold so GC/truncation is more aggressive
2512        client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2513        let (mut write, _) = client
2514            .expect_open::<String, (), u64, i64>(ShardId::new())
2515            .await;
2516
2517        // Write a bunch of batches. This should result in a bounded number of
2518        // live entries in consensus.
2519        const NUM_BATCHES: u64 = 100;
2520        for idx in 0..NUM_BATCHES {
2521            let mut batch = write
2522                .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2523                .await;
2524            // Flush this batch out so the CaA doesn't get inline writes
2525            // backpressure.
2526            let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2527            batch
2528                .flush_to_blob(
2529                    &cfg,
2530                    &client.metrics.user,
2531                    &client.isolated_runtime,
2532                    &write.write_schemas,
2533                )
2534                .await;
2535            let (_, writer_maintenance) = write
2536                .machine
2537                .compare_and_append(
2538                    &batch.into_hollow_batch(),
2539                    &write.writer_id,
2540                    &HandleDebugState::default(),
2541                    (write.cfg.now)(),
2542                )
2543                .await
2544                .unwrap();
2545            writer_maintenance
2546                .perform(&write.machine, &write.gc, write.compact.as_ref())
2547                .await;
2548        }
2549        let live_diffs = write
2550            .machine
2551            .applier
2552            .state_versions
2553            .fetch_all_live_diffs(&write.machine.shard_id())
2554            .await;
2555        // Make sure we constructed the key correctly.
2556        assert!(live_diffs.0.len() > 0);
2557        // Make sure the number of entries is bounded. (I think we could work
2558        // out a tighter bound than this, but the point is only that it's
2559        // bounded).
2560        let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2561        assert!(
2562            live_diffs.0.len() <= max_live_diffs,
2563            "{} vs {}",
2564            live_diffs.0.len(),
2565            max_live_diffs
2566        );
2567    }
2568
2569    // A regression test for database-issues#4206, where a bug in gc led to an incremental
2570    // state invariant being violated which resulted in gc being permanently
2571    // wedged for the shard.
2572    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2573    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2574    async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2575        let mut client = new_test_client(&dyncfgs).await;
2576        let intercept = InterceptHandle::default();
2577        client.blob = Arc::new(InterceptBlob::new(
2578            Arc::clone(&client.blob),
2579            intercept.clone(),
2580        ));
2581        let (_, mut read) = client
2582            .expect_open::<String, String, u64, i64>(ShardId::new())
2583            .await;
2584
2585        // Create a new SeqNo
2586        read.downgrade_since(&Antichain::from_elem(1)).await;
2587        let new_seqno_since = read.machine.applier.seqno_since();
2588
2589        // Start a GC in the background for some SeqNo range that is not
2590        // contiguous compared to the last gc req (in this case, n/a) and then
2591        // crash when it gets to the blob deletes. In the regression case, this
2592        // renders the shard permanently un-gc-able.
2593        assert!(new_seqno_since > SeqNo::minimum());
2594        intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2595        let machine = read.machine.clone();
2596        // Run this in a spawn so we can catch the boom panic
2597        let gc = spawn(|| "", async move {
2598            let req = GcReq {
2599                shard_id: machine.shard_id(),
2600                new_seqno_since,
2601            };
2602            GarbageCollector::gc_and_truncate(&machine, req).await
2603        });
2604        // Wait for gc to either panic (regression case) or finish (good case)
2605        // because it happens to not call blob delete.
2606        let _ = gc.await;
2607
2608        // Allow blob deletes to go through and try GC again. In the regression
2609        // case, this hangs.
2610        intercept.set_post_delete(None);
2611        let req = GcReq {
2612            shard_id: read.machine.shard_id(),
2613            new_seqno_since,
2614        };
2615        let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2616    }
2617
2618    // A regression test for materialize#20776, where a bug meant that compare_and_append
2619    // would not fetch the latest state after an upper mismatch. This meant that
2620    // a write that could succeed if retried on the latest state would instead
2621    // return an UpperMismatch.
2622    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2623    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2624    async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2625        let client = new_test_client(&dyncfgs).await;
2626        let mut client2 = client.clone();
2627
2628        // The bug can only happen if the two WriteHandles have separate copies
2629        // of state, so make sure that each is given its own StateCache.
2630        let new_state_cache = Arc::new(StateCache::new_no_metrics());
2631        client2.shared_states = new_state_cache;
2632
2633        let shard_id = ShardId::new();
2634        let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2635        let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2636
2637        let data = [
2638            (("1".to_owned(), ()), 1, 1),
2639            (("2".to_owned(), ()), 2, 1),
2640            (("3".to_owned(), ()), 3, 1),
2641            (("4".to_owned(), ()), 4, 1),
2642            (("5".to_owned(), ()), 5, 1),
2643        ];
2644
2645        write1.expect_compare_and_append(&data[..1], 0, 2).await;
2646
2647        // this handle's upper now lags behind. if compare_and_append fails to update
2648        // state after an upper mismatch then this call would (incorrectly) fail
2649        write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2650    }
2651}