Struct mz_persist_client::critical::SinceHandle
source · pub struct SinceHandle<K: Codec, V: Codec, T, D, O> {
pub(crate) machine: Machine<K, V, T, D>,
pub(crate) gc: GarbageCollector<K, V, T, D>,
pub(crate) reader_id: CriticalReaderId,
since: Antichain<T>,
opaque: O,
last_downgrade_since: EpochMillis,
}
Expand description
A “capability” granting the ability to hold back the since
frontier of a
shard.
In contrast to crate::read::ReadHandle, which is time-leased, this handle and its associated capability are not leased. A SinceHandle does not release its capability when dropped. This is less ergonomic, but useful for “critical” since holds which must survive even lease timeouts.
IMPORTANT: The above means that if a SinceHandle is registered and then lost, the shard’s since will be permanently “stuck”, forever preventing logical compaction. Users are advised to durably record (preferably in code) the intended CriticalReaderId before registering a SinceHandle (in case the process crashes at the wrong time).
All async methods on SinceHandle retry for as long as they are able, but the returned std::future::Futures implement “cancel on drop” semantics. This means that callers can add a timeout using tokio::time::timeout or tokio::time::timeout_at.
Fields§
§machine: Machine<K, V, T, D>
§gc: GarbageCollector<K, V, T, D>
§reader_id: CriticalReaderId
§since: Antichain<T>
§opaque: O
§last_downgrade_since: EpochMillis
Implementations§
source§impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
pub(crate) fn new( machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, reader_id: CriticalReaderId, since: Antichain<T>, opaque: O, ) -> Self
sourcepub fn since(&self) -> &Antichain<T>
pub fn since(&self) -> &Antichain<T>
This handle’s since
capability.
This will always be greater or equal to the shard-global since
.
sourcepub async fn maybe_compare_and_downgrade_since(
&mut self,
expected: &O,
new: (&O, &Antichain<T>),
) -> Option<Result<Antichain<T>, O>>
pub async fn maybe_compare_and_downgrade_since( &mut self, expected: &O, new: (&O, &Antichain<T>), ) -> Option<Result<Antichain<T>, O>>
Attempts to forward the since capability of this handle to new_since
iff
the opaque value of this handle’s CriticalReaderId is expected
, and
Self::maybe_compare_and_downgrade_since chooses to perform the downgrade.
Users are expected to call this function frequently, but should not expect
since
to be downgraded with each call – this function is free to no-op
requests to perform rate-limiting for downstream services. A None
is returned
for no-op requests, and Some
is returned when downgrading since.
When returning Some(since)
, since
will be set to the most recent value
known for this critical reader ID, and is guaranteed to be !less_than(new_since)
.
Because SinceHandles are expected to live beyond process lifetimes, it’s
possible for the same CriticalReaderId to be used concurrently from
multiple processes (either intentionally or something like a zombie
process). To discover this, Self::maybe_compare_and_downgrade_since has
“compare and set” semantics over an opaque value. If the expected
opaque
value does not match state, an Err
is returned and the caller must decide
how to handle it (likely a retry or a halt!
).
If desired, users may use the opaque value to fence out concurrent access of other SinceHandles for a given CriticalReaderId. e.g.:
use timely::progress::Antichain;
use mz_persist_client::critical::SinceHandle;
use mz_persist_types::Codec64;
let fencing_token: u64 = unimplemented!();
let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
let new_since: Antichain<u64> = unimplemented!();
let res = since
.maybe_compare_and_downgrade_since(
&since.opaque().clone(),
(&fencing_token, &new_since),
)
.await;
match res {
Some(Ok(_)) => {
// we downgraded since!
}
Some(Err(actual_fencing_token)) => {
// compare `fencing_token` and `actual_fencing_token`, etc
}
None => {
// no problem, we'll try again later
}
}
If fencing is not required and it’s acceptable to have concurrent SinceHandle for a given CriticalReaderId, the opaque value can be given a default value and ignored:
use timely::progress::Antichain;
use mz_persist_client::critical::SinceHandle;
use mz_persist_types::Codec64;
let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
let new_since: Antichain<u64> = unimplemented!();
let res = since
.maybe_compare_and_downgrade_since(
&since.opaque().clone(),
(&since.opaque().clone(), &new_since),
)
.await;
match res {
Some(Ok(_)) => {
// woohoo!
}
Some(Err(_actual_opaque)) => {
panic!("the opaque value should never change from the default");
}
None => {
// no problem, we'll try again later
}
};
sourcepub async fn compare_and_downgrade_since(
&mut self,
expected: &O,
new: (&O, &Antichain<T>),
) -> Result<Antichain<T>, O>
pub async fn compare_and_downgrade_since( &mut self, expected: &O, new: (&O, &Antichain<T>), ) -> Result<Antichain<T>, O>
Forwards the since capability of this handle to new_since
iff the opaque value of this
handle’s CriticalReaderId is expected
, and new_since
is beyond the
current since
.
Users are expected to call this function only when a guaranteed downgrade is necessary. All other downgrades should preferably go through Self::maybe_compare_and_downgrade_since which will automatically rate limit the operations.
When returning Ok(since)
, since
will be set to the most recent value known for this
critical reader ID, and is guaranteed to be !less_than(new_since)
.
Because SinceHandles are expected to live beyond process lifetimes, it’s possible for the
same CriticalReaderId to be used concurrently from multiple processes (either
intentionally or something like a zombie process). To discover this,
Self::compare_and_downgrade_since has “compare and set” semantics over an opaque value.
If the expected
opaque value does not match state, an Err
is returned and the caller
must decide how to handle it (likely a retry or a halt!
).
sourcepub fn snapshot_stats(
&self,
as_of: Option<Antichain<T>>,
) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
pub fn snapshot_stats( &self, as_of: Option<Antichain<T>>, ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
Returns aggregate statistics about the contents of the shard TVC at the given frontier.
This command returns the contents of this shard as of as_of
once they
are known. This may “block” (in an async-friendly way) if as_of
is
greater or equal to the current upper
of the shard. If None
is given
for as_of
, then the latest stats known by this process are used.
The Since
error indicates that the requested as_of
cannot be served
(the caller has out of date information) and includes the smallest
as_of
that would have been accepted.
Trait Implementations§
Auto Trait Implementations§
impl<K, V, T, D, O> Freeze for SinceHandle<K, V, T, D, O>
impl<K, V, T, D, O> !RefUnwindSafe for SinceHandle<K, V, T, D, O>
impl<K, V, T, D, O> Send for SinceHandle<K, V, T, D, O>
impl<K, V, T, D, O> Sync for SinceHandle<K, V, T, D, O>
impl<K, V, T, D, O> Unpin for SinceHandle<K, V, T, D, O>
impl<K, V, T, D, O> !UnwindSafe for SinceHandle<K, V, T, D, O>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.