Struct mz_compute::compute_state::ActiveComputeState
source · pub(crate) struct ActiveComputeState<'a, A: Allocate> {
pub timely_worker: &'a mut Worker<A>,
pub compute_state: &'a mut ComputeState,
pub response_tx: &'a mut ResponseSender,
}
Expand description
A wrapper around ComputeState with a live timely worker and response channel.
Fields§
§timely_worker: &'a mut Worker<A>
The underlying Timely worker.
compute_state: &'a mut ComputeState
The compute state itself.
response_tx: &'a mut ResponseSender
The channel over which frontier information is reported.
Implementations§
source§impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A>
impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A>
sourcepub fn handle_compute_command(&mut self, cmd: ComputeCommand)
pub fn handle_compute_command(&mut self, cmd: ComputeCommand)
Entrypoint for applying a compute command.
fn handle_create_instance(&mut self, config: InstanceConfig)
fn handle_update_configuration(&mut self, params: ComputeParameters)
fn handle_create_dataflow( &mut self, dataflow: DataflowDescription<FlatPlan, CollectionMetadata>, )
fn handle_schedule(&mut self, id: GlobalId)
fn handle_allow_compaction( &mut self, id: GlobalId, frontier: Antichain<Timestamp>, )
fn handle_peek(&mut self, peek: Peek)
fn handle_cancel_peek(&mut self, uuid: Uuid)
sourcefn drop_collection(&mut self, id: GlobalId)
fn drop_collection(&mut self, id: GlobalId)
Arrange for the given collection to be dropped.
Collection dropping occurs in three phases:
- This method removes the collection from the
ComputeState
and drops itsCollectionState
, including its held dataflow tokens. It then adds the dropped collection todropped_collections
. - The next step of the Timely worker lets the source operators observe the token drops and shut themselves down.
report_dropped_collections
removes the entry fromdropped_collections
and emits any outstanding final responses required by the compute protocol.
These steps ensure that we don’t report a collection as dropped to the controller before it has stopped reading from its inputs. Doing so would allow the controller to release its read holds on the inputs, which could lead to panics from the replica trying to read already compacted times.
sourcepub fn initialize_logging(&mut self, config: LoggingConfig)
pub fn initialize_logging(&mut self, config: LoggingConfig)
Initializes timely dataflow logging and publishes as a view.
sourcepub fn report_frontiers(&mut self)
pub fn report_frontiers(&mut self)
Send progress information to the controller.
sourcepub fn report_dropped_collections(&mut self)
pub fn report_dropped_collections(&mut self)
Report dropped collections to the controller.
sourcepub fn report_operator_hydration(&self)
pub fn report_operator_hydration(&self)
Report operator hydration events.
sourcepub(crate) fn report_metrics(&self)
pub(crate) fn report_metrics(&self)
Report per-worker metrics.
sourcefn process_peek(&mut self, upper: &mut Antichain<Timestamp>, peek: PendingPeek)
fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, peek: PendingPeek)
Either complete the peek (and send the response) or put it in the pending set.
sourcepub fn process_peeks(&mut self)
pub fn process_peeks(&mut self)
Scan pending peeks and attempt to retire each.
sourcefn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse)
fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse)
Sends a response for this peek’s resolution to the coordinator.
Note that this function takes ownership of the PendingPeek
, which is
meant to prevent multiple responses to the same peek.
sourcepub fn process_subscribes(&mut self)
pub fn process_subscribes(&mut self)
Scan the shared subscribe response buffer, and forward results along.
sourcepub fn process_copy_tos(&self)
pub fn process_copy_tos(&self)
Scan the shared copy to response buffer, and forward results along.
sourcefn send_compute_response(&self, response: ComputeResponse)
fn send_compute_response(&self, response: ComputeResponse)
Send a response to the coordinator.
sourcepub(crate) fn check_expiration(&self)
pub(crate) fn check_expiration(&self)
Checks for dataflow expiration. Panics if we’re past the replica expiration time.
sourcepub fn determine_dataflow_expiration(
&self,
time_dependence: &TimeDependence,
until: &Antichain<Timestamp>,
) -> Antichain<Timestamp>
pub fn determine_dataflow_expiration( &self, time_dependence: &TimeDependence, until: &Antichain<Timestamp>, ) -> Antichain<Timestamp>
Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be dropped.
Returns an empty timestamp if replica_expiration
is unset or matches conditions under
which dataflow expiration should be disabled.
Auto Trait Implementations§
impl<'a, A> Freeze for ActiveComputeState<'a, A>
impl<'a, A> !RefUnwindSafe for ActiveComputeState<'a, A>
impl<'a, A> !Send for ActiveComputeState<'a, A>
impl<'a, A> !Sync for ActiveComputeState<'a, A>
impl<'a, A> Unpin for ActiveComputeState<'a, A>
impl<'a, A> !UnwindSafe for ActiveComputeState<'a, A>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.