Enum mz_storage::upsert::types::StateValue

source ·
pub enum StateValue<T, O> {
    Consolidating(Consolidating),
    Value(Value<T, O>),
}
Expand description

UpsertState has 2 modes:

  • Normal operation
  • Consolidation.

This struct and its substructs are helpers to simplify the logic that individual UpsertState implementations need to do to manage these 2 modes.

Normal operation is simple, we just store an ordinary UpsertValue, and allow the implementer to store it any way they want. During consolidation, the logic is more complex. See the docs on StateValue::merge_update for more information.

Note also that this type is designed to support partial updates. All values are associated with an order key O that can be used to determine if a value existing in the UpsertStateBackend occurred before or after a value being considered for insertion.

O typically required to be : Default, with the default value sorting below all others. During consolidation, values consolidate correctly (as they are actual differential updates with diffs), so order keys are not required.

Variants§

§

Consolidating(Consolidating)

§

Value(Value<T, O>)

Implementations§

source§

impl<T, O> StateValue<T, O>

source

pub fn finalized_value(value: Result<Row, UpsertError>, order: O) -> Self

A finalized, that is (assumed) persistent, value occurring at some order key.

source

pub fn tombstone(order: O) -> Self

A tombstoned value occurring at some order key.

source

pub fn is_tombstone(&self) -> bool

Whether the value is a tombstone.

source

pub fn order(&self) -> &O

Pull out the order for the given Value, assuming ensure_decoded has been called.

source

pub fn into_decoded(self) -> Value<T, O>

Pull out the Value value for a StateValue, after ensure_decoded has been called.

source

pub fn memory_size(&self) -> u64

The size of a StateValue, in memory. This is:

  1. only used in the InMemoryHashMap implementation.
  2. An estimate (it only looks at value sizes, and not errors)

Other implementations may use more accurate accounting.

source§

impl<T: Eq, O> StateValue<T, O>

source

pub fn new_provisional_value( provisional_value: Result<Row, UpsertError>, provisional_ts: T, order: O, ) -> Self

Creates a new provisional value, occurring at some order key, observed at the given timestamp.

source

pub fn into_provisional_value( self, provisional_value: Result<Row, UpsertError>, provisional_ts: T, provisional_order: O, ) -> Self

Creates a provisional value, that retains the finalized value along with its order in this StateValue, if any.

We record the finalized value, so that we can present it when needed or when trying to read a provisional value at a different timestamp.

source

pub fn new_provisional_tombstone(provisional_ts: T, order: O) -> Self

Creates a new provisional tombstone occurring at some order key, observed at the given timestamp.

source

pub fn into_provisional_tombstone( self, provisional_ts: T, provisional_order: O, ) -> Self

Creates a provisional tombstone, that retains the finalized value along with its order in this StateValue, if any.

We record the current finalized value, so that we can present it when needed or when trying to read a provisional value at a different timestamp.

source

pub fn provisional_order(&self, ts: &T) -> Option<&O>

Returns the order of a provisional value at the given timestamp. If that doesn’t exist, the order of the finalized value.

Returns None if none of the above exist.

source

pub fn provisional_value_ref(&self, ts: &T) -> Option<&Result<Row, UpsertError>>

Returns the provisional value, if one is present at the given timestamp. Falls back to the finalized value, or None if there is neither.

source

pub fn into_finalized_value(self) -> Option<(Result<Row, UpsertError>, O)>

Returns the the finalized value, if one is present.

source§

impl<T: Eq, O: Default> StateValue<T, O>

source

pub fn merge_update( &mut self, value: Result<Row, UpsertError>, diff: Diff, bincode_opts: DefaultOptions, bincode_buffer: &mut Vec<u8>, ) -> bool

We use a XOR trick in order to accumulate the values without having to store the full unconsolidated history in memory. For all (value, diff) updates of a key we track:

  • diff_sum = SUM(diff)
  • checksum_sum = SUM(checksum(bincode(value)) * diff)
  • len_sum = SUM(len(bincode(value)) * diff)
  • value_xor = XOR(bincode(value))
§Return value

Returns a bool indicating whether or not the current merged value is able to be deleted.

§Correctness

The method is correct because a well formed upsert collection at a given timestamp will have for each key:

  • Zero or one updates of the form (cur_value, +1)
  • Zero or more pairs of updates of the form (prev_value, +1), (prev_value, -1)

We are interested in extracting the cur_value of each key and discard all prev_values that might be included in the stream. Since the history of prev_values always comes in pairs, computing the XOR of those is always going to cancel their effects out. Also, since XOR is commutative this property is true independent of the order. The same is true for the summations of the length and checksum since the sum will contain the unrelated values zero times.

Therefore the accumulators will end up precisely in one of two states:

  1. diff == 0, checksum == 0, value == [0..] => the key is not present
  2. diff == 1, checksum == checksum(cur_value) value == cur_value => the key is present
§Robustness

In the absense of bugs, accumulating the diff and checksum is not required since we know that a well formed collection always satisfies XOR(bincode(values)) == bincode(cur_value). However bugs may happen and so storing 16 more bytes per key to have a very high guarantee that we’re not decoding garbage is more than worth it. The main key->value used to store previous values.

source

pub fn merge_update_state(&mut self, other: &Self)

Merge an existing StateValue into this one, using the same method described in merge_update. See the docstring above for more information on correctness and robustness.

source

pub fn ensure_decoded(&mut self, bincode_opts: DefaultOptions)

During and after consolidation, we assume that values in the UpsertStateBackend implementation can be Self::Consolidating, with a diff_sum of 1 (or 0, if they have been deleted). Afterwards, if we need to retract one of these values, we need to assert that its in this correct state, then mutate it to its Value state, so the upsert operator can use it.

Trait Implementations§

source§

impl<T: Clone, O: Clone> Clone for StateValue<T, O>

source§

fn clone(&self) -> StateValue<T, O>

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<T, O> Debug for StateValue<T, O>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T, O> Default for StateValue<T, O>

source§

fn default() -> Self

Returns the “default value” for a type. Read more
source§

impl<'de, T, O> Deserialize<'de> for StateValue<T, O>
where T: Deserialize<'de>, O: Deserialize<'de>,

source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
source§

impl<T, O> Serialize for StateValue<T, O>
where T: Serialize, O: Serialize,

source§

fn serialize<__S>(&self, __serializer: __S) -> Result<__S::Ok, __S::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

§

impl<T, O> Freeze for StateValue<T, O>
where O: Freeze, T: Freeze,

§

impl<T, O> RefUnwindSafe for StateValue<T, O>

§

impl<T, O> Send for StateValue<T, O>
where O: Send, T: Send,

§

impl<T, O> Sync for StateValue<T, O>
where O: Sync, T: Sync,

§

impl<T, O> Unpin for StateValue<T, O>
where O: Unpin, T: Unpin,

§

impl<T, O> UnwindSafe for StateValue<T, O>
where O: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

default unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> CopyAs<T> for T

source§

fn copy_as(self) -> T

source§

impl<T> DynClone for T
where T: Clone,

source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for T
where T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> ProgressEventTimestamp for T
where T: Data + Debug + Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more
source§

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,

source§

impl<T> Data for T
where T: Clone + 'static,

source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,

source§

impl<T> ExchangeData for T
where T: Data + Data,