mz_persist_client/internal/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of persist command application.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::cast::CastFrom;
21use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
22use mz_persist_types::schema::SchemaId;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::{Antichain, Timestamp};
25use tracing::debug;
26
27use crate::cache::{LockingTypedState, StateCache};
28use crate::error::{CodecMismatch, InvalidUsage};
29use crate::internal::gc::GcReq;
30use crate::internal::maintenance::RoutineMaintenance;
31use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
32use crate::internal::paths::{PartialRollupKey, RollupId};
33use crate::internal::state::{
34    ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
35    GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
36    ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
37    StateCollections, TypedState,
38};
39use crate::internal::state_diff::StateDiff;
40use crate::internal::state_versions::{EncodedRollup, StateVersions};
41use crate::internal::trace::FueledMergeReq;
42use crate::internal::watch::StateWatch;
43use crate::read::LeasedReaderId;
44use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
45use crate::schema::SchemaCache;
46use crate::{Diagnostics, PersistConfig, ShardId};
47
48/// An applier of persist commands.
49///
50/// This struct exists mainly to allow us to very narrowly bound the surface
51/// area that directly interacts with state.
52#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54    pub(crate) cfg: PersistConfig,
55    pub(crate) metrics: Arc<Metrics>,
56    pub(crate) shard_metrics: Arc<ShardMetrics>,
57    pub(crate) state_versions: Arc<StateVersions>,
58    shared_states: Arc<StateCache>,
59    pubsub_sender: Arc<dyn PubSubSender>,
60    pub(crate) shard_id: ShardId,
61
62    // Access to the shard's state, shared across all handles created by the same
63    // PersistClientCache. The state is wrapped in LockingTypedState, disallowing
64    // access across await points. Access should be always be kept brief, and it
65    // is expected that other handles may advance the state at any time this Applier
66    // is not holding the lock.
67    //
68    // NB: This is very intentionally not pub(crate) so that it's easy to reason
69    //     very locally about the duration of locks.
70    state: Arc<LockingTypedState<K, V, T, D>>,
71}
72
73// Impl Clone regardless of the type params.
74impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
75    fn clone(&self) -> Self {
76        Self {
77            cfg: self.cfg.clone(),
78            metrics: Arc::clone(&self.metrics),
79            shard_metrics: Arc::clone(&self.shard_metrics),
80            state_versions: Arc::clone(&self.state_versions),
81            shared_states: Arc::clone(&self.shared_states),
82            pubsub_sender: Arc::clone(&self.pubsub_sender),
83            shard_id: self.shard_id,
84            state: Arc::clone(&self.state),
85        }
86    }
87}
88
89impl<K, V, T, D> Applier<K, V, T, D>
90where
91    K: Debug + Codec,
92    V: Debug + Codec,
93    T: Timestamp + Lattice + Codec64 + Sync,
94    D: Semigroup + Codec64,
95{
96    pub async fn new(
97        cfg: PersistConfig,
98        shard_id: ShardId,
99        metrics: Arc<Metrics>,
100        state_versions: Arc<StateVersions>,
101        shared_states: Arc<StateCache>,
102        pubsub_sender: Arc<dyn PubSubSender>,
103        diagnostics: Diagnostics,
104    ) -> Result<Self, Box<CodecMismatch>> {
105        let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
106        let state = shared_states
107            .get::<K, V, T, D, _, _>(
108                shard_id,
109                || {
110                    metrics.cmds.init_state.run_cmd(&shard_metrics, || {
111                        state_versions.maybe_init_shard(&shard_metrics)
112                    })
113                },
114                &diagnostics,
115            )
116            .await?;
117        let ret = Applier {
118            cfg,
119            metrics,
120            shard_metrics,
121            state_versions,
122            shared_states,
123            pubsub_sender,
124            shard_id,
125            state,
126        };
127        Ok(ret)
128    }
129
130    /// Returns a new [StateWatch] for changes to this Applier's State.
131    pub fn watch(&self) -> StateWatch<K, V, T, D> {
132        StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
133    }
134
135    /// Fetches the latest state from Consensus and passes its `upper` to the provided closure.
136    pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
137        self.metrics.cmds.fetch_upper_count.inc();
138        self.fetch_and_update_state(None).await;
139        self.upper(|_seqno, upper| f(upper))
140    }
141
142    /// A point-in-time read/clone of `upper` from the current state.
143    ///
144    /// Due to sharing state with other handles, successive reads to this fn or any other may
145    /// see a different version of state, even if this Applier has not explicitly fetched and
146    /// updated to the latest state. Successive calls will always return values such that
147    /// `PartialOrder::less_equal(call1, call2)` hold true.
148    pub fn clone_upper(&self) -> Antichain<T> {
149        self.upper(|_seqno, upper| upper.clone())
150    }
151
152    pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
153        self.state
154            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
155                f(state.seqno, state.upper())
156            })
157    }
158
159    pub(crate) fn schemas<R>(
160        &self,
161        mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
162    ) -> R {
163        self.state
164            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
165                f(state.seqno, &state.collections.schemas)
166            })
167    }
168
169    pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
170        SchemaCache::new(self.state.schema_cache(), self.clone())
171    }
172
173    /// A point-in-time read of `since` from the current state.
174    ///
175    /// Due to sharing state with other handles, successive reads to this fn or any other may
176    /// see a different version of state, even if this Applier has not explicitly fetched and
177    /// updated to the latest state. Successive calls will always return values such that
178    /// `PartialOrder::less_equal(call1, call2)` hold true.
179    #[cfg(test)]
180    pub fn since(&self) -> Antichain<T> {
181        self.state
182            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
183                state.since().clone()
184            })
185    }
186
187    /// Does a lease for the provided reader exist in the current state?
188    ///
189    /// This is useful when we encounter a condition that should only be possible when the lease
190    /// has expired, so we can distinguish between scary bugs and expected-but-unusual cases.
191    /// This returns whatever lease is present in the latest version of state - so to avoid false
192    /// positives, this should be checked only after the surprising condition has occurred.
193    pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
194        self.state
195            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
196                state.state.collections.leased_readers.get(&id).cloned()
197            })
198    }
199
200    /// A point-in-time read of `seqno` from the current state.
201    ///
202    /// Due to sharing state with other handles, successive reads to this fn or any other may
203    /// see a different version of state, even if this Applier has not explicitly fetched and
204    /// updated to the latest state. Successive calls will always return values such that
205    /// `call1 <= call2` hold true.
206    pub fn seqno(&self) -> SeqNo {
207        self.state
208            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
209                state.seqno
210            })
211    }
212
213    /// A point-in-time read of `seqno_since` from the current state.
214    ///
215    /// Due to sharing state with other handles, successive reads to this fn or any other may
216    /// see a different version of state, even if this Applier has not explicitly fetched and
217    /// updated to the latest state. Successive calls will always return values such that
218    /// `call1 <= call2` hold true.
219    pub fn seqno_since(&self) -> SeqNo {
220        self.state
221            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
222                state.seqno_since()
223            })
224    }
225
226    /// A point-in-time read from the current state. (We declare a shard 'finalized' if it's
227    /// both become an unreadable tombstone and the state itself is has been emptied out.)
228    ///
229    /// Due to sharing state with other handles, successive reads to this fn or any other may
230    /// see a different version of state, even if this Applier has not explicitly fetched and
231    /// updated to the latest state. Once this fn returns true, it will always return true.
232    pub fn is_finalized(&self) -> bool {
233        self.state
234            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
235                state.collections.is_tombstone() && state.collections.is_single_empty_batch()
236            })
237    }
238
239    /// See [crate::PersistClient::get_schema].
240    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
241        self.state
242            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
243                let x = state.collections.schemas.get(&schema_id)?;
244                Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
245            })
246    }
247
248    /// See [crate::PersistClient::latest_schema].
249    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
250        self.state
251            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
252                let (id, x) = state.collections.schemas.last_key_value()?;
253                Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
254            })
255    }
256
257    /// Returns whether the current's state `since` and `upper` are both empty.
258    ///
259    /// Due to sharing state with other handles, successive reads to this fn or any other may
260    /// see a different version of state, even if this Applier has not explicitly fetched and
261    /// updated to the latest state. Once this fn returns true, it will always return true.
262    pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
263        self.state
264            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
265                if state.since().is_empty() && state.upper().is_empty() {
266                    Ok(())
267                } else {
268                    Err(InvalidUsage::FinalizationError {
269                        since: state.since().clone(),
270                        upper: state.upper().clone(),
271                    })
272                }
273            })
274    }
275
276    /// Returns all rollups that are <= the given `seqno`.
277    ///
278    /// Due to sharing state with other handles, successive reads to this fn or any other may
279    /// see a different version of state, even if this Applier has not explicitly fetched and
280    /// updated to the latest state.
281    pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
282        self.state
283            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
284                state
285                    .collections
286                    .rollups
287                    .range(..=seqno)
288                    .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
289                    .collect::<Vec<(SeqNo, PartialRollupKey)>>()
290            })
291    }
292
293    pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
294        self.state
295            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
296                state
297                    .collections
298                    .trace
299                    .fueled_merge_reqs_before_ms(u64::MAX, None)
300                    .collect()
301            })
302    }
303
304    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
305        self.state
306            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
307                state.snapshot(as_of)
308            })
309    }
310
311    pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
312        self.state
313            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
314                state.state.collections.trace.batches().cloned().collect()
315            })
316    }
317
318    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
319        self.state
320            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
321                state.verify_listen(as_of)
322            })
323    }
324
325    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
326        self.state
327            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
328                state.next_listen_batch(frontier)
329            })
330    }
331
332    pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
333        let state = self
334            .state
335            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
336                state.clone_for_rollup()
337            });
338
339        self.state_versions
340            .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
341            .await
342    }
343
344    pub async fn apply_unbatched_cmd<
345        R,
346        E,
347        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
348    >(
349        &self,
350        cmd: &CmdMetrics,
351        mut work_fn: WorkFn,
352    ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
353        loop {
354            cmd.started.inc();
355            let now = Instant::now();
356            let ret = Self::apply_unbatched_cmd_locked(
357                &self.state,
358                cmd,
359                &mut work_fn,
360                &self.cfg,
361                &self.metrics,
362                &self.shard_metrics,
363                &self.state_versions,
364            )
365            .await;
366            cmd.seconds.inc_by(now.elapsed().as_secs_f64());
367
368            match ret {
369                ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
370                    cmd.succeeded.inc();
371                    self.shard_metrics.cmd_succeeded.inc();
372                    self.update_state(new_state);
373                    if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
374                        self.pubsub_sender.push_diff(&self.shard_id, &diff);
375                    }
376                    return Ok((diff.seqno, Ok(res), maintenance));
377                }
378                ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
379                    cmd.succeeded.inc();
380                    self.shard_metrics.cmd_succeeded.inc();
381                    return Ok((seqno, Err(err), maintenance));
382                }
383                ApplyCmdResult::Indeterminate(err) => {
384                    cmd.failed.inc();
385                    return Err(err);
386                }
387                ApplyCmdResult::ExpectationMismatch(seqno) => {
388                    cmd.cas_mismatch.inc();
389                    self.fetch_and_update_state(Some(seqno)).await;
390                }
391            }
392        }
393    }
394
395    // work_fn fails to compile without mut, false positive
396    #[allow(clippy::needless_pass_by_ref_mut)]
397    async fn apply_unbatched_cmd_locked<
398        R,
399        E,
400        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
401    >(
402        state: &LockingTypedState<K, V, T, D>,
403        cmd: &CmdMetrics,
404        work_fn: &mut WorkFn,
405        cfg: &PersistConfig,
406        metrics: &Metrics,
407        shard_metrics: &ShardMetrics,
408        state_versions: &StateVersions,
409    ) -> ApplyCmdResult<K, V, T, D, R, E> {
410        let computed_next_state = state
411            .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
412                Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
413            });
414
415        let next_state = match computed_next_state {
416            Ok(x) => x,
417            Err((seqno, err)) => {
418                return ApplyCmdResult::SkippedStateTransition((
419                    seqno,
420                    err,
421                    RoutineMaintenance::default(),
422                ));
423            }
424        };
425
426        let NextState {
427            expected,
428            diff,
429            state,
430            expiry_metrics,
431            garbage_collection,
432            write_rollup,
433            work_ret,
434        } = next_state;
435
436        // SUBTLE! Unlike the other consensus and blob uses, we can't
437        // automatically retry indeterminate ExternalErrors here. However,
438        // if the state change itself is _idempotent_, then we're free to
439        // retry even indeterminate errors. See
440        // [Self::apply_unbatched_idempotent_cmd].
441        let cas_res = state_versions
442            .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
443            .await;
444
445        match cas_res {
446            Ok((CaSResult::Committed, diff)) => {
447                assert!(
448                    expected <= state.seqno,
449                    "state seqno regressed: {} vs {}",
450                    expected,
451                    state.seqno
452                );
453
454                metrics
455                    .lease
456                    .timeout_read
457                    .inc_by(u64::cast_from(expiry_metrics.readers_expired));
458
459                metrics
460                    .state
461                    .writer_removed
462                    .inc_by(u64::cast_from(expiry_metrics.writers_expired));
463
464                if let Some(gc) = garbage_collection.as_ref() {
465                    debug!("Assigned gc request: {:?}", gc);
466                }
467
468                let maintenance = RoutineMaintenance {
469                    garbage_collection,
470                    write_rollup,
471                };
472
473                ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
474            }
475            Ok((CaSResult::ExpectationMismatch, _diff)) => {
476                ApplyCmdResult::ExpectationMismatch(expected)
477            }
478            Err(err) => ApplyCmdResult::Indeterminate(err),
479        }
480    }
481
482    fn compute_next_state_locked<
483        R,
484        E,
485        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
486    >(
487        state: &TypedState<K, V, T, D>,
488        work_fn: &mut WorkFn,
489        metrics: &Metrics,
490        cmd: &CmdMetrics,
491        cfg: &PersistConfig,
492    ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
493        let is_write = cmd.name == metrics.cmds.compare_and_append.name;
494        let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
495        let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
496
497        let gc_config = GcConfig {
498            use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
499            fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
500            min_versions: GC_MIN_VERSIONS.get(cfg),
501            max_versions: GC_MAX_VERSIONS.get(cfg),
502        };
503
504        let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
505        let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
506        let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
507
508        let expected = state.seqno;
509        let was_tombstone_before = state.collections.is_tombstone();
510
511        let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
512            Continue(x) => x,
513            Break(err) => {
514                return Err((expected, err));
515            }
516        };
517        let expiry_metrics = new_state.expire_at((cfg.now)());
518        new_state.state.collections.trace.roundtrip_structure = true;
519
520        // Sanity check that all state transitions have special case for
521        // being a tombstone. The ones that do will return a Break and
522        // return out of this method above. The one exception is adding
523        // a rollup, because we want to be able to add a rollup for the
524        // tombstone state.
525        //
526        // TODO: Even better would be to write the rollup in the
527        // tombstone transition so it's a single terminal state
528        // transition, but it'll be tricky to get right.
529        if was_tombstone_before && !(is_rollup || is_become_tombstone) {
530            panic!(
531                "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
532                cmd.name, state
533            );
534        }
535
536        let now = (cfg.now)();
537        let write_rollup = new_state.need_rollup(
538            rollup_threshold,
539            use_active_rollup,
540            rollup_fallback_threshold_ms,
541            now,
542        );
543
544        if let Some(write_rollup_seqno) = write_rollup {
545            if use_active_rollup {
546                new_state.collections.active_rollup = Some(ActiveRollup {
547                    seqno: write_rollup_seqno,
548                    start_ms: now,
549                });
550            }
551        }
552
553        // Find out if this command has been selected to perform gc, so
554        // that it will fire off a background request to the
555        // GarbageCollector to delete eligible blobs and truncate the
556        // state history. This is dependant both on `maybe_gc` returning
557        // Some _and_ on this state being successfully compare_and_set.
558        let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
559
560        if let Some(gc) = garbage_collection.as_ref() {
561            if gc_config.use_active_gc {
562                new_state.collections.active_gc = Some(ActiveGc {
563                    seqno: gc.new_seqno_since,
564                    start_ms: now,
565                });
566            }
567        }
568
569        // Make sure `new_state` is not modified after this point!
570        // The new state and the diff must be consistent with each other for correctness.
571        let diff = StateDiff::from_diff(&state.state, &new_state);
572        // Sanity check that our diff logic roundtrips and adds back up
573        // correctly.
574        #[cfg(any(test, debug_assertions))]
575        {
576            if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
577                panic!("validate_roundtrips failed: {}", err);
578            }
579        }
580
581        Ok(NextState {
582            expected,
583            diff,
584            state: new_state,
585            expiry_metrics,
586            garbage_collection,
587            write_rollup,
588            work_ret,
589        })
590    }
591
592    pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
593        let (seqno_before, seqno_after) =
594            self.state
595                .write_lock(&self.metrics.locks.applier_write, |state| {
596                    let seqno_before = state.seqno;
597                    if seqno_before < new_state.seqno {
598                        *state = new_state;
599                    }
600                    let seqno_after = state.seqno;
601                    (seqno_before, seqno_after)
602                });
603
604        assert!(
605            seqno_before <= seqno_after,
606            "state seqno regressed: {} vs {}",
607            seqno_before,
608            seqno_after
609        );
610    }
611
612    /// Fetches and updates to the latest state. Uses an optional hint to early-out if
613    /// any more recent version of state is observed (e.g. updated by another handle),
614    /// without making any calls to Consensus or Blob.
615    pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
616        let current_seqno = self.seqno();
617        let seqno_before = match seqno_hint {
618            None => current_seqno,
619            Some(hint) => {
620                // state is already more recent than our hint due to
621                // advancement by another handle to the same shard.
622                if hint < current_seqno {
623                    self.metrics.state.update_state_noop_path.inc();
624                    return;
625                }
626                current_seqno
627            }
628        };
629
630        let diffs_to_current = self
631            .state_versions
632            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
633            .await;
634
635        // no new diffs past our current seqno, nothing to do
636        if diffs_to_current.is_empty() {
637            self.metrics.state.update_state_empty_path.inc();
638            return;
639        }
640
641        let new_seqno = self
642            .state
643            .write_lock(&self.metrics.locks.applier_write, |state| {
644                state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
645                state.seqno
646            });
647
648        assert!(
649            seqno_before <= new_seqno,
650            "state seqno regressed: {} vs {}",
651            seqno_before,
652            new_seqno
653        );
654
655        // whether the seqno advanced from diffs and/or because another handle
656        // already updated it, we can assume it is now up-to-date
657        if seqno_before < new_seqno {
658            self.metrics.state.update_state_fast_path.inc();
659            return;
660        }
661
662        // our state is so old there aren't any diffs we can use to
663        // catch up directly. fall back to fully refetching state.
664        // we can reuse the recent diffs we already have as a hint.
665        let new_state = self
666            .state_versions
667            .fetch_current_state(&self.shard_id, diffs_to_current)
668            .await
669            .check_codecs::<K, V, D>(&self.shard_id)
670            .expect("shard codecs should not change");
671
672        let new_seqno = self
673            .state
674            .write_lock(&self.metrics.locks.applier_write, |state| {
675                if state.seqno < new_state.seqno {
676                    *state = new_state;
677                }
678                state.seqno
679            });
680
681        self.metrics.state.update_state_slow_path.inc();
682        assert!(
683            seqno_before <= new_seqno,
684            "state seqno regressed: {} vs {}",
685            seqno_before,
686            new_seqno
687        );
688    }
689}
690
691enum ApplyCmdResult<K, V, T, D, R, E> {
692    Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
693    SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
694    Indeterminate(Indeterminate),
695    ExpectationMismatch(SeqNo),
696}
697
698struct NextState<K, V, T, D, R> {
699    expected: SeqNo,
700    diff: StateDiff<T>,
701    state: TypedState<K, V, T, D>,
702    expiry_metrics: ExpiryMetrics,
703    write_rollup: Option<SeqNo>,
704    garbage_collection: Option<GcReq>,
705    work_ret: R,
706}