Skip to main content

mz_persist_client/internal/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of persist command application.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use crate::cache::{LockingTypedState, StateCache};
19use crate::error::{CodecMismatch, InvalidUsage};
20use crate::internal::gc::GcReq;
21use crate::internal::maintenance::RoutineMaintenance;
22use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
23use crate::internal::paths::{PartialRollupKey, RollupId};
24use crate::internal::state::{
25    ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
26    GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
27    ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
28    StateCollections, TypedState,
29};
30use crate::internal::state_diff::StateDiff;
31use crate::internal::state_versions::{EncodedRollup, StateVersions};
32use crate::internal::trace::FueledMergeReq;
33use crate::internal::watch::StateWatch;
34use crate::read::LeasedReaderId;
35use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
36use crate::schema::SchemaCache;
37use crate::{Diagnostics, PersistConfig, ShardId, cfg};
38use differential_dataflow::difference::Monoid;
39use differential_dataflow::lattice::Lattice;
40use mz_ore::cast::CastFrom;
41use mz_ore::soft_assert_or_log;
42use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
43use mz_persist_types::schema::SchemaId;
44use mz_persist_types::{Codec, Codec64};
45use timely::progress::{Antichain, Timestamp};
46use tracing::debug;
47
48/// An applier of persist commands.
49///
50/// This struct exists mainly to allow us to very narrowly bound the surface
51/// area that directly interacts with state.
52#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54    pub(crate) cfg: PersistConfig,
55    pub(crate) metrics: Arc<Metrics>,
56    pub(crate) shard_metrics: Arc<ShardMetrics>,
57    pub(crate) state_versions: Arc<StateVersions>,
58    pubsub_sender: Arc<dyn PubSubSender>,
59    pub(crate) shard_id: ShardId,
60
61    // Access to the shard's state, shared across all handles created by the same
62    // PersistClientCache. The state is wrapped in LockingTypedState, disallowing
63    // access across await points. Access should be always be kept brief, and it
64    // is expected that other handles may advance the state at any time this Applier
65    // is not holding the lock.
66    //
67    // NB: This is very intentionally not pub(crate) so that it's easy to reason
68    //     very locally about the duration of locks.
69    state: Arc<LockingTypedState<K, V, T, D>>,
70}
71
72// Impl Clone regardless of the type params.
73impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
74    fn clone(&self) -> Self {
75        Self {
76            cfg: self.cfg.clone(),
77            metrics: Arc::clone(&self.metrics),
78            shard_metrics: Arc::clone(&self.shard_metrics),
79            state_versions: Arc::clone(&self.state_versions),
80            pubsub_sender: Arc::clone(&self.pubsub_sender),
81            shard_id: self.shard_id,
82            state: Arc::clone(&self.state),
83        }
84    }
85}
86
87impl<K, V, T, D> Applier<K, V, T, D>
88where
89    K: Debug + Codec,
90    V: Debug + Codec,
91    T: Timestamp + Lattice + Codec64 + Sync,
92    D: Monoid + Codec64,
93{
94    pub async fn new(
95        cfg: PersistConfig,
96        shard_id: ShardId,
97        metrics: Arc<Metrics>,
98        state_versions: Arc<StateVersions>,
99        shared_states: Arc<StateCache>,
100        pubsub_sender: Arc<dyn PubSubSender>,
101        diagnostics: Diagnostics,
102    ) -> Result<Self, Box<CodecMismatch>> {
103        let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
104        let state = shared_states
105            .get::<K, V, T, D, _, _>(
106                shard_id,
107                || {
108                    metrics.cmds.init_state.run_cmd(&shard_metrics, || {
109                        state_versions.maybe_init_shard(&shard_metrics)
110                    })
111                },
112                &diagnostics,
113            )
114            .await?;
115        let ret = Applier {
116            cfg,
117            metrics,
118            shard_metrics,
119            state_versions,
120            pubsub_sender,
121            shard_id,
122            state,
123        };
124        Ok(ret)
125    }
126
127    /// Returns a new [StateWatch] for changes to this Applier's State.
128    pub fn watch(&self) -> StateWatch<K, V, T, D> {
129        StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
130    }
131
132    /// Fetches the latest state from Consensus and passes its `upper` to the provided closure.
133    pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
134        self.metrics.cmds.fetch_upper_count.inc();
135        self.fetch_and_update_state(None).await;
136        self.upper(|_seqno, upper| f(upper))
137    }
138
139    /// A point-in-time read/clone of `upper` from the current state.
140    ///
141    /// Due to sharing state with other handles, successive reads to this fn or any other may
142    /// see a different version of state, even if this Applier has not explicitly fetched and
143    /// updated to the latest state. Successive calls will always return values such that
144    /// `PartialOrder::less_equal(call1, call2)` hold true.
145    pub fn clone_upper(&self) -> Antichain<T> {
146        self.upper(|_seqno, upper| upper.clone())
147    }
148
149    pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
150        self.state
151            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
152                f(state.seqno, state.upper())
153            })
154    }
155
156    pub(crate) fn schemas<R>(
157        &self,
158        mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
159    ) -> R {
160        self.state
161            .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
162                f(state.seqno, &state.collections.schemas)
163            })
164    }
165
166    pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
167        SchemaCache::new(self.state.schema_cache(), self.clone())
168    }
169
170    /// A point-in-time read of `since` from the current state.
171    ///
172    /// Due to sharing state with other handles, successive reads to this fn or any other may
173    /// see a different version of state, even if this Applier has not explicitly fetched and
174    /// updated to the latest state. Successive calls will always return values such that
175    /// `PartialOrder::less_equal(call1, call2)` hold true.
176    #[cfg(test)]
177    pub fn since(&self) -> Antichain<T> {
178        self.state
179            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
180                state.since().clone()
181            })
182    }
183
184    /// Does a lease for the provided reader exist in the current state?
185    ///
186    /// This is useful when we encounter a condition that should only be possible when the lease
187    /// has expired, so we can distinguish between scary bugs and expected-but-unusual cases.
188    /// This returns whatever lease is present in the latest version of state - so to avoid false
189    /// positives, this should be checked only after the surprising condition has occurred.
190    pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
191        self.state
192            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
193                state.state.collections.leased_readers.get(&id).cloned()
194            })
195    }
196
197    /// A point-in-time read of `seqno` from the current state.
198    ///
199    /// Due to sharing state with other handles, successive reads to this fn or any other may
200    /// see a different version of state, even if this Applier has not explicitly fetched and
201    /// updated to the latest state. Successive calls will always return values such that
202    /// `call1 <= call2` hold true.
203    pub fn seqno(&self) -> SeqNo {
204        self.state
205            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
206                state.seqno
207            })
208    }
209
210    /// A point-in-time read of `seqno_since` from the current state.
211    ///
212    /// Due to sharing state with other handles, successive reads to this fn or any other may
213    /// see a different version of state, even if this Applier has not explicitly fetched and
214    /// updated to the latest state. Successive calls will always return values such that
215    /// `call1 <= call2` hold true.
216    pub fn seqno_since(&self) -> SeqNo {
217        self.state
218            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
219                state.seqno_since()
220            })
221    }
222
223    /// A point-in-time read from the current state. (We declare a shard 'finalized' if it's
224    /// both become an unreadable tombstone and the state itself is has been emptied out.)
225    ///
226    /// Due to sharing state with other handles, successive reads to this fn or any other may
227    /// see a different version of state, even if this Applier has not explicitly fetched and
228    /// updated to the latest state. Once this fn returns true, it will always return true.
229    pub fn is_finalized(&self) -> bool {
230        self.state
231            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
232                state.collections.is_tombstone() && state.collections.is_single_empty_batch()
233            })
234    }
235
236    /// See [crate::PersistClient::get_schema].
237    pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
238        self.state
239            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
240                let x = state.collections.schemas.get(&schema_id)?;
241                Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
242            })
243    }
244
245    /// See [crate::PersistClient::latest_schema].
246    pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
247        self.state
248            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
249                let (id, x) = state.collections.schemas.last_key_value()?;
250                Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
251            })
252    }
253
254    /// Returns the ID of the given schema, if known at the current state.
255    pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
256        self.state
257            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
258                // The common case is that the requested schema is a recent one, so as a minor
259                // optimization, do this search in reverse order.
260                let mut schemas = state.collections.schemas.iter().rev();
261                schemas
262                    .find(|(_, x)| {
263                        K::decode_schema(&x.key) == *key_schema
264                            && V::decode_schema(&x.val) == *val_schema
265                    })
266                    .map(|(id, _)| *id)
267            })
268    }
269
270    /// Returns whether the current's state `since` and `upper` are both empty.
271    ///
272    /// Due to sharing state with other handles, successive reads to this fn or any other may
273    /// see a different version of state, even if this Applier has not explicitly fetched and
274    /// updated to the latest state. Once this fn returns true, it will always return true.
275    pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
276        self.state
277            .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
278                if state.since().is_empty() && state.upper().is_empty() {
279                    Ok(())
280                } else {
281                    Err(InvalidUsage::FinalizationError {
282                        since: state.since().clone(),
283                        upper: state.upper().clone(),
284                    })
285                }
286            })
287    }
288
289    /// Returns all rollups that are <= the given `seqno`.
290    ///
291    /// Due to sharing state with other handles, successive reads to this fn or any other may
292    /// see a different version of state, even if this Applier has not explicitly fetched and
293    /// updated to the latest state.
294    pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
295        self.state
296            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
297                state
298                    .collections
299                    .rollups
300                    .range(..=seqno)
301                    .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
302                    .collect::<Vec<(SeqNo, PartialRollupKey)>>()
303            })
304    }
305
306    pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
307        self.state
308            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
309                state
310                    .collections
311                    .trace
312                    .fueled_merge_reqs_before_ms(u64::MAX, None)
313                    .collect()
314            })
315    }
316
317    pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
318        self.state
319            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
320                state.snapshot(as_of)
321            })
322    }
323
324    pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
325        self.state
326            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
327                state.state.collections.trace.batches().cloned().collect()
328            })
329    }
330
331    pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
332        self.state
333            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
334                state.verify_listen(as_of)
335            })
336    }
337
338    pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
339        self.state
340            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
341                state.next_listen_batch(frontier)
342            })
343    }
344
345    pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
346        let state = self
347            .state
348            .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
349                state.clone_for_rollup()
350            });
351
352        self.state_versions
353            .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
354            .await
355    }
356
357    pub async fn apply_unbatched_cmd<
358        R,
359        E,
360        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
361    >(
362        &self,
363        cmd: &CmdMetrics,
364        mut work_fn: WorkFn,
365    ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
366        loop {
367            cmd.started.inc();
368            let now = Instant::now();
369            let ret = Self::apply_unbatched_cmd_locked(
370                &self.state,
371                cmd,
372                &mut work_fn,
373                &self.cfg,
374                &self.metrics,
375                &self.shard_metrics,
376                &self.state_versions,
377            )
378            .await;
379            cmd.seconds.inc_by(now.elapsed().as_secs_f64());
380
381            match ret {
382                ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
383                    cmd.succeeded.inc();
384                    self.shard_metrics.cmd_succeeded.inc();
385                    self.update_state(new_state);
386                    if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
387                        self.pubsub_sender.push_diff(&self.shard_id, &diff);
388                    }
389                    return Ok((diff.seqno, Ok(res), maintenance));
390                }
391                ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
392                    cmd.succeeded.inc();
393                    self.shard_metrics.cmd_succeeded.inc();
394                    return Ok((seqno, Err(err), maintenance));
395                }
396                ApplyCmdResult::Indeterminate(err) => {
397                    cmd.failed.inc();
398                    return Err(err);
399                }
400                ApplyCmdResult::ExpectationMismatch(seqno) => {
401                    cmd.cas_mismatch.inc();
402                    self.fetch_and_update_state(Some(seqno)).await;
403                }
404            }
405        }
406    }
407
408    // work_fn fails to compile without mut, false positive
409    #[allow(clippy::needless_pass_by_ref_mut)]
410    async fn apply_unbatched_cmd_locked<
411        R,
412        E,
413        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
414    >(
415        state: &LockingTypedState<K, V, T, D>,
416        cmd: &CmdMetrics,
417        work_fn: &mut WorkFn,
418        cfg: &PersistConfig,
419        metrics: &Metrics,
420        shard_metrics: &ShardMetrics,
421        state_versions: &StateVersions,
422    ) -> ApplyCmdResult<K, V, T, D, R, E> {
423        // While it's safe for more than one cmd to try and update the state, only one of those
424        // concurrent requests could actually succeed. This call will wait until previous requests
425        // have completed or timed out, and holding the resulting permit will delay other cmds until
426        // this attempt completes or times out.
427        let _permit_opt = state.lease_for_update().await;
428
429        let computed_next_state = state
430            .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
431                Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
432            });
433
434        let next_state = match computed_next_state {
435            Ok(x) => x,
436            Err((seqno, err)) => {
437                return ApplyCmdResult::SkippedStateTransition((
438                    seqno,
439                    err,
440                    RoutineMaintenance::default(),
441                ));
442            }
443        };
444
445        let NextState {
446            diff,
447            state,
448            expiry_metrics,
449            garbage_collection,
450            write_rollup,
451            work_ret,
452        } = next_state;
453
454        {
455            let build_version = &cfg.build_version;
456            let state_version = &state.state.collections.version;
457            soft_assert_or_log!(
458                cfg::code_can_write_data(build_version, state_version),
459                "current version {build_version} does not support state format {state_version}"
460            );
461        }
462
463        // SUBTLE! Unlike the other consensus and blob uses, we can't
464        // automatically retry indeterminate ExternalErrors here. However,
465        // if the state change itself is _idempotent_, then we're free to
466        // retry even indeterminate errors. See
467        // [Self::apply_unbatched_idempotent_cmd].
468        let cas_res = state_versions
469            .try_compare_and_set_current(&cmd.name, shard_metrics, &state, &diff)
470            .await;
471
472        match cas_res {
473            Ok((CaSResult::Committed, diff)) => {
474                metrics
475                    .lease
476                    .timeout_read
477                    .inc_by(u64::cast_from(expiry_metrics.readers_expired));
478
479                metrics
480                    .state
481                    .writer_removed
482                    .inc_by(u64::cast_from(expiry_metrics.writers_expired));
483
484                if let Some(gc) = garbage_collection.as_ref() {
485                    debug!("Assigned gc request: {:?}", gc);
486                }
487
488                let maintenance = RoutineMaintenance {
489                    garbage_collection,
490                    write_rollup,
491                };
492
493                ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
494            }
495            Ok((CaSResult::ExpectationMismatch, _diff)) => {
496                ApplyCmdResult::ExpectationMismatch(state.seqno())
497            }
498            Err(err) => ApplyCmdResult::Indeterminate(err),
499        }
500    }
501
502    fn compute_next_state_locked<
503        R,
504        E,
505        WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
506    >(
507        state: &TypedState<K, V, T, D>,
508        work_fn: &mut WorkFn,
509        metrics: &Metrics,
510        cmd: &CmdMetrics,
511        cfg: &PersistConfig,
512    ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
513        let is_write = cmd.name == metrics.cmds.compare_and_append.name;
514        let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
515        let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
516
517        let gc_config = GcConfig {
518            use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
519            fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
520            min_versions: GC_MIN_VERSIONS.get(cfg),
521            max_versions: GC_MAX_VERSIONS.get(cfg),
522        };
523
524        let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
525        let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
526        let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
527
528        let expected = state.seqno;
529        let was_tombstone_before = state.collections.is_tombstone();
530
531        let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
532            Continue(x) => x,
533            Break(err) => {
534                return Err((expected, err));
535            }
536        };
537        let expiry_metrics = new_state.expire_at((cfg.now)());
538        new_state.state.collections.trace.roundtrip_structure = true;
539
540        // Sanity check that all state transitions have special case for
541        // being a tombstone. The ones that do will return a Break and
542        // return out of this method above. The one exception is adding
543        // a rollup, because we want to be able to add a rollup for the
544        // tombstone state.
545        //
546        // TODO: Even better would be to write the rollup in the
547        // tombstone transition so it's a single terminal state
548        // transition, but it'll be tricky to get right.
549        if was_tombstone_before && !(is_rollup || is_become_tombstone) {
550            panic!(
551                "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
552                cmd.name, state
553            );
554        }
555
556        let now = (cfg.now)();
557        let write_rollup = new_state.need_rollup(
558            rollup_threshold,
559            use_active_rollup,
560            rollup_fallback_threshold_ms,
561            now,
562        );
563
564        if let Some(write_rollup_seqno) = write_rollup {
565            if use_active_rollup {
566                new_state.collections.active_rollup = Some(ActiveRollup {
567                    seqno: write_rollup_seqno,
568                    start_ms: now,
569                });
570            }
571        }
572
573        // Find out if this command has been selected to perform gc, so
574        // that it will fire off a background request to the
575        // GarbageCollector to delete eligible blobs and truncate the
576        // state history. This is dependant both on `maybe_gc` returning
577        // Some _and_ on this state being successfully compare_and_set.
578        let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
579
580        if let Some(gc) = garbage_collection.as_ref() {
581            if gc_config.use_active_gc {
582                new_state.collections.active_gc = Some(ActiveGc {
583                    seqno: gc.new_seqno_since,
584                    start_ms: now,
585                });
586            }
587        }
588
589        // Make sure `new_state` is not modified after this point!
590        // The new state and the diff must be consistent with each other for correctness.
591        let diff = StateDiff::from_diff(&state.state, &new_state);
592        // Sanity check that our diff logic roundtrips and adds back up
593        // correctly.
594        #[cfg(any(test, debug_assertions))]
595        {
596            if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
597                panic!("validate_roundtrips failed: {}", err);
598            }
599        }
600
601        assert_eq!(
602            expected.next(),
603            new_state.seqno(),
604            "successive states should have successive seqnos"
605        );
606        Ok(NextState {
607            diff,
608            state: new_state,
609            expiry_metrics,
610            garbage_collection,
611            write_rollup,
612            work_ret,
613        })
614    }
615
616    pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
617        let (seqno_before, seqno_after) =
618            self.state
619                .write_lock(&self.metrics.locks.applier_write, |state| {
620                    let seqno_before = state.seqno;
621                    if seqno_before < new_state.seqno {
622                        *state = new_state;
623                    }
624                    let seqno_after = state.seqno;
625                    (seqno_before, seqno_after)
626                });
627
628        assert!(
629            seqno_before <= seqno_after,
630            "state seqno regressed: {} vs {}",
631            seqno_before,
632            seqno_after
633        );
634    }
635
636    /// Fetches and updates to the latest state. Uses an optional hint to early-out if
637    /// any more recent version of state is observed (e.g. updated by another handle),
638    /// without making any calls to Consensus or Blob.
639    pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
640        let current_seqno = self.seqno();
641        let seqno_before = match seqno_hint {
642            None => current_seqno,
643            Some(hint) => {
644                // state is already more recent than our hint due to
645                // advancement by another handle to the same shard.
646                if hint < current_seqno {
647                    self.metrics.state.update_state_noop_path.inc();
648                    return;
649                }
650                current_seqno
651            }
652        };
653
654        let diffs_to_current = self
655            .state_versions
656            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
657            .await;
658
659        // no new diffs past our current seqno, nothing to do
660        if diffs_to_current.is_empty() {
661            self.metrics.state.update_state_empty_path.inc();
662            return;
663        }
664
665        let new_seqno = self
666            .state
667            .write_lock(&self.metrics.locks.applier_write, |state| {
668                state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
669                state.seqno
670            });
671
672        assert!(
673            seqno_before <= new_seqno,
674            "state seqno regressed: {} vs {}",
675            seqno_before,
676            new_seqno
677        );
678
679        // whether the seqno advanced from diffs and/or because another handle
680        // already updated it, we can assume it is now up-to-date
681        if seqno_before < new_seqno {
682            self.metrics.state.update_state_fast_path.inc();
683            return;
684        }
685
686        // our state is so old there aren't any diffs we can use to
687        // catch up directly. fall back to fully refetching state.
688        // we can reuse the recent diffs we already have as a hint.
689        let new_state = self
690            .state_versions
691            .fetch_current_state(&self.shard_id, diffs_to_current)
692            .await
693            .check_codecs::<K, V, D>(&self.shard_id)
694            .expect("shard codecs should not change");
695
696        let new_seqno = self
697            .state
698            .write_lock(&self.metrics.locks.applier_write, |state| {
699                if state.seqno < new_state.seqno {
700                    *state = new_state;
701                }
702                state.seqno
703            });
704
705        self.metrics.state.update_state_slow_path.inc();
706        assert!(
707            seqno_before <= new_seqno,
708            "state seqno regressed: {} vs {}",
709            seqno_before,
710            new_seqno
711        );
712    }
713}
714
715enum ApplyCmdResult<K, V, T, D, R, E> {
716    Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
717    SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
718    Indeterminate(Indeterminate),
719    ExpectationMismatch(SeqNo),
720}
721
722struct NextState<K, V, T, D, R> {
723    diff: StateDiff<T>,
724    state: TypedState<K, V, T, D>,
725    expiry_metrics: ExpiryMetrics,
726    write_rollup: Option<SeqNo>,
727    garbage_collection: Option<GcReq>,
728    work_ret: R,
729}