mz_persist_client/internal/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of persist command application.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::cast::CastFrom;
21use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
22use mz_persist_types::schema::SchemaId;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::{Antichain, Timestamp};
25use tracing::debug;
26
27use crate::cache::{LockingTypedState, StateCache};
28use crate::error::{CodecMismatch, InvalidUsage};
29use crate::internal::gc::GcReq;
30use crate::internal::maintenance::RoutineMaintenance;
31use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
32use crate::internal::paths::{PartialRollupKey, RollupId};
33use crate::internal::state::{
34    EncodedSchemas, ExpiryMetrics, HollowBatch, ROLLUP_THRESHOLD, Since, SnapshotErr,
35    StateCollections, TypedState, Upper,
36};
37use crate::internal::state_diff::StateDiff;
38use crate::internal::state_versions::{EncodedRollup, StateVersions};
39use crate::internal::trace::FueledMergeReq;
40use crate::internal::watch::StateWatch;
41use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
42use crate::schema::SchemaCache;
43use crate::{Diagnostics, PersistConfig, ShardId};
44
45use super::state::{
46    ActiveGc, ActiveRollup, GC_FALLBACK_THRESHOLD_MS, GC_USE_ACTIVE_GC,
47    ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_USE_ACTIVE_ROLLUP,
48};
49
50/// An applier of persist commands.
51///
52/// This struct exists mainly to allow us to very narrowly bound the surface
53/// area that directly interacts with state.
54#[derive(Debug)]
55pub struct Applier<K, V, T, D> {
56    pub(crate) cfg: PersistConfig,
57    pub(crate) metrics: Arc<Metrics>,
58    pub(crate) shard_metrics: Arc<ShardMetrics>,
59    pub(crate) state_versions: Arc<StateVersions>,
60    shared_states: Arc<StateCache>,
61    pubsub_sender: Arc<dyn PubSubSender>,
62    pub(crate) shard_id: ShardId,
63
64    // Access to the shard's state, shared across all handles created by the same
65    // PersistClientCache. The state is wrapped in LockingTypedState, disallowing
66    // access across await points. Access should be always be kept brief, and it
67    // is expected that other handles may advance the state at any time this Applier
68    // is not holding the lock.
69    //
70    // NB: This is very intentionally not pub(crate) so that it's easy to reason
71    //     very locally about the duration of locks.
72    state: Arc<LockingTypedState<K, V, T, D>>,
73}
74
75// Impl Clone regardless of the type params.
76impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
77    fn clone(&self) -> Self {
78        Self {
79            cfg: self.cfg.clone(),
80            metrics: Arc::clone(&self.metrics),
81            shard_metrics: Arc::clone(&self.shard_metrics),
82            state_versions: Arc::clone(&self.state_versions),
83            shared_states: Arc::clone(&self.shared_states),
84            pubsub_sender: Arc::clone(&self.pubsub_sender),
85            shard_id: self.shard_id,
86            state: Arc::clone(&self.state),
87        }
88    }
89}
90
91impl<K, V, T, D> Applier<K, V, T, D>
92where
93    K: Debug + Codec,
94    V: Debug + Codec,
95    T: Timestamp + Lattice + Codec64 + Sync,
96    D: Semigroup + Codec64,
97{
98    pub async fn new(
99        cfg: PersistConfig,
100        shard_id: ShardId,
101        metrics: Arc<Metrics>,
102        state_versions: Arc<StateVersions>,
103        shared_states: Arc<StateCache>,
104        pubsub_sender: Arc<dyn PubSubSender>,
105        diagnostics: Diagnostics,
106    ) -> Result<Self, Box<CodecMismatch>> {
107        let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
108        let state = shared_states
109            .get::<K, V, T, D, _, _>(
110                shard_id,
111                || {
112                    metrics.cmds.init_state.run_cmd(&shard_metrics, || {
113                        state_versions.maybe_init_shard(&shard_metrics)
114                    })
115                },
116                &diagnostics,
117            )
118            .await?;
119        let ret = Applier {
120            cfg,
121            metrics,
122            shard_metrics,
123            state_versions,
124            shared_states,
125            pubsub_sender,
126            shard_id,
127            state,
128        };
129        Ok(ret)
130    }
131
132    /// Returns a new [StateWatch] for changes to this Applier's State.
133    pub fn watch(&self) -> StateWatch<K, V, T, D> {
134        StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
135    }
136
137    /// Fetches the latest state from Consensus and passes its `upper` to the provided closure.
138    pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
139        self.metrics.cmds.fetch_upper_count.inc();
140        self.fetch_and_update_state(None).await;
141        self.upper(|_seqno, upper| f(upper))
142    }
143
144    /// A point-in-time read/clone of `upper` from the current state.
145    ///
146    /// Due to sharing state with other handles, successive reads to this fn or any other may
147    /// see a different version of state, even if this Applier has not explicitly fetched and
148    /// updated to the latest state. Successive calls will always return values such that
149    /// `PartialOrder::less_equal(call1, call2)` hold true.
150    pub fn clone_upper(&self) -> Antichain<T> {
151        self.upper(|_seqno, upper| upper.clone())
152    }
153
154    pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
155        self.state
156            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
157                f(state.seqno, state.upper())
158            })
159    }
160
161    pub(crate) fn schemas<R>(
162        &self,
163        mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
164    ) -> R {
165        self.state
166            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
167                f(state.seqno, &state.collections.schemas)
168            })
169    }
170
171    pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
172        SchemaCache::new(self.state.schema_cache(), self.clone())
173    }
174
175    /// A point-in-time read of `since` from the current state.
176    ///
177    /// Due to sharing state with other handles, successive reads to this fn or any other may
178    /// see a different version of state, even if this Applier has not explicitly fetched and
179    /// updated to the latest state. Successive calls will always return values such that
180    /// `PartialOrder::less_equal(call1, call2)` hold true.
181    #[cfg(test)]
182    pub fn since(&self) -> Antichain<T> {
183        self.state
184            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
185                state.since().clone()
186            })
187    }
188
189    /// A point-in-time read of `seqno` from the current state.
190    ///
191    /// Due to sharing state with other handles, successive reads to this fn or any other may
192    /// see a different version of state, even if this Applier has not explicitly fetched and
193    /// updated to the latest state. Successive calls will always return values such that
194    /// `call1 <= call2` hold true.
195    pub fn seqno(&self) -> SeqNo {
196        self.state
197            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
198                state.seqno
199            })
200    }
201
202    /// A point-in-time read of `seqno_since` from the current state.
203    ///
204    /// Due to sharing state with other handles, successive reads to this fn or any other may
205    /// see a different version of state, even if this Applier has not explicitly fetched and
206    /// updated to the latest state. Successive calls will always return values such that
207    /// `call1 <= call2` hold true.
208    pub fn seqno_since(&self) -> SeqNo {
209        self.state
210            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
211                state.seqno_since()
212            })
213    }
214
215    /// A point-in-time read from the current state. (We declare a shard 'finalized' if it's
216    /// both become an unreadable tombstone and the state itself is has been emptied out.)
217    ///
218    /// Due to sharing state with other handles, successive reads to this fn or any other may
219    /// see a different version of state, even if this Applier has not explicitly fetched and
220    /// updated to the latest state. Once this fn returns true, it will always return true.
221    pub fn is_finalized(&self) -> bool {
222        self.state
223            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
224                state.collections.is_tombstone() && state.collections.is_single_empty_batch()
225            })
226    }
227
228    /// See [crate::PersistClient::get_schema].
229    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
230        self.state
231            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
232                let x = state.collections.schemas.get(&schema_id)?;
233                Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
234            })
235    }
236
237    /// See [crate::PersistClient::latest_schema].
238    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
239        self.state
240            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
241                let (id, x) = state.collections.schemas.last_key_value()?;
242                Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
243            })
244    }
245
246    /// Returns whether the current's state `since` and `upper` are both empty.
247    ///
248    /// Due to sharing state with other handles, successive reads to this fn or any other may
249    /// see a different version of state, even if this Applier has not explicitly fetched and
250    /// updated to the latest state. Once this fn returns true, it will always return true.
251    pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
252        self.state
253            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
254                if state.since().is_empty() && state.upper().is_empty() {
255                    Ok(())
256                } else {
257                    Err(InvalidUsage::FinalizationError {
258                        since: state.since().clone(),
259                        upper: state.upper().clone(),
260                    })
261                }
262            })
263    }
264
265    /// Returns all rollups that are <= the given `seqno`.
266    ///
267    /// Due to sharing state with other handles, successive reads to this fn or any other may
268    /// see a different version of state, even if this Applier has not explicitly fetched and
269    /// updated to the latest state.
270    pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
271        self.state
272            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
273                state
274                    .collections
275                    .rollups
276                    .range(..=seqno)
277                    .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
278                    .collect::<Vec<(SeqNo, PartialRollupKey)>>()
279            })
280    }
281
282    pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
283        self.state
284            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
285                state
286                    .collections
287                    .trace
288                    .fueled_merge_reqs_before_ms(u64::MAX, None)
289                    .collect()
290            })
291    }
292
293    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
294        self.state
295            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
296                state.snapshot(as_of)
297            })
298    }
299
300    pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
301        self.state
302            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
303                state.state.collections.trace.batches().cloned().collect()
304            })
305    }
306
307    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<Result<(), Upper<T>>, Since<T>> {
308        self.state
309            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
310                state.verify_listen(as_of)
311            })
312    }
313
314    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
315        self.state
316            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
317                state.next_listen_batch(frontier)
318            })
319    }
320
321    pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
322        let state = self
323            .state
324            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
325                state.clone_for_rollup()
326            });
327
328        self.state_versions
329            .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
330            .await
331    }
332
333    pub async fn apply_unbatched_cmd<
334        R,
335        E,
336        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
337    >(
338        &self,
339        cmd: &CmdMetrics,
340        mut work_fn: WorkFn,
341    ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
342        loop {
343            cmd.started.inc();
344            let now = Instant::now();
345            let ret = Self::apply_unbatched_cmd_locked(
346                &self.state,
347                cmd,
348                &mut work_fn,
349                &self.cfg,
350                &self.metrics,
351                &self.shard_metrics,
352                &self.state_versions,
353            )
354            .await;
355            cmd.seconds.inc_by(now.elapsed().as_secs_f64());
356
357            match ret {
358                ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
359                    cmd.succeeded.inc();
360                    self.shard_metrics.cmd_succeeded.inc();
361                    self.update_state(new_state);
362                    if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
363                        self.pubsub_sender.push_diff(&self.shard_id, &diff);
364                    }
365                    return Ok((diff.seqno, Ok(res), maintenance));
366                }
367                ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
368                    cmd.succeeded.inc();
369                    self.shard_metrics.cmd_succeeded.inc();
370                    return Ok((seqno, Err(err), maintenance));
371                }
372                ApplyCmdResult::Indeterminate(err) => {
373                    cmd.failed.inc();
374                    return Err(err);
375                }
376                ApplyCmdResult::ExpectationMismatch(seqno) => {
377                    cmd.cas_mismatch.inc();
378                    self.fetch_and_update_state(Some(seqno)).await;
379                }
380            }
381        }
382    }
383
384    // work_fn fails to compile without mut, false positive
385    #[allow(clippy::needless_pass_by_ref_mut)]
386    async fn apply_unbatched_cmd_locked<
387        R,
388        E,
389        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
390    >(
391        state: &LockingTypedState<K, V, T, D>,
392        cmd: &CmdMetrics,
393        work_fn: &mut WorkFn,
394        cfg: &PersistConfig,
395        metrics: &Metrics,
396        shard_metrics: &ShardMetrics,
397        state_versions: &StateVersions,
398    ) -> ApplyCmdResult<K, V, T, D, R, E> {
399        let computed_next_state = state
400            .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
401                Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
402            });
403
404        let next_state = match computed_next_state {
405            Ok(x) => x,
406            Err((seqno, err)) => {
407                return ApplyCmdResult::SkippedStateTransition((
408                    seqno,
409                    err,
410                    RoutineMaintenance::default(),
411                ));
412            }
413        };
414
415        let NextState {
416            expected,
417            diff,
418            state,
419            expiry_metrics,
420            garbage_collection,
421            write_rollup,
422            work_ret,
423        } = next_state;
424
425        // SUBTLE! Unlike the other consensus and blob uses, we can't
426        // automatically retry indeterminate ExternalErrors here. However,
427        // if the state change itself is _idempotent_, then we're free to
428        // retry even indeterminate errors. See
429        // [Self::apply_unbatched_idempotent_cmd].
430        let cas_res = state_versions
431            .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
432            .await;
433
434        match cas_res {
435            Ok((CaSResult::Committed, diff)) => {
436                assert!(
437                    expected <= state.seqno,
438                    "state seqno regressed: {} vs {}",
439                    expected,
440                    state.seqno
441                );
442
443                metrics
444                    .lease
445                    .timeout_read
446                    .inc_by(u64::cast_from(expiry_metrics.readers_expired));
447
448                metrics
449                    .state
450                    .writer_removed
451                    .inc_by(u64::cast_from(expiry_metrics.writers_expired));
452
453                if let Some(gc) = garbage_collection.as_ref() {
454                    debug!("Assigned gc request: {:?}", gc);
455                }
456
457                let maintenance = RoutineMaintenance {
458                    garbage_collection,
459                    write_rollup,
460                };
461
462                ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
463            }
464            Ok((CaSResult::ExpectationMismatch, _diff)) => {
465                ApplyCmdResult::ExpectationMismatch(expected)
466            }
467            Err(err) => ApplyCmdResult::Indeterminate(err),
468        }
469    }
470
471    fn compute_next_state_locked<
472        R,
473        E,
474        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
475    >(
476        state: &TypedState<K, V, T, D>,
477        work_fn: &mut WorkFn,
478        metrics: &Metrics,
479        cmd: &CmdMetrics,
480        cfg: &PersistConfig,
481    ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
482        let is_write = cmd.name == metrics.cmds.compare_and_append.name;
483        let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
484        let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
485
486        let expected = state.seqno;
487        let was_tombstone_before = state.collections.is_tombstone();
488
489        let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
490            Continue(x) => x,
491            Break(err) => {
492                return Err((expected, err));
493            }
494        };
495        let expiry_metrics = new_state.expire_at((cfg.now)());
496        new_state.state.collections.trace.roundtrip_structure = true;
497
498        // Sanity check that all state transitions have special case for
499        // being a tombstone. The ones that do will return a Break and
500        // return out of this method above. The one exception is adding
501        // a rollup, because we want to be able to add a rollup for the
502        // tombstone state.
503        //
504        // TODO: Even better would be to write the rollup in the
505        // tombstone transition so it's a single terminal state
506        // transition, but it'll be tricky to get right.
507        if was_tombstone_before && !(is_rollup || is_become_tombstone) {
508            panic!(
509                "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
510                cmd.name, state
511            );
512        }
513
514        let now = (cfg.now)();
515        let write_rollup = new_state.need_rollup(
516            ROLLUP_THRESHOLD.get(cfg),
517            ROLLUP_USE_ACTIVE_ROLLUP.get(cfg),
518            u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg)),
519            now,
520        );
521
522        if let Some(write_rollup_seqno) = write_rollup {
523            if ROLLUP_USE_ACTIVE_ROLLUP.get(cfg) {
524                new_state.collections.active_rollup = Some(ActiveRollup {
525                    seqno: write_rollup_seqno,
526                    start_ms: now,
527                });
528            }
529        }
530
531        // Find out if this command has been selected to perform gc, so
532        // that it will fire off a background request to the
533        // GarbageCollector to delete eligible blobs and truncate the
534        // state history. This is dependant both on `maybe_gc` returning
535        // Some _and_ on this state being successfully compare_and_set.
536        //
537        // NB: Make sure this overwrites `garbage_collection` on every
538        // run though the loop (i.e. no `if let Some` here). When we
539        // lose a CaS race, we might discover that the winner got
540        // assigned the gc.
541        let garbage_collection = new_state.maybe_gc(
542            is_write,
543            GC_USE_ACTIVE_GC.get(cfg),
544            u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
545            now,
546        );
547
548        if let Some(gc) = garbage_collection.as_ref() {
549            if GC_USE_ACTIVE_GC.get(cfg) {
550                new_state.collections.active_gc = Some(ActiveGc {
551                    seqno: gc.new_seqno_since,
552                    start_ms: now,
553                });
554            }
555        }
556
557        // NB: Make sure this is the very last thing before the
558        // `try_compare_and_set_current` call. (In particular, it needs
559        // to come after anything that might modify new_state, such as
560        // `maybe_gc`.)
561        let diff = StateDiff::from_diff(&state.state, &new_state);
562        // Sanity check that our diff logic roundtrips and adds back up
563        // correctly.
564        #[cfg(any(test, debug_assertions))]
565        {
566            if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
567                panic!("validate_roundtrips failed: {}", err);
568            }
569        }
570
571        Ok(NextState {
572            expected,
573            diff,
574            state: new_state,
575            expiry_metrics,
576            garbage_collection,
577            write_rollup,
578            work_ret,
579        })
580    }
581
582    pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
583        let (seqno_before, seqno_after) =
584            self.state
585                .write_lock(&self.metrics.locks.applier_write, |state| {
586                    let seqno_before = state.seqno;
587                    if seqno_before < new_state.seqno {
588                        *state = new_state;
589                    }
590                    let seqno_after = state.seqno;
591                    (seqno_before, seqno_after)
592                });
593
594        assert!(
595            seqno_before <= seqno_after,
596            "state seqno regressed: {} vs {}",
597            seqno_before,
598            seqno_after
599        );
600    }
601
602    /// Fetches and updates to the latest state. Uses an optional hint to early-out if
603    /// any more recent version of state is observed (e.g. updated by another handle),
604    /// without making any calls to Consensus or Blob.
605    pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
606        let current_seqno = self.seqno();
607        let seqno_before = match seqno_hint {
608            None => current_seqno,
609            Some(hint) => {
610                // state is already more recent than our hint due to
611                // advancement by another handle to the same shard.
612                if hint < current_seqno {
613                    self.metrics.state.update_state_noop_path.inc();
614                    return;
615                }
616                current_seqno
617            }
618        };
619
620        let diffs_to_current = self
621            .state_versions
622            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
623            .await;
624
625        // no new diffs past our current seqno, nothing to do
626        if diffs_to_current.is_empty() {
627            self.metrics.state.update_state_empty_path.inc();
628            return;
629        }
630
631        let new_seqno = self
632            .state
633            .write_lock(&self.metrics.locks.applier_write, |state| {
634                state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
635                state.seqno
636            });
637
638        assert!(
639            seqno_before <= new_seqno,
640            "state seqno regressed: {} vs {}",
641            seqno_before,
642            new_seqno
643        );
644
645        // whether the seqno advanced from diffs and/or because another handle
646        // already updated it, we can assume it is now up-to-date
647        if seqno_before < new_seqno {
648            self.metrics.state.update_state_fast_path.inc();
649            return;
650        }
651
652        // our state is so old there aren't any diffs we can use to
653        // catch up directly. fall back to fully refetching state.
654        // we can reuse the recent diffs we already have as a hint.
655        let new_state = self
656            .state_versions
657            .fetch_current_state(&self.shard_id, diffs_to_current)
658            .await
659            .check_codecs::<K, V, D>(&self.shard_id)
660            .expect("shard codecs should not change");
661
662        let new_seqno = self
663            .state
664            .write_lock(&self.metrics.locks.applier_write, |state| {
665                if state.seqno < new_state.seqno {
666                    *state = new_state;
667                }
668                state.seqno
669            });
670
671        self.metrics.state.update_state_slow_path.inc();
672        assert!(
673            seqno_before <= new_seqno,
674            "state seqno regressed: {} vs {}",
675            seqno_before,
676            new_seqno
677        );
678    }
679}
680
681enum ApplyCmdResult<K, V, T, D, R, E> {
682    Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
683    SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
684    Indeterminate(Indeterminate),
685    ExpectationMismatch(SeqNo),
686}
687
688struct NextState<K, V, T, D, R> {
689    expected: SeqNo,
690    diff: StateDiff<T>,
691    state: TypedState<K, V, T, D>,
692    expiry_metrics: ExpiryMetrics,
693    write_rollup: Option<SeqNo>,
694    garbage_collection: Option<GcReq>,
695    work_ret: R,
696}