Struct mz_persist_client::critical::SinceHandle

source ·
pub struct SinceHandle<K: Codec, V: Codec, T, D, O> {
    pub(crate) machine: Machine<K, V, T, D>,
    pub(crate) gc: GarbageCollector<K, V, T, D>,
    pub(crate) reader_id: CriticalReaderId,
    since: Antichain<T>,
    opaque: O,
    last_downgrade_since: EpochMillis,
}
Expand description

A “capability” granting the ability to hold back the since frontier of a shard.

In contrast to crate::read::ReadHandle, which is time-leased, this handle and its associated capability are not leased. A SinceHandle does not release its capability when dropped. This is less ergonomic, but useful for “critical” since holds which must survive even lease timeouts.

IMPORTANT: The above means that if a SinceHandle is registered and then lost, the shard’s since will be permanently “stuck”, forever preventing logical compaction. Users are advised to durably record (preferably in code) the intended CriticalReaderId before registering a SinceHandle (in case the process crashes at the wrong time).

All async methods on SinceHandle retry for as long as they are able, but the returned std::future::Futures implement “cancel on drop” semantics. This means that callers can add a timeout using tokio::time::timeout or tokio::time::timeout_at.

Fields§

§machine: Machine<K, V, T, D>§gc: GarbageCollector<K, V, T, D>§reader_id: CriticalReaderId§since: Antichain<T>§opaque: O§last_downgrade_since: EpochMillis

Implementations§

source§

impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync, O: Opaque + Codec64,

source

pub(crate) fn new( machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, reader_id: CriticalReaderId, since: Antichain<T>, opaque: O, ) -> Self

source

pub fn shard_id(&self) -> ShardId

This handle’s shard id.

source

pub fn since(&self) -> &Antichain<T>

This handle’s since capability.

This will always be greater or equal to the shard-global since.

source

pub fn opaque(&self) -> &O

This handle’s opaque.

source

pub async fn maybe_compare_and_downgrade_since( &mut self, expected: &O, new: (&O, &Antichain<T>), ) -> Option<Result<Antichain<T>, O>>

Attempts to forward the since capability of this handle to new_since iff the opaque value of this handle’s CriticalReaderId is expected, and Self::maybe_compare_and_downgrade_since chooses to perform the downgrade.

Users are expected to call this function frequently, but should not expect since to be downgraded with each call – this function is free to no-op requests to perform rate-limiting for downstream services. A None is returned for no-op requests, and Some is returned when downgrading since.

When returning Some(since), since will be set to the most recent value known for this critical reader ID, and is guaranteed to be !less_than(new_since).

Because SinceHandles are expected to live beyond process lifetimes, it’s possible for the same CriticalReaderId to be used concurrently from multiple processes (either intentionally or something like a zombie process). To discover this, Self::maybe_compare_and_downgrade_since has “compare and set” semantics over an opaque value. If the expected opaque value does not match state, an Err is returned and the caller must decide how to handle it (likely a retry or a halt!).

If desired, users may use the opaque value to fence out concurrent access of other SinceHandles for a given CriticalReaderId. e.g.:

use timely::progress::Antichain;
use mz_persist_client::critical::SinceHandle;
use mz_persist_types::Codec64;

let fencing_token: u64 = unimplemented!();
let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();

let new_since: Antichain<u64> = unimplemented!();
let res = since
    .maybe_compare_and_downgrade_since(
        &since.opaque().clone(),
        (&fencing_token, &new_since),
    )
    .await;

match res {
    Some(Ok(_)) => {
        // we downgraded since!
    }
    Some(Err(actual_fencing_token)) => {
        // compare `fencing_token` and `actual_fencing_token`, etc
    }
    None => {
        // no problem, we'll try again later
    }
}

If fencing is not required and it’s acceptable to have concurrent SinceHandle for a given CriticalReaderId, the opaque value can be given a default value and ignored:

use timely::progress::Antichain;
use mz_persist_client::critical::SinceHandle;
use mz_persist_types::Codec64;

let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
let new_since: Antichain<u64> = unimplemented!();
let res = since
    .maybe_compare_and_downgrade_since(
        &since.opaque().clone(),
        (&since.opaque().clone(), &new_since),
    )
    .await;

match res {
    Some(Ok(_)) => {
        // woohoo!
    }
    Some(Err(_actual_opaque)) => {
        panic!("the opaque value should never change from the default");
    }
    None => {
        // no problem, we'll try again later
    }
};
source

pub async fn compare_and_downgrade_since( &mut self, expected: &O, new: (&O, &Antichain<T>), ) -> Result<Antichain<T>, O>

Forwards the since capability of this handle to new_since iff the opaque value of this handle’s CriticalReaderId is expected, and new_since is beyond the current since.

Users are expected to call this function only when a guaranteed downgrade is necessary. All other downgrades should preferably go through Self::maybe_compare_and_downgrade_since which will automatically rate limit the operations.

When returning Ok(since), since will be set to the most recent value known for this critical reader ID, and is guaranteed to be !less_than(new_since).

Because SinceHandles are expected to live beyond process lifetimes, it’s possible for the same CriticalReaderId to be used concurrently from multiple processes (either intentionally or something like a zombie process). To discover this, Self::compare_and_downgrade_since has “compare and set” semantics over an opaque value. If the expected opaque value does not match state, an Err is returned and the caller must decide how to handle it (likely a retry or a halt!).

source

pub fn snapshot_stats( &self, as_of: Option<Antichain<T>>, ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static

Returns aggregate statistics about the contents of the shard TVC at the given frontier.

This command returns the contents of this shard as of as_of once they are known. This may “block” (in an async-friendly way) if as_of is greater or equal to the current upper of the shard. If None is given for as_of, then the latest stats known by this process are used.

The Since error indicates that the requested as_of cannot be served (the caller has out of date information) and includes the smallest as_of that would have been accepted.

Trait Implementations§

source§

impl<K: Debug + Codec, V: Debug + Codec, T: Debug, D: Debug, O: Debug> Debug for SinceHandle<K, V, T, D, O>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<K, V, T, D, O> Freeze for SinceHandle<K, V, T, D, O>
where O: Freeze, T: Freeze,

§

impl<K, V, T, D, O> !RefUnwindSafe for SinceHandle<K, V, T, D, O>

§

impl<K, V, T, D, O> Send for SinceHandle<K, V, T, D, O>
where O: Send, T: Send + Sync,

§

impl<K, V, T, D, O> Sync for SinceHandle<K, V, T, D, O>
where O: Sync, T: Sync + Send,

§

impl<K, V, T, D, O> Unpin for SinceHandle<K, V, T, D, O>
where O: Unpin, T: Unpin,

§

impl<K, V, T, D, O> !UnwindSafe for SinceHandle<K, V, T, D, O>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more