Enum mz_compute_client::protocol::command::ComputeCommand
source · pub enum ComputeCommand<T = Timestamp> {
CreateTimely {
config: TimelyConfig,
epoch: ClusterStartupEpoch,
},
CreateInstance(InstanceConfig),
InitializationComplete,
UpdateConfiguration(ComputeParameters),
CreateDataflow(DataflowDescription<FlatPlan<T>, CollectionMetadata, T>),
Schedule(GlobalId),
AllowCompaction {
id: GlobalId,
frontier: Antichain<T>,
},
Peek(Peek<T>),
CancelPeek {
uuid: Uuid,
},
}
Expand description
Compute protocol commands, sent by the compute controller to replicas.
Command sequences sent by the compute controller must be valid according to the Protocol Stages.
Variants§
CreateTimely
Fields
config: TimelyConfig
TODO(#25239): Add documentation.
epoch: ClusterStartupEpoch
TODO(#25239): Add documentation.
CreateTimely
is the first command sent to a replica after a connection was established.
It instructs the replica to initialize the timely dataflow runtime using the given
config
.
This command is special in that it is broadcast to all workers of a multi-worker replica.
All subsequent commands, except UpdateConfiguration
, are only sent to the first worker,
which then distributes them to the other workers using a dataflow. This method of command
distribution requires the timely dataflow runtime to be initialized, which is why the
CreateTimely
command exists.
The epoch
value imposes an ordering on iterations of the compute protocol. When the
compute controller connects to a replica, it must send an epoch
that is greater than all
epochs it sent to the same replica on previous connections. Multi-process replicas should
use the epoch
to ensure that their individual processes agree on which protocol iteration
they are in.
CreateInstance(InstanceConfig)
CreateInstance
must be sent after CreateTimely
to complete the Creation Stage of the
compute protocol. Unlike CreateTimely
, it is only sent to the first worker of the
replica, and then distributed through the timely runtime. CreateInstance
instructs the
replica to initialize its state to a point where it is ready to start maintaining
dataflows.
Upon receiving a CreateInstance
command, the replica must further initialize logging
dataflows according to the given LoggingConfig
.
InitializationComplete
InitializationComplete
informs the replica about the end of the Initialization Stage.
Upon receiving this command, the replica should perform a reconciliation process, to ensure
its dataflow state matches the state requested by the computation commands it received
previously. The replica must now start sending responses to commands received previously,
if it opted to defer them during the Initialization Stage.
UpdateConfiguration(ComputeParameters)
UpdateConfiguration
instructs the replica to update its configuration, according to the
given ComputeParameters
.
This command is special in that, like CreateTimely
, it is broadcast to all workers of the
replica. However, unlike CreateTimely
, it is ignored by all workers except the first one,
which distributes the command to the other workers through the timely runtime.
UpdateConfiguration
commands are broadcast only to allow the intermediary parts of the
networking fabric to observe them and learn of configuration updates.
Parameter updates transmitted through this command must be applied by the replica as soon
as it receives the command, and they must be applied globally to all replica state, even
dataflows and pending peeks that were created before the parameter update. This property
allows the replica to hoist UpdateConfiguration
commands during reconciliation.
Configuration parameters that should not be applied globally, but only to specific
dataflows or peeks, should be added to the DataflowDescription
or Peek
types,
rather than as ComputeParameters
.
CreateDataflow(DataflowDescription<FlatPlan<T>, CollectionMetadata, T>)
CreateDataflow
instructs the replica to create a dataflow according to the given
DataflowDescription
.
The DataflowDescription
must have the following properties:
- Dataflow imports are valid:
- Imported storage collections specified in
source_imports
exist and are readable by the compute replica. - Imported indexes specified in
index_imports
have been created on the replica previously, by previousCreateDataflow
commands.
- Imported storage collections specified in
- Dataflow imports are readable at the specified
as_of
. In other words: Thesince
s of imported collections are not beyond the dataflowas_of
. - Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is instructed to create do not repeat (within a single protocol iteration).
- The dataflow objects defined in
objects_to_build
are topologically ordered according to the dependency relation.
A dataflow description that violates any of the above properties can cause the replica to exhibit undefined behavior, such as panicking or production of incorrect results. A replica should prefer panicking over producing incorrect results.
After receiving a CreateDataflow
command, if the created dataflow exports indexes or
storage sinks, the replica must produce FrontierUpper
responses that report the
advancement of the upper
frontiers of these compute collections.
After receiving a CreateDataflow
command, if the created dataflow exports subscribes, the
replica must produce SubscribeResponse
s that report the progress and results of the
subscribes.
The replica may create the dataflow in a suspended state and defer starting the computation
until it receives a corresponding Schedule
command. Thus, to ensure dataflow execution,
the compute controller should eventually send a Schedule
command for each sent
CreateDataflow
command.
Schedule(GlobalId)
Schedule
allows the replica to start computation for a compute collection.
It is invalid to send a Schedule
command that references a collection that was not
created by a corresponding CreateDataflow
command before. Doing so may cause the replica
to exhibit undefined behavior.
It is also invalid to send a Schedule
command that references a collection that has,
through an AllowCompaction
command, been allowed to compact to the empty frontier before.
AllowCompaction
Fields
AllowCompaction
informs the replica about the relaxation of external read capabilities on
a compute collection exported by one of the replica’s dataflow.
The command names a collection and provides a frontier after which accumulations must be correct. The replica gains the liberty of compacting the corresponding maintained trace up through that frontier.
It is invalid to send an AllowCompaction
command that references a compute collection
that was not created by a corresponding CreateDataflow
command before. Doing so may cause
the replica to exhibit undefined behavior.
The AllowCompaction
command only informs about external read requirements, not internal
ones. The replica is responsible for ensuring that internal requirements are fulfilled at
all times, so local dataflow inputs are not compacted beyond times at which they are still
being read from.
The read frontiers transmitted through AllowCompaction
s may be beyond the corresponding
collections’ current upper
frontiers. This signals that external readers are not
interested in times up to the specified new read frontiers. Consequently, an empty read
frontier signals that external readers are not interested in updates from the corresponding
collection ever again, so the collection is not required anymore.
Sending an AllowCompaction
command with the empty frontier is the canonical way to drop
compute collections.
A replica that receives an AllowCompaction
command with the empty frontier must
eventually respond with a FrontierUpper
response reporting the empty frontier for the
same collection. (#16275)
Peek(Peek<T>)
Peek
instructs the replica to perform a peek on a collection: either an index or a
Persist-backed collection.
The Peek
description must have the following properties:
- If targeting an index, it has previously been created by a corresponding
CreateDataflow
command. (If targeting a persist collection, that collection should exist.) - The
Peek::uuid
is unique, i.e., the UUIDs of peeks a replica gets instructed to perform do not repeat (within a single protocol iteration).
A Peek
description that violates any of the above properties can cause the replica to
exhibit undefined behavior.
Specifying a Peek::timestamp
that is less than the target index’s since
frontier does
not provoke undefined behavior. Instead, the replica must produce a PeekResponse::Error
in response.
After receiving a Peek
command, the replica must eventually produce a single
PeekResponse
:
CancelPeek
Fields
uuid: Uuid
The identifier of the peek request to cancel.
This Value must match a Peek::uuid
value transmitted in a previous Peek
command.
CancelPeek
instructs the replica to cancel the identified pending peek.
It is invalid to send a CancelPeek
command that references a peek that was not created
by a corresponding Peek
command before. Doing so may cause the replica to exhibit
undefined behavior.
If a replica cancels a peek in response to a CancelPeek
command, it must respond with a
PeekResponse::Canceled
. The replica may also decide to fulfill the peek instead and
return a different PeekResponse
, or it may already have returned a response to the
specified peek. In these cases it must not return another PeekResponse
.
Trait Implementations§
source§impl Arbitrary for ComputeCommand<Timestamp>
impl Arbitrary for ComputeCommand<Timestamp>
§type Strategy = Union<BoxedStrategy<ComputeCommand>>
type Strategy = Union<BoxedStrategy<ComputeCommand>>
Strategy
used to generate values of type Self
.§type Parameters = ()
type Parameters = ()
arbitrary_with
accepts for configuration
of the generated Strategy
. Parameters must implement Default
.source§fn arbitrary_with(_: Self::Parameters) -> Self::Strategy
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy
source§impl<T: Clone> Clone for ComputeCommand<T>
impl<T: Clone> Clone for ComputeCommand<T>
source§fn clone(&self) -> ComputeCommand<T>
fn clone(&self) -> ComputeCommand<T>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl<T: Debug> Debug for ComputeCommand<T>
impl<T: Debug> Debug for ComputeCommand<T>
source§impl<'de, T> Deserialize<'de> for ComputeCommand<T>where
T: Deserialize<'de>,
impl<'de, T> Deserialize<'de> for ComputeCommand<T>where
T: Deserialize<'de>,
source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
source§impl<T: Send> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for Box<dyn ComputeClient<T>>
impl<T: Send> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for Box<dyn ComputeClient<T>>
source§fn send<'life0, 'async_trait>(
&'life0 mut self,
cmd: ComputeCommand<T>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn send<'life0, 'async_trait>(
&'life0 mut self,
cmd: ComputeCommand<T>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§impl<T: PartialEq> PartialEq for ComputeCommand<T>
impl<T: PartialEq> PartialEq for ComputeCommand<T>
source§fn eq(&self, other: &ComputeCommand<T>) -> bool
fn eq(&self, other: &ComputeCommand<T>) -> bool
self
and other
values to be equal, and is used
by ==
.source§impl<T> Partitionable<ComputeCommand<T>, ComputeResponse<T>> for (ComputeCommand<T>, ComputeResponse<T>)
impl<T> Partitionable<ComputeCommand<T>, ComputeResponse<T>> for (ComputeCommand<T>, ComputeResponse<T>)
§type PartitionedState = PartitionedComputeState<T>
type PartitionedState = PartitionedComputeState<T>
source§fn new(parts: usize) -> PartitionedComputeState<T>
fn new(parts: usize) -> PartitionedComputeState<T>
PartitionedState
for the command–response pair.source§impl<T> PartitionedState<ComputeCommand<T>, ComputeResponse<T>> for PartitionedComputeState<T>
impl<T> PartitionedState<ComputeCommand<T>, ComputeResponse<T>> for PartitionedComputeState<T>
source§fn split_command(
&mut self,
command: ComputeCommand<T>
) -> Vec<Option<ComputeCommand<T>>>
fn split_command( &mut self, command: ComputeCommand<T> ) -> Vec<Option<ComputeCommand<T>>>
source§fn absorb_response(
&mut self,
shard_id: usize,
message: ComputeResponse<T>
) -> Option<Result<ComputeResponse<T>, Error>>
fn absorb_response( &mut self, shard_id: usize, message: ComputeResponse<T> ) -> Option<Result<ComputeResponse<T>, Error>>
source§impl RustType<ProtoComputeCommand> for ComputeCommand<Timestamp>
impl RustType<ProtoComputeCommand> for ComputeCommand<Timestamp>
source§fn into_proto(&self) -> ProtoComputeCommand
fn into_proto(&self) -> ProtoComputeCommand
Self
into a Proto
value.source§fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError>
fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError>
source§impl<T> Serialize for ComputeCommand<T>where
T: Serialize,
impl<T> Serialize for ComputeCommand<T>where
T: Serialize,
source§impl TryIntoTimelyConfig for ComputeCommand
impl TryIntoTimelyConfig for ComputeCommand
source§fn try_into_timely_config(
self
) -> Result<(TimelyConfig, ClusterStartupEpoch), Self>
fn try_into_timely_config( self ) -> Result<(TimelyConfig, ClusterStartupEpoch), Self>
self
into a (TimelyConfig, ClusterStartupEpoch)
. Otherwise,
fail and return self
back.impl<T> StructuralPartialEq for ComputeCommand<T>
Auto Trait Implementations§
impl<T> RefUnwindSafe for ComputeCommand<T>where
T: RefUnwindSafe,
impl<T> Send for ComputeCommand<T>where
T: Send,
impl<T> Sync for ComputeCommand<T>where
T: Sync,
impl<T> Unpin for ComputeCommand<T>where
T: Unpin,
impl<T> UnwindSafe for ComputeCommand<T>where
T: UnwindSafe + RefUnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
source§fn copy_onto(
self,
target: &mut ConsecutiveOffsetPairs<R, O>
) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
fn copy_onto( self, target: &mut ConsecutiveOffsetPairs<R, O> ) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> ProgressEventTimestamp for T
impl<T> ProgressEventTimestamp for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.