pub struct TxnsCache<T, C: TxnsCodec = TxnsCodecDefault> {
pub(crate) txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
pub(crate) buf: Vec<(TxnsEntry, T, i64)>,
state: TxnsCacheState<T>,
}
Expand description
A self-updating TxnsCacheState.
Fields§
§txns_subscribe: Subscribe<C::Key, C::Val, T, i64>
A subscribe over the txn shard.
buf: Vec<(TxnsEntry, T, i64)>
Pending updates for timestamps that haven’t closed.
state: TxnsCacheState<T>
Implementations§
Source§impl<T, C> TxnsCache<T, C>
impl<T, C> TxnsCache<T, C>
Sourcepub(crate) async fn init(
init_ts: T,
txns_read: ReadHandle<C::Key, C::Val, T, i64>,
txns_write: &mut WriteHandle<C::Key, C::Val, T, i64>,
) -> Self
pub(crate) async fn init( init_ts: T, txns_read: ReadHandle<C::Key, C::Val, T, i64>, txns_write: &mut WriteHandle<C::Key, C::Val, T, i64>, ) -> Self
Initialize the txn shard at init_ts
and returns a TxnsCache reading
from that shard.
Sourcepub async fn open(
client: &PersistClient,
txns_id: ShardId,
only_data_id: Option<ShardId>,
) -> Self
pub async fn open( client: &PersistClient, txns_id: ShardId, only_data_id: Option<ShardId>, ) -> Self
Returns a TxnsCache reading from the given txn shard.
txns_id
identifies which shard will be used as the txns WAL. MZ will
likely have one of these per env, used by all processes and the same
across restarts.
async fn from_read( txns_read: ReadHandle<C::Key, C::Val, T, i64>, only_data_id: Option<ShardId>, ) -> Self
Sourcepub async fn update_gt(&mut self, ts: &T) -> &T
pub async fn update_gt(&mut self, ts: &T) -> &T
Invariant: afterward, self.progress_exclusive will be > ts
Returns the progress_exclusive
of the cache after updating.
Sourcepub async fn update_ge(&mut self, ts: &T) -> &T
pub async fn update_ge(&mut self, ts: &T) -> &T
Invariant: afterward, self.progress_exclusive will be >= ts
Returns the progress_exclusive
of the cache after updating.
Sourceasync fn update<F: Fn(&T) -> bool>(
state: &mut TxnsCacheState<T>,
txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
buf: &mut Vec<(TxnsEntry, T, i64)>,
only_data_id: Option<ShardId>,
done: F,
)
async fn update<F: Fn(&T) -> bool>( state: &mut TxnsCacheState<T>, txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>, buf: &mut Vec<(TxnsEntry, T, i64)>, only_data_id: Option<ShardId>, done: F, )
Listen to the txns shard for events until done
returns true.
pub(crate) async fn fetch_parts( only_data_id: Option<ShardId>, txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>, parts: Vec<LeasedBatchPart<T>>, updates: &mut Vec<(TxnsEntry, T, i64)>, )
fn should_fetch_part( only_data_id: Option<&ShardId>, part: &LeasedBatchPart<T>, ) -> bool
Methods from Deref<Target = TxnsCacheState<T>>§
Sourcepub fn registered_at_progress(&self, data_id: &ShardId, ts: &T) -> bool
pub fn registered_at_progress(&self, data_id: &ShardId, ts: &T) -> bool
Returns whether the data shard was registered to the txns set as of the current progress.
Specifically, a data shard is registered if the most recent register timestamp is set but the most recent forget timestamp is not set.
This function accepts a timestamp as input, but that timestamp must be equal to the progress exclusive, or else the function panics. It mainly acts as a way for the caller to think about the logical time at which this function executes. Times in the past may have been compacted away, and we can’t always return an accurate answer. If this function isn’t sufficient, you can usually find what you’re looking for by inspecting the times in the most recent registration.
Sourcepub(crate) fn all_registered_at_progress(&self, ts: &T) -> Vec<ShardId>
pub(crate) fn all_registered_at_progress(&self, ts: &T) -> Vec<ShardId>
Returns the set of all data shards registered to the txns set as of the current progress. See Self::registered_at_progress.
Sourcepub fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T>
pub fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T>
Returns a token exchangeable for a snapshot of a data shard.
A data shard might be definite at times past the physical upper because
of invariants maintained by this txn system. As a result, this method
discovers the latest potentially unapplied write before the as_of
.
Callers must first wait for TxnsCache::update_gt
with the same or
later timestamp to return. Panics otherwise.
Sourcepub fn data_listen_next(&self, data_id: &ShardId, ts: &T) -> DataListenNext<T>
pub fn data_listen_next(&self, data_id: &ShardId, ts: &T) -> DataListenNext<T>
Returns the next action to take when iterating a Listen on a data shard.
A data shard Listen is executed by repeatedly calling this method with an exclusive progress frontier. The returned value indicates an action to take. Some of these actions advance the progress frontier, which results in calling this method again with a higher timestamp, and thus a new action. See DataListenNext for specifications of the actions.
Note that this is a state machine on self.progress_exclusive
and the
listen progress. DataListenNext indicates which state transitions to
take.
Sourcepub(crate) fn data_subscribe(
&self,
data_id: ShardId,
as_of: T,
) -> DataSubscribe<T>
pub(crate) fn data_subscribe( &self, data_id: ShardId, as_of: T, ) -> DataSubscribe<T>
Returns a token exchangeable for a subscribe of a data shard.
Callers must first wait for TxnsCache::update_gt
with the same or
later timestamp to return. Panics otherwise.
Sourcepub fn min_unapplied_ts(&self) -> &T
pub fn min_unapplied_ts(&self) -> &T
Returns the minimum timestamp not known to be applied by this cache.
fn min_unapplied_ts_inner(&self) -> &T
Sourcepub(crate) fn unapplied(
&self,
) -> impl Iterator<Item = (&ShardId, Unapplied<'_>, &T)>
pub(crate) fn unapplied( &self, ) -> impl Iterator<Item = (&ShardId, Unapplied<'_>, &T)>
Returns the operations needing application as of the current progress.
Sourcepub(crate) fn filter_retractions<'a>(
&'a self,
expected_txns_upper: &T,
retractions: impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>,
) -> impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>
pub(crate) fn filter_retractions<'a>( &'a self, expected_txns_upper: &T, retractions: impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>, ) -> impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>
Filters out retractions known to have made it into the txns shard.
This is called with a set of things that are known to have been applied
and in preparation for retracting them. The caller will attempt to
retract everything not filtered out by this method in a CaA with an
expected upper of expected_txns_upper
. So, we catch up to that point,
and keep everything that is still outstanding. If the CaA fails with an
expected upper mismatch, then it must call this method again on the next
attempt with the new expected upper (new retractions may have made it
into the txns shard in the meantime).
Callers must first wait for TxnsCache::update_ge
with the same or
later timestamp to return. Panics otherwise.
Sourcepub(crate) fn push_entries(
&mut self,
entries: Vec<(TxnsEntry, T, i64)>,
progress: T,
)
pub(crate) fn push_entries( &mut self, entries: Vec<(TxnsEntry, T, i64)>, progress: T, )
Update contents with entries
and mark this cache as progressed up to progress
.
fn push_register(&mut self, data_id: ShardId, ts: T, diff: i64, compacted_ts: T)
fn push_append(&mut self, data_id: ShardId, batch: Vec<u8>, ts: T, diff: i64)
Sourcepub(crate) fn mark_register_applied(&mut self, ts: &T)
pub(crate) fn mark_register_applied(&mut self, ts: &T)
Informs the cache that all registers and forgets less than ts have been applied.
Sourcefn compact_data_times(&mut self, data_id: &ShardId)
fn compact_data_times(&mut self, data_id: &ShardId)
Compact the internal representation for data_id
by removing all data
that is not needed to maintain the following invariants:
- The latest write and registration for each shard are kept in
self.datas
. - All unapplied writes and registrations are kept in
self.datas
. - All writes in
self.datas
are contained by some registration inself.datas
.
pub(crate) fn update_gauges(&self, metrics: &Metrics)
fn assert_only_data_id(&self, data_id: &ShardId)
pub(crate) fn validate(&self) -> Result<(), String>
Trait Implementations§
Auto Trait Implementations§
impl<T, C> Freeze for TxnsCache<T, C>where
T: Freeze,
impl<T, C = TxnsCodecDefault> !RefUnwindSafe for TxnsCache<T, C>
impl<T, C> Send for TxnsCache<T, C>
impl<T, C> Sync for TxnsCache<T, C>
impl<T, C> Unpin for TxnsCache<T, C>where
T: Unpin,
impl<T, C = TxnsCodecDefault> !UnwindSafe for TxnsCache<T, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.