Skip to main content

mz_persist_client/internal/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of persist command application.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use crate::cache::{LockingTypedState, StateCache};
19use crate::error::{CodecMismatch, InvalidUsage};
20use crate::internal::gc::GcReq;
21use crate::internal::maintenance::RoutineMaintenance;
22use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
23use crate::internal::paths::{PartialRollupKey, RollupId};
24use crate::internal::state::{
25    ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
26    GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
27    ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
28    StateCollections, TypedState,
29};
30use crate::internal::state_diff::StateDiff;
31use crate::internal::state_versions::{EncodedRollup, StateVersions};
32use crate::internal::trace::FueledMergeReq;
33use crate::internal::watch::StateWatch;
34use crate::read::LeasedReaderId;
35use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
36use crate::schema::SchemaCache;
37use crate::{Diagnostics, PersistConfig, ShardId, cfg};
38use differential_dataflow::difference::Monoid;
39use differential_dataflow::lattice::Lattice;
40use mz_ore::cast::CastFrom;
41use mz_ore::soft_assert_or_log;
42use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
43use mz_persist_types::schema::SchemaId;
44use mz_persist_types::{Codec, Codec64};
45use timely::progress::{Antichain, Timestamp};
46use tracing::debug;
47
48/// An applier of persist commands.
49///
50/// This struct exists mainly to allow us to very narrowly bound the surface
51/// area that directly interacts with state.
52#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54    pub(crate) cfg: PersistConfig,
55    pub(crate) metrics: Arc<Metrics>,
56    pub(crate) shard_metrics: Arc<ShardMetrics>,
57    pub(crate) state_versions: Arc<StateVersions>,
58    pubsub_sender: Arc<dyn PubSubSender>,
59    pub(crate) shard_id: ShardId,
60
61    // Access to the shard's state, shared across all handles created by the same
62    // PersistClientCache. The state is wrapped in LockingTypedState, disallowing
63    // access across await points. Access should be always be kept brief, and it
64    // is expected that other handles may advance the state at any time this Applier
65    // is not holding the lock.
66    //
67    // NB: This is very intentionally not pub(crate) so that it's easy to reason
68    //     very locally about the duration of locks.
69    state: Arc<LockingTypedState<K, V, T, D>>,
70}
71
72// Impl Clone regardless of the type params.
73impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
74    fn clone(&self) -> Self {
75        Self {
76            cfg: self.cfg.clone(),
77            metrics: Arc::clone(&self.metrics),
78            shard_metrics: Arc::clone(&self.shard_metrics),
79            state_versions: Arc::clone(&self.state_versions),
80            pubsub_sender: Arc::clone(&self.pubsub_sender),
81            shard_id: self.shard_id,
82            state: Arc::clone(&self.state),
83        }
84    }
85}
86
87impl<K, V, T, D> Applier<K, V, T, D>
88where
89    K: Debug + Codec,
90    V: Debug + Codec,
91    T: Timestamp + Lattice + Codec64 + Sync,
92    D: Monoid + Codec64,
93{
94    pub async fn new(
95        cfg: PersistConfig,
96        shard_id: ShardId,
97        metrics: Arc<Metrics>,
98        state_versions: Arc<StateVersions>,
99        shared_states: Arc<StateCache>,
100        pubsub_sender: Arc<dyn PubSubSender>,
101        diagnostics: Diagnostics,
102    ) -> Result<Self, Box<CodecMismatch>> {
103        let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
104        let state = shared_states
105            .get::<K, V, T, D, _, _>(
106                shard_id,
107                || {
108                    metrics.cmds.init_state.run_cmd(&shard_metrics, || {
109                        state_versions.maybe_init_shard(&shard_metrics)
110                    })
111                },
112                &diagnostics,
113            )
114            .await?;
115        let ret = Applier {
116            cfg,
117            metrics,
118            shard_metrics,
119            state_versions,
120            pubsub_sender,
121            shard_id,
122            state,
123        };
124        Ok(ret)
125    }
126
127    /// Returns a new [StateWatch] for changes to this Applier's State.
128    pub fn watch(&self) -> StateWatch<K, V, T, D> {
129        StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
130    }
131
132    /// Fetches the latest state from Consensus and passes its `upper` to the provided closure.
133    pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
134        self.metrics.cmds.fetch_upper_count.inc();
135        self.fetch_and_update_state(None).await;
136        self.upper(|_seqno, upper| f(upper))
137    }
138
139    /// A point-in-time read/clone of `upper` from the current state.
140    ///
141    /// Due to sharing state with other handles, successive reads to this fn or any other may
142    /// see a different version of state, even if this Applier has not explicitly fetched and
143    /// updated to the latest state. Successive calls will always return values such that
144    /// `PartialOrder::less_equal(call1, call2)` hold true.
145    pub fn clone_upper(&self) -> Antichain<T> {
146        self.upper(|_seqno, upper| upper.clone())
147    }
148
149    pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
150        self.state
151            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
152                f(state.seqno, state.upper())
153            })
154    }
155
156    pub(crate) fn schemas<R>(
157        &self,
158        mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
159    ) -> R {
160        self.state
161            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
162                f(state.seqno, &state.collections.schemas)
163            })
164    }
165
166    pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
167        SchemaCache::new(self.state.schema_cache(), self.clone())
168    }
169
170    /// A point-in-time read of `since` from the current state.
171    ///
172    /// Due to sharing state with other handles, successive reads to this fn or any other may
173    /// see a different version of state, even if this Applier has not explicitly fetched and
174    /// updated to the latest state. Successive calls will always return values such that
175    /// `PartialOrder::less_equal(call1, call2)` hold true.
176    #[cfg(test)]
177    pub fn since(&self) -> Antichain<T> {
178        self.state
179            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
180                state.since().clone()
181            })
182    }
183
184    /// Does a lease for the provided reader exist in the current state?
185    ///
186    /// This is useful when we encounter a condition that should only be possible when the lease
187    /// has expired, so we can distinguish between scary bugs and expected-but-unusual cases.
188    /// This returns whatever lease is present in the latest version of state - so to avoid false
189    /// positives, this should be checked only after the surprising condition has occurred.
190    pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
191        self.state
192            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
193                state.state.collections.leased_readers.get(&id).cloned()
194            })
195    }
196
197    /// A point-in-time read of `seqno` from the current state.
198    ///
199    /// Due to sharing state with other handles, successive reads to this fn or any other may
200    /// see a different version of state, even if this Applier has not explicitly fetched and
201    /// updated to the latest state. Successive calls will always return values such that
202    /// `call1 <= call2` hold true.
203    pub fn seqno(&self) -> SeqNo {
204        self.state
205            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
206                state.seqno
207            })
208    }
209
210    /// A point-in-time read of `seqno_since` from the current state.
211    ///
212    /// Due to sharing state with other handles, successive reads to this fn or any other may
213    /// see a different version of state, even if this Applier has not explicitly fetched and
214    /// updated to the latest state. Successive calls will always return values such that
215    /// `call1 <= call2` hold true.
216    pub fn seqno_since(&self) -> SeqNo {
217        self.state
218            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
219                state.seqno_since()
220            })
221    }
222
223    /// A point-in-time read from the current state. (We declare a shard 'finalized' if it's
224    /// both become an unreadable tombstone and the state itself is has been emptied out.)
225    ///
226    /// Due to sharing state with other handles, successive reads to this fn or any other may
227    /// see a different version of state, even if this Applier has not explicitly fetched and
228    /// updated to the latest state. Once this fn returns true, it will always return true.
229    pub fn is_finalized(&self) -> bool {
230        self.state
231            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
232                state.collections.is_tombstone() && state.collections.is_single_empty_batch()
233            })
234    }
235
236    /// See [crate::PersistClient::get_schema].
237    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
238        self.state
239            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
240                let x = state.collections.schemas.get(&schema_id)?;
241                Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
242            })
243    }
244
245    /// See [crate::PersistClient::latest_schema].
246    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
247        self.state
248            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
249                let (id, x) = state.collections.schemas.last_key_value()?;
250                Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
251            })
252    }
253
254    /// Returns the ID of the given schema, if known at the current state.
255    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
256        self.state
257            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
258                // The common case is that the requested schema is a recent one, so as a minor
259                // optimization, do this search in reverse order.
260                let mut schemas = state.collections.schemas.iter().rev();
261                schemas
262                    .find(|(_, x)| {
263                        K::decode_schema(&x.key) == *key_schema
264                            && V::decode_schema(&x.val) == *val_schema
265                    })
266                    .map(|(id, _)| *id)
267            })
268    }
269
270    /// Returns whether the current's state `since` and `upper` are both empty.
271    ///
272    /// Due to sharing state with other handles, successive reads to this fn or any other may
273    /// see a different version of state, even if this Applier has not explicitly fetched and
274    /// updated to the latest state. Once this fn returns true, it will always return true.
275    pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
276        self.state
277            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
278                if state.since().is_empty() && state.upper().is_empty() {
279                    Ok(())
280                } else {
281                    Err(InvalidUsage::FinalizationError {
282                        since: state.since().clone(),
283                        upper: state.upper().clone(),
284                    })
285                }
286            })
287    }
288
289    /// Returns all rollups that are <= the given `seqno`.
290    ///
291    /// Due to sharing state with other handles, successive reads to this fn or any other may
292    /// see a different version of state, even if this Applier has not explicitly fetched and
293    /// updated to the latest state.
294    pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
295        self.state
296            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
297                state
298                    .collections
299                    .rollups
300                    .range(..=seqno)
301                    .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
302                    .collect::<Vec<(SeqNo, PartialRollupKey)>>()
303            })
304    }
305
306    pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
307        self.state
308            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
309                state
310                    .collections
311                    .trace
312                    .fueled_merge_reqs_before_ms(u64::MAX, None)
313                    .collect()
314            })
315    }
316
317    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
318        self.state
319            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
320                state.snapshot(as_of)
321            })
322    }
323
324    pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
325        self.state
326            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
327                state.state.collections.trace.batches().cloned().collect()
328            })
329    }
330
331    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
332        self.state
333            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
334                state.verify_listen(as_of)
335            })
336    }
337
338    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
339        self.state
340            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
341                state.next_listen_batch(frontier)
342            })
343    }
344
345    pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
346        let state = self
347            .state
348            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
349                state.clone_for_rollup()
350            });
351
352        self.state_versions
353            .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
354            .await
355    }
356
357    pub async fn apply_unbatched_cmd<
358        R,
359        E,
360        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
361    >(
362        &self,
363        cmd: &CmdMetrics,
364        mut work_fn: WorkFn,
365    ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
366        loop {
367            cmd.started.inc();
368            let now = Instant::now();
369            let ret = Self::apply_unbatched_cmd_locked(
370                &self.state,
371                cmd,
372                &mut work_fn,
373                &self.cfg,
374                &self.metrics,
375                &self.shard_metrics,
376                &self.state_versions,
377            )
378            .await;
379            cmd.seconds.inc_by(now.elapsed().as_secs_f64());
380
381            match ret {
382                ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
383                    cmd.succeeded.inc();
384                    self.shard_metrics.cmd_succeeded.inc();
385                    self.update_state(new_state);
386                    if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
387                        self.pubsub_sender.push_diff(&self.shard_id, &diff);
388                    }
389                    return Ok((diff.seqno, Ok(res), maintenance));
390                }
391                ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
392                    cmd.succeeded.inc();
393                    self.shard_metrics.cmd_succeeded.inc();
394                    return Ok((seqno, Err(err), maintenance));
395                }
396                ApplyCmdResult::Indeterminate(err) => {
397                    cmd.failed.inc();
398                    return Err(err);
399                }
400                ApplyCmdResult::ExpectationMismatch(seqno) => {
401                    cmd.cas_mismatch.inc();
402                    self.fetch_and_update_state(Some(seqno)).await;
403                }
404            }
405        }
406    }
407
408    // work_fn fails to compile without mut, false positive
409    #[allow(clippy::needless_pass_by_ref_mut)]
410    async fn apply_unbatched_cmd_locked<
411        R,
412        E,
413        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
414    >(
415        state: &LockingTypedState<K, V, T, D>,
416        cmd: &CmdMetrics,
417        work_fn: &mut WorkFn,
418        cfg: &PersistConfig,
419        metrics: &Metrics,
420        shard_metrics: &ShardMetrics,
421        state_versions: &StateVersions,
422    ) -> ApplyCmdResult<K, V, T, D, R, E> {
423        // While it's safe for more than one cmd to try and update the state, only one of those
424        // concurrent requests could actually succeed. This call will wait until previous requests
425        // have completed or timed out, and holding the resulting permit will delay other cmds until
426        // this attempt completes or times out.
427        let _permit_opt = state.lease_for_update().await;
428
429        let computed_next_state = state
430            .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
431                Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
432            });
433
434        let next_state = match computed_next_state {
435            Ok(x) => x,
436            Err((seqno, err)) => {
437                return ApplyCmdResult::SkippedStateTransition((
438                    seqno,
439                    err,
440                    RoutineMaintenance::default(),
441                ));
442            }
443        };
444
445        let NextState {
446            expected,
447            diff,
448            state,
449            expiry_metrics,
450            garbage_collection,
451            write_rollup,
452            work_ret,
453        } = next_state;
454
455        {
456            let build_version = &cfg.build_version;
457            let state_version = &state.state.collections.version;
458            soft_assert_or_log!(
459                cfg::code_can_write_data(build_version, state_version),
460                "current version {build_version} does not support state format {state_version}"
461            );
462        }
463
464        // SUBTLE! Unlike the other consensus and blob uses, we can't
465        // automatically retry indeterminate ExternalErrors here. However,
466        // if the state change itself is _idempotent_, then we're free to
467        // retry even indeterminate errors. See
468        // [Self::apply_unbatched_idempotent_cmd].
469        let cas_res = state_versions
470            .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
471            .await;
472
473        match cas_res {
474            Ok((CaSResult::Committed, diff)) => {
475                assert!(
476                    expected <= state.seqno,
477                    "state seqno regressed: {} vs {}",
478                    expected,
479                    state.seqno
480                );
481
482                metrics
483                    .lease
484                    .timeout_read
485                    .inc_by(u64::cast_from(expiry_metrics.readers_expired));
486
487                metrics
488                    .state
489                    .writer_removed
490                    .inc_by(u64::cast_from(expiry_metrics.writers_expired));
491
492                if let Some(gc) = garbage_collection.as_ref() {
493                    debug!("Assigned gc request: {:?}", gc);
494                }
495
496                let maintenance = RoutineMaintenance {
497                    garbage_collection,
498                    write_rollup,
499                };
500
501                ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
502            }
503            Ok((CaSResult::ExpectationMismatch, _diff)) => {
504                ApplyCmdResult::ExpectationMismatch(expected)
505            }
506            Err(err) => ApplyCmdResult::Indeterminate(err),
507        }
508    }
509
510    fn compute_next_state_locked<
511        R,
512        E,
513        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
514    >(
515        state: &TypedState<K, V, T, D>,
516        work_fn: &mut WorkFn,
517        metrics: &Metrics,
518        cmd: &CmdMetrics,
519        cfg: &PersistConfig,
520    ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
521        let is_write = cmd.name == metrics.cmds.compare_and_append.name;
522        let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
523        let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
524
525        let gc_config = GcConfig {
526            use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
527            fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
528            min_versions: GC_MIN_VERSIONS.get(cfg),
529            max_versions: GC_MAX_VERSIONS.get(cfg),
530        };
531
532        let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
533        let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
534        let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
535
536        let expected = state.seqno;
537        let was_tombstone_before = state.collections.is_tombstone();
538
539        let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
540            Continue(x) => x,
541            Break(err) => {
542                return Err((expected, err));
543            }
544        };
545        let expiry_metrics = new_state.expire_at((cfg.now)());
546        new_state.state.collections.trace.roundtrip_structure = true;
547
548        // Sanity check that all state transitions have special case for
549        // being a tombstone. The ones that do will return a Break and
550        // return out of this method above. The one exception is adding
551        // a rollup, because we want to be able to add a rollup for the
552        // tombstone state.
553        //
554        // TODO: Even better would be to write the rollup in the
555        // tombstone transition so it's a single terminal state
556        // transition, but it'll be tricky to get right.
557        if was_tombstone_before && !(is_rollup || is_become_tombstone) {
558            panic!(
559                "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
560                cmd.name, state
561            );
562        }
563
564        let now = (cfg.now)();
565        let write_rollup = new_state.need_rollup(
566            rollup_threshold,
567            use_active_rollup,
568            rollup_fallback_threshold_ms,
569            now,
570        );
571
572        if let Some(write_rollup_seqno) = write_rollup {
573            if use_active_rollup {
574                new_state.collections.active_rollup = Some(ActiveRollup {
575                    seqno: write_rollup_seqno,
576                    start_ms: now,
577                });
578            }
579        }
580
581        // Find out if this command has been selected to perform gc, so
582        // that it will fire off a background request to the
583        // GarbageCollector to delete eligible blobs and truncate the
584        // state history. This is dependant both on `maybe_gc` returning
585        // Some _and_ on this state being successfully compare_and_set.
586        let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
587
588        if let Some(gc) = garbage_collection.as_ref() {
589            if gc_config.use_active_gc {
590                new_state.collections.active_gc = Some(ActiveGc {
591                    seqno: gc.new_seqno_since,
592                    start_ms: now,
593                });
594            }
595        }
596
597        // Make sure `new_state` is not modified after this point!
598        // The new state and the diff must be consistent with each other for correctness.
599        let diff = StateDiff::from_diff(&state.state, &new_state);
600        // Sanity check that our diff logic roundtrips and adds back up
601        // correctly.
602        #[cfg(any(test, debug_assertions))]
603        {
604            if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
605                panic!("validate_roundtrips failed: {}", err);
606            }
607        }
608
609        Ok(NextState {
610            expected,
611            diff,
612            state: new_state,
613            expiry_metrics,
614            garbage_collection,
615            write_rollup,
616            work_ret,
617        })
618    }
619
620    pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
621        let (seqno_before, seqno_after) =
622            self.state
623                .write_lock(&self.metrics.locks.applier_write, |state| {
624                    let seqno_before = state.seqno;
625                    if seqno_before < new_state.seqno {
626                        *state = new_state;
627                    }
628                    let seqno_after = state.seqno;
629                    (seqno_before, seqno_after)
630                });
631
632        assert!(
633            seqno_before <= seqno_after,
634            "state seqno regressed: {} vs {}",
635            seqno_before,
636            seqno_after
637        );
638    }
639
640    /// Fetches and updates to the latest state. Uses an optional hint to early-out if
641    /// any more recent version of state is observed (e.g. updated by another handle),
642    /// without making any calls to Consensus or Blob.
643    pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
644        let current_seqno = self.seqno();
645        let seqno_before = match seqno_hint {
646            None => current_seqno,
647            Some(hint) => {
648                // state is already more recent than our hint due to
649                // advancement by another handle to the same shard.
650                if hint < current_seqno {
651                    self.metrics.state.update_state_noop_path.inc();
652                    return;
653                }
654                current_seqno
655            }
656        };
657
658        let diffs_to_current = self
659            .state_versions
660            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
661            .await;
662
663        // no new diffs past our current seqno, nothing to do
664        if diffs_to_current.is_empty() {
665            self.metrics.state.update_state_empty_path.inc();
666            return;
667        }
668
669        let new_seqno = self
670            .state
671            .write_lock(&self.metrics.locks.applier_write, |state| {
672                state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
673                state.seqno
674            });
675
676        assert!(
677            seqno_before <= new_seqno,
678            "state seqno regressed: {} vs {}",
679            seqno_before,
680            new_seqno
681        );
682
683        // whether the seqno advanced from diffs and/or because another handle
684        // already updated it, we can assume it is now up-to-date
685        if seqno_before < new_seqno {
686            self.metrics.state.update_state_fast_path.inc();
687            return;
688        }
689
690        // our state is so old there aren't any diffs we can use to
691        // catch up directly. fall back to fully refetching state.
692        // we can reuse the recent diffs we already have as a hint.
693        let new_state = self
694            .state_versions
695            .fetch_current_state(&self.shard_id, diffs_to_current)
696            .await
697            .check_codecs::<K, V, D>(&self.shard_id)
698            .expect("shard codecs should not change");
699
700        let new_seqno = self
701            .state
702            .write_lock(&self.metrics.locks.applier_write, |state| {
703                if state.seqno < new_state.seqno {
704                    *state = new_state;
705                }
706                state.seqno
707            });
708
709        self.metrics.state.update_state_slow_path.inc();
710        assert!(
711            seqno_before <= new_seqno,
712            "state seqno regressed: {} vs {}",
713            seqno_before,
714            new_seqno
715        );
716    }
717}
718
719enum ApplyCmdResult<K, V, T, D, R, E> {
720    Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
721    SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
722    Indeterminate(Indeterminate),
723    ExpectationMismatch(SeqNo),
724}
725
726struct NextState<K, V, T, D, R> {
727    expected: SeqNo,
728    diff: StateDiff<T>,
729    state: TypedState<K, V, T, D>,
730    expiry_metrics: ExpiryMetrics,
731    write_rollup: Option<SeqNo>,
732    garbage_collection: Option<GcReq>,
733    work_ret: R,
734}