Struct mz_persist_client::read::ReadHandle

source ·
pub struct ReadHandle<K: Codec, V: Codec, T, D> {
    pub(crate) cfg: PersistConfig,
    pub(crate) metrics: Arc<Metrics>,
    pub(crate) machine: Machine<K, V, T, D>,
    pub(crate) gc: GarbageCollector<K, V, T, D>,
    pub(crate) blob: Arc<dyn Blob>,
    pub(crate) reader_id: LeasedReaderId,
    pub(crate) read_schemas: Schemas<K, V>,
    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
    since: Antichain<T>,
    pub(crate) last_heartbeat: EpochMillis,
    pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
}
Expand description

A “capability” granting the ability to read the state of some shard at times greater or equal to self.since().

Production users should call Self::expire before dropping a ReadHandle so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.

All async methods on ReadHandle retry for as long as they are able, but the returned std::future::Futures implement “cancel on drop” semantics. This means that callers can add a timeout using tokio::time::timeout or tokio::time::timeout_at.

tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await

Fields§

§cfg: PersistConfig§metrics: Arc<Metrics>§machine: Machine<K, V, T, D>§gc: GarbageCollector<K, V, T, D>§blob: Arc<dyn Blob>§reader_id: LeasedReaderId§read_schemas: Schemas<K, V>§schema_cache: SchemaCache<K, V, T, D>§since: Antichain<T>§last_heartbeat: EpochMillis§leased_seqnos: BTreeMap<SeqNo, Lease>§unexpired_state: Option<UnexpiredReadHandleState>

Implementations§

source§

impl<K, V, T, D> ReadHandle<K, V, T, D>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

source

pub(crate) async fn new( cfg: PersistConfig, metrics: Arc<Metrics>, machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, blob: Arc<dyn Blob>, reader_id: LeasedReaderId, read_schemas: Schemas<K, V>, since: Antichain<T>, last_heartbeat: EpochMillis, ) -> Self

source

pub fn shard_id(&self) -> ShardId

This handle’s shard id.

source

pub fn since(&self) -> &Antichain<T>

This handle’s since frontier.

This will always be greater or equal to the shard-global since.

source

fn outstanding_seqno(&mut self) -> Option<SeqNo>

source

pub async fn downgrade_since(&mut self, new_since: &Antichain<T>)

Forwards the since frontier of this handle, giving up the ability to read at times not greater or equal to new_since.

This may trigger (asynchronous) compaction and consolidation in the system. A new_since of the empty antichain “finishes” this shard, promising that no more data will ever be read by this handle.

This also acts as a heartbeat for the reader lease (including if called with new_since equal to something like self.since() or the minimum timestamp, making the call a no-op).

source

pub async fn listen( self, as_of: Antichain<T>, ) -> Result<Listen<K, V, T, D>, Since<T>>

Returns an ongoing subscription of updates to a shard.

The stream includes all data at times greater than as_of. Combined with Self::snapshot it will produce exactly correct results: the snapshot is the TVCs contents at as_of and all subsequent updates occur at exactly their indicated time. The recipient should only downgrade their read capability when they are certain they have all data through the frontier they would downgrade to.

This takes ownership of the ReadHandle so the Listen can use it to Self::downgrade_since as it progresses. If you need to keep this handle, then Self::clone it before calling listen.

The Since error indicates that the requested as_of cannot be served (the caller has out of date information) and includes the smallest as_of that would have been accepted.

source

pub async fn snapshot( &mut self, as_of: Antichain<T>, ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>>

Returns all of the contents of the shard TVC at as_of broken up into LeasedBatchPartes. These parts can be “turned in” via crate::fetch::fetch_batch_part to receive the data they contain.

This command returns the contents of this shard as of as_of once they are known. This may “block” (in an async-friendly way) if as_of is greater or equal to the current upper of the shard. The recipient should only downgrade their read capability when they are certain they have all data through the frontier they would downgrade to.

The Since error indicates that the requested as_of cannot be served (the caller has out of date information) and includes the smallest as_of that would have been accepted.

source

pub async fn subscribe( self, as_of: Antichain<T>, ) -> Result<Subscribe<K, V, T, D>, Since<T>>

Returns a snapshot of all of a shard’s data using as_of, followed by listening to any future updates.

For more details on this operation’s semantics, see Self::snapshot and Self::listen.

source

fn lease_batch_part( &mut self, desc: Description<T>, part: BatchPart<T>, filter: FetchBatchFilter<T>, ) -> LeasedBatchPart<T>

source

fn lease_batch_parts( &mut self, batch: HollowBatch<T>, filter: FetchBatchFilter<T>, ) -> impl Stream<Item = LeasedBatchPart<T>> + '_

source

fn lease_seqno(&mut self) -> Lease

Tracks that the ReadHandle’s machine’s current SeqNo is being “leased out” to a LeasedBatchPart, and cannot be garbage collected until its lease has been returned.

source

pub async fn clone(&self, purpose: &str) -> Self

Returns an independent ReadHandle with a new LeasedReaderId but the same since.

source

pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>)

A rate-limited version of Self::downgrade_since.

This is an internally rate limited helper, designed to allow users to call it as frequently as they like. Call this Self::downgrade_since, or Self::maybe_heartbeat_reader on some interval that is “frequent” compared to PersistConfig::FAKE_READ_LEASE_DURATION.

This is communicating actual progress information, so is given preferential treatment compared to Self::maybe_heartbeat_reader.

source

pub(crate) async fn maybe_heartbeat_reader(&mut self)

Heartbeats the read lease if necessary.

This is an internally rate limited helper, designed to allow users to call it as frequently as they like. Call this Self::downgrade_since, or Self::maybe_downgrade_since on some interval that is “frequent” compared to PersistConfig::FAKE_READ_LEASE_DURATION.

source

pub async fn expire(self)

Politely expires this reader, releasing its lease.

There is a best-effort impl in Drop to expire a reader that wasn’t explictly expired with this method. When possible, explicit expiry is still preferred because the Drop one is best effort and is dependant on a tokio Handle being available in the TLC at the time of drop (which is a bit subtle). Also, explicit expiry allows for control over when it happens.

source

fn expire_fn( machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, reader_id: LeasedReaderId, ) -> ExpireFn

source§

impl<K, V, T, D> ReadHandle<K, V, T, D>
where K: Debug + Codec + Ord, V: Debug + Codec + Ord, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Ord + Codec64 + Send + Sync,

source

pub async fn snapshot_and_fetch( &mut self, as_of: Antichain<T>, ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>>

Generates a Self::snapshot, and fetches all of the batches it contains.

The output is consolidated. Furthermore, to keep memory usage down when reading a snapshot that consolidates well, this consolidates as it goes.

Potential future improvements (if necessary):

  • Accept something like a F: Fn(K,V) -> (K,V) argument, which looks like an MFP you might be pushing down. Reason being that if you are projecting or transforming in a way that allows further consolidation, amazing.
  • Reuse any code we write to streaming-merge consolidate in persist_source here.
source

pub async fn snapshot_cursor( &mut self, as_of: Antichain<T>, should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, ) -> Result<Cursor<K, V, T, D>, Since<T>>

Generates a Self::snapshot, and fetches all of the batches it contains.

To keep memory usage down when reading a snapshot that consolidates well, this consolidates as it goes. However, note that only the serialized data is consolidated: the deserialized data will only be consolidated if your K/V codecs are one-to-one.

source

pub fn snapshot_stats( &self, as_of: Option<Antichain<T>>, ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static

Returns aggregate statistics about the contents of the shard TVC at the given frontier.

This command returns the contents of this shard as of as_of once they are known. This may “block” (in an async-friendly way) if as_of is greater or equal to the current upper of the shard. If None is given for as_of, then the latest stats known by this process are used.

The Since error indicates that the requested as_of cannot be served (the caller has out of date information) and includes the smallest as_of that would have been accepted.

source

pub async fn snapshot_parts_stats( &self, as_of: Antichain<T>, ) -> Result<SnapshotPartsStats, Since<T>>

Returns aggregate statistics about the contents of the shard TVC at the given frontier.

This command returns the contents of this shard as of as_of once they are known. This may “block” (in an async-friendly way) if as_of is greater or equal to the current upper of the shard.

The Since error indicates that the requested as_of cannot be served (the caller has out of date information) and includes the smallest as_of that would have been accepted.

source§

impl<K, V, T, D> ReadHandle<K, V, T, D>
where K: Debug + Codec + Ord, V: Debug + Codec + Ord, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

source

pub async fn snapshot_and_stream( &mut self, as_of: Antichain<T>, ) -> Result<impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)>, Since<T>>

Generates a Self::snapshot, and streams out all of the updates it contains in bounded memory.

The output is not consolidated.

Trait Implementations§

source§

impl<K: Debug + Codec, V: Debug + Codec, T: Debug, D: Debug> Debug for ReadHandle<K, V, T, D>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<K, V, T, D> Freeze for ReadHandle<K, V, T, D>
where T: Freeze,

§

impl<K, V, T, D> !RefUnwindSafe for ReadHandle<K, V, T, D>

§

impl<K, V, T, D> Send for ReadHandle<K, V, T, D>
where T: Send + Sync,

§

impl<K, V, T, D> Sync for ReadHandle<K, V, T, D>
where T: Sync + Send,

§

impl<K, V, T, D> Unpin for ReadHandle<K, V, T, D>
where T: Unpin,

§

impl<K, V, T, D> !UnwindSafe for ReadHandle<K, V, T, D>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more