mz_persist_client/internal/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of persist command application.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use crate::cache::{LockingTypedState, StateCache};
19use crate::error::{CodecMismatch, InvalidUsage};
20use crate::internal::gc::GcReq;
21use crate::internal::maintenance::RoutineMaintenance;
22use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
23use crate::internal::paths::{PartialRollupKey, RollupId};
24use crate::internal::state::{
25    ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
26    GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
27    ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
28    StateCollections, TypedState,
29};
30use crate::internal::state_diff::StateDiff;
31use crate::internal::state_versions::{EncodedRollup, StateVersions};
32use crate::internal::trace::FueledMergeReq;
33use crate::internal::watch::StateWatch;
34use crate::read::LeasedReaderId;
35use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
36use crate::schema::SchemaCache;
37use crate::{Diagnostics, PersistConfig, ShardId, cfg};
38use differential_dataflow::difference::Monoid;
39use differential_dataflow::lattice::Lattice;
40use mz_ore::cast::CastFrom;
41use mz_ore::soft_assert_or_log;
42use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
43use mz_persist_types::schema::SchemaId;
44use mz_persist_types::{Codec, Codec64};
45use timely::progress::{Antichain, Timestamp};
46use tracing::debug;
47
48/// An applier of persist commands.
49///
50/// This struct exists mainly to allow us to very narrowly bound the surface
51/// area that directly interacts with state.
52#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54    pub(crate) cfg: PersistConfig,
55    pub(crate) metrics: Arc<Metrics>,
56    pub(crate) shard_metrics: Arc<ShardMetrics>,
57    pub(crate) state_versions: Arc<StateVersions>,
58    shared_states: Arc<StateCache>,
59    pubsub_sender: Arc<dyn PubSubSender>,
60    pub(crate) shard_id: ShardId,
61
62    // Access to the shard's state, shared across all handles created by the same
63    // PersistClientCache. The state is wrapped in LockingTypedState, disallowing
64    // access across await points. Access should be always be kept brief, and it
65    // is expected that other handles may advance the state at any time this Applier
66    // is not holding the lock.
67    //
68    // NB: This is very intentionally not pub(crate) so that it's easy to reason
69    //     very locally about the duration of locks.
70    state: Arc<LockingTypedState<K, V, T, D>>,
71}
72
73// Impl Clone regardless of the type params.
74impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
75    fn clone(&self) -> Self {
76        Self {
77            cfg: self.cfg.clone(),
78            metrics: Arc::clone(&self.metrics),
79            shard_metrics: Arc::clone(&self.shard_metrics),
80            state_versions: Arc::clone(&self.state_versions),
81            shared_states: Arc::clone(&self.shared_states),
82            pubsub_sender: Arc::clone(&self.pubsub_sender),
83            shard_id: self.shard_id,
84            state: Arc::clone(&self.state),
85        }
86    }
87}
88
89impl<K, V, T, D> Applier<K, V, T, D>
90where
91    K: Debug + Codec,
92    V: Debug + Codec,
93    T: Timestamp + Lattice + Codec64 + Sync,
94    D: Monoid + Codec64,
95{
96    pub async fn new(
97        cfg: PersistConfig,
98        shard_id: ShardId,
99        metrics: Arc<Metrics>,
100        state_versions: Arc<StateVersions>,
101        shared_states: Arc<StateCache>,
102        pubsub_sender: Arc<dyn PubSubSender>,
103        diagnostics: Diagnostics,
104    ) -> Result<Self, Box<CodecMismatch>> {
105        let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
106        let state = shared_states
107            .get::<K, V, T, D, _, _>(
108                shard_id,
109                || {
110                    metrics.cmds.init_state.run_cmd(&shard_metrics, || {
111                        state_versions.maybe_init_shard(&shard_metrics)
112                    })
113                },
114                &diagnostics,
115            )
116            .await?;
117        let ret = Applier {
118            cfg,
119            metrics,
120            shard_metrics,
121            state_versions,
122            shared_states,
123            pubsub_sender,
124            shard_id,
125            state,
126        };
127        Ok(ret)
128    }
129
130    /// Returns a new [StateWatch] for changes to this Applier's State.
131    pub fn watch(&self) -> StateWatch<K, V, T, D> {
132        StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
133    }
134
135    /// Fetches the latest state from Consensus and passes its `upper` to the provided closure.
136    pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
137        self.metrics.cmds.fetch_upper_count.inc();
138        self.fetch_and_update_state(None).await;
139        self.upper(|_seqno, upper| f(upper))
140    }
141
142    /// A point-in-time read/clone of `upper` from the current state.
143    ///
144    /// Due to sharing state with other handles, successive reads to this fn or any other may
145    /// see a different version of state, even if this Applier has not explicitly fetched and
146    /// updated to the latest state. Successive calls will always return values such that
147    /// `PartialOrder::less_equal(call1, call2)` hold true.
148    pub fn clone_upper(&self) -> Antichain<T> {
149        self.upper(|_seqno, upper| upper.clone())
150    }
151
152    pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
153        self.state
154            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
155                f(state.seqno, state.upper())
156            })
157    }
158
159    pub(crate) fn schemas<R>(
160        &self,
161        mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
162    ) -> R {
163        self.state
164            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
165                f(state.seqno, &state.collections.schemas)
166            })
167    }
168
169    pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
170        SchemaCache::new(self.state.schema_cache(), self.clone())
171    }
172
173    /// A point-in-time read of `since` from the current state.
174    ///
175    /// Due to sharing state with other handles, successive reads to this fn or any other may
176    /// see a different version of state, even if this Applier has not explicitly fetched and
177    /// updated to the latest state. Successive calls will always return values such that
178    /// `PartialOrder::less_equal(call1, call2)` hold true.
179    #[cfg(test)]
180    pub fn since(&self) -> Antichain<T> {
181        self.state
182            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
183                state.since().clone()
184            })
185    }
186
187    /// Does a lease for the provided reader exist in the current state?
188    ///
189    /// This is useful when we encounter a condition that should only be possible when the lease
190    /// has expired, so we can distinguish between scary bugs and expected-but-unusual cases.
191    /// This returns whatever lease is present in the latest version of state - so to avoid false
192    /// positives, this should be checked only after the surprising condition has occurred.
193    pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
194        self.state
195            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
196                state.state.collections.leased_readers.get(&id).cloned()
197            })
198    }
199
200    /// A point-in-time read of `seqno` from the current state.
201    ///
202    /// Due to sharing state with other handles, successive reads to this fn or any other may
203    /// see a different version of state, even if this Applier has not explicitly fetched and
204    /// updated to the latest state. Successive calls will always return values such that
205    /// `call1 <= call2` hold true.
206    pub fn seqno(&self) -> SeqNo {
207        self.state
208            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
209                state.seqno
210            })
211    }
212
213    /// A point-in-time read of `seqno_since` from the current state.
214    ///
215    /// Due to sharing state with other handles, successive reads to this fn or any other may
216    /// see a different version of state, even if this Applier has not explicitly fetched and
217    /// updated to the latest state. Successive calls will always return values such that
218    /// `call1 <= call2` hold true.
219    pub fn seqno_since(&self) -> SeqNo {
220        self.state
221            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
222                state.seqno_since()
223            })
224    }
225
226    /// A point-in-time read from the current state. (We declare a shard 'finalized' if it's
227    /// both become an unreadable tombstone and the state itself is has been emptied out.)
228    ///
229    /// Due to sharing state with other handles, successive reads to this fn or any other may
230    /// see a different version of state, even if this Applier has not explicitly fetched and
231    /// updated to the latest state. Once this fn returns true, it will always return true.
232    pub fn is_finalized(&self) -> bool {
233        self.state
234            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
235                state.collections.is_tombstone() && state.collections.is_single_empty_batch()
236            })
237    }
238
239    /// See [crate::PersistClient::get_schema].
240    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
241        self.state
242            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
243                let x = state.collections.schemas.get(&schema_id)?;
244                Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
245            })
246    }
247
248    /// See [crate::PersistClient::latest_schema].
249    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
250        self.state
251            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
252                let (id, x) = state.collections.schemas.last_key_value()?;
253                Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
254            })
255    }
256
257    /// Returns the ID of the given schema, if known at the current state.
258    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
259        self.state
260            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
261                // The common case is that the requested schema is a recent one, so as a minor
262                // optimization, do this search in reverse order.
263                let mut schemas = state.collections.schemas.iter().rev();
264                schemas
265                    .find(|(_, x)| {
266                        K::decode_schema(&x.key) == *key_schema
267                            && V::decode_schema(&x.val) == *val_schema
268                    })
269                    .map(|(id, _)| *id)
270            })
271    }
272
273    /// Returns whether the current's state `since` and `upper` are both empty.
274    ///
275    /// Due to sharing state with other handles, successive reads to this fn or any other may
276    /// see a different version of state, even if this Applier has not explicitly fetched and
277    /// updated to the latest state. Once this fn returns true, it will always return true.
278    pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
279        self.state
280            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
281                if state.since().is_empty() && state.upper().is_empty() {
282                    Ok(())
283                } else {
284                    Err(InvalidUsage::FinalizationError {
285                        since: state.since().clone(),
286                        upper: state.upper().clone(),
287                    })
288                }
289            })
290    }
291
292    /// Returns all rollups that are <= the given `seqno`.
293    ///
294    /// Due to sharing state with other handles, successive reads to this fn or any other may
295    /// see a different version of state, even if this Applier has not explicitly fetched and
296    /// updated to the latest state.
297    pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
298        self.state
299            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
300                state
301                    .collections
302                    .rollups
303                    .range(..=seqno)
304                    .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
305                    .collect::<Vec<(SeqNo, PartialRollupKey)>>()
306            })
307    }
308
309    pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
310        self.state
311            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
312                state
313                    .collections
314                    .trace
315                    .fueled_merge_reqs_before_ms(u64::MAX, None)
316                    .collect()
317            })
318    }
319
320    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
321        self.state
322            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
323                state.snapshot(as_of)
324            })
325    }
326
327    pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
328        self.state
329            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
330                state.state.collections.trace.batches().cloned().collect()
331            })
332    }
333
334    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
335        self.state
336            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
337                state.verify_listen(as_of)
338            })
339    }
340
341    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
342        self.state
343            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
344                state.next_listen_batch(frontier)
345            })
346    }
347
348    pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
349        let state = self
350            .state
351            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
352                state.clone_for_rollup()
353            });
354
355        self.state_versions
356            .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
357            .await
358    }
359
360    pub async fn apply_unbatched_cmd<
361        R,
362        E,
363        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
364    >(
365        &self,
366        cmd: &CmdMetrics,
367        mut work_fn: WorkFn,
368    ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
369        loop {
370            cmd.started.inc();
371            let now = Instant::now();
372            let ret = Self::apply_unbatched_cmd_locked(
373                &self.state,
374                cmd,
375                &mut work_fn,
376                &self.cfg,
377                &self.metrics,
378                &self.shard_metrics,
379                &self.state_versions,
380            )
381            .await;
382            cmd.seconds.inc_by(now.elapsed().as_secs_f64());
383
384            match ret {
385                ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
386                    cmd.succeeded.inc();
387                    self.shard_metrics.cmd_succeeded.inc();
388                    self.update_state(new_state);
389                    if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
390                        self.pubsub_sender.push_diff(&self.shard_id, &diff);
391                    }
392                    return Ok((diff.seqno, Ok(res), maintenance));
393                }
394                ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
395                    cmd.succeeded.inc();
396                    self.shard_metrics.cmd_succeeded.inc();
397                    return Ok((seqno, Err(err), maintenance));
398                }
399                ApplyCmdResult::Indeterminate(err) => {
400                    cmd.failed.inc();
401                    return Err(err);
402                }
403                ApplyCmdResult::ExpectationMismatch(seqno) => {
404                    cmd.cas_mismatch.inc();
405                    self.fetch_and_update_state(Some(seqno)).await;
406                }
407            }
408        }
409    }
410
411    // work_fn fails to compile without mut, false positive
412    #[allow(clippy::needless_pass_by_ref_mut)]
413    async fn apply_unbatched_cmd_locked<
414        R,
415        E,
416        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
417    >(
418        state: &LockingTypedState<K, V, T, D>,
419        cmd: &CmdMetrics,
420        work_fn: &mut WorkFn,
421        cfg: &PersistConfig,
422        metrics: &Metrics,
423        shard_metrics: &ShardMetrics,
424        state_versions: &StateVersions,
425    ) -> ApplyCmdResult<K, V, T, D, R, E> {
426        let computed_next_state = state
427            .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
428                Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
429            });
430
431        let next_state = match computed_next_state {
432            Ok(x) => x,
433            Err((seqno, err)) => {
434                return ApplyCmdResult::SkippedStateTransition((
435                    seqno,
436                    err,
437                    RoutineMaintenance::default(),
438                ));
439            }
440        };
441
442        let NextState {
443            expected,
444            diff,
445            state,
446            expiry_metrics,
447            garbage_collection,
448            write_rollup,
449            work_ret,
450        } = next_state;
451
452        {
453            let build_version = &cfg.build_version;
454            let state_version = &state.state.collections.version;
455            soft_assert_or_log!(
456                cfg::code_can_write_data(build_version, state_version),
457                "current version {build_version} does not support state format {state_version}"
458            );
459        }
460
461        // SUBTLE! Unlike the other consensus and blob uses, we can't
462        // automatically retry indeterminate ExternalErrors here. However,
463        // if the state change itself is _idempotent_, then we're free to
464        // retry even indeterminate errors. See
465        // [Self::apply_unbatched_idempotent_cmd].
466        let cas_res = state_versions
467            .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
468            .await;
469
470        match cas_res {
471            Ok((CaSResult::Committed, diff)) => {
472                assert!(
473                    expected <= state.seqno,
474                    "state seqno regressed: {} vs {}",
475                    expected,
476                    state.seqno
477                );
478
479                metrics
480                    .lease
481                    .timeout_read
482                    .inc_by(u64::cast_from(expiry_metrics.readers_expired));
483
484                metrics
485                    .state
486                    .writer_removed
487                    .inc_by(u64::cast_from(expiry_metrics.writers_expired));
488
489                if let Some(gc) = garbage_collection.as_ref() {
490                    debug!("Assigned gc request: {:?}", gc);
491                }
492
493                let maintenance = RoutineMaintenance {
494                    garbage_collection,
495                    write_rollup,
496                };
497
498                ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
499            }
500            Ok((CaSResult::ExpectationMismatch, _diff)) => {
501                ApplyCmdResult::ExpectationMismatch(expected)
502            }
503            Err(err) => ApplyCmdResult::Indeterminate(err),
504        }
505    }
506
507    fn compute_next_state_locked<
508        R,
509        E,
510        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
511    >(
512        state: &TypedState<K, V, T, D>,
513        work_fn: &mut WorkFn,
514        metrics: &Metrics,
515        cmd: &CmdMetrics,
516        cfg: &PersistConfig,
517    ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
518        let is_write = cmd.name == metrics.cmds.compare_and_append.name;
519        let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
520        let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
521
522        let gc_config = GcConfig {
523            use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
524            fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
525            min_versions: GC_MIN_VERSIONS.get(cfg),
526            max_versions: GC_MAX_VERSIONS.get(cfg),
527        };
528
529        let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
530        let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
531        let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
532
533        let expected = state.seqno;
534        let was_tombstone_before = state.collections.is_tombstone();
535
536        let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
537            Continue(x) => x,
538            Break(err) => {
539                return Err((expected, err));
540            }
541        };
542        let expiry_metrics = new_state.expire_at((cfg.now)());
543        new_state.state.collections.trace.roundtrip_structure = true;
544
545        // Sanity check that all state transitions have special case for
546        // being a tombstone. The ones that do will return a Break and
547        // return out of this method above. The one exception is adding
548        // a rollup, because we want to be able to add a rollup for the
549        // tombstone state.
550        //
551        // TODO: Even better would be to write the rollup in the
552        // tombstone transition so it's a single terminal state
553        // transition, but it'll be tricky to get right.
554        if was_tombstone_before && !(is_rollup || is_become_tombstone) {
555            panic!(
556                "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
557                cmd.name, state
558            );
559        }
560
561        let now = (cfg.now)();
562        let write_rollup = new_state.need_rollup(
563            rollup_threshold,
564            use_active_rollup,
565            rollup_fallback_threshold_ms,
566            now,
567        );
568
569        if let Some(write_rollup_seqno) = write_rollup {
570            if use_active_rollup {
571                new_state.collections.active_rollup = Some(ActiveRollup {
572                    seqno: write_rollup_seqno,
573                    start_ms: now,
574                });
575            }
576        }
577
578        // Find out if this command has been selected to perform gc, so
579        // that it will fire off a background request to the
580        // GarbageCollector to delete eligible blobs and truncate the
581        // state history. This is dependant both on `maybe_gc` returning
582        // Some _and_ on this state being successfully compare_and_set.
583        let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
584
585        if let Some(gc) = garbage_collection.as_ref() {
586            if gc_config.use_active_gc {
587                new_state.collections.active_gc = Some(ActiveGc {
588                    seqno: gc.new_seqno_since,
589                    start_ms: now,
590                });
591            }
592        }
593
594        // Make sure `new_state` is not modified after this point!
595        // The new state and the diff must be consistent with each other for correctness.
596        let diff = StateDiff::from_diff(&state.state, &new_state);
597        // Sanity check that our diff logic roundtrips and adds back up
598        // correctly.
599        #[cfg(any(test, debug_assertions))]
600        {
601            if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
602                panic!("validate_roundtrips failed: {}", err);
603            }
604        }
605
606        Ok(NextState {
607            expected,
608            diff,
609            state: new_state,
610            expiry_metrics,
611            garbage_collection,
612            write_rollup,
613            work_ret,
614        })
615    }
616
617    pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
618        let (seqno_before, seqno_after) =
619            self.state
620                .write_lock(&self.metrics.locks.applier_write, |state| {
621                    let seqno_before = state.seqno;
622                    if seqno_before < new_state.seqno {
623                        *state = new_state;
624                    }
625                    let seqno_after = state.seqno;
626                    (seqno_before, seqno_after)
627                });
628
629        assert!(
630            seqno_before <= seqno_after,
631            "state seqno regressed: {} vs {}",
632            seqno_before,
633            seqno_after
634        );
635    }
636
637    /// Fetches and updates to the latest state. Uses an optional hint to early-out if
638    /// any more recent version of state is observed (e.g. updated by another handle),
639    /// without making any calls to Consensus or Blob.
640    pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
641        let current_seqno = self.seqno();
642        let seqno_before = match seqno_hint {
643            None => current_seqno,
644            Some(hint) => {
645                // state is already more recent than our hint due to
646                // advancement by another handle to the same shard.
647                if hint < current_seqno {
648                    self.metrics.state.update_state_noop_path.inc();
649                    return;
650                }
651                current_seqno
652            }
653        };
654
655        let diffs_to_current = self
656            .state_versions
657            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
658            .await;
659
660        // no new diffs past our current seqno, nothing to do
661        if diffs_to_current.is_empty() {
662            self.metrics.state.update_state_empty_path.inc();
663            return;
664        }
665
666        let new_seqno = self
667            .state
668            .write_lock(&self.metrics.locks.applier_write, |state| {
669                state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
670                state.seqno
671            });
672
673        assert!(
674            seqno_before <= new_seqno,
675            "state seqno regressed: {} vs {}",
676            seqno_before,
677            new_seqno
678        );
679
680        // whether the seqno advanced from diffs and/or because another handle
681        // already updated it, we can assume it is now up-to-date
682        if seqno_before < new_seqno {
683            self.metrics.state.update_state_fast_path.inc();
684            return;
685        }
686
687        // our state is so old there aren't any diffs we can use to
688        // catch up directly. fall back to fully refetching state.
689        // we can reuse the recent diffs we already have as a hint.
690        let new_state = self
691            .state_versions
692            .fetch_current_state(&self.shard_id, diffs_to_current)
693            .await
694            .check_codecs::<K, V, D>(&self.shard_id)
695            .expect("shard codecs should not change");
696
697        let new_seqno = self
698            .state
699            .write_lock(&self.metrics.locks.applier_write, |state| {
700                if state.seqno < new_state.seqno {
701                    *state = new_state;
702                }
703                state.seqno
704            });
705
706        self.metrics.state.update_state_slow_path.inc();
707        assert!(
708            seqno_before <= new_seqno,
709            "state seqno regressed: {} vs {}",
710            seqno_before,
711            new_seqno
712        );
713    }
714}
715
716enum ApplyCmdResult<K, V, T, D, R, E> {
717    Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
718    SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
719    Indeterminate(Indeterminate),
720    ExpectationMismatch(SeqNo),
721}
722
723struct NextState<K, V, T, D, R> {
724    expected: SeqNo,
725    diff: StateDiff<T>,
726    state: TypedState<K, V, T, D>,
727    expiry_metrics: ExpiryMetrics,
728    write_rollup: Option<SeqNo>,
729    garbage_collection: Option<GcReq>,
730    work_ret: R,
731}