struct TxnsSubscribeTask<T, C: TxnsCodec = TxnsCodecDefault> {
txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
buf: Vec<(TxnsEntry, T, i64)>,
tx: UnboundedSender<TxnsReadCmd<T>>,
only_data_id: Option<ShardId>,
}
Expand description
Reads txn updates from a Subscribe and forwards them to a TxnsReadTask when receiving a progress update.
Fields§
§txns_subscribe: Subscribe<C::Key, C::Val, T, i64>
§buf: Vec<(TxnsEntry, T, i64)>
Staged update that we will consume and forward to the TxnsReadTask when we receive a progress update.
tx: UnboundedSender<TxnsReadCmd<T>>
For sending updates to the main TxnsReadTask.
only_data_id: Option<ShardId>
If Some, this cache only tracks the indicated data shard as a performance optimization. When used, only some methods (in particular, the ones necessary for the txns_progress operator) are supported.
TODO: It’d be nice to make this a compile time thing. I have some ideas, but they’re decently invasive, so leave it for a followup.
Implementations§
Source§impl<T, C> TxnsSubscribeTask<T, C>
impl<T, C> TxnsSubscribeTask<T, C>
Sourcepub async fn open(
client: &PersistClient,
txns_id: ShardId,
only_data_id: Option<ShardId>,
tx: UnboundedSender<TxnsReadCmd<T>>,
) -> (Self, TxnsCacheState<T>)
pub async fn open( client: &PersistClient, txns_id: ShardId, only_data_id: Option<ShardId>, tx: UnboundedSender<TxnsReadCmd<T>>, ) -> (Self, TxnsCacheState<T>)
Creates a TxnsSubscribeTask reading from the given txn shard that
forwards updates (entries and progress) to the given tx
.
This returns both the created task and a TxnsCacheState that can be used to interact with the txn system and into which the updates should be funneled.
NOTE: We create both the TxnsSubscribeTask and the TxnsCacheState at
the same time because the cache is initialized with a since_ts
, which
we get from the same ReadHandle that we use to initialize the
Subscribe.
async fn run(&mut self)
Trait Implementations§
Auto Trait Implementations§
impl<T, C> Freeze for TxnsSubscribeTask<T, C>where
T: Freeze,
impl<T, C = TxnsCodecDefault> !RefUnwindSafe for TxnsSubscribeTask<T, C>
impl<T, C> Send for TxnsSubscribeTask<T, C>
impl<T, C> Sync for TxnsSubscribeTask<T, C>
impl<T, C> Unpin for TxnsSubscribeTask<T, C>where
T: Unpin,
impl<T, C = TxnsCodecDefault> !UnwindSafe for TxnsSubscribeTask<T, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.