Enum mz_storage::upsert::types::StateValue
source · pub enum StateValue<T, O> {
Consolidating(Consolidating),
Value(Value<T, O>),
}
Expand description
UpsertState
has 2 modes:
- Normal operation
- Consolidation.
This struct and its substructs are helpers to simplify the logic that
individual UpsertState
implementations need to do to manage these 2 modes.
Normal operation is simple, we just store an ordinary UpsertValue
, and allow the implementer
to store it any way they want. During consolidation, the logic is more complex.
See the docs on StateValue::merge_update
for more information.
Note also that this type is designed to support partial updates. All values are
associated with an order key O
that can be used to determine if a value existing in the
UpsertStateBackend
occurred before or after a value being considered for insertion.
O
typically required to be : Default
, with the default value sorting below all others.
During consolidation, values consolidate correctly (as they are actual
differential updates with diffs), so order keys are not required.
Variants§
Consolidating(Consolidating)
Value(Value<T, O>)
Implementations§
source§impl<T, O> StateValue<T, O>
impl<T, O> StateValue<T, O>
sourcepub fn finalized_value(value: Result<Row, UpsertError>, order: O) -> Self
pub fn finalized_value(value: Result<Row, UpsertError>, order: O) -> Self
A finalized, that is (assumed) persistent, value occurring at some order key.
sourcepub fn is_tombstone(&self) -> bool
pub fn is_tombstone(&self) -> bool
Whether the value is a tombstone.
sourcepub fn order(&self) -> &O
pub fn order(&self) -> &O
Pull out the order for the given Value
, assuming ensure_decoded
has been called.
sourcepub fn into_decoded(self) -> Value<T, O>
pub fn into_decoded(self) -> Value<T, O>
Pull out the Value
value for a StateValue
, after ensure_decoded
has been called.
sourcepub fn memory_size(&self) -> u64
pub fn memory_size(&self) -> u64
The size of a StateValue
, in memory. This is:
- only used in the
InMemoryHashMap
implementation. - An estimate (it only looks at value sizes, and not errors)
Other implementations may use more accurate accounting.
source§impl<T: Eq, O> StateValue<T, O>
impl<T: Eq, O> StateValue<T, O>
sourcepub fn new_provisional_value(
provisional_value: Result<Row, UpsertError>,
provisional_ts: T,
order: O,
) -> Self
pub fn new_provisional_value( provisional_value: Result<Row, UpsertError>, provisional_ts: T, order: O, ) -> Self
Creates a new provisional value, occurring at some order key, observed at the given timestamp.
sourcepub fn into_provisional_value(
self,
provisional_value: Result<Row, UpsertError>,
provisional_ts: T,
provisional_order: O,
) -> Self
pub fn into_provisional_value( self, provisional_value: Result<Row, UpsertError>, provisional_ts: T, provisional_order: O, ) -> Self
Creates a provisional value, that retains the finalized value along with
its order in this StateValue
, if any.
We record the finalized value, so that we can present it when needed or when trying to read a provisional value at a different timestamp.
sourcepub fn new_provisional_tombstone(provisional_ts: T, order: O) -> Self
pub fn new_provisional_tombstone(provisional_ts: T, order: O) -> Self
Creates a new provisional tombstone occurring at some order key, observed at the given timestamp.
sourcepub fn into_provisional_tombstone(
self,
provisional_ts: T,
provisional_order: O,
) -> Self
pub fn into_provisional_tombstone( self, provisional_ts: T, provisional_order: O, ) -> Self
Creates a provisional tombstone, that retains the finalized value along
with its order in this StateValue
, if any.
We record the current finalized value, so that we can present it when needed or when trying to read a provisional value at a different timestamp.
sourcepub fn provisional_order(&self, ts: &T) -> Option<&O>
pub fn provisional_order(&self, ts: &T) -> Option<&O>
Returns the order of a provisional value at the given timestamp. If that doesn’t exist, the order of the finalized value.
Returns None
if none of the above exist.
sourcepub fn provisional_value_ref(&self, ts: &T) -> Option<&Result<Row, UpsertError>>
pub fn provisional_value_ref(&self, ts: &T) -> Option<&Result<Row, UpsertError>>
Returns the provisional value, if one is present at the given timestamp.
Falls back to the finalized value, or None
if there is neither.
sourcepub fn into_finalized_value(self) -> Option<(Result<Row, UpsertError>, O)>
pub fn into_finalized_value(self) -> Option<(Result<Row, UpsertError>, O)>
Returns the the finalized value, if one is present.
source§impl<T: Eq, O: Default> StateValue<T, O>
impl<T: Eq, O: Default> StateValue<T, O>
sourcepub fn merge_update(
&mut self,
value: Result<Row, UpsertError>,
diff: Diff,
bincode_opts: DefaultOptions,
bincode_buffer: &mut Vec<u8>,
) -> bool
pub fn merge_update( &mut self, value: Result<Row, UpsertError>, diff: Diff, bincode_opts: DefaultOptions, bincode_buffer: &mut Vec<u8>, ) -> bool
We use a XOR trick in order to accumulate the values without having to store the full unconsolidated history in memory. For all (value, diff) updates of a key we track:
- diff_sum = SUM(diff)
- checksum_sum = SUM(checksum(bincode(value)) * diff)
- len_sum = SUM(len(bincode(value)) * diff)
- value_xor = XOR(bincode(value))
§Return value
Returns a bool
indicating whether or not the current merged value is able to be deleted.
§Correctness
The method is correct because a well formed upsert collection at a given timestamp will have for each key:
- Zero or one updates of the form (cur_value, +1)
- Zero or more pairs of updates of the form (prev_value, +1), (prev_value, -1)
We are interested in extracting the cur_value of each key and discard all prev_values that might be included in the stream. Since the history of prev_values always comes in pairs, computing the XOR of those is always going to cancel their effects out. Also, since XOR is commutative this property is true independent of the order. The same is true for the summations of the length and checksum since the sum will contain the unrelated values zero times.
Therefore the accumulators will end up precisely in one of two states:
- diff == 0, checksum == 0, value == [0..] => the key is not present
- diff == 1, checksum == checksum(cur_value) value == cur_value => the key is present
§Robustness
In the absense of bugs, accumulating the diff and checksum is not required since we know that a well formed collection always satisfies XOR(bincode(values)) == bincode(cur_value). However bugs may happen and so storing 16 more bytes per key to have a very high guarantee that we’re not decoding garbage is more than worth it. The main key->value used to store previous values.
sourcepub fn merge_update_state(&mut self, other: &Self)
pub fn merge_update_state(&mut self, other: &Self)
Merge an existing StateValue into this one, using the same method described in merge_update
.
See the docstring above for more information on correctness and robustness.
sourcepub fn ensure_decoded(&mut self, bincode_opts: DefaultOptions)
pub fn ensure_decoded(&mut self, bincode_opts: DefaultOptions)
During and after consolidation, we assume that values in the UpsertStateBackend
implementation
can be Self::Consolidating
, with a diff_sum
of 1 (or 0, if they have been deleted).
Afterwards, if we need to retract one of these values, we need to assert that its in this correct state,
then mutate it to its Value
state, so the upsert
operator can use it.
Trait Implementations§
source§impl<T: Clone, O: Clone> Clone for StateValue<T, O>
impl<T: Clone, O: Clone> Clone for StateValue<T, O>
source§fn clone(&self) -> StateValue<T, O>
fn clone(&self) -> StateValue<T, O>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl<T, O> Debug for StateValue<T, O>
impl<T, O> Debug for StateValue<T, O>
source§impl<T, O> Default for StateValue<T, O>
impl<T, O> Default for StateValue<T, O>
source§impl<'de, T, O> Deserialize<'de> for StateValue<T, O>where
T: Deserialize<'de>,
O: Deserialize<'de>,
impl<'de, T, O> Deserialize<'de> for StateValue<T, O>where
T: Deserialize<'de>,
O: Deserialize<'de>,
source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Auto Trait Implementations§
impl<T, O> Freeze for StateValue<T, O>
impl<T, O> RefUnwindSafe for StateValue<T, O>where
O: RefUnwindSafe,
T: RefUnwindSafe,
impl<T, O> Send for StateValue<T, O>
impl<T, O> Sync for StateValue<T, O>
impl<T, O> Unpin for StateValue<T, O>
impl<T, O> UnwindSafe for StateValue<T, O>where
O: UnwindSafe,
T: UnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> ProgressEventTimestamp for T
impl<T> ProgressEventTimestamp for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.