Struct mz_persist_txn::txns::TxnsHandle
source · pub struct TxnsHandle<K, V, T, D, O = u64, C = TxnsCodecDefault>where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + TotalOrder + Codec64,
D: Semigroup + Codec64 + Send + Sync,
O: Opaque + Debug + Codec64,
C: TxnsCodec,{ /* private fields */ }
Expand description
An interface for atomic multi-shard writes.
This handle is acquired through Self::open. Any data shards must be registered with Self::register before use. Transactions are then started with Self::begin.
§Implementation Details
The structure of the txns shard is (ShardId, Vec<u8>)
updates.
The core mechanism is that a txn commits a set of transmittable persist
batch handles as (ShardId, <opaque blob>)
pairs at a single timestamp.
This contractually both commits the txn and advances the logical upper of
every data shard (not just the ones involved in the txn).
Example:
// A txn to only d0 at ts=1
(d0, <opaque blob A>, 1, 1)
// A txn to d0 (two blobs) and d1 (one blob) at ts=4
(d0, <opaque blob B>, 4, 1)
(d0, <opaque blob C>, 4, 1)
(d1, <opaque blob D>, 4, 1)
However, the new commit is not yet readable until the txn apply has run, which is expected to be promptly done by the committer, except in the event of a crash. This, in ts order, moves the batch handles into the data shards with a compare_and_append_batch (similar to how the multi-worker persist_sink works).
Once apply is run, we “tidy” the txns shard by retracting the update adding the batch. As a result, the contents of the txns shard at any given timestamp is exactly the set of outstanding apply work (plus registrations, see below).
Example (building on the above):
// Tidy for the first txn at ts=3
(d0, <opaque blob A>, 3, -1)
// Tidy for the second txn (the timestamps can be different for each
// retraction in a txn, but don't need to be)
(d0, <opaque blob B>, 5, -1)
(d0, <opaque blob C>, 6, -1)
(d1, <opaque blob D>, 6, -1)
To make it easy to reason about exactly which data shards are registered in
the txn set at any given moment, the data shard is added to the set with a
(ShardId, <empty>)
pair. The data may not be read before the timestamp of
the update (which starts at the time it was initialized, but it may later be
forwarded).
Example (building on both of the above):
// d0 and d1 were both initialized before they were used above
(d0, <empty>, 0, 1)
(d1, <empty>, 2, 1)
Implementations§
source§impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>
impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>
sourcepub async fn open(
init_ts: T,
client: PersistClient,
metrics: Arc<Metrics>,
txns_id: ShardId,
key_schema: Arc<K::Schema>,
val_schema: Arc<V::Schema>
) -> Self
pub async fn open( init_ts: T, client: PersistClient, metrics: Arc<Metrics>, txns_id: ShardId, key_schema: Arc<K::Schema>, val_schema: Arc<V::Schema> ) -> Self
Returns a TxnsHandle committing to the given txn shard.
txns_id
identifies which shard will be used as the txns WAL. MZ will
likely have one of these per env, used by all processes and the same
across restarts.
This also does any (idempotent) initialization work: i.e. ensures that
the txn shard is readable at init_ts
by appending an empty batch, if
necessary.
sourcepub fn begin(&self) -> Txn<K, V, T, D>
pub fn begin(&self) -> Txn<K, V, T, D>
Returns a new, empty transaction that can involve the data shards registered with this handle.
sourcepub async fn register(
&mut self,
register_ts: T,
data_writes: impl IntoIterator<Item = WriteHandle<K, V, T, D>>
) -> Result<Tidy, T>
pub async fn register( &mut self, register_ts: T, data_writes: impl IntoIterator<Item = WriteHandle<K, V, T, D>> ) -> Result<Tidy, T>
Registers data shards for use with this txn set.
A registration entry is written to the txn shard. If it is not possible to register the data at the requested time, an Err will be returned with the minimum time the data shards could be registered.
This method is idempotent. Data shards currently registered at
register_ts
will not be registered a second time. Specifically, this
method will return success when the most recent register ts R
is
less_equal to register_ts
AND there is no forget ts between R
and
register_ts
.
As a side effect all txns <= register_ts are applied, including the registration itself.
WARNING! While a data shard is registered to the txn set, writing to it directly (i.e. using a WriteHandle instead of the TxnHandle, registering it with another txn shard) will lead to incorrectness, undefined behavior, and (potentially sticky) panics.
sourcepub async fn forget(
&mut self,
forget_ts: T,
data_ids: impl IntoIterator<Item = ShardId>
) -> Result<Tidy, T>
pub async fn forget( &mut self, forget_ts: T, data_ids: impl IntoIterator<Item = ShardId> ) -> Result<Tidy, T>
Removes data shards from use with this txn set.
The registration entry written to the txn shard is retracted. If it is not possible to forget the data shard at the requested time, an Err will be returned with the minimum time the data shards could be forgotten.
This method is idempotent. Data shards currently forgotten at
forget_ts
will not be forgotten a second time. Specifically, this
method will return success when the most recent forget ts (if any) F
is less_equal to forget_ts
AND there is no register ts between F
and
forget_ts
.
As a side effect all txns <= forget_ts are applied, including the forget itself.
WARNING! While a data shard is registered to the txn set, writing to it directly (i.e. using a WriteHandle instead of the TxnHandle, registering it with another txn shard) will lead to incorrectness, undefined behavior, and (potentially sticky) panics.
sourcepub async fn forget_all(
&mut self,
forget_ts: T
) -> Result<(Vec<ShardId>, Tidy), T>
pub async fn forget_all( &mut self, forget_ts: T ) -> Result<(Vec<ShardId>, Tidy), T>
Forgets, at the given timestamp, every data shard that is registered. Returns the ids of the forgotten shards. See Self::forget.
sourcepub async fn apply_le(&mut self, ts: &T) -> Tidy
pub async fn apply_le(&mut self, ts: &T) -> Tidy
“Applies” all committed txns <= the given timestamp, ensuring that reads at that timestamp will not block.
In the common case, the txn committer will have done this work and this method will be a no-op, but it is not guaranteed. In the event of a crash or race, this does whatever persist writes are necessary (and returns the resulting maintenance work), which could be significant.
If the requested timestamp has not yet been written, this could block for an unbounded amount of time.
This method is idempotent.
sourcepub async fn apply_eager_le(&mut self, ts: &T) -> Tidy
pub async fn apply_eager_le(&mut self, ts: &T) -> Tidy
Self::apply_le but also advances the physical upper of every data shard registered at the timestamp past the timestamp.
sourcepub async fn compact_to(&mut self, since_ts: T)
pub async fn compact_to(&mut self, since_ts: T)
Allows compaction to the txns shard as well as internal representations, losing the ability to answer queries about times less_than since_ts.
In practice, this will likely only be called from the singleton controller process.
sourcepub fn read_cache(&self) -> &TxnsCache<T, C>
pub fn read_cache(&self) -> &TxnsCache<T, C>
Returns the TxnsCache used by this handle.
Trait Implementations§
source§impl<K, V, T, D, O, C> Debug for TxnsHandle<K, V, T, D, O, C>
impl<K, V, T, D, O, C> Debug for TxnsHandle<K, V, T, D, O, C>
Auto Trait Implementations§
impl<K, V, T, D, O, C> Freeze for TxnsHandle<K, V, T, D, O, C>
impl<K, V, T, D, O = u64, C = TxnsCodecDefault> !RefUnwindSafe for TxnsHandle<K, V, T, D, O, C>
impl<K, V, T, D, O, C> Send for TxnsHandle<K, V, T, D, O, C>where
O: Send,
impl<K, V, T, D, O, C> Sync for TxnsHandle<K, V, T, D, O, C>where
O: Sync,
impl<K, V, T, D, O, C> Unpin for TxnsHandle<K, V, T, D, O, C>
impl<K, V, T, D, O = u64, C = TxnsCodecDefault> !UnwindSafe for TxnsHandle<K, V, T, D, O, C>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
source§fn copy_onto(
self,
target: &mut ConsecutiveOffsetPairs<R, O>
) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
fn copy_onto( self, target: &mut ConsecutiveOffsetPairs<R, O> ) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.