struct DifferentialWriteTask<T, R>where
T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, Diff>> + Send>> + Send + 'static,{
id: GlobalId,
write_handle: WriteHandle<SourceData, (), T, Diff>,
read_handle_fn: R,
read_only: bool,
now: NowFn,
upper_tick_interval: Interval,
cmd_rx: UnboundedReceiver<(StorageWriteOp, Sender<Result<(), StorageError<T>>>)>,
shutdown_rx: Receiver<()>,
desired: Vec<(Row, i64)>,
to_write: Vec<(Row, i64)>,
current_upper: T,
}
Expand description
A task that will make it so that the state in persist matches the desired state and continuously bump the upper for the specified collection.
NOTE: This implementation is a bit clunky, and could be optimized by not keeping all of desired in memory (see commend below). It is meant to showcase the general approach.
Fields§
§id: GlobalId
The collection that we are writing to.
write_handle: WriteHandle<SourceData, (), T, Diff>
§read_handle_fn: R
For getting a ReadHandle
to sync our state to persist contents.
read_only: bool
§now: NowFn
§upper_tick_interval: Interval
In the absence of updates, we regularly bump the upper to “now”, on this interval. This makes it so the collection remains readable at recent timestamps.
cmd_rx: UnboundedReceiver<(StorageWriteOp, Sender<Result<(), StorageError<T>>>)>
Receiver for write commands. These change our desired state.
shutdown_rx: Receiver<()>
We have to shut down when receiving from this.
desired: Vec<(Row, i64)>
The contents of the collection as it should be according to whoever is driving us around.
to_write: Vec<(Row, i64)>
Updates that we have to write when next writing to persist. This is determined by looking at what is desired and what is in persist.
current_upper: T
Current upper of the persist shard. We keep track of this so that we
realize when someone else writes to the shard, in which case we have to
update our state of the world, that is update our to_write
based on
desired
and the contents of the persist shard.
Implementations§
source§impl<T, R> DifferentialWriteTask<T, R>where
T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, Diff>> + Send>> + Send + Sync + 'static,
impl<T, R> DifferentialWriteTask<T, R>where
T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, Diff>> + Send>> + Send + Sync + 'static,
sourcefn spawn(
id: GlobalId,
write_handle: WriteHandle<SourceData, (), T, Diff>,
read_handle_fn: R,
read_only: bool,
now: NowFn,
introspection_config: DifferentialIntrospectionConfig<T>,
) -> (UnboundedSender<(StorageWriteOp, Sender<Result<(), StorageError<T>>>)>, AbortOnDropHandle<()>, Sender<()>)
fn spawn( id: GlobalId, write_handle: WriteHandle<SourceData, (), T, Diff>, read_handle_fn: R, read_only: bool, now: NowFn, introspection_config: DifferentialIntrospectionConfig<T>, ) -> (UnboundedSender<(StorageWriteOp, Sender<Result<(), StorageError<T>>>)>, AbortOnDropHandle<()>, Sender<()>)
Spawns a DifferentialWriteTask
in an mz_ore::task
and returns
handles for interacting with it.
sourceasync fn prepare(
&self,
introspection_config: DifferentialIntrospectionConfig<T>,
)
async fn prepare( &self, introspection_config: DifferentialIntrospectionConfig<T>, )
Does any work that is required before this background task starts writing to the given introspection collection.
This might include consolidation, deleting older entries or seeding in-memory state of, say, scrapers, with current collection contents.
async fn run(self) -> ControlFlow<String>
async fn tick_upper(&mut self) -> ControlFlow<String>
fn handle_shutdown(&mut self)
async fn handle_updates( &mut self, batch: &mut Vec<(StorageWriteOp, Sender<Result<(), StorageError<T>>>)>, ) -> ControlFlow<String>
sourcefn apply_write_op(&mut self, op: StorageWriteOp)
fn apply_write_op(&mut self, op: StorageWriteOp)
Apply the given write operation to the desired
/to_write
state.
sourceasync fn write_to_persist(
&mut self,
responders: Vec<Sender<Result<(), StorageError<T>>>>,
) -> ControlFlow<String>
async fn write_to_persist( &mut self, responders: Vec<Sender<Result<(), StorageError<T>>>>, ) -> ControlFlow<String>
Attempt to write what is currently in Self::to_write to persist, retrying and re-syncing to persist when necessary, that is when the upper was not what we expected.
sourceasync fn sync_to_persist(&mut self)
async fn sync_to_persist(&mut self)
Re-derives Self::to_write by looking at Self::desired and the current state in persist. We want to insert everything in desired and retract everything in persist. But ideally most of that cancels out in consolidation.
To be called when a compare_and_append
failed because the upper didn’t
match what we expected.
Auto Trait Implementations§
impl<T, R> Freeze for DifferentialWriteTask<T, R>
impl<T, R> !RefUnwindSafe for DifferentialWriteTask<T, R>
impl<T, R> Send for DifferentialWriteTask<T, R>
impl<T, R> Sync for DifferentialWriteTask<T, R>where
R: Sync,
impl<T, R> Unpin for DifferentialWriteTask<T, R>
impl<T, R> !UnwindSafe for DifferentialWriteTask<T, R>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.