Struct mz_txn_wal::txns::TxnsHandle

source ·
pub struct TxnsHandle<K: Codec, V: Codec, T, D, O = u64, C: TxnsCodec = TxnsCodecDefault> {
    pub(crate) metrics: Arc<Metrics>,
    pub(crate) txns_cache: TxnsCache<T, C>,
    pub(crate) txns_write: WriteHandle<C::Key, C::Val, T, i64>,
    pub(crate) txns_since: SinceHandle<C::Key, C::Val, T, i64, O>,
    pub(crate) datas: DataHandles<K, V, T, D>,
}
Expand description

An interface for atomic multi-shard writes.

This handle is acquired through Self::open. Any data shards must be registered with Self::register before use. Transactions are then started with Self::begin.

§Implementation Details

The structure of the txns shard is (ShardId, Vec<u8>) updates.

The core mechanism is that a txn commits a set of transmittable persist batch handles as (ShardId, <opaque blob>) pairs at a single timestamp. This contractually both commits the txn and advances the logical upper of every data shard (not just the ones involved in the txn).

Example:

// A txn to only d0 at ts=1
(d0, <opaque blob A>, 1, 1)
// A txn to d0 (two blobs) and d1 (one blob) at ts=4
(d0, <opaque blob B>, 4, 1)
(d0, <opaque blob C>, 4, 1)
(d1, <opaque blob D>, 4, 1)

However, the new commit is not yet readable until the txn apply has run, which is expected to be promptly done by the committer, except in the event of a crash. This, in ts order, moves the batch handles into the data shards with a compare_and_append_batch (similar to how the multi-worker persist_sink works).

Once apply is run, we “tidy” the txns shard by retracting the update adding the batch. As a result, the contents of the txns shard at any given timestamp is exactly the set of outstanding apply work (plus registrations, see below).

Example (building on the above):

// Tidy for the first txn at ts=3
(d0, <opaque blob A>, 3, -1)
// Tidy for the second txn (the timestamps can be different for each
// retraction in a txn, but don't need to be)
(d0, <opaque blob B>, 5, -1)
(d0, <opaque blob C>, 6, -1)
(d1, <opaque blob D>, 6, -1)

To make it easy to reason about exactly which data shards are registered in the txn set at any given moment, the data shard is added to the set with a (ShardId, <empty>) pair. The data may not be read before the timestamp of the update (which starts at the time it was initialized, but it may later be forwarded).

Example (building on both of the above):

// d0 and d1 were both initialized before they were used above
(d0, <empty>, 0, 1)
(d1, <empty>, 2, 1)

Fields§

§metrics: Arc<Metrics>§txns_cache: TxnsCache<T, C>§txns_write: WriteHandle<C::Key, C::Val, T, i64>§txns_since: SinceHandle<C::Key, C::Val, T, i64, O>§datas: DataHandles<K, V, T, D>

Implementations§

source§

impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>

source

pub async fn open( init_ts: T, client: PersistClient, dyncfgs: ConfigSet, metrics: Arc<Metrics>, txns_id: ShardId, ) -> Self

Returns a TxnsHandle committing to the given txn shard.

txns_id identifies which shard will be used as the txns WAL. MZ will likely have one of these per env, used by all processes and the same across restarts.

This also does any (idempotent) initialization work: i.e. ensures that the txn shard is readable at init_ts by appending an empty batch, if necessary.

source

pub fn begin(&self) -> Txn<K, V, T, D>

Returns a new, empty transaction that can involve the data shards registered with this handle.

source

pub async fn register( &mut self, register_ts: T, data_writes: impl IntoIterator<Item = WriteHandle<K, V, T, D>>, ) -> Result<Tidy, T>

Registers data shards for use with this txn set.

A registration entry is written to the txn shard. If it is not possible to register the data at the requested time, an Err will be returned with the minimum time the data shards could be registered.

This method is idempotent. Data shards currently registered at register_ts will not be registered a second time. Specifically, this method will return success when the most recent register ts R is less_equal to register_ts AND there is no forget ts between R and register_ts.

As a side effect all txns <= register_ts are applied, including the registration itself.

WARNING! While a data shard is registered to the txn set, writing to it directly (i.e. using a WriteHandle instead of the TxnHandle, registering it with another txn shard) will lead to incorrectness, undefined behavior, and (potentially sticky) panics.

source

pub async fn forget( &mut self, forget_ts: T, data_ids: impl IntoIterator<Item = ShardId>, ) -> Result<Tidy, T>

Removes data shards from use with this txn set.

The registration entry written to the txn shard is retracted. If it is not possible to forget the data shard at the requested time, an Err will be returned with the minimum time the data shards could be forgotten.

This method is idempotent. Data shards currently forgotten at forget_ts will not be forgotten a second time. Specifically, this method will return success when the most recent forget ts (if any) F is less_equal to forget_ts AND there is no register ts between F and forget_ts.

As a side effect all txns <= forget_ts are applied, including the forget itself.

WARNING! While a data shard is registered to the txn set, writing to it directly (i.e. using a WriteHandle instead of the TxnHandle, registering it with another txn shard) will lead to incorrectness, undefined behavior, and (potentially sticky) panics.

source

pub async fn forget_all( &mut self, forget_ts: T, ) -> Result<(Vec<ShardId>, Tidy), T>

Forgets, at the given timestamp, every data shard that is registered. Returns the ids of the forgotten shards. See Self::forget.

source

pub async fn apply_le(&mut self, ts: &T) -> Tidy

“Applies” all committed txns <= the given timestamp, ensuring that reads at that timestamp will not block.

In the common case, the txn committer will have done this work and this method will be a no-op, but it is not guaranteed. In the event of a crash or race, this does whatever persist writes are necessary (and returns the resulting maintenance work), which could be significant.

If the requested timestamp has not yet been written, this could block for an unbounded amount of time.

This method is idempotent.

source

pub async fn compact_to(&mut self, since_ts: T)

Allows compaction to the txns shard as well as internal representations, losing the ability to answer queries about times less_than since_ts.

In practice, this will likely only be called from the singleton controller process.

source

pub fn txns_id(&self) -> ShardId

Returns the ShardId of the txns shard.

source

pub fn read_cache(&self) -> &TxnsCache<T, C>

Returns the TxnsCache used by this handle.

Trait Implementations§

source§

impl<K: Debug + Codec, V: Debug + Codec, T: Debug, D: Debug, O: Debug, C: Debug + TxnsCodec> Debug for TxnsHandle<K, V, T, D, O, C>
where C::Key: Debug, C::Val: Debug,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<K, V, T, D, O, C> Freeze for TxnsHandle<K, V, T, D, O, C>
where O: Freeze, T: Freeze,

§

impl<K, V, T, D, O = u64, C = TxnsCodecDefault> !RefUnwindSafe for TxnsHandle<K, V, T, D, O, C>

§

impl<K, V, T, D, O, C> Send for TxnsHandle<K, V, T, D, O, C>
where O: Send, T: Send + Sync,

§

impl<K, V, T, D, O, C> Sync for TxnsHandle<K, V, T, D, O, C>
where O: Sync, T: Sync + Send,

§

impl<K, V, T, D, O, C> Unpin for TxnsHandle<K, V, T, D, O, C>
where O: Unpin, T: Unpin,

§

impl<K, V, T, D, O = u64, C = TxnsCodecDefault> !UnwindSafe for TxnsHandle<K, V, T, D, O, C>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more