Struct mz_storage::upsert::types::UpsertState
source · pub struct UpsertState<'metrics, S, O> {Show 13 fields
inner: S,
snapshot_start: Instant,
snapshot_stats: SnapshotStats,
snapshot_completed: bool,
metrics: Arc<UpsertSharedMetrics>,
worker_metrics: &'metrics UpsertMetrics,
stats: SourceStatistics,
bincode_opts: DefaultOptions,
bincode_buffer: Vec<u8>,
consolidate_scratch: Vec<(UpsertKey, Result<Row, UpsertError>, Diff)>,
consolidate_upsert_scratch: IndexMap<UpsertKey, UpsertValueAndSize<O>>,
multi_get_scratch: Vec<UpsertKey>,
shrink_upsert_unused_buffers_by_ratio: usize,
}
Expand description
An UpsertStateBackend
wrapper that supports
snapshot merging, and reports basic metrics about the usage of the UpsertStateBackend
.
Fields§
§inner: S
§snapshot_start: Instant
§snapshot_stats: SnapshotStats
§snapshot_completed: bool
§metrics: Arc<UpsertSharedMetrics>
§worker_metrics: &'metrics UpsertMetrics
§stats: SourceStatistics
§bincode_opts: DefaultOptions
§bincode_buffer: Vec<u8>
§consolidate_scratch: Vec<(UpsertKey, Result<Row, UpsertError>, Diff)>
§consolidate_upsert_scratch: IndexMap<UpsertKey, UpsertValueAndSize<O>>
§multi_get_scratch: Vec<UpsertKey>
§shrink_upsert_unused_buffers_by_ratio: usize
Implementations§
source§impl<'metrics, S, O> UpsertState<'metrics, S, O>
impl<'metrics, S, O> UpsertState<'metrics, S, O>
pub(crate) fn new( inner: S, metrics: Arc<UpsertSharedMetrics>, worker_metrics: &'metrics UpsertMetrics, stats: SourceStatistics, shrink_upsert_unused_buffers_by_ratio: usize, ) -> Self
source§impl<S, O> UpsertState<'_, S, O>where
S: UpsertStateBackend<O>,
O: Default + Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
impl<S, O> UpsertState<'_, S, O>where
S: UpsertStateBackend<O>,
O: Default + Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
sourcepub async fn consolidate_snapshot_chunk<U>(
&mut self,
updates: U,
completed: bool,
) -> Result<(), Error>
pub async fn consolidate_snapshot_chunk<U>( &mut self, updates: U, completed: bool, ) -> Result<(), Error>
Consolidate the following differential updates into the state, during snapshotting. Updates provided to this method can be assumed to consolidate into a single value per-key, after all chunks have been processed.
Therefore, after an entire snapshot has been consolidated
, all values must be in the correct state
(as determined by StateValue::ensure_decoded
), and consolidate_snapshot_chunk
must NOT
be called again.
The completed
boolean communicates whether or not this is the final chunk of updates
to be consolidated, to assert correct usage.
If the backend supports it, this method will use multi_merge
to consolidate the updates
to avoid having to read the existing value for each key first.
On some backends (like RocksDB), this can be significantly faster than the read-then-write
consolidation strategy.
Also note that we use self.inner.multi_*
, not self.multi_*
. This is to avoid
erroneously changing metric and stats values.
sourceasync fn consolidate_snapshot_merge_inner<U>(
&mut self,
updates: U,
) -> Result<SnapshotStats, Error>
async fn consolidate_snapshot_merge_inner<U>( &mut self, updates: U, ) -> Result<SnapshotStats, Error>
Consolidate the updates into the state during snapshotting. This method requires the
backend has support for the multi_merge
operation, and will panic if
self.inner.supports_merge()
was not checked before calling this method.
multi_merge
will write the updates as ‘merge operands’ to the backend, and then the
backend will consolidate those updates with any existing state using the
snapshot_merge_function
.
This method can have significant performance benefits over the
read-then-write method of consolidate_snapshot_read_write_inner
.
sourceasync fn consolidate_snapshot_read_write_inner<U>(
&mut self,
updates: U,
) -> Result<SnapshotStats, Error>
async fn consolidate_snapshot_read_write_inner<U>( &mut self, updates: U, ) -> Result<SnapshotStats, Error>
Consolidates the updates into the state during snapshotting. This method reads the existing values for each key, consolidates the updates, and writes the new values back to the state.
sourcepub async fn multi_put<P>(&mut self, puts: P) -> Result<(), Error>
pub async fn multi_put<P>(&mut self, puts: P) -> Result<(), Error>
Insert or delete for all puts
keys, prioritizing the last value for
repeated keys.
sourcepub async fn multi_get<'r, G, R>(
&mut self,
gets: G,
results_out: R,
) -> Result<(), Error>where
G: IntoIterator<Item = UpsertKey>,
R: IntoIterator<Item = &'r mut UpsertValueAndSize<O>>,
O: 'r,
pub async fn multi_get<'r, G, R>(
&mut self,
gets: G,
results_out: R,
) -> Result<(), Error>where
G: IntoIterator<Item = UpsertKey>,
R: IntoIterator<Item = &'r mut UpsertValueAndSize<O>>,
O: 'r,
Get the gets
keys, which must be unique, placing the results in results_out
.
Panics if gets
and results_out
are not the same length.
Auto Trait Implementations§
impl<'metrics, S, O> Freeze for UpsertState<'metrics, S, O>where
S: Freeze,
impl<'metrics, S, O> !RefUnwindSafe for UpsertState<'metrics, S, O>
impl<'metrics, S, O> !Send for UpsertState<'metrics, S, O>
impl<'metrics, S, O> !Sync for UpsertState<'metrics, S, O>
impl<'metrics, S, O> Unpin for UpsertState<'metrics, S, O>
impl<'metrics, S, O> !UnwindSafe for UpsertState<'metrics, S, O>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.