mz_persist_client/internal/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of persist command application.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::cast::CastFrom;
21use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
22use mz_persist_types::schema::SchemaId;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::{Antichain, Timestamp};
25use tracing::debug;
26
27use crate::cache::{LockingTypedState, StateCache};
28use crate::error::{CodecMismatch, InvalidUsage};
29use crate::internal::gc::GcReq;
30use crate::internal::maintenance::RoutineMaintenance;
31use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
32use crate::internal::paths::{PartialRollupKey, RollupId};
33use crate::internal::state::{
34    ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
35    GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
36    ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
37    StateCollections, TypedState,
38};
39use crate::internal::state_diff::StateDiff;
40use crate::internal::state_versions::{EncodedRollup, StateVersions};
41use crate::internal::trace::FueledMergeReq;
42use crate::internal::watch::StateWatch;
43use crate::read::LeasedReaderId;
44use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
45use crate::schema::SchemaCache;
46use crate::{Diagnostics, PersistConfig, ShardId};
47
48/// An applier of persist commands.
49///
50/// This struct exists mainly to allow us to very narrowly bound the surface
51/// area that directly interacts with state.
52#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54    pub(crate) cfg: PersistConfig,
55    pub(crate) metrics: Arc<Metrics>,
56    pub(crate) shard_metrics: Arc<ShardMetrics>,
57    pub(crate) state_versions: Arc<StateVersions>,
58    shared_states: Arc<StateCache>,
59    pubsub_sender: Arc<dyn PubSubSender>,
60    pub(crate) shard_id: ShardId,
61
62    // Access to the shard's state, shared across all handles created by the same
63    // PersistClientCache. The state is wrapped in LockingTypedState, disallowing
64    // access across await points. Access should be always be kept brief, and it
65    // is expected that other handles may advance the state at any time this Applier
66    // is not holding the lock.
67    //
68    // NB: This is very intentionally not pub(crate) so that it's easy to reason
69    //     very locally about the duration of locks.
70    state: Arc<LockingTypedState<K, V, T, D>>,
71}
72
73// Impl Clone regardless of the type params.
74impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
75    fn clone(&self) -> Self {
76        Self {
77            cfg: self.cfg.clone(),
78            metrics: Arc::clone(&self.metrics),
79            shard_metrics: Arc::clone(&self.shard_metrics),
80            state_versions: Arc::clone(&self.state_versions),
81            shared_states: Arc::clone(&self.shared_states),
82            pubsub_sender: Arc::clone(&self.pubsub_sender),
83            shard_id: self.shard_id,
84            state: Arc::clone(&self.state),
85        }
86    }
87}
88
89impl<K, V, T, D> Applier<K, V, T, D>
90where
91    K: Debug + Codec,
92    V: Debug + Codec,
93    T: Timestamp + Lattice + Codec64 + Sync,
94    D: Monoid + Codec64,
95{
96    pub async fn new(
97        cfg: PersistConfig,
98        shard_id: ShardId,
99        metrics: Arc<Metrics>,
100        state_versions: Arc<StateVersions>,
101        shared_states: Arc<StateCache>,
102        pubsub_sender: Arc<dyn PubSubSender>,
103        diagnostics: Diagnostics,
104    ) -> Result<Self, Box<CodecMismatch>> {
105        let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
106        let state = shared_states
107            .get::<K, V, T, D, _, _>(
108                shard_id,
109                || {
110                    metrics.cmds.init_state.run_cmd(&shard_metrics, || {
111                        state_versions.maybe_init_shard(&shard_metrics)
112                    })
113                },
114                &diagnostics,
115            )
116            .await?;
117        let ret = Applier {
118            cfg,
119            metrics,
120            shard_metrics,
121            state_versions,
122            shared_states,
123            pubsub_sender,
124            shard_id,
125            state,
126        };
127        Ok(ret)
128    }
129
130    /// Returns a new [StateWatch] for changes to this Applier's State.
131    pub fn watch(&self) -> StateWatch<K, V, T, D> {
132        StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
133    }
134
135    /// Fetches the latest state from Consensus and passes its `upper` to the provided closure.
136    pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
137        self.metrics.cmds.fetch_upper_count.inc();
138        self.fetch_and_update_state(None).await;
139        self.upper(|_seqno, upper| f(upper))
140    }
141
142    /// A point-in-time read/clone of `upper` from the current state.
143    ///
144    /// Due to sharing state with other handles, successive reads to this fn or any other may
145    /// see a different version of state, even if this Applier has not explicitly fetched and
146    /// updated to the latest state. Successive calls will always return values such that
147    /// `PartialOrder::less_equal(call1, call2)` hold true.
148    pub fn clone_upper(&self) -> Antichain<T> {
149        self.upper(|_seqno, upper| upper.clone())
150    }
151
152    pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
153        self.state
154            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
155                f(state.seqno, state.upper())
156            })
157    }
158
159    pub(crate) fn schemas<R>(
160        &self,
161        mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
162    ) -> R {
163        self.state
164            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
165                f(state.seqno, &state.collections.schemas)
166            })
167    }
168
169    pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
170        SchemaCache::new(self.state.schema_cache(), self.clone())
171    }
172
173    /// A point-in-time read of `since` from the current state.
174    ///
175    /// Due to sharing state with other handles, successive reads to this fn or any other may
176    /// see a different version of state, even if this Applier has not explicitly fetched and
177    /// updated to the latest state. Successive calls will always return values such that
178    /// `PartialOrder::less_equal(call1, call2)` hold true.
179    #[cfg(test)]
180    pub fn since(&self) -> Antichain<T> {
181        self.state
182            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
183                state.since().clone()
184            })
185    }
186
187    /// Does a lease for the provided reader exist in the current state?
188    ///
189    /// This is useful when we encounter a condition that should only be possible when the lease
190    /// has expired, so we can distinguish between scary bugs and expected-but-unusual cases.
191    /// This returns whatever lease is present in the latest version of state - so to avoid false
192    /// positives, this should be checked only after the surprising condition has occurred.
193    pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
194        self.state
195            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
196                state.state.collections.leased_readers.get(&id).cloned()
197            })
198    }
199
200    /// A point-in-time read of `seqno` from the current state.
201    ///
202    /// Due to sharing state with other handles, successive reads to this fn or any other may
203    /// see a different version of state, even if this Applier has not explicitly fetched and
204    /// updated to the latest state. Successive calls will always return values such that
205    /// `call1 <= call2` hold true.
206    pub fn seqno(&self) -> SeqNo {
207        self.state
208            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
209                state.seqno
210            })
211    }
212
213    /// A point-in-time read of `seqno_since` from the current state.
214    ///
215    /// Due to sharing state with other handles, successive reads to this fn or any other may
216    /// see a different version of state, even if this Applier has not explicitly fetched and
217    /// updated to the latest state. Successive calls will always return values such that
218    /// `call1 <= call2` hold true.
219    pub fn seqno_since(&self) -> SeqNo {
220        self.state
221            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
222                state.seqno_since()
223            })
224    }
225
226    /// A point-in-time read from the current state. (We declare a shard 'finalized' if it's
227    /// both become an unreadable tombstone and the state itself is has been emptied out.)
228    ///
229    /// Due to sharing state with other handles, successive reads to this fn or any other may
230    /// see a different version of state, even if this Applier has not explicitly fetched and
231    /// updated to the latest state. Once this fn returns true, it will always return true.
232    pub fn is_finalized(&self) -> bool {
233        self.state
234            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
235                state.collections.is_tombstone() && state.collections.is_single_empty_batch()
236            })
237    }
238
239    /// See [crate::PersistClient::get_schema].
240    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
241        self.state
242            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
243                let x = state.collections.schemas.get(&schema_id)?;
244                Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
245            })
246    }
247
248    /// See [crate::PersistClient::latest_schema].
249    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
250        self.state
251            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
252                let (id, x) = state.collections.schemas.last_key_value()?;
253                Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
254            })
255    }
256
257    /// Returns the ID of the given schema, if known at the current state.
258    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
259        self.state
260            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
261                // The common case is that the requested schema is a recent one, so as a minor
262                // optimization, do this search in reverse order.
263                let mut schemas = state.collections.schemas.iter().rev();
264                schemas
265                    .find(|(_, x)| {
266                        K::decode_schema(&x.key) == *key_schema
267                            && V::decode_schema(&x.val) == *val_schema
268                    })
269                    .map(|(id, _)| *id)
270            })
271    }
272
273    /// Returns whether the current's state `since` and `upper` are both empty.
274    ///
275    /// Due to sharing state with other handles, successive reads to this fn or any other may
276    /// see a different version of state, even if this Applier has not explicitly fetched and
277    /// updated to the latest state. Once this fn returns true, it will always return true.
278    pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
279        self.state
280            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
281                if state.since().is_empty() && state.upper().is_empty() {
282                    Ok(())
283                } else {
284                    Err(InvalidUsage::FinalizationError {
285                        since: state.since().clone(),
286                        upper: state.upper().clone(),
287                    })
288                }
289            })
290    }
291
292    /// Returns all rollups that are <= the given `seqno`.
293    ///
294    /// Due to sharing state with other handles, successive reads to this fn or any other may
295    /// see a different version of state, even if this Applier has not explicitly fetched and
296    /// updated to the latest state.
297    pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
298        self.state
299            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
300                state
301                    .collections
302                    .rollups
303                    .range(..=seqno)
304                    .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
305                    .collect::<Vec<(SeqNo, PartialRollupKey)>>()
306            })
307    }
308
309    pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
310        self.state
311            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
312                state
313                    .collections
314                    .trace
315                    .fueled_merge_reqs_before_ms(u64::MAX, None)
316                    .collect()
317            })
318    }
319
320    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
321        self.state
322            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
323                state.snapshot(as_of)
324            })
325    }
326
327    pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
328        self.state
329            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
330                state.state.collections.trace.batches().cloned().collect()
331            })
332    }
333
334    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
335        self.state
336            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
337                state.verify_listen(as_of)
338            })
339    }
340
341    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
342        self.state
343            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
344                state.next_listen_batch(frontier)
345            })
346    }
347
348    pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
349        let state = self
350            .state
351            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
352                state.clone_for_rollup()
353            });
354
355        self.state_versions
356            .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
357            .await
358    }
359
360    pub async fn apply_unbatched_cmd<
361        R,
362        E,
363        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
364    >(
365        &self,
366        cmd: &CmdMetrics,
367        mut work_fn: WorkFn,
368    ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
369        loop {
370            cmd.started.inc();
371            let now = Instant::now();
372            let ret = Self::apply_unbatched_cmd_locked(
373                &self.state,
374                cmd,
375                &mut work_fn,
376                &self.cfg,
377                &self.metrics,
378                &self.shard_metrics,
379                &self.state_versions,
380            )
381            .await;
382            cmd.seconds.inc_by(now.elapsed().as_secs_f64());
383
384            match ret {
385                ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
386                    cmd.succeeded.inc();
387                    self.shard_metrics.cmd_succeeded.inc();
388                    self.update_state(new_state);
389                    if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
390                        self.pubsub_sender.push_diff(&self.shard_id, &diff);
391                    }
392                    return Ok((diff.seqno, Ok(res), maintenance));
393                }
394                ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
395                    cmd.succeeded.inc();
396                    self.shard_metrics.cmd_succeeded.inc();
397                    return Ok((seqno, Err(err), maintenance));
398                }
399                ApplyCmdResult::Indeterminate(err) => {
400                    cmd.failed.inc();
401                    return Err(err);
402                }
403                ApplyCmdResult::ExpectationMismatch(seqno) => {
404                    cmd.cas_mismatch.inc();
405                    self.fetch_and_update_state(Some(seqno)).await;
406                }
407            }
408        }
409    }
410
411    // work_fn fails to compile without mut, false positive
412    #[allow(clippy::needless_pass_by_ref_mut)]
413    async fn apply_unbatched_cmd_locked<
414        R,
415        E,
416        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
417    >(
418        state: &LockingTypedState<K, V, T, D>,
419        cmd: &CmdMetrics,
420        work_fn: &mut WorkFn,
421        cfg: &PersistConfig,
422        metrics: &Metrics,
423        shard_metrics: &ShardMetrics,
424        state_versions: &StateVersions,
425    ) -> ApplyCmdResult<K, V, T, D, R, E> {
426        let computed_next_state = state
427            .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
428                Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
429            });
430
431        let next_state = match computed_next_state {
432            Ok(x) => x,
433            Err((seqno, err)) => {
434                return ApplyCmdResult::SkippedStateTransition((
435                    seqno,
436                    err,
437                    RoutineMaintenance::default(),
438                ));
439            }
440        };
441
442        let NextState {
443            expected,
444            diff,
445            state,
446            expiry_metrics,
447            garbage_collection,
448            write_rollup,
449            work_ret,
450        } = next_state;
451
452        // SUBTLE! Unlike the other consensus and blob uses, we can't
453        // automatically retry indeterminate ExternalErrors here. However,
454        // if the state change itself is _idempotent_, then we're free to
455        // retry even indeterminate errors. See
456        // [Self::apply_unbatched_idempotent_cmd].
457        let cas_res = state_versions
458            .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
459            .await;
460
461        match cas_res {
462            Ok((CaSResult::Committed, diff)) => {
463                assert!(
464                    expected <= state.seqno,
465                    "state seqno regressed: {} vs {}",
466                    expected,
467                    state.seqno
468                );
469
470                metrics
471                    .lease
472                    .timeout_read
473                    .inc_by(u64::cast_from(expiry_metrics.readers_expired));
474
475                metrics
476                    .state
477                    .writer_removed
478                    .inc_by(u64::cast_from(expiry_metrics.writers_expired));
479
480                if let Some(gc) = garbage_collection.as_ref() {
481                    debug!("Assigned gc request: {:?}", gc);
482                }
483
484                let maintenance = RoutineMaintenance {
485                    garbage_collection,
486                    write_rollup,
487                };
488
489                ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
490            }
491            Ok((CaSResult::ExpectationMismatch, _diff)) => {
492                ApplyCmdResult::ExpectationMismatch(expected)
493            }
494            Err(err) => ApplyCmdResult::Indeterminate(err),
495        }
496    }
497
498    fn compute_next_state_locked<
499        R,
500        E,
501        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
502    >(
503        state: &TypedState<K, V, T, D>,
504        work_fn: &mut WorkFn,
505        metrics: &Metrics,
506        cmd: &CmdMetrics,
507        cfg: &PersistConfig,
508    ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
509        let is_write = cmd.name == metrics.cmds.compare_and_append.name;
510        let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
511        let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
512
513        let gc_config = GcConfig {
514            use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
515            fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
516            min_versions: GC_MIN_VERSIONS.get(cfg),
517            max_versions: GC_MAX_VERSIONS.get(cfg),
518        };
519
520        let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
521        let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
522        let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
523
524        let expected = state.seqno;
525        let was_tombstone_before = state.collections.is_tombstone();
526
527        let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
528            Continue(x) => x,
529            Break(err) => {
530                return Err((expected, err));
531            }
532        };
533        let expiry_metrics = new_state.expire_at((cfg.now)());
534        new_state.state.collections.trace.roundtrip_structure = true;
535
536        // Sanity check that all state transitions have special case for
537        // being a tombstone. The ones that do will return a Break and
538        // return out of this method above. The one exception is adding
539        // a rollup, because we want to be able to add a rollup for the
540        // tombstone state.
541        //
542        // TODO: Even better would be to write the rollup in the
543        // tombstone transition so it's a single terminal state
544        // transition, but it'll be tricky to get right.
545        if was_tombstone_before && !(is_rollup || is_become_tombstone) {
546            panic!(
547                "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
548                cmd.name, state
549            );
550        }
551
552        let now = (cfg.now)();
553        let write_rollup = new_state.need_rollup(
554            rollup_threshold,
555            use_active_rollup,
556            rollup_fallback_threshold_ms,
557            now,
558        );
559
560        if let Some(write_rollup_seqno) = write_rollup {
561            if use_active_rollup {
562                new_state.collections.active_rollup = Some(ActiveRollup {
563                    seqno: write_rollup_seqno,
564                    start_ms: now,
565                });
566            }
567        }
568
569        // Find out if this command has been selected to perform gc, so
570        // that it will fire off a background request to the
571        // GarbageCollector to delete eligible blobs and truncate the
572        // state history. This is dependant both on `maybe_gc` returning
573        // Some _and_ on this state being successfully compare_and_set.
574        let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
575
576        if let Some(gc) = garbage_collection.as_ref() {
577            if gc_config.use_active_gc {
578                new_state.collections.active_gc = Some(ActiveGc {
579                    seqno: gc.new_seqno_since,
580                    start_ms: now,
581                });
582            }
583        }
584
585        // Make sure `new_state` is not modified after this point!
586        // The new state and the diff must be consistent with each other for correctness.
587        let diff = StateDiff::from_diff(&state.state, &new_state);
588        // Sanity check that our diff logic roundtrips and adds back up
589        // correctly.
590        #[cfg(any(test, debug_assertions))]
591        {
592            if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
593                panic!("validate_roundtrips failed: {}", err);
594            }
595        }
596
597        Ok(NextState {
598            expected,
599            diff,
600            state: new_state,
601            expiry_metrics,
602            garbage_collection,
603            write_rollup,
604            work_ret,
605        })
606    }
607
608    pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
609        let (seqno_before, seqno_after) =
610            self.state
611                .write_lock(&self.metrics.locks.applier_write, |state| {
612                    let seqno_before = state.seqno;
613                    if seqno_before < new_state.seqno {
614                        *state = new_state;
615                    }
616                    let seqno_after = state.seqno;
617                    (seqno_before, seqno_after)
618                });
619
620        assert!(
621            seqno_before <= seqno_after,
622            "state seqno regressed: {} vs {}",
623            seqno_before,
624            seqno_after
625        );
626    }
627
628    /// Fetches and updates to the latest state. Uses an optional hint to early-out if
629    /// any more recent version of state is observed (e.g. updated by another handle),
630    /// without making any calls to Consensus or Blob.
631    pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
632        let current_seqno = self.seqno();
633        let seqno_before = match seqno_hint {
634            None => current_seqno,
635            Some(hint) => {
636                // state is already more recent than our hint due to
637                // advancement by another handle to the same shard.
638                if hint < current_seqno {
639                    self.metrics.state.update_state_noop_path.inc();
640                    return;
641                }
642                current_seqno
643            }
644        };
645
646        let diffs_to_current = self
647            .state_versions
648            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
649            .await;
650
651        // no new diffs past our current seqno, nothing to do
652        if diffs_to_current.is_empty() {
653            self.metrics.state.update_state_empty_path.inc();
654            return;
655        }
656
657        let new_seqno = self
658            .state
659            .write_lock(&self.metrics.locks.applier_write, |state| {
660                state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
661                state.seqno
662            });
663
664        assert!(
665            seqno_before <= new_seqno,
666            "state seqno regressed: {} vs {}",
667            seqno_before,
668            new_seqno
669        );
670
671        // whether the seqno advanced from diffs and/or because another handle
672        // already updated it, we can assume it is now up-to-date
673        if seqno_before < new_seqno {
674            self.metrics.state.update_state_fast_path.inc();
675            return;
676        }
677
678        // our state is so old there aren't any diffs we can use to
679        // catch up directly. fall back to fully refetching state.
680        // we can reuse the recent diffs we already have as a hint.
681        let new_state = self
682            .state_versions
683            .fetch_current_state(&self.shard_id, diffs_to_current)
684            .await
685            .check_codecs::<K, V, D>(&self.shard_id)
686            .expect("shard codecs should not change");
687
688        let new_seqno = self
689            .state
690            .write_lock(&self.metrics.locks.applier_write, |state| {
691                if state.seqno < new_state.seqno {
692                    *state = new_state;
693                }
694                state.seqno
695            });
696
697        self.metrics.state.update_state_slow_path.inc();
698        assert!(
699            seqno_before <= new_seqno,
700            "state seqno regressed: {} vs {}",
701            seqno_before,
702            new_seqno
703        );
704    }
705}
706
707enum ApplyCmdResult<K, V, T, D, R, E> {
708    Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
709    SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
710    Indeterminate(Indeterminate),
711    ExpectationMismatch(SeqNo),
712}
713
714struct NextState<K, V, T, D, R> {
715    expected: SeqNo,
716    diff: StateDiff<T>,
717    state: TypedState<K, V, T, D>,
718    expiry_metrics: ExpiryMetrics,
719    write_rollup: Option<SeqNo>,
720    garbage_collection: Option<GcReq>,
721    work_ret: R,
722}