Struct mz_storage::source::reclock::ReclockFollower
source · pub struct ReclockFollower<FromTime: Timestamp, IntoTime: Timestamp + Lattice + Display> {
since: Antichain<IntoTime>,
pub inner: Rc<RefCell<ReclockFollowerInner<FromTime, IntoTime>>>,
}
Expand description
A “follower” for the ReclockOperator, that maintains a trace based on the results of reclocking
and data from the source. It provides the reclock
method, which produces messages with their
associated timestamps.
Shareable with .share()
Fields§
§since: Antichain<IntoTime>
The since
maintained by the local handle. This may be beyond the shared since
inner: Rc<RefCell<ReclockFollowerInner<FromTime, IntoTime>>>
Implementations§
source§impl<FromTime, IntoTime> ReclockFollower<FromTime, IntoTime>where
FromTime: Timestamp,
IntoTime: Timestamp + Lattice + Display,
impl<FromTime, IntoTime> ReclockFollower<FromTime, IntoTime>where FromTime: Timestamp, IntoTime: Timestamp + Lattice + Display,
sourcepub fn new(as_of: Antichain<IntoTime>) -> Self
pub fn new(as_of: Antichain<IntoTime>) -> Self
Constructs a new ReclockFollower
pub fn source_upper(&self) -> Antichain<FromTime>
pub fn initialized(&self) -> bool
sourcepub fn push_trace_batch(&mut self, batch: ReclockBatch<FromTime, IntoTime>)
pub fn push_trace_batch(&mut self, batch: ReclockBatch<FromTime, IntoTime>)
Pushes a new trace batch into this ReclockFollower
.
sourcepub fn reclock<'a, M: 'a>(
&'a self,
batch: impl IntoIterator<Item = (M, FromTime)> + 'a
) -> impl Iterator<Item = (M, Result<IntoTime, ReclockError<FromTime>>)> + 'awhere
IntoTime: TotalOrder,
pub fn reclock<'a, M: 'a>( &'a self, batch: impl IntoIterator<Item = (M, FromTime)> + 'a ) -> impl Iterator<Item = (M, Result<IntoTime, ReclockError<FromTime>>)> + 'awhere IntoTime: TotalOrder,
Reclocks a batch of messages timestamped with FromTime
and returns an iterator of
messages timestamped with IntoTime
.
Each item of the resulting iterator will be associated with either the time it should be reclocked to or an error indicating that a reclocking decision could not be taken with the data that we have at hand.
This method is most efficient when the to be reclocked iterator presents data in contiguous
runs with the same FromTime
.
sourcepub fn reclock_time(
&self,
src_ts: &FromTime
) -> Result<Antichain<IntoTime>, ReclockError<FromTime>>
pub fn reclock_time( &self, src_ts: &FromTime ) -> Result<Antichain<IntoTime>, ReclockError<FromTime>>
Reclocks a single FromTime
timestamp into the IntoTime
time domain.
sourcepub fn reclock_time_total(
&self,
src_ts: &FromTime
) -> Result<IntoTime, ReclockError<FromTime>>where
IntoTime: TotalOrder,
pub fn reclock_time_total( &self, src_ts: &FromTime ) -> Result<IntoTime, ReclockError<FromTime>>where IntoTime: TotalOrder,
Reclocks a single FromTime
timestamp into a totally ordered IntoTime
time domain.
sourcepub fn reclock_frontier(
&self,
source_frontier: AntichainRef<'_, FromTime>
) -> Result<Antichain<IntoTime>, ReclockError<FromTime>>
pub fn reclock_frontier( &self, source_frontier: AntichainRef<'_, FromTime> ) -> Result<Antichain<IntoTime>, ReclockError<FromTime>>
Reclocks a FromTime
frontier into a IntoTime
frontier.
The conversion has the property that all messages that are beyond the provided FromTime
frontier will be relocked at times that will be beyond the returned IntoTime
frontier.
This can be used to drive a IntoTime
capability forward when the caller knows that a
FromTime
frontier has advanced.
The method returns an error if the FromTime
frontier is not beyond the since frontier.
The error will contain the offending FromTime
.
sourcepub fn source_upper_at_frontier<'a>(
&self,
frontier: AntichainRef<'a, IntoTime>
) -> Result<Antichain<FromTime>, ReclockError<AntichainRef<'a, IntoTime>>>
pub fn source_upper_at_frontier<'a>( &self, frontier: AntichainRef<'a, IntoTime> ) -> Result<Antichain<FromTime>, ReclockError<AntichainRef<'a, IntoTime>>>
Reclocks an IntoTime
frontier into a FromTime
frontier.
The conversion has the property that all messages that would be reclocked to times beyond
the provided IntoTime
frontier will be beyond the returned FromTime
frontier. This can
be used to compute a safe starting point to resume producing an IntoTime
collection at a
particular frontier.
sourcepub fn compact(&mut self, new_since: Antichain<IntoTime>)
pub fn compact(&mut self, new_since: Antichain<IntoTime>)
Compacts the trace held by this reclock follower to the specified frontier.
Reclocking has the property that it commutes with compaction. What this means is that reclocking a collection and then compacting the result to some frontier F will produce exactly the same result with first compacting the remap trace to frontier F and then reclocking the collection.
pub fn since(&self) -> AntichainRef<'_, IntoTime>
Trait Implementations§
Auto Trait Implementations§
impl<FromTime, IntoTime> !RefUnwindSafe for ReclockFollower<FromTime, IntoTime>
impl<FromTime, IntoTime> !Send for ReclockFollower<FromTime, IntoTime>
impl<FromTime, IntoTime> !Sync for ReclockFollower<FromTime, IntoTime>
impl<FromTime, IntoTime> Unpin for ReclockFollower<FromTime, IntoTime>where IntoTime: Unpin,
impl<FromTime, IntoTime> !UnwindSafe for ReclockFollower<FromTime, IntoTime>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_mut()
into the pipe
function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
.tap_ref_mut()
only in debug builds, and is erased in release
builds.