Enum mz_compute_client::protocol::command::ComputeCommand
source · pub enum ComputeCommand<T = Timestamp> {
CreateTimely {
config: TimelyConfig,
epoch: ClusterStartupEpoch,
},
CreateInstance(LoggingConfig),
InitializationComplete,
UpdateConfiguration(ComputeParameters),
CreateDataflows(Vec<DataflowDescription<Plan<T>, CollectionMetadata, T>>),
AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
Peek(Peek<T>),
CancelPeeks {
uuids: BTreeSet<Uuid>,
},
}
Expand description
Compute protocol commands, sent by the compute controller to replicas.
Command sequences sent by the compute controller must be valid according to the Protocol Stages.
Variants§
CreateTimely
CreateTimely
is the first command sent to a replica after a connection was established.
It instructs the replica to initialize the timely dataflow runtime using the given
config
.
This command is special in that it is the only one that is broadcast to all processes of a
multi-process replica. All subsequent commands are only sent to the first process, which
then distributes them to the other processes using a dataflow. This method of command
distribution requires the timely dataflow runtime to be initialized, which is why the
CreateTimely
command exists.
The epoch
value imposes an ordering on iterations of the compute protocol. When the
compute controller connects to a replica, it must send an epoch
that is greater than all
epochs it sent to the same replica on previous connections. Multi-process replicas should
use the epoch
to ensure that their individual processes agree on which protocol iteration
they are in.
CreateInstance(LoggingConfig)
CreateInstance
must be sent after CreateTimely
to complete the Creation Stage of the
compute protocol. Unlike CreateTimely
, and like all other commands, it is only sent to
the first process of the replica, and then distributed through the timely runtime.
CreateInstance
instructs the replica to initialize its state to a point where it is ready
to start maintaining dataflows.
Upon receiving a CreateInstance
command, the replica must further initialize logging
dataflows according to the given LoggingConfig
.
InitializationComplete
InitializationComplete
informs the replica about the end of the Initialization Stage.
Upon receiving this command, the replica should perform a reconciliation process, to ensure
its dataflow state matches the state requested by the computation commands it received
previously. The replica must now start sending responses to commands received previously,
if it opted to defer them during the Initialization Stage.
UpdateConfiguration(ComputeParameters)
UpdateConfiguration
instructs the replica to update its configuration, according to the
given ComputeParameters
.
Parameter updates transmitted through this command must be applied by the replica as soon
as it receives the command, and they must be apply globally to all replica state, even
dataflows and pending peeks that were created before the parameter update. This property
allows the replica to hoist UpdateConfiguration
commands during reconciliation.
Configuration parameters that should not be applied globally, but only to specific
dataflows or peeks, should be added to the DataflowDescription
or Peek
types,
rather than as ComputeParameters
.
CreateDataflows(Vec<DataflowDescription<Plan<T>, CollectionMetadata, T>>)
CreateDataflows
instructs the replica to create and start maintaining dataflows according
to the given DataflowDescription
s.
If a CreateDataflows
command defines multiple dataflows, the list of
DataflowDescription
s must be topologically ordered according to the dependency
relation.
Each DataflowDescription
must have the following properties:
- Dataflow imports are valid:
- Imported storage collections specified in
source_imports
exist and are readable by the compute replica. - Imported indexes specified in
index_imports
have been created on the replica previously, either by previousCreateDataflows
commands, or by the sameCreateDataflows
command.
- Imported storage collections specified in
- Dataflow imports are readable at the specified
as_of
. In other words: Thesince
s of imported collections are not beyond the dataflowas_of
. - Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is instructed to create do not repeat (within a single protocol iteration).
- The dataflow objects defined in
objects_to_build
are topologically ordered according to the dependency relation.
A dataflow description that violates any of the above properties can cause the replica to exhibit undefined behavior, such as panicking or production of incorrect results. A replica should prefer panicking over producing incorrect results.
After receiving a CreateDataflows
command, for created dataflows that export indexes or
storage sinks, the replica must produce FrontierUppers
responses that report the
advancement of the upper
frontiers of these compute collections.
After receiving a CreateDataflows
command, for created dataflows that export subscribes,
the replica must produce SubscribeResponse
s that report the progress and results of the
subscribes.
During the Initialization Stage, the controller must not send CreateDataflows
commands
that instruct the creation of dataflows exporting subscribes. This is a limitation of our
current implementation that we indend to remove (#16247).
AllowCompaction(Vec<(GlobalId, Antichain<T>)>)
AllowCompaction
informs the replica about the relaxation of external read capabilities on
the compute collections exported by the replica’s dataflow.
Each entry in the vector names a collection and provides a frontier after which accumulations must be correct. The replica gains the liberty of compacting the corresponding maintained traces up through that frontier.
It is invalid to send an AllowCompaction
command that references compute collections that
were not created by a corresponding CreateDataflows
command before. Doing so may cause
the replica to exhibit undefined behavior.
The AllowCompaction
command only informs about external read requirements, not internal
ones. The replica is responsible for ensuring that internal requirements are fulfilled at
all times, so local dataflow inputs are not compacted beyond times at which they are still
being read from.
The read frontiers transmitted through AllowCompactions
may be beyond the corresponding
collections’ current upper
frontiers. This signals that external readers are not
interested in times up to the specified new read frontiers. Consequently, an empty read
frontier signals that external readers are not interested in updates from the corresponding
collection ever again, so the collection is not required anymore.
Sending an AllowCompaction
command with the empty frontier is the canonical way to drop
compute collections.
A replica that receives an AllowCompaction
command with the empty frontier must
eventually respond with a FrontierUppers
response reporting the empty frontier for the
same collection. (#16275)
Peek(Peek<T>)
Peek
instructs the replica to perform a peek at an index.
The Peek
description must have the following properties:
- The target index has previously been created by a corresponding
CreateDataflows
command. - The
Peek::uuid
is unique, i.e., the UUIDs of peeks a replica gets instructed to perform do not repeat (within a single protocol iteration).
A Peek
description that violates any of the above properties can cause the replica to
exhibit undefined behavior.
Specifying a Peek::timestamp
that is less than the target index’s since
frontier does
not provoke undefined behavior. Instead, the replica must produce a PeekResponse::Error
in response.
After receiving a Peek
command, the replica must eventually produce a single
PeekResponse
:
CancelPeeks
Fields
uuids: BTreeSet<Uuid>
The identifiers of the peek requests to cancel.
Values in this set must match Peek::uuid
values transmitted in previous Peek
commands.
CancelPeeks
instructs the replica to cancel the identified pending peeks.
It is invalid to send a CancelPeeks
command that references peeks that were not created
by a corresponding Peek
command before. Doing so may cause the replica to exhibit
undefined behavior.
If a replica cancels a peek in response to a CancelPeeks
command, it must respond with a
PeekResponse::Canceled
. The replica may also decide to fulfill the peek instead and
return a different PeekResponse
, or it may already have returned a response to the
specified peek. In these cases it must not return another PeekResponse
.
Trait Implementations§
source§impl Arbitrary for ComputeCommand<Timestamp>
impl Arbitrary for ComputeCommand<Timestamp>
§type Strategy = Union<BoxedStrategy<ComputeCommand<Timestamp>>>
type Strategy = Union<BoxedStrategy<ComputeCommand<Timestamp>>>
Strategy
used to generate values of type Self
.§type Parameters = ()
type Parameters = ()
arbitrary_with
accepts for configuration
of the generated Strategy
. Parameters must implement Default
.source§fn arbitrary_with(_: Self::Parameters) -> Self::Strategy
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy
source§impl<T: Clone> Clone for ComputeCommand<T>
impl<T: Clone> Clone for ComputeCommand<T>
source§fn clone(&self) -> ComputeCommand<T>
fn clone(&self) -> ComputeCommand<T>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl<T: Debug> Debug for ComputeCommand<T>
impl<T: Debug> Debug for ComputeCommand<T>
source§impl<'de, T> Deserialize<'de> for ComputeCommand<T>where
T: Deserialize<'de>,
impl<'de, T> Deserialize<'de> for ComputeCommand<T>where
T: Deserialize<'de>,
source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
source§impl<T: Send> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for Box<dyn ComputeClient<T>>
impl<T: Send> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for Box<dyn ComputeClient<T>>
source§fn send<'life0, 'async_trait>(
&'life0 mut self,
cmd: ComputeCommand<T>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn send<'life0, 'async_trait>(
&'life0 mut self,
cmd: ComputeCommand<T>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§impl<T: PartialEq> PartialEq<ComputeCommand<T>> for ComputeCommand<T>
impl<T: PartialEq> PartialEq<ComputeCommand<T>> for ComputeCommand<T>
source§fn eq(&self, other: &ComputeCommand<T>) -> bool
fn eq(&self, other: &ComputeCommand<T>) -> bool
self
and other
values to be equal, and is used
by ==
.source§impl<T> Partitionable<ComputeCommand<T>, ComputeResponse<T>> for (ComputeCommand<T>, ComputeResponse<T>)where
T: Timestamp + Lattice,
impl<T> Partitionable<ComputeCommand<T>, ComputeResponse<T>> for (ComputeCommand<T>, ComputeResponse<T>)where
T: Timestamp + Lattice,
§type PartitionedState = PartitionedComputeState<T>
type PartitionedState = PartitionedComputeState<T>
source§fn new(parts: usize) -> PartitionedComputeState<T>
fn new(parts: usize) -> PartitionedComputeState<T>
PartitionedState
for the command–response pair.source§impl<T> PartitionedState<ComputeCommand<T>, ComputeResponse<T>> for PartitionedComputeState<T>where
T: Timestamp + Lattice,
impl<T> PartitionedState<ComputeCommand<T>, ComputeResponse<T>> for PartitionedComputeState<T>where
T: Timestamp + Lattice,
source§fn split_command(
&mut self,
command: ComputeCommand<T>
) -> Vec<Option<ComputeCommand<T>>> ⓘ
fn split_command(
&mut self,
command: ComputeCommand<T>
) -> Vec<Option<ComputeCommand<T>>> ⓘ
source§fn absorb_response(
&mut self,
shard_id: usize,
message: ComputeResponse<T>
) -> Option<Result<ComputeResponse<T>, Error>>
fn absorb_response(
&mut self,
shard_id: usize,
message: ComputeResponse<T>
) -> Option<Result<ComputeResponse<T>, Error>>
source§impl RustType<ProtoComputeCommand> for ComputeCommand<Timestamp>
impl RustType<ProtoComputeCommand> for ComputeCommand<Timestamp>
source§fn into_proto(&self) -> ProtoComputeCommand
fn into_proto(&self) -> ProtoComputeCommand
Self
into a Proto
value.source§fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError>
fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError>
source§impl<T> Serialize for ComputeCommand<T>where
T: Serialize,
impl<T> Serialize for ComputeCommand<T>where
T: Serialize,
impl<T> StructuralPartialEq for ComputeCommand<T>
Auto Trait Implementations§
impl<T> RefUnwindSafe for ComputeCommand<T>where
T: RefUnwindSafe,
impl<T> Send for ComputeCommand<T>where
T: Send,
impl<T> Sync for ComputeCommand<T>where
T: Sync,
impl<T> Unpin for ComputeCommand<T>where
T: Unpin,
impl<T> UnwindSafe for ComputeCommand<T>where
T: UnwindSafe + RefUnwindSafe,
Blanket Implementations§
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.