Struct mz_storage::upsert::types::UpsertState
source · pub struct UpsertState<'metrics, S, T, O> {Show 13 fields
inner: S,
pub snapshot_start: Instant,
snapshot_stats: SnapshotStats,
snapshot_completed: bool,
metrics: Arc<UpsertSharedMetrics>,
worker_metrics: &'metrics UpsertMetrics,
stats: SourceStatistics,
bincode_opts: DefaultOptions,
bincode_buffer: Vec<u8>,
consolidate_scratch: Vec<(UpsertKey, Result<Row, UpsertError>, Diff)>,
consolidate_upsert_scratch: IndexMap<UpsertKey, UpsertValueAndSize<T, O>>,
multi_get_scratch: Vec<UpsertKey>,
shrink_upsert_unused_buffers_by_ratio: usize,
}
Expand description
An UpsertStateBackend
wrapper that supports consolidating merging, and
reports basic metrics about the usage of the UpsertStateBackend
.
Fields§
§inner: S
§snapshot_start: Instant
§snapshot_stats: SnapshotStats
§snapshot_completed: bool
§metrics: Arc<UpsertSharedMetrics>
§worker_metrics: &'metrics UpsertMetrics
§stats: SourceStatistics
§bincode_opts: DefaultOptions
§bincode_buffer: Vec<u8>
§consolidate_scratch: Vec<(UpsertKey, Result<Row, UpsertError>, Diff)>
§consolidate_upsert_scratch: IndexMap<UpsertKey, UpsertValueAndSize<T, O>>
§multi_get_scratch: Vec<UpsertKey>
§shrink_upsert_unused_buffers_by_ratio: usize
Implementations§
source§impl<'metrics, S, T, O> UpsertState<'metrics, S, T, O>
impl<'metrics, S, T, O> UpsertState<'metrics, S, T, O>
pub(crate) fn new( inner: S, metrics: Arc<UpsertSharedMetrics>, worker_metrics: &'metrics UpsertMetrics, stats: SourceStatistics, shrink_upsert_unused_buffers_by_ratio: usize, ) -> Self
source§impl<S, T, O> UpsertState<'_, S, T, O>
impl<S, T, O> UpsertState<'_, S, T, O>
sourcepub async fn consolidate_chunk<U>(
&mut self,
updates: U,
completed: bool,
) -> Result<(), Error>
pub async fn consolidate_chunk<U>( &mut self, updates: U, completed: bool, ) -> Result<(), Error>
Consolidate the following differential updates into the state. Updates provided to this method can be assumed to consolidate into a single value per-key, after all chunks of updates for a given timestamp have been processed,
Therefore, after all updates of a given timestamp have been
consolidated
, all values must be in the correct state (as determined
by StateValue::ensure_decoded
).
The completed
boolean communicates whether or not this is the final
chunk of updates for the initial “snapshot” from persist.
If the backend supports it, this method will use multi_merge
to
consolidate the updates to avoid having to read the existing value for
each key first. On some backends (like RocksDB), this can be
significantly faster than the read-then-write consolidation strategy.
Also note that we use self.inner.multi_*
, not self.multi_*
. This is
to avoid erroneously changing metric and stats values.
sourceasync fn consolidate_merge_inner<U>(
&mut self,
updates: U,
) -> Result<SnapshotStats, Error>
async fn consolidate_merge_inner<U>( &mut self, updates: U, ) -> Result<SnapshotStats, Error>
Consolidate the updates into the state. This method requires the backend
has support for the multi_merge
operation, and will panic if
self.inner.supports_merge()
was not checked before calling this
method. multi_merge
will write the updates as ‘merge operands’ to the
backend, and then the backend will consolidate those updates with any
existing state using the consolidating_merge_function
.
This method can have significant performance benefits over the
read-then-write method of consolidate_read_write_inner
.
sourceasync fn consolidate_read_write_inner<U>(
&mut self,
updates: U,
) -> Result<SnapshotStats, Error>
async fn consolidate_read_write_inner<U>( &mut self, updates: U, ) -> Result<SnapshotStats, Error>
Consolidates the updates into the state. This method reads the existing values for each key, consolidates the updates, and writes the new values back to the state.
sourcepub async fn multi_put<P>(
&mut self,
update_per_record_stats: bool,
puts: P,
) -> Result<(), Error>
pub async fn multi_put<P>( &mut self, update_per_record_stats: bool, puts: P, ) -> Result<(), Error>
Insert or delete for all puts
keys, prioritizing the last value for
repeated keys.
sourcepub async fn multi_get<'r, G, R>(
&mut self,
gets: G,
results_out: R,
) -> Result<(), Error>where
G: IntoIterator<Item = UpsertKey>,
R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
O: 'r,
pub async fn multi_get<'r, G, R>(
&mut self,
gets: G,
results_out: R,
) -> Result<(), Error>where
G: IntoIterator<Item = UpsertKey>,
R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
O: 'r,
Get the gets
keys, which must be unique, placing the results in results_out
.
Panics if gets
and results_out
are not the same length.
Auto Trait Implementations§
impl<'metrics, S, T, O> Freeze for UpsertState<'metrics, S, T, O>where
S: Freeze,
impl<'metrics, S, T, O> !RefUnwindSafe for UpsertState<'metrics, S, T, O>
impl<'metrics, S, T, O> !Send for UpsertState<'metrics, S, T, O>
impl<'metrics, S, T, O> !Sync for UpsertState<'metrics, S, T, O>
impl<'metrics, S, T, O> Unpin for UpsertState<'metrics, S, T, O>
impl<'metrics, S, T, O> !UnwindSafe for UpsertState<'metrics, S, T, O>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.