mz_persist_client/internal/
machine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the persist state machine.
11
12use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] // False positive.
25use mz_ore::fmt::FormatBuffer;
26use mz_ore::task::JoinHandle;
27use mz_ore::{assert_none, soft_assert_no_log};
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50    CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
51    IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
52    Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65    pub(crate) applier: Applier<K, V, T, D>,
66    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69// Impl Clone regardless of the type params.
70impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71    fn clone(&self) -> Self {
72        Self {
73            applier: self.applier.clone(),
74            isolated_runtime: Arc::clone(&self.isolated_runtime),
75        }
76    }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80    "persist_claim_unclaimed_compactions",
81    false,
82    "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83    in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87    "persist_claim_compaction_percent",
88    100,
89    "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90    (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91    three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95    "persist_claim_compaction_min_version",
96    String::new(),
97    "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102    K: Debug + Codec,
103    V: Debug + Codec,
104    T: Timestamp + Lattice + Codec64 + Sync,
105    D: Monoid + Codec64,
106{
107    pub async fn new(
108        cfg: PersistConfig,
109        shard_id: ShardId,
110        metrics: Arc<Metrics>,
111        state_versions: Arc<StateVersions>,
112        shared_states: Arc<StateCache>,
113        pubsub_sender: Arc<dyn PubSubSender>,
114        isolated_runtime: Arc<IsolatedRuntime>,
115        diagnostics: Diagnostics,
116    ) -> Result<Self, Box<CodecMismatch>> {
117        let applier = Applier::new(
118            cfg,
119            shard_id,
120            metrics,
121            state_versions,
122            shared_states,
123            pubsub_sender,
124            diagnostics,
125        )
126        .await?;
127        Ok(Machine {
128            applier,
129            isolated_runtime,
130        })
131    }
132
133    pub fn shard_id(&self) -> ShardId {
134        self.applier.shard_id
135    }
136
137    pub fn seqno(&self) -> SeqNo {
138        self.applier.seqno()
139    }
140
141    pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142        let rollup = self.applier.write_rollup_for_state().await;
143        let Some(rollup) = rollup else {
144            return RoutineMaintenance::default();
145        };
146
147        let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148        if !applied {
149            // Someone else already wrote a rollup at this seqno, so ours didn't
150            // get added. Delete it.
151            self.applier
152                .state_versions
153                .delete_rollup(&rollup.shard_id, &rollup.key)
154                .await;
155        }
156        maintenance
157    }
158
159    pub async fn add_rollup(
160        &self,
161        add_rollup: (SeqNo, &HollowRollup),
162    ) -> (bool, RoutineMaintenance) {
163        // See the big SUBTLE comment in [Self::merge_res] for what's going on
164        // here.
165        let mut applied_ever_true = false;
166        let metrics = Arc::clone(&self.applier.metrics);
167        let (_seqno, _applied, maintenance) = self
168            .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169                let ret = state.add_rollup(add_rollup);
170                if let Continue(applied) = ret {
171                    applied_ever_true = applied_ever_true || applied;
172                }
173                ret
174            })
175            .await;
176        (applied_ever_true, maintenance)
177    }
178
179    pub async fn remove_rollups(
180        &self,
181        remove_rollups: &[(SeqNo, PartialRollupKey)],
182    ) -> (Vec<SeqNo>, RoutineMaintenance) {
183        let metrics = Arc::clone(&self.applier.metrics);
184        let (_seqno, removed_rollup_seqnos, maintenance) = self
185            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186                state.remove_rollups(remove_rollups)
187            })
188            .await;
189        (removed_rollup_seqnos, maintenance)
190    }
191
192    /// Attempt to upgrade the state to the latest version. If that's not possible, return the
193    /// actual data version of the shard.
194    pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
195        let metrics = Arc::clone(&self.applier.metrics);
196        let (_seqno, upgrade_result, maintenance) = self
197            .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
198                if state.version <= cfg.build_version {
199                    // This would be the place to remove any deprecated items from state, now
200                    // that we're dropping compatibility with any previous versions.
201                    state.version = cfg.build_version.clone();
202                    Continue(Ok(()))
203                } else {
204                    Break(NoOpStateTransition(Err(state.version.clone())))
205                }
206            })
207            .await;
208
209        match upgrade_result {
210            Ok(()) => Ok(maintenance),
211            Err(version) => {
212                soft_assert_no_log!(
213                    maintenance.is_empty(),
214                    "should not generate maintenance on failed upgrade"
215                );
216                Err(version)
217            }
218        }
219    }
220
221    pub async fn register_leased_reader(
222        &self,
223        reader_id: &LeasedReaderId,
224        purpose: &str,
225        lease_duration: Duration,
226        heartbeat_timestamp_ms: u64,
227        use_critical_since: bool,
228    ) -> (LeasedReaderState<T>, RoutineMaintenance) {
229        let metrics = Arc::clone(&self.applier.metrics);
230        let (_seqno, (reader_state, seqno_since), maintenance) = self
231            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
232                state.register_leased_reader(
233                    &cfg.hostname,
234                    reader_id,
235                    purpose,
236                    seqno,
237                    lease_duration,
238                    heartbeat_timestamp_ms,
239                    use_critical_since,
240                )
241            })
242            .await;
243        // Usually, the reader gets an initial seqno hold of the seqno at which
244        // it was registered. However, on a tombstone shard the seqno hold
245        // happens to get computed as the tombstone seqno + 1
246        // (State::clone_apply provided seqno.next(), the non-no-op commit
247        // seqno, to the work fn and this is what register_reader uses for the
248        // seqno hold). The real invariant we want to protect here is that the
249        // hold is >= the seqno_since, so validate that instead of anything more
250        // specific.
251        debug_assert!(
252            reader_state.seqno >= seqno_since,
253            "{} vs {}",
254            reader_state.seqno,
255            seqno_since,
256        );
257        (reader_state, maintenance)
258    }
259
260    pub async fn register_critical_reader<O: Opaque + Codec64>(
261        &self,
262        reader_id: &CriticalReaderId,
263        purpose: &str,
264    ) -> (CriticalReaderState<T>, RoutineMaintenance) {
265        let metrics = Arc::clone(&self.applier.metrics);
266        let (_seqno, state, maintenance) = self
267            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
268                state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
269            })
270            .await;
271        (state, maintenance)
272    }
273
274    pub async fn register_schema(
275        &self,
276        key_schema: &K::Schema,
277        val_schema: &V::Schema,
278    ) -> (Option<SchemaId>, RoutineMaintenance) {
279        let metrics = Arc::clone(&self.applier.metrics);
280        let (_seqno, state, maintenance) = self
281            .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
282                state.register_schema::<K, V>(key_schema, val_schema)
283            })
284            .await;
285        (state, maintenance)
286    }
287
288    pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
289        // Performance special case for no-ops, to avoid the State clones.
290        if fuel == 0 || self.applier.all_batches().len() < 2 {
291            return (Vec::new(), RoutineMaintenance::default());
292        }
293
294        let metrics = Arc::clone(&self.applier.metrics);
295        let (_seqno, reqs, maintenance) = self
296            .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
297                state.spine_exert(fuel)
298            })
299            .await;
300        let reqs = reqs
301            .into_iter()
302            .map(|req| CompactReq {
303                shard_id: self.shard_id(),
304                desc: req.desc,
305                inputs: req.inputs,
306            })
307            .collect();
308        (reqs, maintenance)
309    }
310
311    pub async fn compare_and_append(
312        &self,
313        batch: &HollowBatch<T>,
314        writer_id: &WriterId,
315        debug_info: &HandleDebugState,
316        heartbeat_timestamp_ms: u64,
317    ) -> CompareAndAppendRes<T> {
318        let idempotency_token = IdempotencyToken::new();
319        loop {
320            let res = self
321                .compare_and_append_idempotent(
322                    batch,
323                    writer_id,
324                    heartbeat_timestamp_ms,
325                    &idempotency_token,
326                    debug_info,
327                    None,
328                )
329                .await;
330            match res {
331                CompareAndAppendRes::Success(seqno, maintenance) => {
332                    return CompareAndAppendRes::Success(seqno, maintenance);
333                }
334                CompareAndAppendRes::InvalidUsage(x) => {
335                    return CompareAndAppendRes::InvalidUsage(x);
336                }
337                CompareAndAppendRes::InlineBackpressure => {
338                    return CompareAndAppendRes::InlineBackpressure;
339                }
340                CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
341                    // If the state machine thinks that the shard upper is not
342                    // far enough along, it could be because the caller of this
343                    // method has found out that it advanced via some some
344                    // side-channel that didn't update our local cache of the
345                    // machine state. So, fetch the latest state and try again
346                    // if we indeed get something different.
347                    self.applier.fetch_and_update_state(Some(seqno)).await;
348                    let (current_seqno, current_upper) =
349                        self.applier.upper(|seqno, upper| (seqno, upper.clone()));
350
351                    // We tried to to a compare_and_append with the wrong
352                    // expected upper, that won't work.
353                    if &current_upper != batch.desc.lower() {
354                        return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
355                    } else {
356                        // The upper stored in state was outdated. Retry after
357                        // updating.
358                    }
359                }
360            }
361        }
362    }
363
364    async fn compare_and_append_idempotent(
365        &self,
366        batch: &HollowBatch<T>,
367        writer_id: &WriterId,
368        heartbeat_timestamp_ms: u64,
369        idempotency_token: &IdempotencyToken,
370        debug_info: &HandleDebugState,
371        // Only exposed for testing. In prod, this always starts as None, but
372        // making it a parameter allows us to simulate hitting an indeterminate
373        // error on the first attempt in tests.
374        mut indeterminate: Option<Indeterminate>,
375    ) -> CompareAndAppendRes<T> {
376        let metrics = Arc::clone(&self.applier.metrics);
377        let lease_duration_ms = self
378            .applier
379            .cfg
380            .writer_lease_duration
381            .as_millis()
382            .try_into()
383            .expect("reasonable duration");
384        // SUBTLE: Retries of compare_and_append with Indeterminate errors are
385        // tricky (more discussion of this in database-issues#3680):
386        //
387        // - (1) We compare_and_append and get an Indeterminate error back from
388        //   CRDB/Consensus. This means we don't know if it committed or not.
389        // - (2) We retry it.
390        // - (3) We get back an upper mismatch. The tricky bit is deciding if we
391        //   conflicted with some other writer OR if the write in (1) actually
392        //   went through and we're "conflicting" with ourself.
393        //
394        // A number of scenarios can be distinguished with per-writer
395        // idempotency tokens, so I'll jump straight to the hardest one:
396        //
397        // - (1) A compare_and_append is issued for e.g. `[5,7)`, the consensus
398        //   call makes it onto the network before the operation is cancelled
399        //   (by dropping the future).
400        // - (2) A compare_and_append is issued from the same WriteHandle for
401        //   `[3,5)`, it uses a different conn from the consensus pool and gets
402        //   an Indeterminate error.
403        // - (3) The call in (1) is received by consensus and commits.
404        // - (4) The retry of (2) receives an upper mismatch with an upper of 7.
405        //
406        // At this point, how do we determine whether (2) committed or not and
407        // thus whether we should return success or upper mismatch? Getting this
408        // right is very important for correctness (imagine this is a table
409        // write and we either return success or failure to the client).
410        //
411        // - If we use per-writer IdempotencyTokens but only store the latest
412        //   one in state, then the `[5,7)` one will have clobbered whatever our
413        //   `[3,5)` one was.
414        // - We could store every IdempotencyToken that ever committed, but that
415        //   would require unbounded storage in state (non-starter).
416        // - We could require that IdempotencyTokens are comparable and that
417        //   each call issued by a WriteHandle uses one that is strictly greater
418        //   than every call before it. A previous version of this PR tried this
419        //   and it's remarkably subtle. As a result, I (Dan) have developed
420        //   strong feels that our correctness protocol _should not depend on
421        //   WriteHandle, only Machine_.
422        // - We could require a new WriterId if a request is ever cancelled by
423        //   making `compare_and_append` take ownership of `self` and then
424        //   handing it back for any call polled to completion. The ergonomics
425        //   of this are quite awkward and, like the previous idea, it depends
426        //   on the WriteHandle impl for correctness.
427        // - Any ideas that involve reading back the data are foiled by a step
428        //   `(0) set the since to 100` (plus the latency and memory usage would
429        //   be too unpredictable).
430        //
431        // The technique used here derives from the following observations:
432        //
433        // - In practice, we don't use compare_and_append with the sort of
434        //   "regressing frontiers" described above.
435        // - In practice, Indeterminate errors are rare-ish. They happen enough
436        //   that we don't want to always panic on them, but this is still a
437        //   useful property to build on.
438        //
439        // At a high level, we do just enough to be able to distinguish the
440        // cases that we think will happen in practice and then leave the rest
441        // for a panic! that we think we'll never see. Concretely:
442        //
443        // - Run compare_and_append in a loop, retrying on Indeterminate errors
444        //   but noting if we've ever done that.
445        // - If we haven't seen an Indeterminate error (i.e. this is the first
446        //   time though the loop) then the result we got is guaranteed to be
447        //   correct, so pass it up.
448        // - Otherwise, any result other than an expected upper mismatch is
449        //   guaranteed to be correct, so just pass it up.
450        // - Otherwise examine the writer's most recent upper and break it into
451        //   two cases:
452        // - Case 1 `expected_upper.less_than(writer_most_recent_upper)`: it's
453        //   impossible that we committed on a previous iteration because the
454        //   overall upper of the shard is less_than what this call would have
455        //   advanced it to. Pass up the expectation mismatch.
456        // - Case 2 `!Case1`: First note that this means our IdempotencyToken
457        //   didn't match, otherwise we would have gotten `AlreadyCommitted`. It
458        //   also means some previous write from _this writer_ has committed an
459        //   upper that is beyond the one in this call, which is a weird usage
460        //   (NB can't be a future write because that would mean someone is
461        //   still polling us, but `&mut self` prevents that).
462        //
463        // TODO: If this technique works in practice (leads to zero panics),
464        // then commit to it and remove the Indeterminate from
465        // [WriteHandle::compare_and_append_batch].
466        let mut retry = self
467            .applier
468            .metrics
469            .retries
470            .compare_and_append_idempotent
471            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
472        let mut writer_was_present = false;
473        loop {
474            let cmd_res = self
475                .applier
476                .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
477                    writer_was_present = state.writers.contains_key(writer_id);
478                    state.compare_and_append(
479                        batch,
480                        writer_id,
481                        heartbeat_timestamp_ms,
482                        lease_duration_ms,
483                        idempotency_token,
484                        debug_info,
485                        INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
486                        if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
487                            CLAIM_COMPACTION_PERCENT.get(cfg)
488                        } else {
489                            0
490                        },
491                        Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
492                            .ok()
493                            .as_ref(),
494                    )
495                })
496                .await;
497            let (seqno, res, routine) = match cmd_res {
498                Ok(x) => x,
499                Err(err) => {
500                    // These are rare and interesting enough that we always log
501                    // them at info!.
502                    info!(
503                        "compare_and_append received an indeterminate error, retrying in {:?}: {}",
504                        retry.next_sleep(),
505                        err
506                    );
507                    if indeterminate.is_none() {
508                        indeterminate = Some(err);
509                    }
510                    retry = retry.sleep().await;
511                    continue;
512                }
513            };
514            match res {
515                Ok(merge_reqs) => {
516                    // We got explicit confirmation that we succeeded, so
517                    // anything that happened in a previous retry is irrelevant.
518                    let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
519                    for req in merge_reqs {
520                        let req = CompactReq {
521                            shard_id: self.shard_id(),
522                            desc: req.desc,
523                            inputs: req.inputs,
524                        };
525                        compact_reqs.push(req);
526                    }
527                    let writer_maintenance = WriterMaintenance {
528                        routine,
529                        compaction: compact_reqs,
530                    };
531
532                    if !writer_was_present {
533                        metrics.state.writer_added.inc();
534                    }
535                    for part in &batch.parts {
536                        if part.is_inline() {
537                            let bytes = u64::cast_from(part.inline_bytes());
538                            metrics.inline.part_commit_bytes.inc_by(bytes);
539                            metrics.inline.part_commit_count.inc();
540                        }
541                    }
542                    return CompareAndAppendRes::Success(seqno, writer_maintenance);
543                }
544                Err(CompareAndAppendBreak::AlreadyCommitted) => {
545                    // A previous iteration through this loop got an
546                    // Indeterminate error but was successful. Sanity check this
547                    // and pass along the good news.
548                    assert!(indeterminate.is_some());
549                    self.applier.metrics.cmds.compare_and_append_noop.inc();
550                    if !writer_was_present {
551                        metrics.state.writer_added.inc();
552                    }
553                    return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
554                }
555                Err(CompareAndAppendBreak::InvalidUsage(err)) => {
556                    // InvalidUsage is (or should be) a deterministic function
557                    // of the inputs and independent of anything in persist
558                    // state. It's handed back via a Break, so we never even try
559                    // to commit it. No network, no Indeterminate.
560                    assert_none!(indeterminate);
561                    return CompareAndAppendRes::InvalidUsage(err);
562                }
563                Err(CompareAndAppendBreak::InlineBackpressure) => {
564                    // We tried to write an inline part, but there was already
565                    // too much in state. Flush it out to s3 and try again.
566                    return CompareAndAppendRes::InlineBackpressure;
567                }
568                Err(CompareAndAppendBreak::Upper {
569                    shard_upper,
570                    writer_upper,
571                }) => {
572                    // NB the below intentionally compares to writer_upper
573                    // (because it gives a tighter bound on the bad case), but
574                    // returns shard_upper (what the persist caller cares
575                    // about).
576                    assert!(
577                        PartialOrder::less_equal(&writer_upper, &shard_upper),
578                        "{:?} vs {:?}",
579                        &writer_upper,
580                        &shard_upper
581                    );
582                    if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
583                        // No way this could have committed in some previous
584                        // attempt of this loop: the upper of the writer is
585                        // strictly less than the proposed new upper.
586                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
587                    }
588                    if indeterminate.is_none() {
589                        // No way this could have committed in some previous
590                        // attempt of this loop: we never saw an indeterminate
591                        // error (thus there was no previous iteration of the
592                        // loop).
593                        return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
594                    }
595                    // This is the bad case. We can't distinguish if some
596                    // previous attempt that got an Indeterminate error
597                    // succeeded or not. This should be sufficiently rare in
598                    // practice (hopefully ~never) that we give up and let
599                    // process restart fix things. See the big comment above for
600                    // more context.
601                    //
602                    // NB: This is intentionally not a halt! because it's quite
603                    // unexpected.
604                    panic!(
605                        concat!(
606                            "cannot distinguish compare_and_append success or failure ",
607                            "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
608                        ),
609                        batch.desc.lower().elements(),
610                        batch.desc.upper().elements(),
611                        writer_upper.elements(),
612                        shard_upper.elements(),
613                        indeterminate,
614                    );
615                }
616            };
617        }
618    }
619
620    pub async fn downgrade_since(
621        &self,
622        reader_id: &LeasedReaderId,
623        outstanding_seqno: Option<SeqNo>,
624        new_since: &Antichain<T>,
625        heartbeat_timestamp_ms: u64,
626    ) -> (SeqNo, Since<T>, RoutineMaintenance) {
627        let metrics = Arc::clone(&self.applier.metrics);
628        self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
629            state.downgrade_since(
630                reader_id,
631                seqno,
632                outstanding_seqno,
633                new_since,
634                heartbeat_timestamp_ms,
635            )
636        })
637        .await
638    }
639
640    pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
641        &self,
642        reader_id: &CriticalReaderId,
643        expected_opaque: &O,
644        (new_opaque, new_since): (&O, &Antichain<T>),
645    ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
646        let metrics = Arc::clone(&self.applier.metrics);
647        let (_seqno, res, maintenance) = self
648            .apply_unbatched_idempotent_cmd(
649                &metrics.cmds.compare_and_downgrade_since,
650                |_seqno, _cfg, state| {
651                    state.compare_and_downgrade_since::<O>(
652                        reader_id,
653                        expected_opaque,
654                        (new_opaque, new_since),
655                    )
656                },
657            )
658            .await;
659
660        match res {
661            Ok(since) => (Ok(since), maintenance),
662            Err((opaque, since)) => (Err((opaque, since)), maintenance),
663        }
664    }
665
666    pub async fn heartbeat_leased_reader(
667        &self,
668        reader_id: &LeasedReaderId,
669        heartbeat_timestamp_ms: u64,
670    ) -> (SeqNo, bool, RoutineMaintenance) {
671        let metrics = Arc::clone(&self.applier.metrics);
672        let (seqno, existed, maintenance) = self
673            .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
674                state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
675            })
676            .await;
677        (seqno, existed, maintenance)
678    }
679
680    pub async fn expire_leased_reader(
681        &self,
682        reader_id: &LeasedReaderId,
683    ) -> (SeqNo, RoutineMaintenance) {
684        let metrics = Arc::clone(&self.applier.metrics);
685        let (seqno, _existed, maintenance) = self
686            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
687                state.expire_leased_reader(reader_id)
688            })
689            .await;
690        (seqno, maintenance)
691    }
692
693    #[allow(dead_code)] // TODO(bkirwi): remove this when since behaviour on expiry has settled
694    pub async fn expire_critical_reader(
695        &self,
696        reader_id: &CriticalReaderId,
697    ) -> (SeqNo, RoutineMaintenance) {
698        let metrics = Arc::clone(&self.applier.metrics);
699        let (seqno, _existed, maintenance) = self
700            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
701                state.expire_critical_reader(reader_id)
702            })
703            .await;
704        (seqno, maintenance)
705    }
706
707    pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
708        let metrics = Arc::clone(&self.applier.metrics);
709        let (seqno, _existed, maintenance) = self
710            .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
711                state.expire_writer(writer_id)
712            })
713            .await;
714        metrics.state.writer_removed.inc();
715        (seqno, maintenance)
716    }
717
718    pub fn is_finalized(&self) -> bool {
719        self.applier.is_finalized()
720    }
721
722    /// See [crate::PersistClient::get_schema].
723    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
724        self.applier.get_schema(schema_id)
725    }
726
727    /// See [crate::PersistClient::latest_schema].
728    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
729        self.applier.latest_schema()
730    }
731
732    /// Returns the ID of the given schema, if known at the current state.
733    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
734        self.applier.find_schema(key_schema, val_schema)
735    }
736
737    /// See [crate::PersistClient::compare_and_evolve_schema].
738    ///
739    /// TODO: Unify this with [Self::register_schema]?
740    pub async fn compare_and_evolve_schema(
741        &self,
742        expected: SchemaId,
743        key_schema: &K::Schema,
744        val_schema: &V::Schema,
745    ) -> (CaESchema<K, V>, RoutineMaintenance) {
746        let metrics = Arc::clone(&self.applier.metrics);
747        let (_seqno, state, maintenance) = self
748            .apply_unbatched_idempotent_cmd(
749                &metrics.cmds.compare_and_evolve_schema,
750                |_seqno, _cfg, state| {
751                    state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
752                },
753            )
754            .await;
755        (state, maintenance)
756    }
757
758    async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
759        let metrics = Arc::clone(&self.applier.metrics);
760        let mut retry = self
761            .applier
762            .metrics
763            .retries
764            .idempotent_cmd
765            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
766        loop {
767            let res = self
768                .applier
769                .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
770                    state.become_tombstone_and_shrink()
771                })
772                .await;
773            let err = match res {
774                Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
775                Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
776                    return Ok((false, maintenance));
777                }
778                Err(err) => err,
779            };
780            if retry.attempt() >= INFO_MIN_ATTEMPTS {
781                info!(
782                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
783                    retry.next_sleep(),
784                    err
785                );
786            } else {
787                debug!(
788                    "become_tombstone received an indeterminate error, retrying in {:?}: {}",
789                    retry.next_sleep(),
790                    err
791                );
792            }
793            retry = retry.sleep().await;
794        }
795    }
796
797    pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
798        self.applier.check_since_upper_both_empty()?;
799
800        let mut maintenance = RoutineMaintenance::default();
801
802        loop {
803            let (made_progress, more_maintenance) = self.tombstone_step().await?;
804            maintenance.merge(more_maintenance);
805            if !made_progress {
806                break;
807            }
808        }
809
810        Ok(maintenance)
811    }
812
813    pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
814        let start = Instant::now();
815        let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
816            Ok(x) => return Ok(x),
817            Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
818            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
819                return Err(Since(since));
820            }
821        };
822
823        // The latest state still couldn't serve this as_of: watch+sleep in a
824        // loop until it's ready.
825        let mut watch = self.applier.watch();
826        let watch = &mut watch;
827        let sleeps = self
828            .applier
829            .metrics
830            .retries
831            .snapshot
832            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
833
834        enum Wake<'a, K, V, T, D> {
835            Watch(&'a mut StateWatch<K, V, T, D>),
836            Sleep(MetricsRetryStream),
837        }
838        let mut watch_fut = std::pin::pin!(
839            watch
840                .wait_for_seqno_ge(seqno.next())
841                .map(Wake::Watch)
842                .instrument(trace_span!("snapshot::watch")),
843        );
844        let mut sleep_fut = std::pin::pin!(
845            sleeps
846                .sleep()
847                .map(Wake::Sleep)
848                .instrument(trace_span!("snapshot::sleep")),
849        );
850
851        // To reduce log spam, we log "not yet available" only once at info if
852        // it passes a certain threshold. Then, if it did one info log, we log
853        // again at info when it resolves.
854        let mut logged_at_info = false;
855        loop {
856            // Use a duration based threshold here instead of the usual
857            // INFO_MIN_ATTEMPTS because here we're waiting on an
858            // external thing to arrive.
859            if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
860                logged_at_info = true;
861                info!(
862                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
863                    self.applier.shard_metrics.name,
864                    self.shard_id(),
865                    as_of.elements(),
866                    seqno,
867                    upper.elements(),
868                );
869            } else {
870                debug!(
871                    "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
872                    self.applier.shard_metrics.name,
873                    self.shard_id(),
874                    as_of.elements(),
875                    seqno,
876                    upper.elements(),
877                );
878            }
879
880            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
881                future::Either::Left((wake, _)) => wake,
882                future::Either::Right((wake, _)) => wake,
883            };
884            // Note that we don't need to fetch in the Watch case, because the
885            // Watch wakeup is a signal that the shared state has already been
886            // updated.
887            match &wake {
888                Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
889                Wake::Sleep(_) => {
890                    self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
891                    self.applier.fetch_and_update_state(Some(seqno)).await;
892                }
893            }
894
895            (seqno, upper) = match self.applier.snapshot(as_of) {
896                Ok(x) => {
897                    if logged_at_info {
898                        info!(
899                            "snapshot {} {} as of {:?} now available",
900                            self.applier.shard_metrics.name,
901                            self.shard_id(),
902                            as_of.elements(),
903                        );
904                    }
905                    return Ok(x);
906                }
907                Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
908                    // The upper isn't ready yet, fall through and try again.
909                    (seqno, upper)
910                }
911                Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
912                    return Err(Since(since));
913                }
914            };
915
916            match wake {
917                Wake::Watch(watch) => {
918                    watch_fut.set(
919                        watch
920                            .wait_for_seqno_ge(seqno.next())
921                            .map(Wake::Watch)
922                            .instrument(trace_span!("snapshot::watch")),
923                    );
924                }
925                Wake::Sleep(sleeps) => {
926                    debug!(
927                        "snapshot {} {} sleeping for {:?}",
928                        self.applier.shard_metrics.name,
929                        self.shard_id(),
930                        sleeps.next_sleep()
931                    );
932                    sleep_fut.set(
933                        sleeps
934                            .sleep()
935                            .map(Wake::Sleep)
936                            .instrument(trace_span!("snapshot::sleep")),
937                    );
938                }
939            }
940        }
941    }
942
943    // NB: Unlike the other methods here, this one is read-only.
944    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
945        self.applier.verify_listen(as_of)
946    }
947
948    pub async fn next_listen_batch(
949        &self,
950        frontier: &Antichain<T>,
951        watch: &mut StateWatch<K, V, T, D>,
952        reader_id: Option<&LeasedReaderId>,
953        // If Some, an override for the default listen sleep retry parameters.
954        retry: Option<RetryParameters>,
955    ) -> HollowBatch<T> {
956        let mut seqno = match self.applier.next_listen_batch(frontier) {
957            Ok(b) => return b,
958            Err(seqno) => seqno,
959        };
960
961        // The latest state still doesn't have a new frontier for us:
962        // watch+sleep in a loop until it does.
963        let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
964        let sleeps = self
965            .applier
966            .metrics
967            .retries
968            .next_listen_batch
969            .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
970
971        enum Wake<'a, K, V, T, D> {
972            Watch(&'a mut StateWatch<K, V, T, D>),
973            Sleep(MetricsRetryStream),
974        }
975        let mut watch_fut = std::pin::pin!(
976            watch
977                .wait_for_seqno_ge(seqno.next())
978                .map(Wake::Watch)
979                .instrument(trace_span!("snapshot::watch"))
980        );
981        let mut sleep_fut = std::pin::pin!(
982            sleeps
983                .sleep()
984                .map(Wake::Sleep)
985                .instrument(trace_span!("snapshot::sleep"))
986        );
987
988        loop {
989            let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
990                future::Either::Left((wake, _)) => wake,
991                future::Either::Right((wake, _)) => wake,
992            };
993            // Note that we don't need to fetch in the Watch case, because the
994            // Watch wakeup is a signal that the shared state has already been
995            // updated.
996            match &wake {
997                Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
998                Wake::Sleep(_) => {
999                    self.applier.metrics.watch.listen_woken_via_sleep.inc();
1000                    self.applier.fetch_and_update_state(Some(seqno)).await;
1001                }
1002            }
1003
1004            seqno = match self.applier.next_listen_batch(frontier) {
1005                Ok(b) => {
1006                    match &wake {
1007                        Wake::Watch(_) => {
1008                            self.applier.metrics.watch.listen_resolved_via_watch.inc()
1009                        }
1010                        Wake::Sleep(_) => {
1011                            self.applier.metrics.watch.listen_resolved_via_sleep.inc()
1012                        }
1013                    }
1014                    return b;
1015                }
1016                Err(seqno) => seqno,
1017            };
1018
1019            // Wait a bit and try again. Intentionally don't ever log
1020            // this at info level.
1021            match wake {
1022                Wake::Watch(watch) => {
1023                    watch_fut.set(
1024                        watch
1025                            .wait_for_seqno_ge(seqno.next())
1026                            .map(Wake::Watch)
1027                            .instrument(trace_span!("snapshot::watch")),
1028                    );
1029                }
1030                Wake::Sleep(sleeps) => {
1031                    debug!(
1032                        "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1033                        reader_id,
1034                        self.applier.shard_metrics.name,
1035                        self.shard_id(),
1036                        sleeps.next_sleep()
1037                    );
1038                    sleep_fut.set(
1039                        sleeps
1040                            .sleep()
1041                            .map(Wake::Sleep)
1042                            .instrument(trace_span!("snapshot::sleep")),
1043                    );
1044                }
1045            }
1046        }
1047    }
1048
1049    async fn apply_unbatched_idempotent_cmd<
1050        R,
1051        WorkFn: FnMut(
1052            SeqNo,
1053            &PersistConfig,
1054            &mut StateCollections<T>,
1055        ) -> ControlFlow<NoOpStateTransition<R>, R>,
1056    >(
1057        &self,
1058        cmd: &CmdMetrics,
1059        mut work_fn: WorkFn,
1060    ) -> (SeqNo, R, RoutineMaintenance) {
1061        let mut retry = self
1062            .applier
1063            .metrics
1064            .retries
1065            .idempotent_cmd
1066            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1067        loop {
1068            match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1069                Ok((seqno, x, maintenance)) => match x {
1070                    Ok(x) => {
1071                        return (seqno, x, maintenance);
1072                    }
1073                    Err(NoOpStateTransition(x)) => {
1074                        return (seqno, x, maintenance);
1075                    }
1076                },
1077                Err(err) => {
1078                    if retry.attempt() >= INFO_MIN_ATTEMPTS {
1079                        info!(
1080                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1081                            cmd.name,
1082                            retry.next_sleep(),
1083                            err
1084                        );
1085                    } else {
1086                        debug!(
1087                            "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1088                            cmd.name,
1089                            retry.next_sleep(),
1090                            err
1091                        );
1092                    }
1093                    retry = retry.sleep().await;
1094                    continue;
1095                }
1096            }
1097        }
1098    }
1099}
1100
1101impl<K, V, T, D> Machine<K, V, T, D>
1102where
1103    K: Debug + Codec,
1104    V: Debug + Codec,
1105    T: Timestamp + Lattice + Codec64 + Sync,
1106    D: Monoid + Codec64 + PartialEq,
1107{
1108    pub async fn merge_res(
1109        &self,
1110        res: &FueledMergeRes<T>,
1111    ) -> (ApplyMergeResult, RoutineMaintenance) {
1112        let metrics = Arc::clone(&self.applier.metrics);
1113
1114        // SUBTLE! If Machine::merge_res returns false, the blobs referenced in
1115        // compaction output are deleted so we don't leak them. Naively passing
1116        // back the value returned by State::apply_merge_res might give a false
1117        // negative in the presence of retries and Indeterminate errors.
1118        // Specifically, something like the following:
1119        //
1120        // - We try to apply_merge_res, it matches.
1121        // - When apply_unbatched_cmd goes to commit the new state, the
1122        //   Consensus::compare_and_set returns an Indeterminate error (but
1123        //   actually succeeds). The committed State now contains references to
1124        //   the compaction output blobs.
1125        // - Machine::apply_unbatched_idempotent_cmd retries the Indeterminate
1126        //   error. For whatever reason, this time though it doesn't match
1127        //   (maybe the batches simply get grouped difference when deserialized
1128        //   from state, or more unavoidably perhaps another compaction
1129        //   happens).
1130        // - This now bubbles up applied=false to the caller, which uses it as a
1131        //   signal that the blobs in the compaction output should be deleted so
1132        //   that we don't leak them.
1133        // - We now contain references in committed State to blobs that don't
1134        //   exist.
1135        //
1136        // The fix is to keep track of whether applied ever was true, even for a
1137        // compare_and_set that returned an Indeterminate error. This has the
1138        // chance of false positive (leaking a blob) but that's better than a
1139        // false negative (a blob we can never recover referenced by state). We
1140        // anyway need a mechanism to clean up leaked blobs because of process
1141        // crashes.
1142        let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1143        let (_seqno, _apply_merge_result, maintenance) = self
1144            .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1145                let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1146                if let Continue(result) = ret {
1147                    // record if we've ever applied the merge
1148                    if result.applied() {
1149                        merge_result_ever_applied = result;
1150                    }
1151                    // otherwise record the most granular reason for _not_
1152                    // applying the merge when there was a matching batch
1153                    if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1154                    {
1155                        merge_result_ever_applied = result;
1156                    }
1157                }
1158                ret
1159            })
1160            .await;
1161        (merge_result_ever_applied, maintenance)
1162    }
1163}
1164
1165pub(crate) struct ExpireFn(
1166    /// This is stored on WriteHandle and ReadHandle, which we require to be
1167    /// Send + Sync, but the Future is only Send and not Sync. Instead store a
1168    /// FnOnce that returns the Future. This could also be made an `IntoFuture`,
1169    /// once producing one of those is made easier.
1170    pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1171);
1172
1173impl Debug for ExpireFn {
1174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1175        f.debug_struct("ExpireFn").finish_non_exhaustive()
1176    }
1177}
1178
1179#[derive(Debug)]
1180pub(crate) enum CompareAndAppendRes<T> {
1181    Success(SeqNo, WriterMaintenance<T>),
1182    InvalidUsage(InvalidUsage<T>),
1183    UpperMismatch(SeqNo, Antichain<T>),
1184    InlineBackpressure,
1185}
1186
1187#[cfg(test)]
1188impl<T: Debug> CompareAndAppendRes<T> {
1189    #[track_caller]
1190    fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1191        match self {
1192            CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1193            x => panic!("{:?}", x),
1194        }
1195    }
1196}
1197
1198impl<K, V, T, D> Machine<K, V, T, D>
1199where
1200    K: Debug + Codec,
1201    V: Debug + Codec,
1202    T: Timestamp + Lattice + Codec64 + Sync,
1203    D: Monoid + Codec64 + Send + Sync,
1204{
1205    #[allow(clippy::unused_async)]
1206    pub async fn start_reader_heartbeat_tasks(
1207        self,
1208        reader_id: LeasedReaderId,
1209        gc: GarbageCollector<K, V, T, D>,
1210    ) -> Vec<JoinHandle<()>> {
1211        let mut ret = Vec::new();
1212        let metrics = Arc::clone(&self.applier.metrics);
1213
1214        // TODO: In response to a production incident, this runs the heartbeat
1215        // task on both the in-context tokio runtime and persist's isolated
1216        // runtime. We think we were seeing tasks (including this one) get stuck
1217        // indefinitely in tokio while waiting for a runtime worker. This could
1218        // happen if some other task in that runtime never yields. It's possible
1219        // that one of the two runtimes is healthy while the other isn't (this
1220        // was inconclusive in the incident debugging), and the heartbeat task
1221        // is fairly lightweight, so run a copy in each in case that helps.
1222        //
1223        // The real fix here is to find the misbehaving task and fix it. Remove
1224        // this duplication when that happens.
1225        let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1226        ret.push(mz_ore::task::spawn(|| name, {
1227            let machine = self.clone();
1228            let reader_id = reader_id.clone();
1229            let gc = gc.clone();
1230            metrics
1231                .tasks
1232                .heartbeat_read
1233                .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1234        }));
1235
1236        let isolated_runtime = Arc::clone(&self.isolated_runtime);
1237        let name = format!(
1238            "persist::heartbeat_read_isolated({},{})",
1239            self.shard_id(),
1240            reader_id
1241        );
1242        ret.push(
1243            isolated_runtime.spawn_named(
1244                || name,
1245                metrics
1246                    .tasks
1247                    .heartbeat_read
1248                    .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1249            ),
1250        );
1251
1252        ret
1253    }
1254
1255    async fn reader_heartbeat_task(
1256        machine: Self,
1257        reader_id: LeasedReaderId,
1258        gc: GarbageCollector<K, V, T, D>,
1259    ) {
1260        let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1261        loop {
1262            let before_sleep = Instant::now();
1263            tokio::time::sleep(sleep_duration).await;
1264
1265            let elapsed_since_before_sleeping = before_sleep.elapsed();
1266            if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1267                warn!(
1268                    "reader ({}) of shard ({}) went {}s between heartbeats",
1269                    reader_id,
1270                    machine.shard_id(),
1271                    elapsed_since_before_sleeping.as_secs_f64()
1272                );
1273            }
1274
1275            let before_heartbeat = Instant::now();
1276            let (_seqno, existed, maintenance) = machine
1277                .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1278                .await;
1279            maintenance.start_performing(&machine, &gc);
1280
1281            let elapsed_since_heartbeat = before_heartbeat.elapsed();
1282            if elapsed_since_heartbeat > Duration::from_secs(60) {
1283                warn!(
1284                    "reader ({}) of shard ({}) heartbeat call took {}s",
1285                    reader_id,
1286                    machine.shard_id(),
1287                    elapsed_since_heartbeat.as_secs_f64(),
1288                );
1289            }
1290
1291            if !existed {
1292                // If the read handle was intentionally expired, this task
1293                // *should* be aborted before it observes the expiration. So if
1294                // we get here, this task somehow failed to keep the read lease
1295                // alive. Warn loudly, because there's now a live read handle to
1296                // an expired shard that will panic if used, but don't panic,
1297                // just in case there is some edge case that results in this
1298                // task observing the intentional expiration of a read handle.
1299                warn!(
1300                    "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1301                     while read handle is live",
1302                    reader_id,
1303                    machine.shard_id(),
1304                );
1305                return;
1306            }
1307        }
1308    }
1309}
1310
1311pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1312    "persist_next_listen_batch_retryer_fixed_sleep",
1313    Duration::from_millis(1200), // pubsub is on by default!
1314    "\
1315    The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1316);
1317
1318pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1319    "persist_next_listen_batch_retryer_initial_backoff",
1320    Duration::from_millis(100), // pubsub is on by default!
1321    "The initial backoff when polling for new batches from a Listen or Subscribe.",
1322);
1323
1324pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1325    "persist_next_listen_batch_retryer_multiplier",
1326    2,
1327    "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1328);
1329
1330pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1331    "persist_next_listen_batch_retryer_clamp",
1332    Duration::from_secs(16), // pubsub is on by default!
1333    "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1334);
1335
1336fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1337    RetryParameters {
1338        fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1339        initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1340        multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1341        clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1342    }
1343}
1344
1345pub const INFO_MIN_ATTEMPTS: usize = 3;
1346
1347pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1348where
1349    F: std::future::Future<Output = Result<R, ExternalError>>,
1350    WorkFn: FnMut() -> F,
1351{
1352    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1353    loop {
1354        match work_fn().await {
1355            Ok(x) => {
1356                if retry.attempt() > 0 {
1357                    debug!(
1358                        "external operation {} succeeded after failing at least once",
1359                        metrics.name,
1360                    );
1361                }
1362                return x;
1363            }
1364            Err(err) => {
1365                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1366                    info!(
1367                        "external operation {} failed, retrying in {:?}: {}",
1368                        metrics.name,
1369                        retry.next_sleep(),
1370                        err.display_with_causes()
1371                    );
1372                } else {
1373                    debug!(
1374                        "external operation {} failed, retrying in {:?}: {}",
1375                        metrics.name,
1376                        retry.next_sleep(),
1377                        err.display_with_causes()
1378                    );
1379                }
1380                retry = retry.sleep().await;
1381            }
1382        }
1383    }
1384}
1385
1386pub async fn retry_determinate<R, F, WorkFn>(
1387    metrics: &RetryMetrics,
1388    mut work_fn: WorkFn,
1389) -> Result<R, Indeterminate>
1390where
1391    F: std::future::Future<Output = Result<R, ExternalError>>,
1392    WorkFn: FnMut() -> F,
1393{
1394    let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1395    loop {
1396        match work_fn().await {
1397            Ok(x) => {
1398                if retry.attempt() > 0 {
1399                    debug!(
1400                        "external operation {} succeeded after failing at least once",
1401                        metrics.name,
1402                    );
1403                }
1404                return Ok(x);
1405            }
1406            Err(ExternalError::Determinate(err)) => {
1407                // The determinate "could not serialize access" errors
1408                // happen often enough in dev (which uses Postgres) that
1409                // it's impeding people's work. At the same time, it's been
1410                // a source of confusion for eng. The situation is much
1411                // better on CRDB and we have metrics coverage in prod, so
1412                // this is redundant enough that it's more hurtful than
1413                // helpful. As a result, this intentionally ignores
1414                // INFO_MIN_ATTEMPTS and always logs at debug.
1415                debug!(
1416                    "external operation {} failed, retrying in {:?}: {}",
1417                    metrics.name,
1418                    retry.next_sleep(),
1419                    err.display_with_causes()
1420                );
1421                retry = retry.sleep().await;
1422                continue;
1423            }
1424            Err(ExternalError::Indeterminate(x)) => return Err(x),
1425        }
1426    }
1427}
1428
1429#[cfg(test)]
1430pub mod datadriven {
1431    use std::collections::{BTreeMap, BTreeSet};
1432    use std::pin::pin;
1433    use std::sync::{Arc, LazyLock};
1434
1435    use anyhow::anyhow;
1436    use differential_dataflow::consolidation::consolidate_updates;
1437    use differential_dataflow::trace::Description;
1438    use futures::StreamExt;
1439    use mz_dyncfg::{ConfigUpdates, ConfigVal};
1440    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1441    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1442
1443    use crate::batch::{
1444        BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1445        BatchParts, validate_truncate_batch,
1446    };
1447    use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1448    use crate::fetch::{EncodedPart, FetchConfig};
1449    use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1450    use crate::internal::datadriven::DirectiveArgs;
1451    use crate::internal::encoding::Schemas;
1452    use crate::internal::gc::GcReq;
1453    use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1454    use crate::internal::state::{BatchPart, RunOrder, RunPart};
1455    use crate::internal::state_versions::EncodedRollup;
1456    use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1457    use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1458    use crate::rpc::NoopPubSubSender;
1459    use crate::tests::new_test_client;
1460    use crate::write::COMBINE_INLINE_WRITES;
1461    use crate::{GarbageCollector, PersistClient};
1462
1463    use super::*;
1464
1465    static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1466        id: Some(SchemaId(0)),
1467        key: Arc::new(StringSchema),
1468        val: Arc::new(UnitSchema),
1469    });
1470
1471    /// Shared state for a single [crate::internal::machine] [datadriven::TestFile].
1472    #[derive(Debug)]
1473    pub struct MachineState {
1474        pub client: PersistClient,
1475        pub shard_id: ShardId,
1476        pub state_versions: Arc<StateVersions>,
1477        pub machine: Machine<String, (), u64, i64>,
1478        pub gc: GarbageCollector<String, (), u64, i64>,
1479        pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1480        pub next_id: usize,
1481        pub rollups: BTreeMap<String, EncodedRollup>,
1482        pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1483        pub routine: Vec<RoutineMaintenance>,
1484        pub compactions: BTreeMap<String, CompactReq<u64>>,
1485    }
1486
1487    impl MachineState {
1488        pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1489            let shard_id = ShardId::new();
1490            let client = new_test_client(dyncfgs).await;
1491            // Reset blob_target_size. Individual batch writes and compactions
1492            // can override it with an arg.
1493            client
1494                .cfg
1495                .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1496            // Our structured compaction code uses slightly different estimates
1497            // for array size than the old path, which can affect the results of
1498            // some compaction tests.
1499            client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1500            let state_versions = Arc::new(StateVersions::new(
1501                client.cfg.clone(),
1502                Arc::clone(&client.consensus),
1503                Arc::clone(&client.blob),
1504                Arc::clone(&client.metrics),
1505            ));
1506            let machine = Machine::new(
1507                client.cfg.clone(),
1508                shard_id,
1509                Arc::clone(&client.metrics),
1510                Arc::clone(&state_versions),
1511                Arc::clone(&client.shared_states),
1512                Arc::new(NoopPubSubSender),
1513                Arc::clone(&client.isolated_runtime),
1514                Diagnostics::for_tests(),
1515            )
1516            .await
1517            .expect("codecs should match");
1518            let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1519            MachineState {
1520                shard_id,
1521                client,
1522                state_versions,
1523                machine,
1524                gc,
1525                batches: BTreeMap::default(),
1526                rollups: BTreeMap::default(),
1527                listens: BTreeMap::default(),
1528                routine: Vec::new(),
1529                compactions: BTreeMap::default(),
1530                next_id: 0,
1531            }
1532        }
1533
1534        fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1535            Batch::new(
1536                true,
1537                Arc::clone(&self.client.metrics),
1538                Arc::clone(&self.client.blob),
1539                self.client.metrics.shards.shard(&self.shard_id, "test"),
1540                self.client.cfg.build_version.clone(),
1541                (
1542                    <String>::encode_schema(&*SCHEMAS.key),
1543                    <()>::encode_schema(&*SCHEMAS.val),
1544                ),
1545                hollow,
1546            )
1547        }
1548    }
1549
1550    /// Scans consensus and returns all states with their SeqNos
1551    /// and which batches they reference
1552    pub async fn consensus_scan(
1553        datadriven: &MachineState,
1554        args: DirectiveArgs<'_>,
1555    ) -> Result<String, anyhow::Error> {
1556        let from = args.expect("from_seqno");
1557
1558        let mut states = datadriven
1559            .state_versions
1560            .fetch_all_live_states::<u64>(datadriven.shard_id)
1561            .await
1562            .expect("should only be called on an initialized shard")
1563            .check_ts_codec()
1564            .expect("shard codecs should not change");
1565        let mut s = String::new();
1566        while let Some(x) = states.next(|_| {}) {
1567            if x.seqno < from {
1568                continue;
1569            }
1570            let rollups: Vec<_> = x
1571                .collections
1572                .rollups
1573                .keys()
1574                .map(|seqno| seqno.to_string())
1575                .collect();
1576            let batches: Vec<_> = x
1577                .collections
1578                .trace
1579                .batches()
1580                .filter(|b| !b.is_empty())
1581                .filter_map(|b| {
1582                    datadriven
1583                        .batches
1584                        .iter()
1585                        .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1586                        .map(|(batch_name, _)| batch_name.to_owned())
1587                })
1588                .collect();
1589            write!(
1590                s,
1591                "seqno={} batches={} rollups={}\n",
1592                x.seqno,
1593                batches.join(","),
1594                rollups.join(","),
1595            );
1596        }
1597        Ok(s)
1598    }
1599
1600    pub async fn consensus_truncate(
1601        datadriven: &MachineState,
1602        args: DirectiveArgs<'_>,
1603    ) -> Result<String, anyhow::Error> {
1604        let to = args.expect("to_seqno");
1605        let removed = datadriven
1606            .client
1607            .consensus
1608            .truncate(&datadriven.shard_id.to_string(), to)
1609            .await
1610            .expect("valid truncation");
1611        Ok(format!("{}\n", removed))
1612    }
1613
1614    pub async fn blob_scan_batches(
1615        datadriven: &MachineState,
1616        _args: DirectiveArgs<'_>,
1617    ) -> Result<String, anyhow::Error> {
1618        let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1619
1620        let mut s = String::new();
1621        let () = datadriven
1622            .state_versions
1623            .blob
1624            .list_keys_and_metadata(&key_prefix, &mut |x| {
1625                let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1626                if let PartialBlobKey::Batch(_, _) = key {
1627                    write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1628                }
1629            })
1630            .await?;
1631        Ok(s)
1632    }
1633
1634    #[allow(clippy::unused_async)]
1635    pub async fn shard_desc(
1636        datadriven: &MachineState,
1637        _args: DirectiveArgs<'_>,
1638    ) -> Result<String, anyhow::Error> {
1639        Ok(format!(
1640            "since={:?} upper={:?}\n",
1641            datadriven.machine.applier.since().elements(),
1642            datadriven.machine.applier.clone_upper().elements()
1643        ))
1644    }
1645
1646    pub async fn downgrade_since(
1647        datadriven: &mut MachineState,
1648        args: DirectiveArgs<'_>,
1649    ) -> Result<String, anyhow::Error> {
1650        let since = args.expect_antichain("since");
1651        let seqno = args.optional("seqno");
1652        let reader_id = args.expect("reader_id");
1653        let (_, since, routine) = datadriven
1654            .machine
1655            .downgrade_since(
1656                &reader_id,
1657                seqno,
1658                &since,
1659                (datadriven.machine.applier.cfg.now)(),
1660            )
1661            .await;
1662        datadriven.routine.push(routine);
1663        Ok(format!(
1664            "{} {:?}\n",
1665            datadriven.machine.seqno(),
1666            since.0.elements()
1667        ))
1668    }
1669
1670    #[allow(clippy::unused_async)]
1671    pub async fn dyncfg(
1672        datadriven: &MachineState,
1673        args: DirectiveArgs<'_>,
1674    ) -> Result<String, anyhow::Error> {
1675        let mut updates = ConfigUpdates::default();
1676        for x in args.input.trim().split('\n') {
1677            match x.split(' ').collect::<Vec<_>>().as_slice() {
1678                &[name, val] => {
1679                    let config = datadriven
1680                        .client
1681                        .cfg
1682                        .entries()
1683                        .find(|x| x.name() == name)
1684                        .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1685                    match config.val() {
1686                        ConfigVal::Usize(_) => {
1687                            let val = val.parse().map_err(anyhow::Error::new)?;
1688                            updates.add_dynamic(name, ConfigVal::Usize(val));
1689                        }
1690                        ConfigVal::Bool(_) => {
1691                            let val = val.parse().map_err(anyhow::Error::new)?;
1692                            updates.add_dynamic(name, ConfigVal::Bool(val));
1693                        }
1694                        x => unimplemented!("dyncfg type: {:?}", x),
1695                    }
1696                }
1697                x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1698            }
1699        }
1700        updates.apply(&datadriven.client.cfg);
1701
1702        Ok("ok\n".to_string())
1703    }
1704
1705    pub async fn compare_and_downgrade_since(
1706        datadriven: &mut MachineState,
1707        args: DirectiveArgs<'_>,
1708    ) -> Result<String, anyhow::Error> {
1709        let expected_opaque: u64 = args.expect("expect_opaque");
1710        let new_opaque: u64 = args.expect("opaque");
1711        let new_since = args.expect_antichain("since");
1712        let reader_id = args.expect("reader_id");
1713        let (res, routine) = datadriven
1714            .machine
1715            .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1716            .await;
1717        datadriven.routine.push(routine);
1718        let since = res.map_err(|(opaque, since)| {
1719            anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1720        })?;
1721        Ok(format!(
1722            "{} {} {:?}\n",
1723            datadriven.machine.seqno(),
1724            new_opaque,
1725            since.0.elements()
1726        ))
1727    }
1728
1729    pub async fn write_rollup(
1730        datadriven: &mut MachineState,
1731        args: DirectiveArgs<'_>,
1732    ) -> Result<String, anyhow::Error> {
1733        let output = args.expect_str("output");
1734
1735        let rollup = datadriven
1736            .machine
1737            .applier
1738            .write_rollup_for_state()
1739            .await
1740            .expect("rollup");
1741
1742        datadriven
1743            .rollups
1744            .insert(output.to_string(), rollup.clone());
1745
1746        Ok(format!(
1747            "state={} diffs=[{}, {})\n",
1748            rollup.seqno,
1749            rollup._desc.lower().first().expect("seqno"),
1750            rollup._desc.upper().first().expect("seqno"),
1751        ))
1752    }
1753
1754    pub async fn add_rollup(
1755        datadriven: &mut MachineState,
1756        args: DirectiveArgs<'_>,
1757    ) -> Result<String, anyhow::Error> {
1758        let input = args.expect_str("input");
1759        let rollup = datadriven
1760            .rollups
1761            .get(input)
1762            .expect("unknown batch")
1763            .clone();
1764
1765        let (applied, maintenance) = datadriven
1766            .machine
1767            .add_rollup((rollup.seqno, &rollup.to_hollow()))
1768            .await;
1769
1770        if !applied {
1771            return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1772        }
1773
1774        datadriven.routine.push(maintenance);
1775        Ok(format!("{}\n", datadriven.machine.seqno()))
1776    }
1777
1778    pub async fn write_batch(
1779        datadriven: &mut MachineState,
1780        args: DirectiveArgs<'_>,
1781    ) -> Result<String, anyhow::Error> {
1782        let output = args.expect_str("output");
1783        let lower = args.expect_antichain("lower");
1784        let upper = args.expect_antichain("upper");
1785        assert!(PartialOrder::less_than(&lower, &upper));
1786        let since = args
1787            .optional_antichain("since")
1788            .unwrap_or_else(|| Antichain::from_elem(0));
1789        let target_size = args.optional("target_size");
1790        let parts_size_override = args.optional("parts_size_override");
1791        let consolidate = args.optional("consolidate").unwrap_or(true);
1792        let mut updates: Vec<_> = args
1793            .input
1794            .split('\n')
1795            .flat_map(DirectiveArgs::parse_update)
1796            .collect();
1797
1798        let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1799        if let Some(target_size) = target_size {
1800            cfg.blob_target_size = target_size;
1801        };
1802        if consolidate {
1803            consolidate_updates(&mut updates);
1804        }
1805        let run_order = if consolidate {
1806            cfg.preferred_order
1807        } else {
1808            RunOrder::Unordered
1809        };
1810        let parts = BatchParts::new_ordered::<i64>(
1811            cfg.clone(),
1812            run_order,
1813            Arc::clone(&datadriven.client.metrics),
1814            Arc::clone(&datadriven.machine.applier.shard_metrics),
1815            datadriven.shard_id,
1816            Arc::clone(&datadriven.client.blob),
1817            Arc::clone(&datadriven.client.isolated_runtime),
1818            &datadriven.client.metrics.user,
1819        );
1820        let builder = BatchBuilderInternal::new(
1821            cfg.clone(),
1822            parts,
1823            Arc::clone(&datadriven.client.metrics),
1824            SCHEMAS.clone(),
1825            Arc::clone(&datadriven.client.blob),
1826            datadriven.shard_id.clone(),
1827            datadriven.client.cfg.build_version.clone(),
1828        );
1829        let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1830        for ((k, ()), t, d) in updates {
1831            builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1832        }
1833        let mut batch = builder.finish(upper).await?;
1834        // We can only reasonably use parts_size_override with hollow batches,
1835        // so if it's set, flush any inline batches out.
1836        if parts_size_override.is_some() {
1837            batch
1838                .flush_to_blob(
1839                    &cfg,
1840                    &datadriven.client.metrics.user,
1841                    &datadriven.client.isolated_runtime,
1842                    &SCHEMAS,
1843                )
1844                .await;
1845        }
1846        let batch = batch.into_hollow_batch();
1847        let batch = IdHollowBatch {
1848            batch: Arc::new(batch),
1849            id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1850        };
1851        datadriven.next_id += 1;
1852
1853        if let Some(size) = parts_size_override {
1854            let mut batch = batch.clone();
1855            let mut hollow_batch = (*batch.batch).clone();
1856            for part in hollow_batch.parts.iter_mut() {
1857                match part {
1858                    RunPart::Many(run) => run.max_part_bytes = size,
1859                    RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1860                    RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1861                }
1862            }
1863            batch.batch = Arc::new(hollow_batch);
1864            datadriven.batches.insert(output.to_owned(), batch);
1865        } else {
1866            datadriven.batches.insert(output.to_owned(), batch.clone());
1867        }
1868        Ok(format!(
1869            "parts={} len={}\n",
1870            batch.batch.part_count(),
1871            batch.batch.len
1872        ))
1873    }
1874
1875    pub async fn fetch_batch(
1876        datadriven: &MachineState,
1877        args: DirectiveArgs<'_>,
1878    ) -> Result<String, anyhow::Error> {
1879        let input = args.expect_str("input");
1880        let stats = args.optional_str("stats");
1881        let batch = datadriven.batches.get(input).expect("unknown batch");
1882
1883        let mut s = String::new();
1884        let mut stream = pin!(
1885            batch
1886                .batch
1887                .part_stream(
1888                    datadriven.shard_id,
1889                    &*datadriven.state_versions.blob,
1890                    &*datadriven.state_versions.metrics
1891                )
1892                .enumerate()
1893        );
1894        while let Some((idx, part)) = stream.next().await {
1895            let part = &*part?;
1896            write!(s, "<part {idx}>\n");
1897
1898            let lower = match part {
1899                BatchPart::Inline { updates, .. } => {
1900                    let updates: BlobTraceBatchPart<u64> =
1901                        updates.decode(&datadriven.client.metrics.columnar)?;
1902                    updates.structured_key_lower()
1903                }
1904                other => other.structured_key_lower(),
1905            };
1906
1907            if let Some(lower) = lower {
1908                if stats == Some("lower") {
1909                    writeln!(s, "<key lower={}>", lower.get())
1910                }
1911            }
1912
1913            match part {
1914                BatchPart::Hollow(part) => {
1915                    let blob_batch = datadriven
1916                        .client
1917                        .blob
1918                        .get(&part.key.complete(&datadriven.shard_id))
1919                        .await;
1920                    match blob_batch {
1921                        Ok(Some(_)) | Err(_) => {}
1922                        // don't try to fetch/print the keys of the batch part
1923                        // if the blob store no longer has it
1924                        Ok(None) => {
1925                            s.push_str("<empty>\n");
1926                            continue;
1927                        }
1928                    };
1929                }
1930                BatchPart::Inline { .. } => {}
1931            };
1932            let part = EncodedPart::fetch(
1933                &FetchConfig::from_persist_config(&datadriven.client.cfg),
1934                &datadriven.shard_id,
1935                datadriven.client.blob.as_ref(),
1936                datadriven.client.metrics.as_ref(),
1937                datadriven.machine.applier.shard_metrics.as_ref(),
1938                &datadriven.client.metrics.read.batch_fetcher,
1939                &batch.batch.desc,
1940                part,
1941            )
1942            .await
1943            .expect("invalid batch part");
1944            let part = part
1945                .normalize(&datadriven.client.metrics.columnar)
1946                .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1947
1948            for ((k, _v), t, d) in part
1949                .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1950                .expect("valid schemas")
1951            {
1952                writeln!(s, "{k} {t} {d}");
1953            }
1954        }
1955        if !s.is_empty() {
1956            for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1957                write!(s, "<run {idx}>\n");
1958                for part in run {
1959                    let part_idx = batch
1960                        .batch
1961                        .parts
1962                        .iter()
1963                        .position(|p| p == part)
1964                        .expect("part should exist");
1965                    write!(s, "part {part_idx}\n");
1966                }
1967            }
1968        }
1969        Ok(s)
1970    }
1971
1972    #[allow(clippy::unused_async)]
1973    pub async fn truncate_batch_desc(
1974        datadriven: &mut MachineState,
1975        args: DirectiveArgs<'_>,
1976    ) -> Result<String, anyhow::Error> {
1977        let input = args.expect_str("input");
1978        let output = args.expect_str("output");
1979        let lower = args.expect_antichain("lower");
1980        let upper = args.expect_antichain("upper");
1981
1982        let batch = datadriven
1983            .batches
1984            .get(input)
1985            .expect("unknown batch")
1986            .clone();
1987        let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1988        let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1989        let mut new_hollow_batch = (*batch.batch).clone();
1990        new_hollow_batch.desc = truncated_desc;
1991        let new_batch = IdHollowBatch {
1992            batch: Arc::new(new_hollow_batch),
1993            id: batch.id,
1994        };
1995        datadriven
1996            .batches
1997            .insert(output.to_owned(), new_batch.clone());
1998        Ok(format!(
1999            "parts={} len={}\n",
2000            batch.batch.part_count(),
2001            batch.batch.len
2002        ))
2003    }
2004
2005    #[allow(clippy::unused_async)]
2006    pub async fn set_batch_parts_size(
2007        datadriven: &mut MachineState,
2008        args: DirectiveArgs<'_>,
2009    ) -> Result<String, anyhow::Error> {
2010        let input = args.expect_str("input");
2011        let size = args.expect("size");
2012        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2013        let mut hollow_batch = (*batch.batch).clone();
2014        for part in hollow_batch.parts.iter_mut() {
2015            match part {
2016                RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
2017                _ => {
2018                    panic!("set_batch_parts_size only supports hollow parts")
2019                }
2020            }
2021        }
2022        batch.batch = Arc::new(hollow_batch);
2023        Ok("ok\n".to_string())
2024    }
2025
2026    pub async fn compact(
2027        datadriven: &mut MachineState,
2028        args: DirectiveArgs<'_>,
2029    ) -> Result<String, anyhow::Error> {
2030        let output = args.expect_str("output");
2031        let lower = args.expect_antichain("lower");
2032        let upper = args.expect_antichain("upper");
2033        let since = args.expect_antichain("since");
2034        let target_size = args.optional("target_size");
2035        let memory_bound = args.optional("memory_bound");
2036
2037        let mut inputs = Vec::new();
2038        for input in args.args.get("inputs").expect("missing inputs") {
2039            inputs.push(
2040                datadriven
2041                    .batches
2042                    .get(input)
2043                    .expect("unknown batch")
2044                    .clone(),
2045            );
2046        }
2047
2048        let cfg = datadriven.client.cfg.clone();
2049        if let Some(target_size) = target_size {
2050            cfg.set_config(&BLOB_TARGET_SIZE, target_size);
2051        };
2052        if let Some(memory_bound) = memory_bound {
2053            cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
2054        }
2055        let req = CompactReq {
2056            shard_id: datadriven.shard_id,
2057            desc: Description::new(lower, upper, since),
2058            inputs: inputs.clone(),
2059        };
2060        datadriven
2061            .compactions
2062            .insert(output.to_owned(), req.clone());
2063        let spine_lower = inputs
2064            .first()
2065            .map_or_else(|| datadriven.next_id, |x| x.id.0);
2066        let spine_upper = inputs.last().map_or_else(
2067            || {
2068                datadriven.next_id += 1;
2069                datadriven.next_id
2070            },
2071            |x| x.id.1,
2072        );
2073        let new_spine_id = SpineId(spine_lower, spine_upper);
2074        let res = Compactor::<String, (), u64, i64>::compact(
2075            CompactConfig::new(&cfg, datadriven.shard_id),
2076            Arc::clone(&datadriven.client.blob),
2077            Arc::clone(&datadriven.client.metrics),
2078            Arc::clone(&datadriven.machine.applier.shard_metrics),
2079            Arc::clone(&datadriven.client.isolated_runtime),
2080            req,
2081            SCHEMAS.clone(),
2082        )
2083        .await?;
2084
2085        let batch = IdHollowBatch {
2086            batch: Arc::new(res.output.clone()),
2087            id: new_spine_id,
2088        };
2089
2090        datadriven.batches.insert(output.to_owned(), batch.clone());
2091        Ok(format!(
2092            "parts={} len={}\n",
2093            res.output.part_count(),
2094            res.output.len
2095        ))
2096    }
2097
2098    pub async fn clear_blob(
2099        datadriven: &MachineState,
2100        _args: DirectiveArgs<'_>,
2101    ) -> Result<String, anyhow::Error> {
2102        let mut to_delete = vec![];
2103        datadriven
2104            .client
2105            .blob
2106            .list_keys_and_metadata("", &mut |meta| {
2107                to_delete.push(meta.key.to_owned());
2108            })
2109            .await?;
2110        for blob in &to_delete {
2111            datadriven.client.blob.delete(blob).await?;
2112        }
2113        Ok(format!("deleted={}\n", to_delete.len()))
2114    }
2115
2116    pub async fn restore_blob(
2117        datadriven: &MachineState,
2118        _args: DirectiveArgs<'_>,
2119    ) -> Result<String, anyhow::Error> {
2120        let not_restored = crate::internal::restore::restore_blob(
2121            &datadriven.state_versions,
2122            datadriven.client.blob.as_ref(),
2123            &datadriven.client.cfg.build_version,
2124            datadriven.shard_id,
2125            &*datadriven.state_versions.metrics,
2126        )
2127        .await?;
2128        let mut out = String::new();
2129        for key in not_restored {
2130            writeln!(&mut out, "{key}");
2131        }
2132        Ok(out)
2133    }
2134
2135    #[allow(clippy::unused_async)]
2136    pub async fn rewrite_ts(
2137        datadriven: &mut MachineState,
2138        args: DirectiveArgs<'_>,
2139    ) -> Result<String, anyhow::Error> {
2140        let input = args.expect_str("input");
2141        let ts_rewrite = args.expect_antichain("frontier");
2142        let upper = args.expect_antichain("upper");
2143
2144        let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2145        let mut hollow_batch = (*batch.batch).clone();
2146        let () = hollow_batch
2147            .rewrite_ts(&ts_rewrite, upper)
2148            .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2149        batch.batch = Arc::new(hollow_batch);
2150        Ok("ok\n".into())
2151    }
2152
2153    pub async fn gc(
2154        datadriven: &mut MachineState,
2155        args: DirectiveArgs<'_>,
2156    ) -> Result<String, anyhow::Error> {
2157        let new_seqno_since = args.expect("to_seqno");
2158
2159        let req = GcReq {
2160            shard_id: datadriven.shard_id,
2161            new_seqno_since,
2162        };
2163        let (maintenance, stats) =
2164            GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2165        datadriven.routine.push(maintenance);
2166
2167        Ok(format!(
2168            "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2169            datadriven.machine.seqno(),
2170            stats.batch_parts_deleted_from_blob,
2171            stats.rollups_deleted_from_blob,
2172            stats
2173                .truncated_consensus_to
2174                .iter()
2175                .map(|x| x.to_string())
2176                .collect::<Vec<_>>()
2177                .join(","),
2178            stats
2179                .rollups_removed_from_state
2180                .iter()
2181                .map(|x| x.to_string())
2182                .collect::<Vec<_>>()
2183                .join(","),
2184        ))
2185    }
2186
2187    pub async fn snapshot(
2188        datadriven: &MachineState,
2189        args: DirectiveArgs<'_>,
2190    ) -> Result<String, anyhow::Error> {
2191        let as_of = args.expect_antichain("as_of");
2192        let snapshot = datadriven
2193            .machine
2194            .snapshot(&as_of)
2195            .await
2196            .map_err(|err| anyhow!("{:?}", err))?;
2197
2198        let mut result = String::new();
2199
2200        for batch in snapshot {
2201            writeln!(
2202                result,
2203                "<batch {:?}-{:?}>",
2204                batch.desc.lower().elements(),
2205                batch.desc.upper().elements()
2206            );
2207            for (run, (_meta, parts)) in batch.runs().enumerate() {
2208                writeln!(result, "<run {run}>");
2209                let mut stream = pin!(
2210                    futures::stream::iter(parts)
2211                        .flat_map(|part| part.part_stream(
2212                            datadriven.shard_id,
2213                            &*datadriven.state_versions.blob,
2214                            &*datadriven.state_versions.metrics
2215                        ))
2216                        .enumerate()
2217                );
2218
2219                while let Some((idx, part)) = stream.next().await {
2220                    let part = &*part?;
2221                    writeln!(result, "<part {idx}>");
2222
2223                    let part = EncodedPart::fetch(
2224                        &FetchConfig::from_persist_config(&datadriven.client.cfg),
2225                        &datadriven.shard_id,
2226                        datadriven.client.blob.as_ref(),
2227                        datadriven.client.metrics.as_ref(),
2228                        datadriven.machine.applier.shard_metrics.as_ref(),
2229                        &datadriven.client.metrics.read.batch_fetcher,
2230                        &batch.desc,
2231                        part,
2232                    )
2233                    .await
2234                    .expect("invalid batch part");
2235                    let part = part
2236                        .normalize(&datadriven.client.metrics.columnar)
2237                        .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2238
2239                    let mut updates = Vec::new();
2240
2241                    for ((k, _v), mut t, d) in part
2242                        .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2243                        .expect("valid schemas")
2244                    {
2245                        t.advance_by(as_of.borrow());
2246                        updates.push((k, t, d));
2247                    }
2248
2249                    consolidate_updates(&mut updates);
2250
2251                    for (k, t, d) in updates {
2252                        writeln!(result, "{k} {t} {d}");
2253                    }
2254                }
2255            }
2256        }
2257
2258        Ok(result)
2259    }
2260
2261    pub async fn register_listen(
2262        datadriven: &mut MachineState,
2263        args: DirectiveArgs<'_>,
2264    ) -> Result<String, anyhow::Error> {
2265        let output = args.expect_str("output");
2266        let as_of = args.expect_antichain("as_of");
2267        let read = datadriven
2268            .client
2269            .open_leased_reader::<String, (), u64, i64>(
2270                datadriven.shard_id,
2271                Arc::new(StringSchema),
2272                Arc::new(UnitSchema),
2273                Diagnostics::for_tests(),
2274                true,
2275            )
2276            .await
2277            .expect("invalid shard types");
2278        let listen = read
2279            .listen(as_of)
2280            .await
2281            .map_err(|err| anyhow!("{:?}", err))?;
2282        datadriven.listens.insert(output.to_owned(), listen);
2283        Ok("ok\n".into())
2284    }
2285
2286    pub async fn listen_through(
2287        datadriven: &mut MachineState,
2288        args: DirectiveArgs<'_>,
2289    ) -> Result<String, anyhow::Error> {
2290        let input = args.expect_str("input");
2291        // It's not possible to listen _through_ the empty antichain, so this is
2292        // intentionally `expect` instead of `expect_antichain`.
2293        let frontier = args.expect("frontier");
2294        let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2295        let mut s = String::new();
2296        loop {
2297            for event in listen.fetch_next().await {
2298                match event {
2299                    ListenEvent::Updates(x) => {
2300                        for ((k, _v), t, d) in x.iter() {
2301                            write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2302                        }
2303                    }
2304                    ListenEvent::Progress(x) => {
2305                        if !x.less_than(&frontier) {
2306                            return Ok(s);
2307                        }
2308                    }
2309                }
2310            }
2311        }
2312    }
2313
2314    pub async fn register_critical_reader(
2315        datadriven: &mut MachineState,
2316        args: DirectiveArgs<'_>,
2317    ) -> Result<String, anyhow::Error> {
2318        let reader_id = args.expect("reader_id");
2319        let (state, maintenance) = datadriven
2320            .machine
2321            .register_critical_reader::<u64>(&reader_id, "tests")
2322            .await;
2323        datadriven.routine.push(maintenance);
2324        Ok(format!(
2325            "{} {:?}\n",
2326            datadriven.machine.seqno(),
2327            state.since.elements(),
2328        ))
2329    }
2330
2331    pub async fn register_leased_reader(
2332        datadriven: &mut MachineState,
2333        args: DirectiveArgs<'_>,
2334    ) -> Result<String, anyhow::Error> {
2335        let reader_id = args.expect("reader_id");
2336        let (reader_state, maintenance) = datadriven
2337            .machine
2338            .register_leased_reader(
2339                &reader_id,
2340                "tests",
2341                READER_LEASE_DURATION.get(&datadriven.client.cfg),
2342                (datadriven.client.cfg.now)(),
2343                false,
2344            )
2345            .await;
2346        datadriven.routine.push(maintenance);
2347        Ok(format!(
2348            "{} {:?}\n",
2349            datadriven.machine.seqno(),
2350            reader_state.since.elements(),
2351        ))
2352    }
2353
2354    pub async fn heartbeat_leased_reader(
2355        datadriven: &MachineState,
2356        args: DirectiveArgs<'_>,
2357    ) -> Result<String, anyhow::Error> {
2358        let reader_id = args.expect("reader_id");
2359        let _ = datadriven
2360            .machine
2361            .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2362            .await;
2363        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2364    }
2365
2366    pub async fn expire_critical_reader(
2367        datadriven: &mut MachineState,
2368        args: DirectiveArgs<'_>,
2369    ) -> Result<String, anyhow::Error> {
2370        let reader_id = args.expect("reader_id");
2371        let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2372        datadriven.routine.push(maintenance);
2373        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2374    }
2375
2376    pub async fn expire_leased_reader(
2377        datadriven: &mut MachineState,
2378        args: DirectiveArgs<'_>,
2379    ) -> Result<String, anyhow::Error> {
2380        let reader_id = args.expect("reader_id");
2381        let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2382        datadriven.routine.push(maintenance);
2383        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2384    }
2385
2386    pub async fn compare_and_append_batches(
2387        datadriven: &MachineState,
2388        args: DirectiveArgs<'_>,
2389    ) -> Result<String, anyhow::Error> {
2390        let expected_upper = args.expect_antichain("expected_upper");
2391        let new_upper = args.expect_antichain("new_upper");
2392
2393        let mut batches: Vec<Batch<String, (), u64, i64>> = args
2394            .args
2395            .get("batches")
2396            .expect("missing batches")
2397            .into_iter()
2398            .map(|batch| {
2399                let hollow = (*datadriven
2400                    .batches
2401                    .get(batch)
2402                    .expect("unknown batch")
2403                    .clone()
2404                    .batch)
2405                    .clone();
2406                datadriven.to_batch(hollow)
2407            })
2408            .collect();
2409
2410        let mut writer = datadriven
2411            .client
2412            .open_writer(
2413                datadriven.shard_id,
2414                Arc::new(StringSchema),
2415                Arc::new(UnitSchema),
2416                Diagnostics::for_tests(),
2417            )
2418            .await?;
2419
2420        let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2421
2422        let () = writer
2423            .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2424            .await?
2425            .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2426
2427        writer.expire().await;
2428
2429        Ok("ok\n".into())
2430    }
2431
2432    pub async fn expire_writer(
2433        datadriven: &mut MachineState,
2434        args: DirectiveArgs<'_>,
2435    ) -> Result<String, anyhow::Error> {
2436        let writer_id = args.expect("writer_id");
2437        let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2438        datadriven.routine.push(maintenance);
2439        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2440    }
2441
2442    pub(crate) async fn finalize(
2443        datadriven: &mut MachineState,
2444        _args: DirectiveArgs<'_>,
2445    ) -> anyhow::Result<String> {
2446        let maintenance = datadriven.machine.become_tombstone().await?;
2447        datadriven.routine.push(maintenance);
2448        Ok(format!("{} ok\n", datadriven.machine.seqno()))
2449    }
2450
2451    pub(crate) fn is_finalized(
2452        datadriven: &MachineState,
2453        _args: DirectiveArgs<'_>,
2454    ) -> anyhow::Result<String> {
2455        let seqno = datadriven.machine.seqno();
2456        let tombstone = datadriven.machine.is_finalized();
2457        Ok(format!("{seqno} {tombstone}\n"))
2458    }
2459
2460    pub async fn compare_and_append(
2461        datadriven: &mut MachineState,
2462        args: DirectiveArgs<'_>,
2463    ) -> Result<String, anyhow::Error> {
2464        let input = args.expect_str("input");
2465        let writer_id = args.expect("writer_id");
2466        let mut batch = datadriven
2467            .batches
2468            .get(input)
2469            .expect("unknown batch")
2470            .clone();
2471        let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2472        let now = (datadriven.client.cfg.now)();
2473
2474        let (id, maintenance) = datadriven
2475            .machine
2476            .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2477            .await;
2478        assert_eq!(id, SCHEMAS.id);
2479        datadriven.routine.push(maintenance);
2480        let maintenance = loop {
2481            let indeterminate = args
2482                .optional::<String>("prev_indeterminate")
2483                .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2484            let res = datadriven
2485                .machine
2486                .compare_and_append_idempotent(
2487                    &batch.batch,
2488                    &writer_id,
2489                    now,
2490                    &token,
2491                    &HandleDebugState::default(),
2492                    indeterminate,
2493                )
2494                .await;
2495            match res {
2496                CompareAndAppendRes::Success(_, x) => break x,
2497                CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2498                    return Err(anyhow!("{:?}", Upper(upper)));
2499                }
2500                CompareAndAppendRes::InlineBackpressure => {
2501                    let hollow_batch = (*batch.batch).clone();
2502                    let mut b = datadriven.to_batch(hollow_batch);
2503                    let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2504                    b.flush_to_blob(
2505                        &cfg,
2506                        &datadriven.client.metrics.user,
2507                        &datadriven.client.isolated_runtime,
2508                        &*SCHEMAS,
2509                    )
2510                    .await;
2511                    batch.batch = Arc::new(b.into_hollow_batch());
2512                    continue;
2513                }
2514                _ => panic!("{:?}", res),
2515            };
2516        };
2517        // TODO: Don't throw away writer maintenance. It's slightly tricky
2518        // because we need a WriterId for Compactor.
2519        datadriven.routine.push(maintenance.routine);
2520        Ok(format!(
2521            "{} {:?}\n",
2522            datadriven.machine.seqno(),
2523            datadriven.machine.applier.clone_upper().elements(),
2524        ))
2525    }
2526
2527    pub async fn apply_merge_res(
2528        datadriven: &mut MachineState,
2529        args: DirectiveArgs<'_>,
2530    ) -> Result<String, anyhow::Error> {
2531        let input = args.expect_str("input");
2532        let batch = datadriven
2533            .batches
2534            .get(input)
2535            .expect("unknown batch")
2536            .clone();
2537        let compact_req = datadriven
2538            .compactions
2539            .get(input)
2540            .expect("unknown compact req")
2541            .clone();
2542        let input_batches = compact_req
2543            .inputs
2544            .iter()
2545            .map(|x| x.id)
2546            .collect::<BTreeSet<_>>();
2547        let lower_spine_bound = input_batches
2548            .first()
2549            .map(|id| id.0)
2550            .expect("at least one batch must be present");
2551        let upper_spine_bound = input_batches
2552            .last()
2553            .map(|id| id.1)
2554            .expect("at least one batch must be present");
2555        let id = SpineId(lower_spine_bound, upper_spine_bound);
2556        let hollow_batch = (*batch.batch).clone();
2557
2558        let (merge_res, maintenance) = datadriven
2559            .machine
2560            .merge_res(&FueledMergeRes {
2561                output: hollow_batch,
2562                input: CompactionInput::IdRange(id),
2563                new_active_compaction: None,
2564            })
2565            .await;
2566        datadriven.routine.push(maintenance);
2567        Ok(format!(
2568            "{} {}\n",
2569            datadriven.machine.seqno(),
2570            merge_res.applied()
2571        ))
2572    }
2573
2574    pub async fn perform_maintenance(
2575        datadriven: &mut MachineState,
2576        _args: DirectiveArgs<'_>,
2577    ) -> Result<String, anyhow::Error> {
2578        let mut s = String::new();
2579        for maintenance in datadriven.routine.drain(..) {
2580            let () = maintenance
2581                .perform(&datadriven.machine, &datadriven.gc)
2582                .await;
2583            let () = datadriven
2584                .machine
2585                .applier
2586                .fetch_and_update_state(None)
2587                .await;
2588            write!(s, "{} ok\n", datadriven.machine.seqno());
2589        }
2590        Ok(s)
2591    }
2592}
2593
2594#[cfg(test)]
2595pub mod tests {
2596    use std::sync::Arc;
2597
2598    use mz_dyncfg::ConfigUpdates;
2599    use mz_ore::cast::CastFrom;
2600    use mz_ore::task::spawn;
2601    use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2602    use mz_persist::location::SeqNo;
2603    use mz_persist_types::PersistLocation;
2604    use semver::Version;
2605    use timely::progress::Antichain;
2606
2607    use crate::batch::BatchBuilderConfig;
2608    use crate::cache::StateCache;
2609    use crate::internal::gc::{GarbageCollector, GcReq};
2610    use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2611    use crate::tests::{new_test_client, new_test_client_cache};
2612    use crate::{Diagnostics, PersistClient, ShardId};
2613
2614    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2615    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2616    async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2617        mz_ore::test::init_logging();
2618
2619        let client = new_test_client(&dyncfgs).await;
2620        // set a low rollup threshold so GC/truncation is more aggressive
2621        client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2622        let (mut write, _) = client
2623            .expect_open::<String, (), u64, i64>(ShardId::new())
2624            .await;
2625
2626        // Write a bunch of batches. This should result in a bounded number of
2627        // live entries in consensus.
2628        const NUM_BATCHES: u64 = 100;
2629        for idx in 0..NUM_BATCHES {
2630            let mut batch = write
2631                .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2632                .await;
2633            // Flush this batch out so the CaA doesn't get inline writes
2634            // backpressure.
2635            let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2636            batch
2637                .flush_to_blob(
2638                    &cfg,
2639                    &client.metrics.user,
2640                    &client.isolated_runtime,
2641                    &write.write_schemas,
2642                )
2643                .await;
2644            let (_, writer_maintenance) = write
2645                .machine
2646                .compare_and_append(
2647                    &batch.into_hollow_batch(),
2648                    &write.writer_id,
2649                    &HandleDebugState::default(),
2650                    (write.cfg.now)(),
2651                )
2652                .await
2653                .unwrap();
2654            writer_maintenance
2655                .perform(&write.machine, &write.gc, write.compact.as_ref())
2656                .await;
2657        }
2658        let live_diffs = write
2659            .machine
2660            .applier
2661            .state_versions
2662            .fetch_all_live_diffs(&write.machine.shard_id())
2663            .await;
2664        // Make sure we constructed the key correctly.
2665        assert!(live_diffs.0.len() > 0);
2666        // Make sure the number of entries is bounded. (I think we could work
2667        // out a tighter bound than this, but the point is only that it's
2668        // bounded).
2669        let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2670        assert!(
2671            live_diffs.0.len() <= max_live_diffs,
2672            "{} vs {}",
2673            live_diffs.0.len(),
2674            max_live_diffs
2675        );
2676    }
2677
2678    // A regression test for database-issues#4206, where a bug in gc led to an incremental
2679    // state invariant being violated which resulted in gc being permanently
2680    // wedged for the shard.
2681    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2682    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2683    async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2684        let mut client = new_test_client(&dyncfgs).await;
2685        let intercept = InterceptHandle::default();
2686        client.blob = Arc::new(InterceptBlob::new(
2687            Arc::clone(&client.blob),
2688            intercept.clone(),
2689        ));
2690        let (_, mut read) = client
2691            .expect_open::<String, String, u64, i64>(ShardId::new())
2692            .await;
2693
2694        // Create a new SeqNo
2695        read.downgrade_since(&Antichain::from_elem(1)).await;
2696        let new_seqno_since = read.machine.applier.seqno_since();
2697
2698        // Start a GC in the background for some SeqNo range that is not
2699        // contiguous compared to the last gc req (in this case, n/a) and then
2700        // crash when it gets to the blob deletes. In the regression case, this
2701        // renders the shard permanently un-gc-able.
2702        assert!(new_seqno_since > SeqNo::minimum());
2703        intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2704        let machine = read.machine.clone();
2705        // Run this in a spawn so we can catch the boom panic
2706        let gc = spawn(|| "", async move {
2707            let req = GcReq {
2708                shard_id: machine.shard_id(),
2709                new_seqno_since,
2710            };
2711            GarbageCollector::gc_and_truncate(&machine, req).await
2712        });
2713        // Wait for gc to either panic (regression case) or finish (good case)
2714        // because it happens to not call blob delete.
2715        let _ = gc.await;
2716
2717        // Allow blob deletes to go through and try GC again. In the regression
2718        // case, this hangs.
2719        intercept.set_post_delete(None);
2720        let req = GcReq {
2721            shard_id: read.machine.shard_id(),
2722            new_seqno_since,
2723        };
2724        let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2725    }
2726
2727    // A regression test for materialize#20776, where a bug meant that compare_and_append
2728    // would not fetch the latest state after an upper mismatch. This meant that
2729    // a write that could succeed if retried on the latest state would instead
2730    // return an UpperMismatch.
2731    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2732    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
2733    async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2734        let client = new_test_client(&dyncfgs).await;
2735        let mut client2 = client.clone();
2736
2737        // The bug can only happen if the two WriteHandles have separate copies
2738        // of state, so make sure that each is given its own StateCache.
2739        let new_state_cache = Arc::new(StateCache::new_no_metrics());
2740        client2.shared_states = new_state_cache;
2741
2742        let shard_id = ShardId::new();
2743        let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2744        let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2745
2746        let data = [
2747            (("1".to_owned(), ()), 1, 1),
2748            (("2".to_owned(), ()), 2, 1),
2749            (("3".to_owned(), ()), 3, 1),
2750            (("4".to_owned(), ()), 4, 1),
2751            (("5".to_owned(), ()), 5, 1),
2752        ];
2753
2754        write1.expect_compare_and_append(&data[..1], 0, 2).await;
2755
2756        // this handle's upper now lags behind. if compare_and_append fails to update
2757        // state after an upper mismatch then this call would (incorrectly) fail
2758        write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2759    }
2760
2761    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2762    #[cfg_attr(miri, ignore)]
2763    async fn version_upgrade(dyncfgs: ConfigUpdates) {
2764        let mut cache = new_test_client_cache(&dyncfgs);
2765        cache.cfg.build_version = Version::new(26, 1, 0);
2766        let shard_id = ShardId::new();
2767
2768        async fn fetch_catalog_upgrade_shard_version(
2769            persist_client: &PersistClient,
2770            upgrade_shard_id: ShardId,
2771        ) -> Option<semver::Version> {
2772            let shard_state = persist_client
2773                .inspect_shard::<u64>(&upgrade_shard_id)
2774                .await
2775                .ok()?;
2776            let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2777            let upgrade_version = json_state
2778                .get("applier_version")
2779                .cloned()
2780                .expect("missing applier_version");
2781            let upgrade_version =
2782                serde_json::from_value(upgrade_version).expect("version deserialization error");
2783            Some(upgrade_version)
2784        }
2785
2786        cache.cfg.build_version = Version::new(26, 1, 0);
2787        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2788        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2789        reader.downgrade_since(&Antichain::from_elem(1)).await;
2790        assert_eq!(
2791            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2792            Some(Version::new(26, 1, 0)),
2793        );
2794
2795        // Merely opening and operating on the shard at a new version doesn't bump version...
2796        cache.cfg.build_version = Version::new(27, 1, 0);
2797        let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2798        let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2799        reader.downgrade_since(&Antichain::from_elem(2)).await;
2800        assert_eq!(
2801            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2802            Some(Version::new(26, 1, 0)),
2803        );
2804
2805        // ...but an explicit call will.
2806        client
2807            .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2808            .await
2809            .unwrap();
2810        assert_eq!(
2811            fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2812            Some(Version::new(27, 1, 0)),
2813        );
2814    }
2815}