Struct mz_persist_client::read::ReadHandle
source · pub struct ReadHandle<K: Codec, V: Codec, T, D> {
pub(crate) cfg: PersistConfig,
pub(crate) metrics: Arc<Metrics>,
pub(crate) machine: Machine<K, V, T, D>,
pub(crate) gc: GarbageCollector<K, V, T, D>,
pub(crate) blob: Arc<dyn Blob>,
pub(crate) reader_id: LeasedReaderId,
pub(crate) read_schemas: Schemas<K, V>,
pub(crate) schema_cache: SchemaCache<K, V, T, D>,
since: Antichain<T>,
pub(crate) last_heartbeat: EpochMillis,
pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
}
Expand description
A “capability” granting the ability to read the state of some shard at times
greater or equal to self.since()
.
Production users should call Self::expire before dropping a ReadHandle so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
All async methods on ReadHandle retry for as long as they are able, but the returned std::future::Futures implement “cancel on drop” semantics. This means that callers can add a timeout using tokio::time::timeout or tokio::time::timeout_at.
tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
Fields§
§cfg: PersistConfig
§metrics: Arc<Metrics>
§machine: Machine<K, V, T, D>
§gc: GarbageCollector<K, V, T, D>
§blob: Arc<dyn Blob>
§reader_id: LeasedReaderId
§read_schemas: Schemas<K, V>
§schema_cache: SchemaCache<K, V, T, D>
§since: Antichain<T>
§last_heartbeat: EpochMillis
§leased_seqnos: BTreeMap<SeqNo, Lease>
§unexpired_state: Option<UnexpiredReadHandleState>
Implementations§
source§impl<K, V, T, D> ReadHandle<K, V, T, D>
impl<K, V, T, D> ReadHandle<K, V, T, D>
pub(crate) async fn new( cfg: PersistConfig, metrics: Arc<Metrics>, machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, blob: Arc<dyn Blob>, reader_id: LeasedReaderId, read_schemas: Schemas<K, V>, since: Antichain<T>, last_heartbeat: EpochMillis, ) -> Self
sourcepub fn since(&self) -> &Antichain<T>
pub fn since(&self) -> &Antichain<T>
This handle’s since
frontier.
This will always be greater or equal to the shard-global since
.
fn outstanding_seqno(&mut self) -> Option<SeqNo>
sourcepub async fn downgrade_since(&mut self, new_since: &Antichain<T>)
pub async fn downgrade_since(&mut self, new_since: &Antichain<T>)
Forwards the since frontier of this handle, giving up the ability to
read at times not greater or equal to new_since
.
This may trigger (asynchronous) compaction and consolidation in the
system. A new_since
of the empty antichain “finishes” this shard,
promising that no more data will ever be read by this handle.
This also acts as a heartbeat for the reader lease (including if called
with new_since
equal to something like self.since()
or the minimum
timestamp, making the call a no-op).
sourcepub async fn listen(
self,
as_of: Antichain<T>,
) -> Result<Listen<K, V, T, D>, Since<T>>
pub async fn listen( self, as_of: Antichain<T>, ) -> Result<Listen<K, V, T, D>, Since<T>>
Returns an ongoing subscription of updates to a shard.
The stream includes all data at times greater than as_of
. Combined
with Self::snapshot it will produce exactly correct results: the
snapshot is the TVCs contents at as_of
and all subsequent updates
occur at exactly their indicated time. The recipient should only
downgrade their read capability when they are certain they have all data
through the frontier they would downgrade to.
This takes ownership of the ReadHandle so the Listen can use it to Self::downgrade_since as it progresses. If you need to keep this handle, then Self::clone it before calling listen.
The Since
error indicates that the requested as_of
cannot be served
(the caller has out of date information) and includes the smallest
as_of
that would have been accepted.
sourcepub async fn snapshot(
&mut self,
as_of: Antichain<T>,
) -> Result<Vec<LeasedBatchPart<T>>, Since<T>>
pub async fn snapshot( &mut self, as_of: Antichain<T>, ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>>
Returns all of the contents of the shard TVC at as_of
broken up into
LeasedBatchPart
es. These parts can be “turned in” via
crate::fetch::fetch_batch_part
to receive the data they contain.
This command returns the contents of this shard as of as_of
once they
are known. This may “block” (in an async-friendly way) if as_of
is
greater or equal to the current upper
of the shard. The recipient
should only downgrade their read capability when they are certain they
have all data through the frontier they would downgrade to.
The Since
error indicates that the requested as_of
cannot be served
(the caller has out of date information) and includes the smallest
as_of
that would have been accepted.
sourcepub async fn subscribe(
self,
as_of: Antichain<T>,
) -> Result<Subscribe<K, V, T, D>, Since<T>>
pub async fn subscribe( self, as_of: Antichain<T>, ) -> Result<Subscribe<K, V, T, D>, Since<T>>
Returns a snapshot of all of a shard’s data using as_of
, followed by
listening to any future updates.
For more details on this operation’s semantics, see Self::snapshot and Self::listen.
fn lease_batch_part( &mut self, desc: Description<T>, part: BatchPart<T>, filter: FetchBatchFilter<T>, ) -> LeasedBatchPart<T>
fn lease_batch_parts( &mut self, batch: HollowBatch<T>, filter: FetchBatchFilter<T>, ) -> impl Stream<Item = LeasedBatchPart<T>> + '_
sourcefn lease_seqno(&mut self) -> Lease
fn lease_seqno(&mut self) -> Lease
Tracks that the ReadHandle
’s machine’s current SeqNo
is being
“leased out” to a LeasedBatchPart
, and cannot be garbage
collected until its lease has been returned.
sourcepub async fn clone(&self, purpose: &str) -> Self
pub async fn clone(&self, purpose: &str) -> Self
Returns an independent ReadHandle with a new LeasedReaderId but the
same since
.
sourcepub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>)
pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>)
A rate-limited version of Self::downgrade_since.
This is an internally rate limited helper, designed to allow users to call it as frequently as they like. Call this Self::downgrade_since, or Self::maybe_heartbeat_reader on some interval that is “frequent” compared to PersistConfig::FAKE_READ_LEASE_DURATION.
This is communicating actual progress information, so is given preferential treatment compared to Self::maybe_heartbeat_reader.
sourcepub(crate) async fn maybe_heartbeat_reader(&mut self)
pub(crate) async fn maybe_heartbeat_reader(&mut self)
Heartbeats the read lease if necessary.
This is an internally rate limited helper, designed to allow users to call it as frequently as they like. Call this Self::downgrade_since, or Self::maybe_downgrade_since on some interval that is “frequent” compared to PersistConfig::FAKE_READ_LEASE_DURATION.
sourcepub async fn expire(self)
pub async fn expire(self)
Politely expires this reader, releasing its lease.
There is a best-effort impl in Drop to expire a reader that wasn’t explictly expired with this method. When possible, explicit expiry is still preferred because the Drop one is best effort and is dependant on a tokio Handle being available in the TLC at the time of drop (which is a bit subtle). Also, explicit expiry allows for control over when it happens.
fn expire_fn( machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, reader_id: LeasedReaderId, ) -> ExpireFn
source§impl<K, V, T, D> ReadHandle<K, V, T, D>
impl<K, V, T, D> ReadHandle<K, V, T, D>
sourcepub async fn snapshot_and_fetch(
&mut self,
as_of: Antichain<T>,
) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>>
pub async fn snapshot_and_fetch( &mut self, as_of: Antichain<T>, ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>>
Generates a Self::snapshot, and fetches all of the batches it contains.
The output is consolidated. Furthermore, to keep memory usage down when reading a snapshot that consolidates well, this consolidates as it goes.
Potential future improvements (if necessary):
- Accept something like a
F: Fn(K,V) -> (K,V)
argument, which looks like an MFP you might be pushing down. Reason being that if you are projecting or transforming in a way that allows further consolidation, amazing. - Reuse any code we write to streaming-merge consolidate in persist_source here.
sourcepub async fn snapshot_cursor(
&mut self,
as_of: Antichain<T>,
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
) -> Result<Cursor<K, V, T, D>, Since<T>>
pub async fn snapshot_cursor( &mut self, as_of: Antichain<T>, should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, ) -> Result<Cursor<K, V, T, D>, Since<T>>
Generates a Self::snapshot, and fetches all of the batches it contains.
To keep memory usage down when reading a snapshot that consolidates well, this consolidates as it goes. However, note that only the serialized data is consolidated: the deserialized data will only be consolidated if your K/V codecs are one-to-one.
sourcepub fn snapshot_stats(
&self,
as_of: Option<Antichain<T>>,
) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
pub fn snapshot_stats( &self, as_of: Option<Antichain<T>>, ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
Returns aggregate statistics about the contents of the shard TVC at the given frontier.
This command returns the contents of this shard as of as_of
once they
are known. This may “block” (in an async-friendly way) if as_of
is
greater or equal to the current upper
of the shard. If None
is given
for as_of
, then the latest stats known by this process are used.
The Since
error indicates that the requested as_of
cannot be served
(the caller has out of date information) and includes the smallest
as_of
that would have been accepted.
sourcepub async fn snapshot_parts_stats(
&self,
as_of: Antichain<T>,
) -> Result<SnapshotPartsStats, Since<T>>
pub async fn snapshot_parts_stats( &self, as_of: Antichain<T>, ) -> Result<SnapshotPartsStats, Since<T>>
Returns aggregate statistics about the contents of the shard TVC at the given frontier.
This command returns the contents of this shard as of as_of
once they
are known. This may “block” (in an async-friendly way) if as_of
is
greater or equal to the current upper
of the shard.
The Since
error indicates that the requested as_of
cannot be served
(the caller has out of date information) and includes the smallest
as_of
that would have been accepted.
source§impl<K, V, T, D> ReadHandle<K, V, T, D>
impl<K, V, T, D> ReadHandle<K, V, T, D>
Trait Implementations§
Auto Trait Implementations§
impl<K, V, T, D> Freeze for ReadHandle<K, V, T, D>where
T: Freeze,
impl<K, V, T, D> !RefUnwindSafe for ReadHandle<K, V, T, D>
impl<K, V, T, D> Send for ReadHandle<K, V, T, D>
impl<K, V, T, D> Sync for ReadHandle<K, V, T, D>
impl<K, V, T, D> Unpin for ReadHandle<K, V, T, D>where
T: Unpin,
impl<K, V, T, D> !UnwindSafe for ReadHandle<K, V, T, D>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.