mz_persist_client/internal/
machine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the persist state machine.
11
12use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::assert_none;
23use mz_ore::cast::CastFrom;
24use mz_ore::error::ErrorExt;
25#[allow(unused_imports)] // False positive.
26use mz_ore::fmt::FormatBuffer;
27use mz_ore::task::JoinHandle;
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50    CompareAndAppendBreak, CriticalReaderState, ENABLE_INCREMENTAL_COMPACTION, HandleDebugState,
51    HollowBatch, HollowRollup, IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since,
52    SnapshotErr, StateCollections, Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65    pub(crate) applier: Applier<K, V, T, D>,
66    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69// Impl Clone regardless of the type params.
70impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71    fn clone(&self) -> Self {
72        Self {
73            applier: self.applier.clone(),
74            isolated_runtime: Arc::clone(&self.isolated_runtime),
75        }
76    }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80    "persist_claim_unclaimed_compactions",
81    false,
82    "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83    in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87    "persist_claim_compaction_percent",
88    100,
89    "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90    (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91    three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95    "persist_claim_compaction_min_version",
96    String::new(),
97    "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102    K: Debug + Codec,
103    V: Debug + Codec,
104    T: Timestamp + Lattice + Codec64 + Sync,
105    D: Semigroup + Codec64,
106{
107    pub async fn new(
108        cfg: PersistConfig,
109        shard_id: ShardId,
110        metrics: Arc<Metrics>,
111        state_versions: Arc<StateVersions>,
112        shared_states: Arc<StateCache>,
113        pubsub_sender: Arc<dyn PubSubSender>,
114        isolated_runtime: Arc<IsolatedRuntime>,
115        diagnostics: Diagnostics,
116    ) -> Result<Self, Box<CodecMismatch>> {
117        let applier = Applier::new(
118            cfg,
119            shard_id,
120            metrics,
121            state_versions,
122            shared_states,
123            pubsub_sender,
124            diagnostics,
125        )
126        .await?;
127        Ok(Machine {
128            applier,
129            isolated_runtime,
130        })
131    }
132
133    pub fn shard_id(&self) -> ShardId {
134        self.applier.shard_id
135    }
136
137    pub fn seqno(&self) -> SeqNo {
138        self.applier.seqno()
139    }
140
141    pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142        let rollup = self.applier.write_rollup_for_state().await;
143        let Some(rollup) = rollup else {
144            return RoutineMaintenance::default();
145        };
146
147        let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148        if !applied {
149            // Someone else already wrote a rollup at this seqno, so ours didn't
150            // get added. Delete it.
151            self.applier
152                .state_versions
153                .delete_rollup(&rollup.shard_id, &rollup.key)
154                .await;
155        }
156        maintenance
157    }
158
159    pub async fn add_rollup(
160        &self,
161        add_rollup: (SeqNo, &HollowRollup),
162    ) -> (bool, RoutineMaintenance) {
163        // See the big SUBTLE comment in [Self::merge_res] for what's going on
164        // here.
165        let mut applied_ever_true = false;
166        let metrics = Arc::clone(&self.applier.metrics);
167        let (_seqno, _applied, maintenance) = self
168            .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169                let ret = state.add_rollup(add_rollup);
170                if let Continue(applied) = ret {
171                    applied_ever_true = applied_ever_true || applied;
172                }
173                ret
174            })
175            .await;
176        (applied_ever_true, maintenance)
177    }
178
179    pub async fn remove_rollups(
180        &self,
181        remove_rollups: &[(SeqNo, PartialRollupKey)],
182    ) -> (Vec<SeqNo>, RoutineMaintenance) {
183        let metrics = Arc::clone(&self.applier.metrics);
184        let (_seqno, removed_rollup_seqnos, maintenance) = self
185            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186                state.remove_rollups(remove_rollups)
187            })
188            .await;
189        (removed_rollup_seqnos, maintenance)
190    }
191
192    pub async fn register_leased_reader(
193        &self,
194        reader_id: &LeasedReaderId,
195        purpose: &str,
196        lease_duration: Duration,
197        heartbeat_timestamp_ms: u64,
198        use_critical_since: bool,
199    ) -> (LeasedReaderState<T>, RoutineMaintenance) {
200        let metrics = Arc::clone(&self.applier.metrics);
201        let (_seqno, (reader_state, seqno_since), maintenance) = self
202            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
203                state.register_leased_reader(
204                    &cfg.hostname,
205                    reader_id,
206                    purpose,
207                    seqno,
208                    lease_duration,
209                    heartbeat_timestamp_ms,
210                    use_critical_since,
211                )
212            })
213            .await;
214        // Usually, the reader gets an initial seqno hold of the seqno at which
215        // it was registered. However, on a tombstone shard the seqno hold
216        // happens to get computed as the tombstone seqno + 1
217        // (State::clone_apply provided seqno.next(), the non-no-op commit
218        // seqno, to the work fn and this is what register_reader uses for the
219        // seqno hold). The real invariant we want to protect here is that the
220        // hold is >= the seqno_since, so validate that instead of anything more
221        // specific.
222        debug_assert!(
223            reader_state.seqno >= seqno_since,
224            "{} vs {}",
225            reader_state.seqno,
226            seqno_since,
227        );
228        (reader_state, maintenance)
229    }
230
231    pub async fn register_critical_reader<O: Opaque + Codec64>(
232        &self,
233        reader_id: &CriticalReaderId,
234        purpose: &str,
235    ) -> (CriticalReaderState<T>, RoutineMaintenance) {
236        let metrics = Arc::clone(&self.applier.metrics);
237        let (_seqno, state, maintenance) = self
238            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
239                state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
240            })
241            .await;
242        (state, maintenance)
243    }
244
245    pub async fn register_schema(
246        &self,
247        key_schema: &K::Schema,
248        val_schema: &V::Schema,
249    ) -> (Option<SchemaId>, RoutineMaintenance) {
250        let metrics = Arc::clone(&self.applier.metrics);
251        let (_seqno, state, maintenance) = self
252            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
253                state.register_schema::<K, V>(key_schema, val_schema)
254            })
255            .await;
256        (state, maintenance)
257    }
258
259    pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
260        // Performance special case for no-ops, to avoid the State clones.
261        if fuel == 0 || self.applier.all_batches().len() < 2 {
262            return (Vec::new(), RoutineMaintenance::default());
263        }
264
265        let metrics = Arc::clone(&self.applier.metrics);
266        let (_seqno, reqs, maintenance) = self
267            .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
268                state.spine_exert(fuel)
269            })
270            .await;
271        let reqs = reqs
272            .into_iter()
273            .map(|req| CompactReq {
274                shard_id: self.shard_id(),
275                desc: req.desc,
276                inputs: req.inputs,
277            })
278            .collect();
279        (reqs, maintenance)
280    }
281
282    pub async fn compare_and_append(
283        &self,
284        batch: &HollowBatch<T>,
285        writer_id: &WriterId,
286        debug_info: &HandleDebugState,
287        heartbeat_timestamp_ms: u64,
288    ) -> CompareAndAppendRes<T> {
289        let idempotency_token = IdempotencyToken::new();
290        loop {
291            let res = self
292                .compare_and_append_idempotent(
293                    batch,
294                    writer_id,
295                    heartbeat_timestamp_ms,
296                    &idempotency_token,
297                    debug_info,
298                    None,
299                )
300                .await;
301            match res {
302                CompareAndAppendRes::Success(seqno, maintenance) => {
303                    return CompareAndAppendRes::Success(seqno, maintenance);
304                }
305                CompareAndAppendRes::InvalidUsage(x) => {
306                    return CompareAndAppendRes::InvalidUsage(x);
307                }
308                CompareAndAppendRes::InlineBackpressure => {
309                    return CompareAndAppendRes::InlineBackpressure;
310                }
311                CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
312                    // If the state machine thinks that the shard upper is not
313                    // far enough along, it could be because the caller of this
314                    // method has found out that it advanced via some some
315                    // side-channel that didn't update our local cache of the
316                    // machine state. So, fetch the latest state and try again
317                    // if we indeed get something different.
318                    self.applier.fetch_and_update_state(Some(seqno)).await;
319                    let (current_seqno, current_upper) =
320                        self.applier.upper(|seqno, upper| (seqno, upper.clone()));
321
322                    // We tried to to a compare_and_append with the wrong
323                    // expected upper, that won't work.
324                    if &current_upper != batch.desc.lower() {
325                        return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
326                    } else {
327                        // The upper stored in state was outdated. Retry after
328                        // updating.
329                    }
330                }
331            }
332        }
333    }
334
335    async fn compare_and_append_idempotent(
336        &self,
337        batch: &HollowBatch<T>,
338        writer_id: &WriterId,
339        heartbeat_timestamp_ms: u64,
340        idempotency_token: &IdempotencyToken,
341        debug_info: &HandleDebugState,
342        // Only exposed for testing. In prod, this always starts as None, but
343        // making it a parameter allows us to simulate hitting an indeterminate
344        // error on the first attempt in tests.
345        mut indeterminate: Option<Indeterminate>,
346    ) -> CompareAndAppendRes<T> {
347        let metrics = Arc::clone(&self.applier.metrics);
348        let lease_duration_ms = self
349            .applier
350            .cfg
351            .writer_lease_duration
352            .as_millis()
353            .try_into()
354            .expect("reasonable duration");
355        // SUBTLE: Retries of compare_and_append with Indeterminate errors are
356        // tricky (more discussion of this in database-issues#3680):
357        //
358        // - (1) We compare_and_append and get an Indeterminate error back from
359        //   CRDB/Consensus. This means we don't know if it committed or not.
360        // - (2) We retry it.
361        // - (3) We get back an upper mismatch. The tricky bit is deciding if we
362        //   conflicted with some other writer OR if the write in (1) actually
363        //   went through and we're "conflicting" with ourself.
364        //
365        // A number of scenarios can be distinguished with per-writer
366        // idempotency tokens, so I'll jump straight to the hardest one:
367        //
368        // - (1) A compare_and_append is issued for e.g. `[5,7)`, the consensus
369        //   call makes it onto the network before the operation is cancelled
370        //   (by dropping the future).
371        // - (2) A compare_and_append is issued from the same WriteHandle for
372        //   `[3,5)`, it uses a different conn from the consensus pool and gets
373        //   an Indeterminate error.
374        // - (3) The call in (1) is received by consensus and commits.
375        // - (4) The retry of (2) receives an upper mismatch with an upper of 7.
376        //
377        // At this point, how do we determine whether (2) committed or not and
378        // thus whether we should return success or upper mismatch? Getting this
379        // right is very important for correctness (imagine this is a table
380        // write and we either return success or failure to the client).
381        //
382        // - If we use per-writer IdempotencyTokens but only store the latest
383        //   one in state, then the `[5,7)` one will have clobbered whatever our
384        //   `[3,5)` one was.
385        // - We could store every IdempotencyToken that ever committed, but that
386        //   would require unbounded storage in state (non-starter).
387        // - We could require that IdempotencyTokens are comparable and that
388        //   each call issued by a WriteHandle uses one that is strictly greater
389        //   than every call before it. A previous version of this PR tried this
390        //   and it's remarkably subtle. As a result, I (Dan) have developed
391        //   strong feels that our correctness protocol _should not depend on
392        //   WriteHandle, only Machine_.
393        // - We could require a new WriterId if a request is ever cancelled by
394        //   making `compare_and_append` take ownership of `self` and then
395        //   handing it back for any call polled to completion. The ergonomics
396        //   of this are quite awkward and, like the previous idea, it depends
397        //   on the WriteHandle impl for correctness.
398        // - Any ideas that involve reading back the data are foiled by a step
399        //   `(0) set the since to 100` (plus the latency and memory usage would
400        //   be too unpredictable).
401        //
402        // The technique used here derives from the following observations:
403        //
404        // - In practice, we don't use compare_and_append with the sort of
405        //   "regressing frontiers" described above.
406        // - In practice, Indeterminate errors are rare-ish. They happen enough
407        //   that we don't want to always panic on them, but this is still a
408        //   useful property to build on.
409        //
410        // At a high level, we do just enough to be able to distinguish the
411        // cases that we think will happen in practice and then leave the rest
412        // for a panic! that we think we'll never see. Concretely:
413        //
414        // - Run compare_and_append in a loop, retrying on Indeterminate errors
415        //   but noting if we've ever done that.
416        // - If we haven't seen an Indeterminate error (i.e. this is the first
417        //   time though the loop) then the result we got is guaranteed to be
418        //   correct, so pass it up.
419        // - Otherwise, any result other than an expected upper mismatch is
420        //   guaranteed to be correct, so just pass it up.
421        // - Otherwise examine the writer's most recent upper and break it into
422        //   two cases:
423        // - Case 1 `expected_upper.less_than(writer_most_recent_upper)`: it's
424        //   impossible that we committed on a previous iteration because the
425        //   overall upper of the shard is less_than what this call would have
426        //   advanced it to. Pass up the expectation mismatch.
427        // - Case 2 `!Case1`: First note that this means our IdempotencyToken
428        //   didn't match, otherwise we would have gotten `AlreadyCommitted`. It
429        //   also means some previous write from _this writer_ has committed an
430        //   upper that is beyond the one in this call, which is a weird usage
431        //   (NB can't be a future write because that would mean someone is
432        //   still polling us, but `&mut self` prevents that).
433        //
434        // TODO: If this technique works in practice (leads to zero panics),
435        // then commit to it and remove the Indeterminate from
436        // [WriteHandle::compare_and_append_batch].
437        let mut retry = self
438            .applier
439            .metrics
440            .retries
441            .compare_and_append_idempotent
442            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
443        let mut writer_was_present = false;
444        loop {
445            let cmd_res = self
446                .applier
447                .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
448                    writer_was_present = state.writers.contains_key(writer_id);
449                    state.compare_and_append(
450                        batch,
451                        writer_id,
452                        heartbeat_timestamp_ms,
453                        lease_duration_ms,
454                        idempotency_token,
455                        debug_info,
456                        INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
457                        if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
458                            CLAIM_COMPACTION_PERCENT.get(cfg)
459                        } else {
460                            0
461                        },
462                        Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
463                            .ok()
464                            .as_ref(),
465                    )
466                })
467                .await;
468            let (seqno, res, routine) = match cmd_res {
469                Ok(x) => x,
470                Err(err) => {
471                    // These are rare and interesting enough that we always log
472                    // them at info!.
473                    info!(
474                        "compare_and_append received an indeterminate error, retrying in {:?}: {}",
475                        retry.next_sleep(),
476                        err
477                    );
478                    if indeterminate.is_none() {
479                        indeterminate = Some(err);
480                    }
481                    retry = retry.sleep().await;
482                    continue;
483                }
484            };
485            match res {
486                Ok(merge_reqs) => {
487                    // We got explicit confirmation that we succeeded, so
488                    // anything that happened in a previous retry is irrelevant.
489                    let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
490                    for req in merge_reqs {
491                        let req = CompactReq {
492                            shard_id: self.shard_id(),
493                            desc: req.desc,
494                            inputs: req.inputs,
495                        };
496                        compact_reqs.push(req);
497                    }
498                    let writer_maintenance = WriterMaintenance {
499                        routine,
500                        compaction: compact_reqs,
501                    };
502
503                    if !writer_was_present {
504                        metrics.state.writer_added.inc();
505                    }
506                    for part in &batch.parts {
507                        if part.is_inline() {
508                            let bytes = u64::cast_from(part.inline_bytes());
509                            metrics.inline.part_commit_bytes.inc_by(bytes);
510                            metrics.inline.part_commit_count.inc();
511                        }
512                    }
513                    return CompareAndAppendRes::Success(seqno, writer_maintenance);
514                }
515                Err(CompareAndAppendBreak::AlreadyCommitted) => {
516                    // A previous iteration through this loop got an
517                    // Indeterminate error but was successful. Sanity check this
518                    // and pass along the good news.
519                    assert!(indeterminate.is_some());
520                    self.applier.metrics.cmds.compare_and_append_noop.inc();
521                    if !writer_was_present {
522                        metrics.state.writer_added.inc();
523                    }
524                    return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
525                }
526                Err(CompareAndAppendBreak::InvalidUsage(err)) => {
527                    // InvalidUsage is (or should be) a deterministic function
528                    // of the inputs and independent of anything in persist
529                    // state. It's handed back via a Break, so we never even try
530                    // to commit it. No network, no Indeterminate.
531                    assert_none!(indeterminate);
532                    return CompareAndAppendRes::InvalidUsage(err);
533                }
534                Err(CompareAndAppendBreak::InlineBackpressure) => {
535                    // We tried to write an inline part, but there was already
536                    // too much in state. Flush it out to s3 and try again.
537                    return CompareAndAppendRes::InlineBackpressure;
538                }
539                Err(CompareAndAppendBreak::Upper {
540                    shard_upper,
541                    writer_upper,
542                }) => {
543                    // NB the below intentionally compares to writer_upper
544                    // (because it gives a tighter bound on the bad case), but
545                    // returns shard_upper (what the persist caller cares
546                    // about).
547                    assert!(
548                        PartialOrder::less_equal(&writer_upper, &shard_upper),
549                        "{:?} vs {:?}",
550                        &writer_upper,
551                        &shard_upper
552                    );
553                    if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
554                        // No way this could have committed in some previous
555                        // attempt of this loop: the upper of the writer is
556                        // strictly less than the proposed new upper.
557                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
558                    }
559                    if indeterminate.is_none() {
560                        // No way this could have committed in some previous
561                        // attempt of this loop: we never saw an indeterminate
562                        // error (thus there was no previous iteration of the
563                        // loop).
564                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
565                    }
566                    // This is the bad case. We can't distinguish if some
567                    // previous attempt that got an Indeterminate error
568                    // succeeded or not. This should be sufficiently rare in
569                    // practice (hopefully ~never) that we give up and let
570                    // process restart fix things. See the big comment above for
571                    // more context.
572                    //
573                    // NB: This is intentionally not a halt! because it's quite
574                    // unexpected.
575                    panic!(
576                        concat!(
577                            "cannot distinguish compare_and_append success or failure ",
578                            "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
579                        ),
580                        batch.desc.lower().elements(),
581                        batch.desc.upper().elements(),
582                        writer_upper.elements(),
583                        shard_upper.elements(),
584                        indeterminate,
585                    );
586                }
587            };
588        }
589    }
590
591    pub async fn downgrade_since(
592        &self,
593        reader_id: &LeasedReaderId,
594        outstanding_seqno: Option<SeqNo>,
595        new_since: &Antichain<T>,
596        heartbeat_timestamp_ms: u64,
597    ) -> (SeqNo, Since<T>, RoutineMaintenance) {
598        let metrics = Arc::clone(&self.applier.metrics);
599        self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
600            state.downgrade_since(
601                reader_id,
602                seqno,
603                outstanding_seqno,
604                new_since,
605                heartbeat_timestamp_ms,
606            )
607        })
608        .await
609    }
610
611    pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
612        &self,
613        reader_id: &CriticalReaderId,
614        expected_opaque: &O,
615        (new_opaque, new_since): (&O, &Antichain<T>),
616    ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
617        let metrics = Arc::clone(&self.applier.metrics);
618        let (_seqno, res, maintenance) = self
619            .apply_unbatched_idempotent_cmd(
620                &metrics.cmds.compare_and_downgrade_since,
621                |_seqno, _cfg, state| {
622                    state.compare_and_downgrade_since::<O>(
623                        reader_id,
624                        expected_opaque,
625                        (new_opaque, new_since),
626                    )
627                },
628            )
629            .await;
630
631        match res {
632            Ok(since) => (Ok(since), maintenance),
633            Err((opaque, since)) => (Err((opaque, since)), maintenance),
634        }
635    }
636
637    pub async fn heartbeat_leased_reader(
638        &self,
639        reader_id: &LeasedReaderId,
640        heartbeat_timestamp_ms: u64,
641    ) -> (SeqNo, bool, RoutineMaintenance) {
642        let metrics = Arc::clone(&self.applier.metrics);
643        let (seqno, existed, maintenance) = self
644            .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
645                state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
646            })
647            .await;
648        (seqno, existed, maintenance)
649    }
650
651    pub async fn expire_leased_reader(
652        &self,
653        reader_id: &LeasedReaderId,
654    ) -> (SeqNo, RoutineMaintenance) {
655        let metrics = Arc::clone(&self.applier.metrics);
656        let (seqno, _existed, maintenance) = self
657            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
658                state.expire_leased_reader(reader_id)
659            })
660            .await;
661        (seqno, maintenance)
662    }
663
664    #[allow(dead_code)] // TODO(bkirwi): remove this when since behaviour on expiry has settled
665    pub async fn expire_critical_reader(
666        &self,
667        reader_id: &CriticalReaderId,
668    ) -> (SeqNo, RoutineMaintenance) {
669        let metrics = Arc::clone(&self.applier.metrics);
670        let (seqno, _existed, maintenance) = self
671            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
672                state.expire_critical_reader(reader_id)
673            })
674            .await;
675        (seqno, maintenance)
676    }
677
678    pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
679        let metrics = Arc::clone(&self.applier.metrics);
680        let (seqno, _existed, maintenance) = self
681            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
682                state.expire_writer(writer_id)
683            })
684            .await;
685        metrics.state.writer_removed.inc();
686        (seqno, maintenance)
687    }
688
689    pub fn is_finalized(&self) -> bool {
690        self.applier.is_finalized()
691    }
692
693    /// See [crate::PersistClient::get_schema].
694    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
695        self.applier.get_schema(schema_id)
696    }
697
698    /// See [crate::PersistClient::latest_schema].
699    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
700        self.applier.latest_schema()
701    }
702
703    /// See [crate::PersistClient::compare_and_evolve_schema].
704    ///
705    /// TODO: Unify this with [Self::register_schema]?
706    pub async fn compare_and_evolve_schema(
707        &self,
708        expected: SchemaId,
709        key_schema: &K::Schema,
710        val_schema: &V::Schema,
711    ) -> (CaESchema<K, V>, RoutineMaintenance) {
712        let metrics = Arc::clone(&self.applier.metrics);
713        let (_seqno, state, maintenance) = self
714            .apply_unbatched_idempotent_cmd(
715                &metrics.cmds.compare_and_evolve_schema,
716                |_seqno, _cfg, state| {
717                    state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
718                },
719            )
720            .await;
721        (state, maintenance)
722    }
723
724    async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
725        let metrics = Arc::clone(&self.applier.metrics);
726        let mut retry = self
727            .applier
728            .metrics
729            .retries
730            .idempotent_cmd
731            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
732        loop {
733            let res = self
734                .applier
735                .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
736                    state.become_tombstone_and_shrink()
737                })
738                .await;
739            let err = match res {
740                Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
741                Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
742                    return Ok((false, maintenance));
743                }
744                Err(err) => err,
745            };
746            if retry.attempt() >= INFO_MIN_ATTEMPTS {
747                info!(
748                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
749                    retry.next_sleep(),
750                    err
751                );
752            } else {
753                debug!(
754                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
755                    retry.next_sleep(),
756                    err
757                );
758            }
759            retry = retry.sleep().await;
760        }
761    }
762
763    pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
764        self.applier.check_since_upper_both_empty()?;
765
766        let mut maintenance = RoutineMaintenance::default();
767
768        loop {
769            let (made_progress, more_maintenance) = self.tombstone_step().await?;
770            maintenance.merge(more_maintenance);
771            if !made_progress {
772                break;
773            }
774        }
775
776        Ok(maintenance)
777    }
778
779    pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
780        let start = Instant::now();
781        let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
782            Ok(x) => return Ok(x),
783            Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
784            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
785                return Err(Since(since));
786            }
787        };
788
789        // The latest state still couldn't serve this as_of: watch+sleep in a
790        // loop until it's ready.
791        let mut watch = self.applier.watch();
792        let watch = &mut watch;
793        let sleeps = self
794            .applier
795            .metrics
796            .retries
797            .snapshot
798            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
799
800        enum Wake<'a, K, V, T, D> {
801            Watch(&'a mut StateWatch<K, V, T, D>),
802            Sleep(MetricsRetryStream),
803        }
804        let mut watch_fut = std::pin::pin!(
805            watch
806                .wait_for_seqno_ge(seqno.next())
807                .map(Wake::Watch)
808                .instrument(trace_span!("snapshot::watch")),
809        );
810        let mut sleep_fut = std::pin::pin!(
811            sleeps
812                .sleep()
813                .map(Wake::Sleep)
814                .instrument(trace_span!("snapshot::sleep")),
815        );
816
817        // To reduce log spam, we log "not yet available" only once at info if
818        // it passes a certain threshold. Then, if it did one info log, we log
819        // again at info when it resolves.
820        let mut logged_at_info = false;
821        loop {
822            // Use a duration based threshold here instead of the usual
823            // INFO_MIN_ATTEMPTS because here we're waiting on an
824            // external thing to arrive.
825            if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
826                logged_at_info = true;
827                info!(
828                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
829                    self.applier.shard_metrics.name,
830                    self.shard_id(),
831                    as_of.elements(),
832                    seqno,
833                    upper.elements(),
834                );
835            } else {
836                debug!(
837                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
838                    self.applier.shard_metrics.name,
839                    self.shard_id(),
840                    as_of.elements(),
841                    seqno,
842                    upper.elements(),
843                );
844            }
845
846            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
847                future::Either::Left((wake, _)) => wake,
848                future::Either::Right((wake, _)) => wake,
849            };
850            // Note that we don't need to fetch in the Watch case, because the
851            // Watch wakeup is a signal that the shared state has already been
852            // updated.
853            match &wake {
854                Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
855                Wake::Sleep(_) => {
856                    self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
857                    self.applier.fetch_and_update_state(Some(seqno)).await;
858                }
859            }
860
861            (seqno, upper) = match self.applier.snapshot(as_of) {
862                Ok(x) => {
863                    if logged_at_info {
864                        info!(
865                            "snapshot {} {} as of {:?} now available",
866                            self.applier.shard_metrics.name,
867                            self.shard_id(),
868                            as_of.elements(),
869                        );
870                    }
871                    return Ok(x);
872                }
873                Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
874                    // The upper isn't ready yet, fall through and try again.
875                    (seqno, upper)
876                }
877                Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
878                    return Err(Since(since));
879                }
880            };
881
882            match wake {
883                Wake::Watch(watch) => {
884                    watch_fut.set(
885                        watch
886                            .wait_for_seqno_ge(seqno.next())
887                            .map(Wake::Watch)
888                            .instrument(trace_span!("snapshot::watch")),
889                    );
890                }
891                Wake::Sleep(sleeps) => {
892                    debug!(
893                        "snapshot {} {} sleeping for {:?}",
894                        self.applier.shard_metrics.name,
895                        self.shard_id(),
896                        sleeps.next_sleep()
897                    );
898                    sleep_fut.set(
899                        sleeps
900                            .sleep()
901                            .map(Wake::Sleep)
902                            .instrument(trace_span!("snapshot::sleep")),
903                    );
904                }
905            }
906        }
907    }
908
909    // NB: Unlike the other methods here, this one is read-only.
910    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
911        self.applier.verify_listen(as_of)
912    }
913
914    pub async fn next_listen_batch(
915        &self,
916        frontier: &Antichain<T>,
917        watch: &mut StateWatch<K, V, T, D>,
918        reader_id: Option<&LeasedReaderId>,
919        // If Some, an override for the default listen sleep retry parameters.
920        retry: Option<RetryParameters>,
921    ) -> HollowBatch<T> {
922        let mut seqno = match self.applier.next_listen_batch(frontier) {
923            Ok(b) => return b,
924            Err(seqno) => seqno,
925        };
926
927        // The latest state still doesn't have a new frontier for us:
928        // watch+sleep in a loop until it does.
929        let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
930        let sleeps = self
931            .applier
932            .metrics
933            .retries
934            .next_listen_batch
935            .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
936
937        enum Wake<'a, K, V, T, D> {
938            Watch(&'a mut StateWatch<K, V, T, D>),
939            Sleep(MetricsRetryStream),
940        }
941        let mut watch_fut = std::pin::pin!(
942            watch
943                .wait_for_seqno_ge(seqno.next())
944                .map(Wake::Watch)
945                .instrument(trace_span!("snapshot::watch"))
946        );
947        let mut sleep_fut = std::pin::pin!(
948            sleeps
949                .sleep()
950                .map(Wake::Sleep)
951                .instrument(trace_span!("snapshot::sleep"))
952        );
953
954        loop {
955            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
956                future::Either::Left((wake, _)) => wake,
957                future::Either::Right((wake, _)) => wake,
958            };
959            // Note that we don't need to fetch in the Watch case, because the
960            // Watch wakeup is a signal that the shared state has already been
961            // updated.
962            match &wake {
963                Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
964                Wake::Sleep(_) => {
965                    self.applier.metrics.watch.listen_woken_via_sleep.inc();
966                    self.applier.fetch_and_update_state(Some(seqno)).await;
967                }
968            }
969
970            seqno = match self.applier.next_listen_batch(frontier) {
971                Ok(b) => {
972                    match &wake {
973                        Wake::Watch(_) => {
974                            self.applier.metrics.watch.listen_resolved_via_watch.inc()
975                        }
976                        Wake::Sleep(_) => {
977                            self.applier.metrics.watch.listen_resolved_via_sleep.inc()
978                        }
979                    }
980                    return b;
981                }
982                Err(seqno) => seqno,
983            };
984
985            // Wait a bit and try again. Intentionally don't ever log
986            // this at info level.
987            match wake {
988                Wake::Watch(watch) => {
989                    watch_fut.set(
990                        watch
991                            .wait_for_seqno_ge(seqno.next())
992                            .map(Wake::Watch)
993                            .instrument(trace_span!("snapshot::watch")),
994                    );
995                }
996                Wake::Sleep(sleeps) => {
997                    debug!(
998                        "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
999                        reader_id,
1000                        self.applier.shard_metrics.name,
1001                        self.shard_id(),
1002                        sleeps.next_sleep()
1003                    );
1004                    sleep_fut.set(
1005                        sleeps
1006                            .sleep()
1007                            .map(Wake::Sleep)
1008                            .instrument(trace_span!("snapshot::sleep")),
1009                    );
1010                }
1011            }
1012        }
1013    }
1014
1015    async fn apply_unbatched_idempotent_cmd<
1016        R,
1017        WorkFn: FnMut(
1018            SeqNo,
1019            &PersistConfig,
1020            &mut StateCollections<T>,
1021        ) -> ControlFlow<NoOpStateTransition<R>, R>,
1022    >(
1023        &self,
1024        cmd: &CmdMetrics,
1025        mut work_fn: WorkFn,
1026    ) -> (SeqNo, R, RoutineMaintenance) {
1027        let mut retry = self
1028            .applier
1029            .metrics
1030            .retries
1031            .idempotent_cmd
1032            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1033        loop {
1034            match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1035                Ok((seqno, x, maintenance)) => match x {
1036                    Ok(x) => {
1037                        return (seqno, x, maintenance);
1038                    }
1039                    Err(NoOpStateTransition(x)) => {
1040                        return (seqno, x, maintenance);
1041                    }
1042                },
1043                Err(err) => {
1044                    if retry.attempt() >= INFO_MIN_ATTEMPTS {
1045                        info!(
1046                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1047                            cmd.name,
1048                            retry.next_sleep(),
1049                            err
1050                        );
1051                    } else {
1052                        debug!(
1053                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1054                            cmd.name,
1055                            retry.next_sleep(),
1056                            err
1057                        );
1058                    }
1059                    retry = retry.sleep().await;
1060                    continue;
1061                }
1062            }
1063        }
1064    }
1065}
1066
1067impl<K, V, T, D> Machine<K, V, T, D>
1068where
1069    K: Debug + Codec,
1070    V: Debug + Codec,
1071    T: Timestamp + Lattice + Codec64 + Sync,
1072    D: Semigroup + Codec64 + PartialEq,
1073{
1074    pub async fn merge_res(
1075        &self,
1076        res: &FueledMergeRes<T>,
1077    ) -> (ApplyMergeResult, RoutineMaintenance) {
1078        let metrics = Arc::clone(&self.applier.metrics);
1079        let use_incremental_compaction = ENABLE_INCREMENTAL_COMPACTION.get(&self.applier.cfg);
1080
1081        // SUBTLE! If Machine::merge_res returns false, the blobs referenced in
1082        // compaction output are deleted so we don't leak them. Naively passing
1083        // back the value returned by State::apply_merge_res might give a false
1084        // negative in the presence of retries and Indeterminate errors.
1085        // Specifically, something like the following:
1086        //
1087        // - We try to apply_merge_res, it matches.
1088        // - When apply_unbatched_cmd goes to commit the new state, the
1089        //   Consensus::compare_and_set returns an Indeterminate error (but
1090        //   actually succeeds). The committed State now contains references to
1091        //   the compaction output blobs.
1092        // - Machine::apply_unbatched_idempotent_cmd retries the Indeterminate
1093        //   error. For whatever reason, this time though it doesn't match
1094        //   (maybe the batches simply get grouped difference when deserialized
1095        //   from state, or more unavoidably perhaps another compaction
1096        //   happens).
1097        // - This now bubbles up applied=false to the caller, which uses it as a
1098        //   signal that the blobs in the compaction output should be deleted so
1099        //   that we don't leak them.
1100        // - We now contain references in committed State to blobs that don't
1101        //   exist.
1102        //
1103        // The fix is to keep track of whether applied ever was true, even for a
1104        // compare_and_set that returned an Indeterminate error. This has the
1105        // chance of false positive (leaking a blob) but that's better than a
1106        // false negative (a blob we can never recover referenced by state). We
1107        // anyway need a mechanism to clean up leaked blobs because of process
1108        // crashes.
1109        let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1110        let (_seqno, _apply_merge_result, maintenance) = self
1111            .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1112                let ret = if use_incremental_compaction {
1113                    state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar)
1114                } else {
1115                    state.apply_merge_res_classic::<D>(res, &Arc::clone(&metrics).columnar)
1116                };
1117                if let Continue(result) = ret {
1118                    // record if we've ever applied the merge
1119                    if result.applied() {
1120                        merge_result_ever_applied = result;
1121                    }
1122                    // otherwise record the most granular reason for _not_
1123                    // applying the merge when there was a matching batch
1124                    if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1125                    {
1126                        merge_result_ever_applied = result;
1127                    }
1128                }
1129                ret
1130            })
1131            .await;
1132        (merge_result_ever_applied, maintenance)
1133    }
1134}
1135
1136pub(crate) struct ExpireFn(
1137    /// This is stored on WriteHandle and ReadHandle, which we require to be
1138    /// Send + Sync, but the Future is only Send and not Sync. Instead store a
1139    /// FnOnce that returns the Future. This could also be made an `IntoFuture`,
1140    /// once producing one of those is made easier.
1141    pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1142);
1143
1144impl Debug for ExpireFn {
1145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1146        f.debug_struct("ExpireFn").finish_non_exhaustive()
1147    }
1148}
1149
1150#[derive(Debug)]
1151pub(crate) enum CompareAndAppendRes<T> {
1152    Success(SeqNo, WriterMaintenance<T>),
1153    InvalidUsage(InvalidUsage<T>),
1154    UpperMismatch(SeqNo, Antichain<T>),
1155    InlineBackpressure,
1156}
1157
1158#[cfg(test)]
1159impl<T: Debug> CompareAndAppendRes<T> {
1160    #[track_caller]
1161    fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1162        match self {
1163            CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1164            x => panic!("{:?}", x),
1165        }
1166    }
1167}
1168
1169impl<K, V, T, D> Machine<K, V, T, D>
1170where
1171    K: Debug + Codec,
1172    V: Debug + Codec,
1173    T: Timestamp + Lattice + Codec64 + Sync,
1174    D: Semigroup + Codec64 + Send + Sync,
1175{
1176    #[allow(clippy::unused_async)]
1177    pub async fn start_reader_heartbeat_tasks(
1178        self,
1179        reader_id: LeasedReaderId,
1180        gc: GarbageCollector<K, V, T, D>,
1181    ) -> Vec<JoinHandle<()>> {
1182        let mut ret = Vec::new();
1183        let metrics = Arc::clone(&self.applier.metrics);
1184
1185        // TODO: In response to a production incident, this runs the heartbeat
1186        // task on both the in-context tokio runtime and persist's isolated
1187        // runtime. We think we were seeing tasks (including this one) get stuck
1188        // indefinitely in tokio while waiting for a runtime worker. This could
1189        // happen if some other task in that runtime never yields. It's possible
1190        // that one of the two runtimes is healthy while the other isn't (this
1191        // was inconclusive in the incident debugging), and the heartbeat task
1192        // is fairly lightweight, so run a copy in each in case that helps.
1193        //
1194        // The real fix here is to find the misbehaving task and fix it. Remove
1195        // this duplication when that happens.
1196        let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1197        ret.push(mz_ore::task::spawn(|| name, {
1198            let machine = self.clone();
1199            let reader_id = reader_id.clone();
1200            let gc = gc.clone();
1201            metrics
1202                .tasks
1203                .heartbeat_read
1204                .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1205        }));
1206
1207        let isolated_runtime = Arc::clone(&self.isolated_runtime);
1208        let name = format!(
1209            "persist::heartbeat_read_isolated({},{})",
1210            self.shard_id(),
1211            reader_id
1212        );
1213        ret.push(
1214            isolated_runtime.spawn_named(
1215                || name,
1216                metrics
1217                    .tasks
1218                    .heartbeat_read
1219                    .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1220            ),
1221        );
1222
1223        ret
1224    }
1225
1226    async fn reader_heartbeat_task(
1227        machine: Self,
1228        reader_id: LeasedReaderId,
1229        gc: GarbageCollector<K, V, T, D>,
1230    ) {
1231        let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1232        loop {
1233            let before_sleep = Instant::now();
1234            tokio::time::sleep(sleep_duration).await;
1235
1236            let elapsed_since_before_sleeping = before_sleep.elapsed();
1237            if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1238                warn!(
1239                    "reader ({}) of shard ({}) went {}s between heartbeats",
1240                    reader_id,
1241                    machine.shard_id(),
1242                    elapsed_since_before_sleeping.as_secs_f64()
1243                );
1244            }
1245
1246            let before_heartbeat = Instant::now();
1247            let (_seqno, existed, maintenance) = machine
1248                .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1249                .await;
1250            maintenance.start_performing(&machine, &gc);
1251
1252            let elapsed_since_heartbeat = before_heartbeat.elapsed();
1253            if elapsed_since_heartbeat > Duration::from_secs(60) {
1254                warn!(
1255                    "reader ({}) of shard ({}) heartbeat call took {}s",
1256                    reader_id,
1257                    machine.shard_id(),
1258                    elapsed_since_heartbeat.as_secs_f64(),
1259                );
1260            }
1261
1262            if !existed {
1263                // If the read handle was intentionally expired, this task
1264                // *should* be aborted before it observes the expiration. So if
1265                // we get here, this task somehow failed to keep the read lease
1266                // alive. Warn loudly, because there's now a live read handle to
1267                // an expired shard that will panic if used, but don't panic,
1268                // just in case there is some edge case that results in this
1269                // task observing the intentional expiration of a read handle.
1270                warn!(
1271                    "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1272                     while read handle is live",
1273                    reader_id,
1274                    machine.shard_id(),
1275                );
1276                return;
1277            }
1278        }
1279    }
1280}
1281
1282pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1283    "persist_next_listen_batch_retryer_fixed_sleep",
1284    Duration::from_millis(1200), // pubsub is on by default!
1285    "\
1286    The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1287);
1288
1289pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1290    "persist_next_listen_batch_retryer_initial_backoff",
1291    Duration::from_millis(100), // pubsub is on by default!
1292    "The initial backoff when polling for new batches from a Listen or Subscribe.",
1293);
1294
1295pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1296    "persist_next_listen_batch_retryer_multiplier",
1297    2,
1298    "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1299);
1300
1301pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1302    "persist_next_listen_batch_retryer_clamp",
1303    Duration::from_secs(16), // pubsub is on by default!
1304    "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1305);
1306
1307fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1308    RetryParameters {
1309        fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1310        initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1311        multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1312        clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1313    }
1314}
1315
1316pub const INFO_MIN_ATTEMPTS: usize = 3;
1317
1318pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1319where
1320    F: std::future::Future<Output = Result<R, ExternalError>>,
1321    WorkFn: FnMut() -> F,
1322{
1323    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1324    loop {
1325        match work_fn().await {
1326            Ok(x) => {
1327                if retry.attempt() > 0 {
1328                    debug!(
1329                        "external operation {} succeeded after failing at least once",
1330                        metrics.name,
1331                    );
1332                }
1333                return x;
1334            }
1335            Err(err) => {
1336                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1337                    info!(
1338                        "external operation {} failed, retrying in {:?}: {}",
1339                        metrics.name,
1340                        retry.next_sleep(),
1341                        err.display_with_causes()
1342                    );
1343                } else {
1344                    debug!(
1345                        "external operation {} failed, retrying in {:?}: {}",
1346                        metrics.name,
1347                        retry.next_sleep(),
1348                        err.display_with_causes()
1349                    );
1350                }
1351                retry = retry.sleep().await;
1352            }
1353        }
1354    }
1355}
1356
1357pub async fn retry_determinate<R, F, WorkFn>(
1358    metrics: &RetryMetrics,
1359    mut work_fn: WorkFn,
1360) -> Result<R, Indeterminate>
1361where
1362    F: std::future::Future<Output = Result<R, ExternalError>>,
1363    WorkFn: FnMut() -> F,
1364{
1365    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1366    loop {
1367        match work_fn().await {
1368            Ok(x) => {
1369                if retry.attempt() > 0 {
1370                    debug!(
1371                        "external operation {} succeeded after failing at least once",
1372                        metrics.name,
1373                    );
1374                }
1375                return Ok(x);
1376            }
1377            Err(ExternalError::Determinate(err)) => {
1378                // The determinate "could not serialize access" errors
1379                // happen often enough in dev (which uses Postgres) that
1380                // it's impeding people's work. At the same time, it's been
1381                // a source of confusion for eng. The situation is much
1382                // better on CRDB and we have metrics coverage in prod, so
1383                // this is redundant enough that it's more hurtful than
1384                // helpful. As a result, this intentionally ignores
1385                // INFO_MIN_ATTEMPTS and always logs at debug.
1386                debug!(
1387                    "external operation {} failed, retrying in {:?}: {}",
1388                    metrics.name,
1389                    retry.next_sleep(),
1390                    err.display_with_causes()
1391                );
1392                retry = retry.sleep().await;
1393                continue;
1394            }
1395            Err(ExternalError::Indeterminate(x)) => return Err(x),
1396        }
1397    }
1398}
1399
1400#[cfg(test)]
1401pub mod datadriven {
1402    use std::collections::{BTreeMap, BTreeSet};
1403    use std::pin::pin;
1404    use std::sync::{Arc, LazyLock};
1405
1406    use anyhow::anyhow;
1407    use differential_dataflow::consolidation::consolidate_updates;
1408    use differential_dataflow::trace::Description;
1409    use futures::StreamExt;
1410    use mz_dyncfg::{ConfigUpdates, ConfigVal};
1411    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1412    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1413
1414    use crate::batch::{
1415        BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1416        BatchParts, validate_truncate_batch,
1417    };
1418    use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1419    use crate::fetch::{EncodedPart, FetchConfig};
1420    use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1421    use crate::internal::datadriven::DirectiveArgs;
1422    use crate::internal::encoding::Schemas;
1423    use crate::internal::gc::GcReq;
1424    use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1425    use crate::internal::state::{BatchPart, RunOrder, RunPart};
1426    use crate::internal::state_versions::EncodedRollup;
1427    use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1428    use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1429    use crate::rpc::NoopPubSubSender;
1430    use crate::tests::new_test_client;
1431    use crate::write::COMBINE_INLINE_WRITES;
1432    use crate::{GarbageCollector, PersistClient};
1433
1434    use super::*;
1435
1436    static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1437        id: Some(SchemaId(0)),
1438        key: Arc::new(StringSchema),
1439        val: Arc::new(UnitSchema),
1440    });
1441
1442    /// Shared state for a single [crate::internal::machine] [datadriven::TestFile].
1443    #[derive(Debug)]
1444    pub struct MachineState {
1445        pub client: PersistClient,
1446        pub shard_id: ShardId,
1447        pub state_versions: Arc<StateVersions>,
1448        pub machine: Machine<String, (), u64, i64>,
1449        pub gc: GarbageCollector<String, (), u64, i64>,
1450        pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1451        pub next_id: usize,
1452        pub rollups: BTreeMap<String, EncodedRollup>,
1453        pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1454        pub routine: Vec<RoutineMaintenance>,
1455        pub compactions: BTreeMap<String, CompactReq<u64>>,
1456    }
1457
1458    impl MachineState {
1459        pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1460            let shard_id = ShardId::new();
1461            let client = new_test_client(dyncfgs).await;
1462            // Reset blob_target_size. Individual batch writes and compactions
1463            // can override it with an arg.
1464            client
1465                .cfg
1466                .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1467            // Our structured compaction code uses slightly different estimates
1468            // for array size than the old path, which can affect the results of
1469            // some compaction tests.
1470            client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1471            let state_versions = Arc::new(StateVersions::new(
1472                client.cfg.clone(),
1473                Arc::clone(&client.consensus),
1474                Arc::clone(&client.blob),
1475                Arc::clone(&client.metrics),
1476            ));
1477            let machine = Machine::new(
1478                client.cfg.clone(),
1479                shard_id,
1480                Arc::clone(&client.metrics),
1481                Arc::clone(&state_versions),
1482                Arc::clone(&client.shared_states),
1483                Arc::new(NoopPubSubSender),
1484                Arc::clone(&client.isolated_runtime),
1485                Diagnostics::for_tests(),
1486            )
1487            .await
1488            .expect("codecs should match");
1489            let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1490            MachineState {
1491                shard_id,
1492                client,
1493                state_versions,
1494                machine,
1495                gc,
1496                batches: BTreeMap::default(),
1497                rollups: BTreeMap::default(),
1498                listens: BTreeMap::default(),
1499                routine: Vec::new(),
1500                compactions: BTreeMap::default(),
1501                next_id: 0,
1502            }
1503        }
1504
1505        fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1506            Batch::new(
1507                true,
1508                Arc::clone(&self.client.metrics),
1509                Arc::clone(&self.client.blob),
1510                self.client.metrics.shards.shard(&self.shard_id, "test"),
1511                self.client.cfg.build_version.clone(),
1512                hollow,
1513            )
1514        }
1515    }
1516
1517    /// Scans consensus and returns all states with their SeqNos
1518    /// and which batches they reference
1519    pub async fn consensus_scan(
1520        datadriven: &MachineState,
1521        args: DirectiveArgs<'_>,
1522    ) -> Result<String, anyhow::Error> {
1523        let from = args.expect("from_seqno");
1524
1525        let mut states = datadriven
1526            .state_versions
1527            .fetch_all_live_states::<u64>(datadriven.shard_id)
1528            .await
1529            .expect("should only be called on an initialized shard")
1530            .check_ts_codec()
1531            .expect("shard codecs should not change");
1532        let mut s = String::new();
1533        while let Some(x) = states.next(|_| {}) {
1534            if x.seqno < from {
1535                continue;
1536            }
1537            let rollups: Vec<_> = x
1538                .collections
1539                .rollups
1540                .keys()
1541                .map(|seqno| seqno.to_string())
1542                .collect();
1543            let batches: Vec<_> = x
1544                .collections
1545                .trace
1546                .batches()
1547                .filter(|b| !b.is_empty())
1548                .filter_map(|b| {
1549                    datadriven
1550                        .batches
1551                        .iter()
1552                        .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1553                        .map(|(batch_name, _)| batch_name.to_owned())
1554                })
1555                .collect();
1556            write!(
1557                s,
1558                "seqno={} batches={} rollups={}\n",
1559                x.seqno,
1560                batches.join(","),
1561                rollups.join(","),
1562            );
1563        }
1564        Ok(s)
1565    }
1566
1567    pub async fn consensus_truncate(
1568        datadriven: &MachineState,
1569        args: DirectiveArgs<'_>,
1570    ) -> Result<String, anyhow::Error> {
1571        let to = args.expect("to_seqno");
1572        let removed = datadriven
1573            .client
1574            .consensus
1575            .truncate(&datadriven.shard_id.to_string(), to)
1576            .await
1577            .expect("valid truncation");
1578        Ok(format!("{}\n", removed))
1579    }
1580
1581    pub async fn blob_scan_batches(
1582        datadriven: &MachineState,
1583        _args: DirectiveArgs<'_>,
1584    ) -> Result<String, anyhow::Error> {
1585        let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1586
1587        let mut s = String::new();
1588        let () = datadriven
1589            .state_versions
1590            .blob
1591            .list_keys_and_metadata(&key_prefix, &mut |x| {
1592                let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1593                if let PartialBlobKey::Batch(_, _) = key {
1594                    write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1595                }
1596            })
1597            .await?;
1598        Ok(s)
1599    }
1600
1601    #[allow(clippy::unused_async)]
1602    pub async fn shard_desc(
1603        datadriven: &MachineState,
1604        _args: DirectiveArgs<'_>,
1605    ) -> Result<String, anyhow::Error> {
1606        Ok(format!(
1607            "since={:?} upper={:?}\n",
1608            datadriven.machine.applier.since().elements(),
1609            datadriven.machine.applier.clone_upper().elements()
1610        ))
1611    }
1612
1613    pub async fn downgrade_since(
1614        datadriven: &mut MachineState,
1615        args: DirectiveArgs<'_>,
1616    ) -> Result<String, anyhow::Error> {
1617        let since = args.expect_antichain("since");
1618        let seqno = args.optional("seqno");
1619        let reader_id = args.expect("reader_id");
1620        let (_, since, routine) = datadriven
1621            .machine
1622            .downgrade_since(
1623                &reader_id,
1624                seqno,
1625                &since,
1626                (datadriven.machine.applier.cfg.now)(),
1627            )
1628            .await;
1629        datadriven.routine.push(routine);
1630        Ok(format!(
1631            "{} {:?}\n",
1632            datadriven.machine.seqno(),
1633            since.0.elements()
1634        ))
1635    }
1636
1637    #[allow(clippy::unused_async)]
1638    pub async fn dyncfg(
1639        datadriven: &MachineState,
1640        args: DirectiveArgs<'_>,
1641    ) -> Result<String, anyhow::Error> {
1642        let mut updates = ConfigUpdates::default();
1643        for x in args.input.trim().split('\n') {
1644            match x.split(' ').collect::<Vec<_>>().as_slice() {
1645                &[name, val] => {
1646                    let config = datadriven
1647                        .client
1648                        .cfg
1649                        .entries()
1650                        .find(|x| x.name() == name)
1651                        .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1652                    match config.val() {
1653                        ConfigVal::Usize(_) => {
1654                            let val = val.parse().map_err(anyhow::Error::new)?;
1655                            updates.add_dynamic(name, ConfigVal::Usize(val));
1656                        }
1657                        ConfigVal::Bool(_) => {
1658                            let val = val.parse().map_err(anyhow::Error::new)?;
1659                            updates.add_dynamic(name, ConfigVal::Bool(val));
1660                        }
1661                        x => unimplemented!("dyncfg type: {:?}", x),
1662                    }
1663                }
1664                x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1665            }
1666        }
1667        updates.apply(&datadriven.client.cfg);
1668
1669        Ok("ok\n".to_string())
1670    }
1671
1672    pub async fn compare_and_downgrade_since(
1673        datadriven: &mut MachineState,
1674        args: DirectiveArgs<'_>,
1675    ) -> Result<String, anyhow::Error> {
1676        let expected_opaque: u64 = args.expect("expect_opaque");
1677        let new_opaque: u64 = args.expect("opaque");
1678        let new_since = args.expect_antichain("since");
1679        let reader_id = args.expect("reader_id");
1680        let (res, routine) = datadriven
1681            .machine
1682            .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1683            .await;
1684        datadriven.routine.push(routine);
1685        let since = res.map_err(|(opaque, since)| {
1686            anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1687        })?;
1688        Ok(format!(
1689            "{} {} {:?}\n",
1690            datadriven.machine.seqno(),
1691            new_opaque,
1692            since.0.elements()
1693        ))
1694    }
1695
1696    pub async fn write_rollup(
1697        datadriven: &mut MachineState,
1698        args: DirectiveArgs<'_>,
1699    ) -> Result<String, anyhow::Error> {
1700        let output = args.expect_str("output");
1701
1702        let rollup = datadriven
1703            .machine
1704            .applier
1705            .write_rollup_for_state()
1706            .await
1707            .expect("rollup");
1708
1709        datadriven
1710            .rollups
1711            .insert(output.to_string(), rollup.clone());
1712
1713        Ok(format!(
1714            "state={} diffs=[{}, {})\n",
1715            rollup.seqno,
1716            rollup._desc.lower().first().expect("seqno"),
1717            rollup._desc.upper().first().expect("seqno"),
1718        ))
1719    }
1720
1721    pub async fn add_rollup(
1722        datadriven: &mut MachineState,
1723        args: DirectiveArgs<'_>,
1724    ) -> Result<String, anyhow::Error> {
1725        let input = args.expect_str("input");
1726        let rollup = datadriven
1727            .rollups
1728            .get(input)
1729            .expect("unknown batch")
1730            .clone();
1731
1732        let (applied, maintenance) = datadriven
1733            .machine
1734            .add_rollup((rollup.seqno, &rollup.to_hollow()))
1735            .await;
1736
1737        if !applied {
1738            return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1739        }
1740
1741        datadriven.routine.push(maintenance);
1742        Ok(format!("{}\n", datadriven.machine.seqno()))
1743    }
1744
1745    pub async fn write_batch(
1746        datadriven: &mut MachineState,
1747        args: DirectiveArgs<'_>,
1748    ) -> Result<String, anyhow::Error> {
1749        let output = args.expect_str("output");
1750        let lower = args.expect_antichain("lower");
1751        let upper = args.expect_antichain("upper");
1752        assert!(PartialOrder::less_than(&lower, &upper));
1753        let since = args
1754            .optional_antichain("since")
1755            .unwrap_or_else(|| Antichain::from_elem(0));
1756        let target_size = args.optional("target_size");
1757        let parts_size_override = args.optional("parts_size_override");
1758        let consolidate = args.optional("consolidate").unwrap_or(true);
1759        let mut updates: Vec<_> = args
1760            .input
1761            .split('\n')
1762            .flat_map(DirectiveArgs::parse_update)
1763            .collect();
1764
1765        let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1766        if let Some(target_size) = target_size {
1767            cfg.blob_target_size = target_size;
1768        };
1769        if consolidate {
1770            consolidate_updates(&mut updates);
1771        }
1772        let run_order = if consolidate {
1773            cfg.preferred_order
1774        } else {
1775            RunOrder::Unordered
1776        };
1777        let parts = BatchParts::new_ordered::<i64>(
1778            cfg.clone(),
1779            run_order,
1780            Arc::clone(&datadriven.client.metrics),
1781            Arc::clone(&datadriven.machine.applier.shard_metrics),
1782            datadriven.shard_id,
1783            Arc::clone(&datadriven.client.blob),
1784            Arc::clone(&datadriven.client.isolated_runtime),
1785            &datadriven.client.metrics.user,
1786        );
1787        let builder = BatchBuilderInternal::new(
1788            cfg.clone(),
1789            parts,
1790            Arc::clone(&datadriven.client.metrics),
1791            SCHEMAS.clone(),
1792            Arc::clone(&datadriven.client.blob),
1793            datadriven.shard_id.clone(),
1794            datadriven.client.cfg.build_version.clone(),
1795        );
1796        let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1797        for ((k, ()), t, d) in updates {
1798            builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1799        }
1800        let mut batch = builder.finish(upper).await?;
1801        // We can only reasonably use parts_size_override with hollow batches,
1802        // so if it's set, flush any inline batches out.
1803        if parts_size_override.is_some() {
1804            batch
1805                .flush_to_blob(
1806                    &cfg,
1807                    &datadriven.client.metrics.user,
1808                    &datadriven.client.isolated_runtime,
1809                    &SCHEMAS,
1810                )
1811                .await;
1812        }
1813        let batch = batch.into_hollow_batch();
1814        let batch = IdHollowBatch {
1815            batch: Arc::new(batch),
1816            id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1817        };
1818        datadriven.next_id += 1;
1819
1820        if let Some(size) = parts_size_override {
1821            let mut batch = batch.clone();
1822            let mut hollow_batch = (*batch.batch).clone();
1823            for part in hollow_batch.parts.iter_mut() {
1824                match part {
1825                    RunPart::Many(run) => run.max_part_bytes = size,
1826                    RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1827                    RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1828                }
1829            }
1830            batch.batch = Arc::new(hollow_batch);
1831            datadriven.batches.insert(output.to_owned(), batch);
1832        } else {
1833            datadriven.batches.insert(output.to_owned(), batch.clone());
1834        }
1835        Ok(format!(
1836            "parts={} len={}\n",
1837            batch.batch.part_count(),
1838            batch.batch.len
1839        ))
1840    }
1841
1842    pub async fn fetch_batch(
1843        datadriven: &MachineState,
1844        args: DirectiveArgs<'_>,
1845    ) -> Result<String, anyhow::Error> {
1846        let input = args.expect_str("input");
1847        let stats = args.optional_str("stats");
1848        let batch = datadriven.batches.get(input).expect("unknown batch");
1849
1850        let mut s = String::new();
1851        let mut stream = pin!(
1852            batch
1853                .batch
1854                .part_stream(
1855                    datadriven.shard_id,
1856                    &*datadriven.state_versions.blob,
1857                    &*datadriven.state_versions.metrics
1858                )
1859                .enumerate()
1860        );
1861        while let Some((idx, part)) = stream.next().await {
1862            let part = &*part?;
1863            write!(s, "<part {idx}>\n");
1864
1865            let lower = match part {
1866                BatchPart::Inline { updates, .. } => {
1867                    let updates: BlobTraceBatchPart<u64> =
1868                        updates.decode(&datadriven.client.metrics.columnar)?;
1869                    updates.structured_key_lower()
1870                }
1871                other => other.structured_key_lower(),
1872            };
1873
1874            if let Some(lower) = lower {
1875                if stats == Some("lower") {
1876                    writeln!(s, "<key lower={}>", lower.get())
1877                }
1878            }
1879
1880            match part {
1881                BatchPart::Hollow(part) => {
1882                    let blob_batch = datadriven
1883                        .client
1884                        .blob
1885                        .get(&part.key.complete(&datadriven.shard_id))
1886                        .await;
1887                    match blob_batch {
1888                        Ok(Some(_)) | Err(_) => {}
1889                        // don't try to fetch/print the keys of the batch part
1890                        // if the blob store no longer has it
1891                        Ok(None) => {
1892                            s.push_str("<empty>\n");
1893                            continue;
1894                        }
1895                    };
1896                }
1897                BatchPart::Inline { .. } => {}
1898            };
1899            let part = EncodedPart::fetch(
1900                &FetchConfig::from_persist_config(&datadriven.client.cfg),
1901                &datadriven.shard_id,
1902                datadriven.client.blob.as_ref(),
1903                datadriven.client.metrics.as_ref(),
1904                datadriven.machine.applier.shard_metrics.as_ref(),
1905                &datadriven.client.metrics.read.batch_fetcher,
1906                &batch.batch.desc,
1907                part,
1908            )
1909            .await
1910            .expect("invalid batch part");
1911            let part = part
1912                .normalize(&datadriven.client.metrics.columnar)
1913                .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1914
1915            for ((k, _v), t, d) in part
1916                .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1917                .expect("valid schemas")
1918            {
1919                writeln!(s, "{k} {t} {d}");
1920            }
1921        }
1922        if !s.is_empty() {
1923            for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1924                write!(s, "<run {idx}>\n");
1925                for part in run {
1926                    let part_idx = batch
1927                        .batch
1928                        .parts
1929                        .iter()
1930                        .position(|p| p == part)
1931                        .expect("part should exist");
1932                    write!(s, "part {part_idx}\n");
1933                }
1934            }
1935        }
1936        Ok(s)
1937    }
1938
1939    #[allow(clippy::unused_async)]
1940    pub async fn truncate_batch_desc(
1941        datadriven: &mut MachineState,
1942        args: DirectiveArgs<'_>,
1943    ) -> Result<String, anyhow::Error> {
1944        let input = args.expect_str("input");
1945        let output = args.expect_str("output");
1946        let lower = args.expect_antichain("lower");
1947        let upper = args.expect_antichain("upper");
1948
1949        let batch = datadriven
1950            .batches
1951            .get(input)
1952            .expect("unknown batch")
1953            .clone();
1954        let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1955        let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1956        let mut new_hollow_batch = (*batch.batch).clone();
1957        new_hollow_batch.desc = truncated_desc;
1958        let new_batch = IdHollowBatch {
1959            batch: Arc::new(new_hollow_batch),
1960            id: batch.id,
1961        };
1962        datadriven
1963            .batches
1964            .insert(output.to_owned(), new_batch.clone());
1965        Ok(format!(
1966            "parts={} len={}\n",
1967            batch.batch.part_count(),
1968            batch.batch.len
1969        ))
1970    }
1971
1972    #[allow(clippy::unused_async)]
1973    pub async fn set_batch_parts_size(
1974        datadriven: &mut MachineState,
1975        args: DirectiveArgs<'_>,
1976    ) -> Result<String, anyhow::Error> {
1977        let input = args.expect_str("input");
1978        let size = args.expect("size");
1979        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1980        let mut hollow_batch = (*batch.batch).clone();
1981        for part in hollow_batch.parts.iter_mut() {
1982            match part {
1983                RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1984                _ => {
1985                    panic!("set_batch_parts_size only supports hollow parts")
1986                }
1987            }
1988        }
1989        batch.batch = Arc::new(hollow_batch);
1990        Ok("ok\n".to_string())
1991    }
1992
1993    pub async fn compact(
1994        datadriven: &mut MachineState,
1995        args: DirectiveArgs<'_>,
1996    ) -> Result<String, anyhow::Error> {
1997        let output = args.expect_str("output");
1998        let lower = args.expect_antichain("lower");
1999        let upper = args.expect_antichain("upper");
2000        let since = args.expect_antichain("since");
2001        let target_size = args.optional("target_size");
2002        let memory_bound = args.optional("memory_bound");
2003
2004        let mut inputs = Vec::new();
2005        for input in args.args.get("inputs").expect("missing inputs") {
2006            inputs.push(
2007                datadriven
2008                    .batches
2009                    .get(input)
2010                    .expect("unknown batch")
2011                    .clone(),
2012            );
2013        }
2014
2015        let cfg = datadriven.client.cfg.clone();
2016        if let Some(target_size) = target_size {
2017            cfg.set_config(&BLOB_TARGET_SIZE, target_size);
2018        };
2019        if let Some(memory_bound) = memory_bound {
2020            cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
2021        }
2022        let req = CompactReq {
2023            shard_id: datadriven.shard_id,
2024            desc: Description::new(lower, upper, since),
2025            inputs: inputs.clone(),
2026        };
2027        datadriven
2028            .compactions
2029            .insert(output.to_owned(), req.clone());
2030        let spine_lower = inputs
2031            .first()
2032            .map_or_else(|| datadriven.next_id, |x| x.id.0);
2033        let spine_upper = inputs.last().map_or_else(
2034            || {
2035                datadriven.next_id += 1;
2036                datadriven.next_id
2037            },
2038            |x| x.id.1,
2039        );
2040        let new_spine_id = SpineId(spine_lower, spine_upper);
2041        let res = Compactor::<String, (), u64, i64>::compact(
2042            CompactConfig::new(&cfg, datadriven.shard_id),
2043            Arc::clone(&datadriven.client.blob),
2044            Arc::clone(&datadriven.client.metrics),
2045            Arc::clone(&datadriven.machine.applier.shard_metrics),
2046            Arc::clone(&datadriven.client.isolated_runtime),
2047            req,
2048            SCHEMAS.clone(),
2049        )
2050        .await?;
2051
2052        let batch = IdHollowBatch {
2053            batch: Arc::new(res.output.clone()),
2054            id: new_spine_id,
2055        };
2056
2057        datadriven.batches.insert(output.to_owned(), batch.clone());
2058        Ok(format!(
2059            "parts={} len={}\n",
2060            res.output.part_count(),
2061            res.output.len
2062        ))
2063    }
2064
2065    pub async fn clear_blob(
2066        datadriven: &MachineState,
2067        _args: DirectiveArgs<'_>,
2068    ) -> Result<String, anyhow::Error> {
2069        let mut to_delete = vec![];
2070        datadriven
2071            .client
2072            .blob
2073            .list_keys_and_metadata("", &mut |meta| {
2074                to_delete.push(meta.key.to_owned());
2075            })
2076            .await?;
2077        for blob in &to_delete {
2078            datadriven.client.blob.delete(blob).await?;
2079        }
2080        Ok(format!("deleted={}\n", to_delete.len()))
2081    }
2082
2083    pub async fn restore_blob(
2084        datadriven: &MachineState,
2085        _args: DirectiveArgs<'_>,
2086    ) -> Result<String, anyhow::Error> {
2087        let not_restored = crate::internal::restore::restore_blob(
2088            &datadriven.state_versions,
2089            datadriven.client.blob.as_ref(),
2090            &datadriven.client.cfg.build_version,
2091            datadriven.shard_id,
2092            &*datadriven.state_versions.metrics,
2093        )
2094        .await?;
2095        let mut out = String::new();
2096        for key in not_restored {
2097            writeln!(&mut out, "{key}");
2098        }
2099        Ok(out)
2100    }
2101
2102    #[allow(clippy::unused_async)]
2103    pub async fn rewrite_ts(
2104        datadriven: &mut MachineState,
2105        args: DirectiveArgs<'_>,
2106    ) -> Result<String, anyhow::Error> {
2107        let input = args.expect_str("input");
2108        let ts_rewrite = args.expect_antichain("frontier");
2109        let upper = args.expect_antichain("upper");
2110
2111        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2112        let mut hollow_batch = (*batch.batch).clone();
2113        let () = hollow_batch
2114            .rewrite_ts(&ts_rewrite, upper)
2115            .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2116        batch.batch = Arc::new(hollow_batch);
2117        Ok("ok\n".into())
2118    }
2119
2120    pub async fn gc(
2121        datadriven: &mut MachineState,
2122        args: DirectiveArgs<'_>,
2123    ) -> Result<String, anyhow::Error> {
2124        let new_seqno_since = args.expect("to_seqno");
2125
2126        let req = GcReq {
2127            shard_id: datadriven.shard_id,
2128            new_seqno_since,
2129        };
2130        let (maintenance, stats) =
2131            GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2132        datadriven.routine.push(maintenance);
2133
2134        Ok(format!(
2135            "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2136            datadriven.machine.seqno(),
2137            stats.batch_parts_deleted_from_blob,
2138            stats.rollups_deleted_from_blob,
2139            stats
2140                .truncated_consensus_to
2141                .iter()
2142                .map(|x| x.to_string())
2143                .collect::<Vec<_>>()
2144                .join(","),
2145            stats
2146                .rollups_removed_from_state
2147                .iter()
2148                .map(|x| x.to_string())
2149                .collect::<Vec<_>>()
2150                .join(","),
2151        ))
2152    }
2153
2154    pub async fn snapshot(
2155        datadriven: &MachineState,
2156        args: DirectiveArgs<'_>,
2157    ) -> Result<String, anyhow::Error> {
2158        let as_of = args.expect_antichain("as_of");
2159        let snapshot = datadriven
2160            .machine
2161            .snapshot(&as_of)
2162            .await
2163            .map_err(|err| anyhow!("{:?}", err))?;
2164
2165        let mut result = String::new();
2166
2167        for batch in snapshot {
2168            writeln!(
2169                result,
2170                "<batch {:?}-{:?}>",
2171                batch.desc.lower().elements(),
2172                batch.desc.upper().elements()
2173            );
2174            for (run, (_meta, parts)) in batch.runs().enumerate() {
2175                writeln!(result, "<run {run}>");
2176                let mut stream = pin!(
2177                    futures::stream::iter(parts)
2178                        .flat_map(|part| part.part_stream(
2179                            datadriven.shard_id,
2180                            &*datadriven.state_versions.blob,
2181                            &*datadriven.state_versions.metrics
2182                        ))
2183                        .enumerate()
2184                );
2185
2186                while let Some((idx, part)) = stream.next().await {
2187                    let part = &*part?;
2188                    writeln!(result, "<part {idx}>");
2189
2190                    let part = EncodedPart::fetch(
2191                        &FetchConfig::from_persist_config(&datadriven.client.cfg),
2192                        &datadriven.shard_id,
2193                        datadriven.client.blob.as_ref(),
2194                        datadriven.client.metrics.as_ref(),
2195                        datadriven.machine.applier.shard_metrics.as_ref(),
2196                        &datadriven.client.metrics.read.batch_fetcher,
2197                        &batch.desc,
2198                        part,
2199                    )
2200                    .await
2201                    .expect("invalid batch part");
2202                    let part = part
2203                        .normalize(&datadriven.client.metrics.columnar)
2204                        .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2205
2206                    let mut updates = Vec::new();
2207
2208                    for ((k, _v), mut t, d) in part
2209                        .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2210                        .expect("valid schemas")
2211                    {
2212                        t.advance_by(as_of.borrow());
2213                        updates.push((k, t, d));
2214                    }
2215
2216                    consolidate_updates(&mut updates);
2217
2218                    for (k, t, d) in updates {
2219                        writeln!(result, "{k} {t} {d}");
2220                    }
2221                }
2222            }
2223        }
2224
2225        Ok(result)
2226    }
2227
2228    pub async fn register_listen(
2229        datadriven: &mut MachineState,
2230        args: DirectiveArgs<'_>,
2231    ) -> Result<String, anyhow::Error> {
2232        let output = args.expect_str("output");
2233        let as_of = args.expect_antichain("as_of");
2234        let read = datadriven
2235            .client
2236            .open_leased_reader::<String, (), u64, i64>(
2237                datadriven.shard_id,
2238                Arc::new(StringSchema),
2239                Arc::new(UnitSchema),
2240                Diagnostics::for_tests(),
2241                true,
2242            )
2243            .await
2244            .expect("invalid shard types");
2245        let listen = read
2246            .listen(as_of)
2247            .await
2248            .map_err(|err| anyhow!("{:?}", err))?;
2249        datadriven.listens.insert(output.to_owned(), listen);
2250        Ok("ok\n".into())
2251    }
2252
2253    pub async fn listen_through(
2254        datadriven: &mut MachineState,
2255        args: DirectiveArgs<'_>,
2256    ) -> Result<String, anyhow::Error> {
2257        let input = args.expect_str("input");
2258        // It's not possible to listen _through_ the empty antichain, so this is
2259        // intentionally `expect` instead of `expect_antichain`.
2260        let frontier = args.expect("frontier");
2261        let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2262        let mut s = String::new();
2263        loop {
2264            for event in listen.fetch_next().await {
2265                match event {
2266                    ListenEvent::Updates(x) => {
2267                        for ((k, _v), t, d) in x.iter() {
2268                            write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2269                        }
2270                    }
2271                    ListenEvent::Progress(x) => {
2272                        if !x.less_than(&frontier) {
2273                            return Ok(s);
2274                        }
2275                    }
2276                }
2277            }
2278        }
2279    }
2280
2281    pub async fn register_critical_reader(
2282        datadriven: &mut MachineState,
2283        args: DirectiveArgs<'_>,
2284    ) -> Result<String, anyhow::Error> {
2285        let reader_id = args.expect("reader_id");
2286        let (state, maintenance) = datadriven
2287            .machine
2288            .register_critical_reader::<u64>(&reader_id, "tests")
2289            .await;
2290        datadriven.routine.push(maintenance);
2291        Ok(format!(
2292            "{} {:?}\n",
2293            datadriven.machine.seqno(),
2294            state.since.elements(),
2295        ))
2296    }
2297
2298    pub async fn register_leased_reader(
2299        datadriven: &mut MachineState,
2300        args: DirectiveArgs<'_>,
2301    ) -> Result<String, anyhow::Error> {
2302        let reader_id = args.expect("reader_id");
2303        let (reader_state, maintenance) = datadriven
2304            .machine
2305            .register_leased_reader(
2306                &reader_id,
2307                "tests",
2308                READER_LEASE_DURATION.get(&datadriven.client.cfg),
2309                (datadriven.client.cfg.now)(),
2310                false,
2311            )
2312            .await;
2313        datadriven.routine.push(maintenance);
2314        Ok(format!(
2315            "{} {:?}\n",
2316            datadriven.machine.seqno(),
2317            reader_state.since.elements(),
2318        ))
2319    }
2320
2321    pub async fn heartbeat_leased_reader(
2322        datadriven: &MachineState,
2323        args: DirectiveArgs<'_>,
2324    ) -> Result<String, anyhow::Error> {
2325        let reader_id = args.expect("reader_id");
2326        let _ = datadriven
2327            .machine
2328            .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2329            .await;
2330        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2331    }
2332
2333    pub async fn expire_critical_reader(
2334        datadriven: &mut MachineState,
2335        args: DirectiveArgs<'_>,
2336    ) -> Result<String, anyhow::Error> {
2337        let reader_id = args.expect("reader_id");
2338        let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2339        datadriven.routine.push(maintenance);
2340        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2341    }
2342
2343    pub async fn expire_leased_reader(
2344        datadriven: &mut MachineState,
2345        args: DirectiveArgs<'_>,
2346    ) -> Result<String, anyhow::Error> {
2347        let reader_id = args.expect("reader_id");
2348        let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2349        datadriven.routine.push(maintenance);
2350        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2351    }
2352
2353    pub async fn compare_and_append_batches(
2354        datadriven: &MachineState,
2355        args: DirectiveArgs<'_>,
2356    ) -> Result<String, anyhow::Error> {
2357        let expected_upper = args.expect_antichain("expected_upper");
2358        let new_upper = args.expect_antichain("new_upper");
2359
2360        let mut batches: Vec<Batch<String, (), u64, i64>> = args
2361            .args
2362            .get("batches")
2363            .expect("missing batches")
2364            .into_iter()
2365            .map(|batch| {
2366                let hollow = (*datadriven
2367                    .batches
2368                    .get(batch)
2369                    .expect("unknown batch")
2370                    .clone()
2371                    .batch)
2372                    .clone();
2373                datadriven.to_batch(hollow)
2374            })
2375            .collect();
2376
2377        let mut writer = datadriven
2378            .client
2379            .open_writer(
2380                datadriven.shard_id,
2381                Arc::new(StringSchema),
2382                Arc::new(UnitSchema),
2383                Diagnostics::for_tests(),
2384            )
2385            .await?;
2386
2387        let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2388
2389        let () = writer
2390            .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2391            .await?
2392            .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2393
2394        writer.expire().await;
2395
2396        Ok("ok\n".into())
2397    }
2398
2399    pub async fn expire_writer(
2400        datadriven: &mut MachineState,
2401        args: DirectiveArgs<'_>,
2402    ) -> Result<String, anyhow::Error> {
2403        let writer_id = args.expect("writer_id");
2404        let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2405        datadriven.routine.push(maintenance);
2406        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2407    }
2408
2409    pub(crate) async fn finalize(
2410        datadriven: &mut MachineState,
2411        _args: DirectiveArgs<'_>,
2412    ) -> anyhow::Result<String> {
2413        let maintenance = datadriven.machine.become_tombstone().await?;
2414        datadriven.routine.push(maintenance);
2415        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2416    }
2417
2418    pub(crate) fn is_finalized(
2419        datadriven: &MachineState,
2420        _args: DirectiveArgs<'_>,
2421    ) -> anyhow::Result<String> {
2422        let seqno = datadriven.machine.seqno();
2423        let tombstone = datadriven.machine.is_finalized();
2424        Ok(format!("{seqno} {tombstone}\n"))
2425    }
2426
2427    pub async fn compare_and_append(
2428        datadriven: &mut MachineState,
2429        args: DirectiveArgs<'_>,
2430    ) -> Result<String, anyhow::Error> {
2431        let input = args.expect_str("input");
2432        let writer_id = args.expect("writer_id");
2433        let mut batch = datadriven
2434            .batches
2435            .get(input)
2436            .expect("unknown batch")
2437            .clone();
2438        let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2439        let now = (datadriven.client.cfg.now)();
2440
2441        let (id, maintenance) = datadriven
2442            .machine
2443            .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2444            .await;
2445        assert_eq!(id, SCHEMAS.id);
2446        datadriven.routine.push(maintenance);
2447        let maintenance = loop {
2448            let indeterminate = args
2449                .optional::<String>("prev_indeterminate")
2450                .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2451            let res = datadriven
2452                .machine
2453                .compare_and_append_idempotent(
2454                    &batch.batch,
2455                    &writer_id,
2456                    now,
2457                    &token,
2458                    &HandleDebugState::default(),
2459                    indeterminate,
2460                )
2461                .await;
2462            match res {
2463                CompareAndAppendRes::Success(_, x) => break x,
2464                CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2465                    return Err(anyhow!("{:?}", Upper(upper)));
2466                }
2467                CompareAndAppendRes::InlineBackpressure => {
2468                    let hollow_batch = (*batch.batch).clone();
2469                    let mut b = datadriven.to_batch(hollow_batch);
2470                    let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2471                    b.flush_to_blob(
2472                        &cfg,
2473                        &datadriven.client.metrics.user,
2474                        &datadriven.client.isolated_runtime,
2475                        &*SCHEMAS,
2476                    )
2477                    .await;
2478                    batch.batch = Arc::new(b.into_hollow_batch());
2479                    continue;
2480                }
2481                _ => panic!("{:?}", res),
2482            };
2483        };
2484        // TODO: Don't throw away writer maintenance. It's slightly tricky
2485        // because we need a WriterId for Compactor.
2486        datadriven.routine.push(maintenance.routine);
2487        Ok(format!(
2488            "{} {:?}\n",
2489            datadriven.machine.seqno(),
2490            datadriven.machine.applier.clone_upper().elements(),
2491        ))
2492    }
2493
2494    pub async fn apply_merge_res(
2495        datadriven: &mut MachineState,
2496        args: DirectiveArgs<'_>,
2497    ) -> Result<String, anyhow::Error> {
2498        let input = args.expect_str("input");
2499        let batch = datadriven
2500            .batches
2501            .get(input)
2502            .expect("unknown batch")
2503            .clone();
2504        let compact_req = datadriven
2505            .compactions
2506            .get(input)
2507            .expect("unknown compact req")
2508            .clone();
2509        let input_batches = compact_req
2510            .inputs
2511            .iter()
2512            .map(|x| x.id)
2513            .collect::<BTreeSet<_>>();
2514        let lower_spine_bound = input_batches
2515            .first()
2516            .map(|id| id.0)
2517            .expect("at least one batch must be present");
2518        let upper_spine_bound = input_batches
2519            .last()
2520            .map(|id| id.1)
2521            .expect("at least one batch must be present");
2522        let id = SpineId(lower_spine_bound, upper_spine_bound);
2523        let hollow_batch = (*batch.batch).clone();
2524
2525        let (merge_res, maintenance) = datadriven
2526            .machine
2527            .merge_res(&FueledMergeRes {
2528                output: hollow_batch,
2529                input: CompactionInput::IdRange(id),
2530                new_active_compaction: None,
2531            })
2532            .await;
2533        datadriven.routine.push(maintenance);
2534        Ok(format!(
2535            "{} {}\n",
2536            datadriven.machine.seqno(),
2537            merge_res.applied()
2538        ))
2539    }
2540
2541    pub async fn perform_maintenance(
2542        datadriven: &mut MachineState,
2543        _args: DirectiveArgs<'_>,
2544    ) -> Result<String, anyhow::Error> {
2545        let mut s = String::new();
2546        for maintenance in datadriven.routine.drain(..) {
2547            let () = maintenance
2548                .perform(&datadriven.machine, &datadriven.gc)
2549                .await;
2550            let () = datadriven
2551                .machine
2552                .applier
2553                .fetch_and_update_state(None)
2554                .await;
2555            write!(s, "{} ok\n", datadriven.machine.seqno());
2556        }
2557        Ok(s)
2558    }
2559}
2560
2561#[cfg(test)]
2562pub mod tests {
2563    use std::sync::Arc;
2564
2565    use mz_dyncfg::ConfigUpdates;
2566    use mz_ore::cast::CastFrom;
2567    use mz_ore::task::spawn;
2568    use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2569    use mz_persist::location::SeqNo;
2570    use timely::progress::Antichain;
2571
2572    use crate::ShardId;
2573    use crate::batch::BatchBuilderConfig;
2574    use crate::cache::StateCache;
2575    use crate::internal::gc::{GarbageCollector, GcReq};
2576    use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2577    use crate::tests::new_test_client;
2578
2579    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2580    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2581    async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2582        mz_ore::test::init_logging();
2583
2584        let client = new_test_client(&dyncfgs).await;
2585        // set a low rollup threshold so GC/truncation is more aggressive
2586        client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2587        let (mut write, _) = client
2588            .expect_open::<String, (), u64, i64>(ShardId::new())
2589            .await;
2590
2591        // Write a bunch of batches. This should result in a bounded number of
2592        // live entries in consensus.
2593        const NUM_BATCHES: u64 = 100;
2594        for idx in 0..NUM_BATCHES {
2595            let mut batch = write
2596                .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2597                .await;
2598            // Flush this batch out so the CaA doesn't get inline writes
2599            // backpressure.
2600            let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2601            batch
2602                .flush_to_blob(
2603                    &cfg,
2604                    &client.metrics.user,
2605                    &client.isolated_runtime,
2606                    &write.write_schemas,
2607                )
2608                .await;
2609            let (_, writer_maintenance) = write
2610                .machine
2611                .compare_and_append(
2612                    &batch.into_hollow_batch(),
2613                    &write.writer_id,
2614                    &HandleDebugState::default(),
2615                    (write.cfg.now)(),
2616                )
2617                .await
2618                .unwrap();
2619            writer_maintenance
2620                .perform(&write.machine, &write.gc, write.compact.as_ref())
2621                .await;
2622        }
2623        let live_diffs = write
2624            .machine
2625            .applier
2626            .state_versions
2627            .fetch_all_live_diffs(&write.machine.shard_id())
2628            .await;
2629        // Make sure we constructed the key correctly.
2630        assert!(live_diffs.0.len() > 0);
2631        // Make sure the number of entries is bounded. (I think we could work
2632        // out a tighter bound than this, but the point is only that it's
2633        // bounded).
2634        let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2635        assert!(
2636            live_diffs.0.len() <= max_live_diffs,
2637            "{} vs {}",
2638            live_diffs.0.len(),
2639            max_live_diffs
2640        );
2641    }
2642
2643    // A regression test for database-issues#4206, where a bug in gc led to an incremental
2644    // state invariant being violated which resulted in gc being permanently
2645    // wedged for the shard.
2646    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2647    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2648    async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2649        let mut client = new_test_client(&dyncfgs).await;
2650        let intercept = InterceptHandle::default();
2651        client.blob = Arc::new(InterceptBlob::new(
2652            Arc::clone(&client.blob),
2653            intercept.clone(),
2654        ));
2655        let (_, mut read) = client
2656            .expect_open::<String, String, u64, i64>(ShardId::new())
2657            .await;
2658
2659        // Create a new SeqNo
2660        read.downgrade_since(&Antichain::from_elem(1)).await;
2661        let new_seqno_since = read.machine.applier.seqno_since();
2662
2663        // Start a GC in the background for some SeqNo range that is not
2664        // contiguous compared to the last gc req (in this case, n/a) and then
2665        // crash when it gets to the blob deletes. In the regression case, this
2666        // renders the shard permanently un-gc-able.
2667        assert!(new_seqno_since > SeqNo::minimum());
2668        intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2669        let machine = read.machine.clone();
2670        // Run this in a spawn so we can catch the boom panic
2671        let gc = spawn(|| "", async move {
2672            let req = GcReq {
2673                shard_id: machine.shard_id(),
2674                new_seqno_since,
2675            };
2676            GarbageCollector::gc_and_truncate(&machine, req).await
2677        });
2678        // Wait for gc to either panic (regression case) or finish (good case)
2679        // because it happens to not call blob delete.
2680        let _ = gc.await;
2681
2682        // Allow blob deletes to go through and try GC again. In the regression
2683        // case, this hangs.
2684        intercept.set_post_delete(None);
2685        let req = GcReq {
2686            shard_id: read.machine.shard_id(),
2687            new_seqno_since,
2688        };
2689        let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2690    }
2691
2692    // A regression test for materialize#20776, where a bug meant that compare_and_append
2693    // would not fetch the latest state after an upper mismatch. This meant that
2694    // a write that could succeed if retried on the latest state would instead
2695    // return an UpperMismatch.
2696    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2697    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2698    async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2699        let client = new_test_client(&dyncfgs).await;
2700        let mut client2 = client.clone();
2701
2702        // The bug can only happen if the two WriteHandles have separate copies
2703        // of state, so make sure that each is given its own StateCache.
2704        let new_state_cache = Arc::new(StateCache::new_no_metrics());
2705        client2.shared_states = new_state_cache;
2706
2707        let shard_id = ShardId::new();
2708        let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2709        let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2710
2711        let data = [
2712            (("1".to_owned(), ()), 1, 1),
2713            (("2".to_owned(), ()), 2, 1),
2714            (("3".to_owned(), ()), 3, 1),
2715            (("4".to_owned(), ()), 4, 1),
2716            (("5".to_owned(), ()), 5, 1),
2717        ];
2718
2719        write1.expect_compare_and_append(&data[..1], 0, 2).await;
2720
2721        // this handle's upper now lags behind. if compare_and_append fails to update
2722        // state after an upper mismatch then this call would (incorrectly) fail
2723        write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2724    }
2725}