pub struct StateVersions {
    pub(crate) cfg: PersistConfig,
    pub(crate) consensus: Arc<dyn Consensus + Send + Sync>,
    pub(crate) blob: Arc<dyn Blob + Send + Sync>,
    metrics: Arc<Metrics>,
}
Expand description

A durable, truncatable log of versions of State.

As persist metadata changes over time, we make its versions (each identified by a SeqNo) durable in two ways:

The following invariants are maintained at all times:

  • A shard is initialized iff there is at least one version of it in Consensus.
  • The first version of state is written to SeqNo(1). Each successive state version is assigned its predecessor’s SeqNo +1.
  • current: The latest version of state. By definition, the largest SeqNo present in Consensus.
  • As state changes over time, we keep a range of consecutive versions available. These are periodically truncated to prune old versions that are no longer necessary.
  • earliest: The first version of state that it is possible to reconstruct.
    • Invariant: earliest <= current.seqno_since() (we don’t garbage collect versions still being used by some reader).
    • Invariant: earliest is always the smallest Seqno present in Consensus.
      • This doesn’t have to be true, but we select to enforce it.
      • Because the data stored at that smallest Seqno is an incremental diff, to make this invariant work, there needs to be a rollup at either earliest-1 or earliest. We choose earliest because it seems to make the code easier to reason about in practice.
      • A consequence of the above is when we garbage collect old versions of state, we’re only free to truncate ones that are < the latest rollup that is <= current.seqno_since.
  • live diffs: The set of SeqNos present in Consensus at any given time.
  • live states: The range of state versions that it is possible to reconstruct: [earliest,current].
    • Because of earliest and current invariants above, the range of live diffs and live states are the same.
  • The set of known rollups are tracked in the shard state itself.
    • For efficiency of common operations, the most recent rollup’s Blob key is always denormalized in each StateDiff written to Consensus. (As described above, there is always a rollup at earliest, so we’re guaranteed that there is always at least one live rollup.)
    • Invariant: The rollups in current exist in Blob.
      • A consequence is that, if a rollup in a state you believe is current doesn’t exist, it’s a guarantee that current has changed (or it’s a bug).
    • Any rollup at a version < earliest-1 is useless (we’ve lost the incremental diffs between it and the live states). GC is tasked with deleting these rollups from Blob before truncating diffs from Consensus. Thus, any rollup at a seqno < earliest can be considered “leaked” and deleted by the leaked blob detector.
    • Note that this means, while current’s rollups exist, it will be common for other live states to reference rollups that no longer exist.

Fields§

§cfg: PersistConfig§consensus: Arc<dyn Consensus + Send + Sync>§blob: Arc<dyn Blob + Send + Sync>§metrics: Arc<Metrics>

Implementations§

source§

impl StateVersions

source

pub fn new( cfg: PersistConfig, consensus: Arc<dyn Consensus + Send + Sync>, blob: Arc<dyn Blob + Send + Sync>, metrics: Arc<Metrics> ) -> Self

source

pub async fn maybe_init_shard<K, V, T, D>( &self, shard_metrics: &ShardMetrics ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64,

Fetches the current state of the requested shard, or creates it if uninitialized.

source

pub async fn try_compare_and_set_current<K, V, T, D>( &self, cmd_name: &str, shard_metrics: &ShardMetrics, expected: Option<SeqNo>, new_state: &TypedState<K, V, T, D>, diff: &StateDiff<T> ) -> Result<(CaSResult, VersionedData), Indeterminate>where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64,

Updates the state of a shard to a new current iff expected matches current.

May be called on uninitialized shards.

source

pub async fn fetch_current_state<T>( &self, shard_id: &ShardId, live_diffs: Vec<VersionedData> ) -> UntypedState<T>where T: Timestamp + Lattice + Codec64,

Fetches the current state of the requested shard.

Uses the provided hint (live_diffs), which is a possibly outdated copy of all or recent live diffs, to avoid fetches where possible.

Panics if called on an uninitialized shard.

source

pub async fn fetch_all_live_states<T>( &self, shard_id: ShardId ) -> Option<UntypedStateVersionsIter<T>>where T: Timestamp + Lattice + Codec64,

Returns an iterator over all live states for the requested shard.

Returns None if called on an uninitialized shard.

source

pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs

Fetches all live_diffs for a shard. Intended only for when a caller needs to reconstruct all states still referenced by Consensus. Prefer Self::fetch_recent_live_diffs when the caller simply needs to fetch the latest state.

Returns an empty Vec iff called on an uninitialized shard.

source

pub async fn fetch_recent_live_diffs<T>( &self, shard_id: &ShardId ) -> RecentLiveDiffswhere T: Timestamp + Lattice + Codec64,

Fetches recent live_diffs for a shard. Intended for when a caller needs to fetch the latest state in Consensus.

“Recent” is defined as either:

  • All of the diffs known in Consensus
  • All of the diffs in Consensus after the latest rollup
source

pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>( &self, shard_id: &ShardId, seqno: SeqNo ) -> Vec<VersionedData>

Fetches all live diffs greater than the given SeqNo.

TODO: Apply a limit to this scan. This could additionally be used as an internal call within fetch_recent_live_diffs.

source

pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo)

Truncates any diffs in consensus less than the given seqno.

source

async fn write_initial_rollup<K, V, T, D>( &self, shard_metrics: &ShardMetrics ) -> (TypedState<K, V, T, D>, StateDiff<T>)where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64,

source

pub async fn write_rollup_for_state<K, V, T, D>( &self, shard_metrics: &ShardMetrics, state: TypedState<K, V, T, D>, rollup_id: &RollupId ) -> Option<EncodedRollup>where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64,

source

pub fn encode_rollup_blob<K, V, T, D>( &self, shard_metrics: &ShardMetrics, state: TypedState<K, V, T, D>, diffs: Vec<VersionedData>, key: PartialRollupKey ) -> EncodedRollupwhere K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64,

Encodes the given state and diffs as a rollup to be written to the specified key.

The diffs must span the seqno range (state.last_rollup().seqno, state.seqno].

source

pub async fn write_rollup_blob(&self, rollup: &EncodedRollup)

Writes the given state rollup out to blob.

source

async fn fetch_rollup_at_seqno<T>( &self, shard_id: &ShardId, live_diffs: Vec<VersionedData>, seqno: SeqNo ) -> Option<UntypedState<T>>where T: Timestamp + Lattice + Codec64,

Fetches a rollup for the given SeqNo, if it exists.

Uses the provided hint, which is a possibly outdated copy of all or recent live diffs, to avoid fetches where possible.

Panics if called on an uninitialized shard.

source

async fn fetch_rollup_at_key<T>( &self, shard_id: &ShardId, rollup_key: &PartialRollupKey ) -> Option<UntypedState<T>>where T: Timestamp + Lattice + Codec64,

Fetches the rollup at the given key, if it exists.

source

pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey)

Deletes the rollup at the given key, if it exists.

Trait Implementations§

source§

impl Debug for StateVersions

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for Twhere U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unsharedwhere Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more