Struct mz_persist_client::internal::state_versions::StateVersions
source · pub struct StateVersions {
pub(crate) cfg: PersistConfig,
pub(crate) consensus: Arc<dyn Consensus>,
pub(crate) blob: Arc<dyn Blob>,
pub(crate) metrics: Arc<Metrics>,
}
Expand description
A durable, truncatable log of versions of State.
As persist metadata changes over time, we make its versions (each identified by a SeqNo) durable in two ways:
rollups
: Periodic copies of the entirety of State, written to Blob.diffs
: Incremental StateDiffs, written to Consensus.
The following invariants are maintained at all times:
- A shard is initialized iff there is at least one version of it in Consensus.
- The first version of state is written to
SeqNo(1)
. Each successive state version is assigned its predecessor’s SeqNo +1. current
: The latest version of state. By definition, the largest SeqNo present in Consensus.- As state changes over time, we keep a range of consecutive versions
available. These are periodically
truncated
to prune old versions that are no longer necessary. earliest
: The first version of state that it is possible to reconstruct.- Invariant:
earliest <= current.seqno_since()
(we don’t garbage collect versions still being used by some reader). - Invariant:
earliest
is always the smallest Seqno present in Consensus.- This doesn’t have to be true, but we select to enforce it.
- Because the data stored at that smallest Seqno is an incremental diff,
to make this invariant work, there needs to be a rollup at either
earliest-1
orearliest
. We chooseearliest
because it seems to make the code easier to reason about in practice. - A consequence of the above is when we garbage collect old versions of
state, we’re only free to truncate ones that are
<
the latest rollup that is<= current.seqno_since
.
- Invariant:
live diffs
: The set of SeqNos present in Consensus at any given time.live states
: The range of state versions that it is possible to reconstruct:[earliest,current]
.- Because of earliest and current invariants above, the range of
live diffs
andlive states
are the same.
- Because of earliest and current invariants above, the range of
- The set of known rollups are tracked in the shard state itself.
- For efficiency of common operations, the most recent rollup’s Blob key is always denormalized in each StateDiff written to Consensus. (As described above, there is always a rollup at earliest, so we’re guaranteed that there is always at least one live rollup.)
- Invariant: The rollups in
current
exist in Blob.- A consequence is that, if a rollup in a state you believe is
current
doesn’t exist, it’s a guarantee thatcurrent
has changed (or it’s a bug).
- A consequence is that, if a rollup in a state you believe is
- Any rollup at a version
< earliest-1
is useless (we’ve lost the incremental diffs between it and the live states). GC is tasked with deleting these rollups from Blob before truncating diffs from Consensus. Thus, any rollup at a seqno < earliest can be considered “leaked” and deleted by the leaked blob detector. - Note that this means, while
current
’s rollups exist, it will be common for other live states to reference rollups that no longer exist.
Fields§
§cfg: PersistConfig
§consensus: Arc<dyn Consensus>
§blob: Arc<dyn Blob>
§metrics: Arc<Metrics>
Implementations§
source§impl StateVersions
impl StateVersions
pub fn new( cfg: PersistConfig, consensus: Arc<dyn Consensus>, blob: Arc<dyn Blob>, metrics: Arc<Metrics>, ) -> Self
sourcepub async fn maybe_init_shard<K, V, T, D>(
&self,
shard_metrics: &ShardMetrics,
) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
pub async fn maybe_init_shard<K, V, T, D>( &self, shard_metrics: &ShardMetrics, ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
Fetches the current
state of the requested shard, or creates it if
uninitialized.
sourcepub async fn try_compare_and_set_current<K, V, T, D>(
&self,
cmd_name: &str,
shard_metrics: &ShardMetrics,
expected: Option<SeqNo>,
new_state: &TypedState<K, V, T, D>,
diff: &StateDiff<T>,
) -> Result<(CaSResult, VersionedData), Indeterminate>
pub async fn try_compare_and_set_current<K, V, T, D>( &self, cmd_name: &str, shard_metrics: &ShardMetrics, expected: Option<SeqNo>, new_state: &TypedState<K, V, T, D>, diff: &StateDiff<T>, ) -> Result<(CaSResult, VersionedData), Indeterminate>
Updates the state of a shard to a new current
iff expected
matches
current
.
May be called on uninitialized shards.
sourcepub async fn fetch_current_state<T>(
&self,
shard_id: &ShardId,
live_diffs: Vec<VersionedData>,
) -> UntypedState<T>
pub async fn fetch_current_state<T>( &self, shard_id: &ShardId, live_diffs: Vec<VersionedData>, ) -> UntypedState<T>
Fetches the current
state of the requested shard.
Uses the provided hint (live_diffs), which is a possibly outdated copy of all or recent live diffs, to avoid fetches where possible.
Panics if called on an uninitialized shard.
sourcepub async fn fetch_all_live_states<T>(
&self,
shard_id: ShardId,
) -> Option<UntypedStateVersionsIter<T>>
pub async fn fetch_all_live_states<T>( &self, shard_id: ShardId, ) -> Option<UntypedStateVersionsIter<T>>
Returns an iterator over all live states for the requested shard.
Returns None if called on an uninitialized shard.
sourcepub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs
pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs
Fetches all live_diffs for a shard. Intended only for when a caller needs to reconstruct all states still referenced by Consensus. Prefer Self::fetch_recent_live_diffs when the caller simply needs to fetch the latest state.
Returns an empty Vec iff called on an uninitialized shard.
sourcepub async fn fetch_recent_live_diffs<T>(
&self,
shard_id: &ShardId,
) -> RecentLiveDiffs
pub async fn fetch_recent_live_diffs<T>( &self, shard_id: &ShardId, ) -> RecentLiveDiffs
Fetches recent live_diffs for a shard. Intended for when a caller needs to fetch the latest state in Consensus.
“Recent” is defined as either:
- All of the diffs known in Consensus
- All of the diffs in Consensus after the latest rollup
sourcepub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
&self,
shard_id: &ShardId,
seqno: SeqNo,
) -> Vec<VersionedData>
pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>( &self, shard_id: &ShardId, seqno: SeqNo, ) -> Vec<VersionedData>
Fetches all live diffs greater than the given SeqNo.
TODO: Apply a limit to this scan. This could additionally be used as an internal
call within fetch_recent_live_diffs
.
sourcepub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo)
pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo)
Truncates any diffs in consensus less than the given seqno.
async fn write_initial_rollup<K, V, T, D>( &self, shard_metrics: &ShardMetrics, ) -> (TypedState<K, V, T, D>, StateDiff<T>)
pub async fn write_rollup_for_state<K, V, T, D>( &self, shard_metrics: &ShardMetrics, state: TypedState<K, V, T, D>, rollup_id: &RollupId, ) -> Option<EncodedRollup>
sourcepub fn encode_rollup_blob<K, V, T, D>(
&self,
shard_metrics: &ShardMetrics,
state: TypedState<K, V, T, D>,
diffs: Vec<VersionedData>,
key: PartialRollupKey,
) -> EncodedRollup
pub fn encode_rollup_blob<K, V, T, D>( &self, shard_metrics: &ShardMetrics, state: TypedState<K, V, T, D>, diffs: Vec<VersionedData>, key: PartialRollupKey, ) -> EncodedRollup
Encodes the given state and diffs as a rollup to be written to the specified key.
The diffs must span the seqno range (state.last_rollup().seqno, state.seqno]
.
sourcepub async fn write_rollup_blob(&self, rollup: &EncodedRollup)
pub async fn write_rollup_blob(&self, rollup: &EncodedRollup)
Writes the given state rollup out to blob.
sourceasync fn fetch_rollup_at_seqno<T>(
&self,
shard_id: &ShardId,
live_diffs: Vec<VersionedData>,
seqno: SeqNo,
) -> Option<UntypedState<T>>
async fn fetch_rollup_at_seqno<T>( &self, shard_id: &ShardId, live_diffs: Vec<VersionedData>, seqno: SeqNo, ) -> Option<UntypedState<T>>
Fetches a rollup for the given SeqNo, if it exists.
Uses the provided hint, which is a possibly outdated copy of all or recent live diffs, to avoid fetches where possible.
Panics if called on an uninitialized shard.
sourceasync fn fetch_rollup_at_key<T>(
&self,
shard_id: &ShardId,
rollup_key: &PartialRollupKey,
) -> Option<UntypedState<T>>
async fn fetch_rollup_at_key<T>( &self, shard_id: &ShardId, rollup_key: &PartialRollupKey, ) -> Option<UntypedState<T>>
Fetches the rollup at the given key, if it exists.
sourcepub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey)
pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey)
Deletes the rollup at the given key, if it exists.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for StateVersions
impl !RefUnwindSafe for StateVersions
impl Send for StateVersions
impl Sync for StateVersions
impl Unpin for StateVersions
impl !UnwindSafe for StateVersions
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.