Struct mz_storage::source::reclock::ReclockOperator
source · pub struct ReclockOperator<FromTime: Timestamp, IntoTime: Timestamp + Lattice, Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>, Clock> {
upper: Antichain<IntoTime>,
source_upper: MutableAntichain<FromTime>,
remap_handle: Handle,
clock_stream: Clock,
}
Expand description
The ReclockOperator
is responsible for observing progress in the FromTime
domain and
consume messages from a ticker of progress in the IntoTime
domain. When the source frontier
advances and the ticker ticks the ReclockOperator
will generate the data that describe this
correspondence and write them out to its provided remap handle. The output generated by the
reclock operator can be thought of as Collection<G, FromTime>
where G::Timestamp
is
IntoTime
.
The ReclockOperator
will always maintain the invariant that for any time IntoTime
the remap
collection accumulates into an Antichain where each FromTime
timestamp has frequency 1
. In
other words the remap collection describes a well formed Antichain<FromTime>
as it is
marching forwards.
Fields§
§upper: Antichain<IntoTime>
Upper frontier of the partial remap trace
source_upper: MutableAntichain<FromTime>
The upper frontier in terms of FromTime
. Any attempt to reclock messages beyond this
frontier will lead to minting new bindings.
remap_handle: Handle
A handle allowing this operator to publish updates to and read back from the remap collection
clock_stream: Clock
A stream of IntoTime values and upper frontiers, used to drive minting bindings In the future this will be a timely input to the reclock operator
Implementations§
source§impl<FromTime, IntoTime, Handle, Clock> ReclockOperator<FromTime, IntoTime, Handle, Clock>where
FromTime: Timestamp,
IntoTime: Timestamp + Lattice,
Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
Clock: Stream<Item = (IntoTime, Antichain<IntoTime>)> + Unpin,
impl<FromTime, IntoTime, Handle, Clock> ReclockOperator<FromTime, IntoTime, Handle, Clock>where FromTime: Timestamp, IntoTime: Timestamp + Lattice, Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>, Clock: Stream<Item = (IntoTime, Antichain<IntoTime>)> + Unpin,
sourcepub async fn new(
remap_handle: Handle,
clock_stream: Clock
) -> (Self, ReclockBatch<FromTime, IntoTime>)
pub async fn new( remap_handle: Handle, clock_stream: Clock ) -> (Self, ReclockBatch<FromTime, IntoTime>)
Construct a new ReclockOperator from the given collection metadata
sourcepub async fn advance(&mut self) -> ReclockBatch<FromTime, IntoTime>
pub async fn advance(&mut self) -> ReclockBatch<FromTime, IntoTime>
Advances the upper of the reclock operator if appropriate
sourceasync fn sync(
&mut self,
target_upper: AntichainRef<'_, IntoTime>
) -> ReclockBatch<FromTime, IntoTime>
async fn sync( &mut self, target_upper: AntichainRef<'_, IntoTime> ) -> ReclockBatch<FromTime, IntoTime>
Syncs the state of this operator to match that of the persist shard until the provided frontier
pub async fn mint( &mut self, new_source_upper: AntichainRef<'_, FromTime> ) -> ReclockBatch<FromTime, IntoTime>
sourceasync fn append_batch(
&mut self,
updates: Vec<(FromTime, IntoTime, Diff)>,
new_upper: Antichain<IntoTime>
) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>>
async fn append_batch( &mut self, updates: Vec<(FromTime, IntoTime, Diff)>, new_upper: Antichain<IntoTime> ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>>
Appends the provided updates to the remap collection at the next available minting IntoTime and updates this operator’s in-memory state accordingly.
If an attempt to mint bindings fails due to another process having raced and appended bindings concurrently then the current global upper will be returned as an error. This is the frontier that this operator must be synced to for a future append attempt to have any chance of success.
Trait Implementations§
Auto Trait Implementations§
impl<FromTime, IntoTime, Handle, Clock> RefUnwindSafe for ReclockOperator<FromTime, IntoTime, Handle, Clock>where Clock: RefUnwindSafe, FromTime: RefUnwindSafe, Handle: RefUnwindSafe, IntoTime: RefUnwindSafe,
impl<FromTime, IntoTime, Handle, Clock> Send for ReclockOperator<FromTime, IntoTime, Handle, Clock>where Clock: Send, Handle: Send,
impl<FromTime, IntoTime, Handle, Clock> Sync for ReclockOperator<FromTime, IntoTime, Handle, Clock>where Clock: Sync, Handle: Sync,
impl<FromTime, IntoTime, Handle, Clock> Unpin for ReclockOperator<FromTime, IntoTime, Handle, Clock>where Clock: Unpin, FromTime: Unpin, Handle: Unpin, IntoTime: Unpin,
impl<FromTime, IntoTime, Handle, Clock> UnwindSafe for ReclockOperator<FromTime, IntoTime, Handle, Clock>where Clock: UnwindSafe, FromTime: UnwindSafe, Handle: UnwindSafe, IntoTime: UnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_mut()
into the pipe
function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
.tap_ref_mut()
only in debug builds, and is erased in release
builds.