pub enum ComputeCommand<T = Timestamp> {
    CreateTimely {
        config: TimelyConfig,
        epoch: ClusterStartupEpoch,
    },
    CreateInstance(LoggingConfig),
    InitializationComplete,
    UpdateConfiguration(ComputeParameters),
    CreateDataflows(Vec<DataflowDescription<Plan<T>, CollectionMetadata, T>>),
    AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
    Peek(Peek<T>),
    CancelPeeks {
        uuids: BTreeSet<Uuid>,
    },
}
Expand description

Compute protocol commands, sent by the compute controller to replicas.

Command sequences sent by the compute controller must be valid according to the Protocol Stages.

Variants§

§

CreateTimely

CreateTimely is the first command sent to a replica after a connection was established. It instructs the replica to initialize the timely dataflow runtime using the given config.

This command is special in that it is the only one that is broadcast to all processes of a multi-process replica. All subsequent commands are only sent to the first process, which then distributes them to the other processes using a dataflow. This method of command distribution requires the timely dataflow runtime to be initialized, which is why the CreateTimely command exists.

The epoch value imposes an ordering on iterations of the compute protocol. When the compute controller connects to a replica, it must send an epoch that is greater than all epochs it sent to the same replica on previous connections. Multi-process replicas should use the epoch to ensure that their individual processes agree on which protocol iteration they are in.

§

CreateInstance(LoggingConfig)

CreateInstance must be sent after CreateTimely to complete the Creation Stage of the compute protocol. Unlike CreateTimely, and like all other commands, it is only sent to the first process of the replica, and then distributed through the timely runtime. CreateInstance instructs the replica to initialize its state to a point where it is ready to start maintaining dataflows.

Upon receiving a CreateInstance command, the replica must further initialize logging dataflows according to the given LoggingConfig.

§

InitializationComplete

InitializationComplete informs the replica about the end of the Initialization Stage. Upon receiving this command, the replica should perform a reconciliation process, to ensure its dataflow state matches the state requested by the computation commands it received previously. The replica must now start sending responses to commands received previously, if it opted to defer them during the Initialization Stage.

§

UpdateConfiguration(ComputeParameters)

UpdateConfiguration instructs the replica to update its configuration, according to the given ComputeParameters.

Parameter updates transmitted through this command must be applied by the replica as soon as it receives the command, and they must be apply globally to all replica state, even dataflows and pending peeks that were created before the parameter update. This property allows the replica to hoist UpdateConfiguration commands during reconciliation.

Configuration parameters that should not be applied globally, but only to specific dataflows or peeks, should be added to the DataflowDescription or Peek types, rather than as ComputeParameters.

§

CreateDataflows(Vec<DataflowDescription<Plan<T>, CollectionMetadata, T>>)

CreateDataflows instructs the replica to create and start maintaining dataflows according to the given DataflowDescriptions.

If a CreateDataflows command defines multiple dataflows, the list of DataflowDescriptions must be topologically ordered according to the dependency relation.

Each DataflowDescription must have the following properties:

  • Dataflow imports are valid:
    • Imported storage collections specified in source_imports exist and are readable by the compute replica.
    • Imported indexes specified in index_imports have been created on the replica previously, either by previous CreateDataflows commands, or by the same CreateDataflows command.
  • Dataflow imports are readable at the specified as_of. In other words: The sinces of imported collections are not beyond the dataflow as_of.
  • Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is instructed to create do not repeat (within a single protocol iteration).
  • The dataflow objects defined in objects_to_build are topologically ordered according to the dependency relation.

A dataflow description that violates any of the above properties can cause the replica to exhibit undefined behavior, such as panicking or production of incorrect results. A replica should prefer panicking over producing incorrect results.

After receiving a CreateDataflows command, for created dataflows that export indexes or storage sinks, the replica must produce FrontierUppers responses that report the advancement of the upper frontiers of these compute collections.

After receiving a CreateDataflows command, for created dataflows that export subscribes, the replica must produce SubscribeResponses that report the progress and results of the subscribes.

During the Initialization Stage, the controller must not send CreateDataflows commands that instruct the creation of dataflows exporting subscribes. This is a limitation of our current implementation that we indend to remove (#16247).

§

AllowCompaction(Vec<(GlobalId, Antichain<T>)>)

AllowCompaction informs the replica about the relaxation of external read capabilities on the compute collections exported by the replica’s dataflow.

Each entry in the vector names a collection and provides a frontier after which accumulations must be correct. The replica gains the liberty of compacting the corresponding maintained traces up through that frontier.

It is invalid to send an AllowCompaction command that references compute collections that were not created by a corresponding CreateDataflows command before. Doing so may cause the replica to exhibit undefined behavior.

The AllowCompaction command only informs about external read requirements, not internal ones. The replica is responsible for ensuring that internal requirements are fulfilled at all times, so local dataflow inputs are not compacted beyond times at which they are still being read from.

The read frontiers transmitted through AllowCompactions may be beyond the corresponding collections’ current upper frontiers. This signals that external readers are not interested in times up to the specified new read frontiers. Consequently, an empty read frontier signals that external readers are not interested in updates from the corresponding collection ever again, so the collection is not required anymore.

Sending an AllowCompaction command with the empty frontier is the canonical way to drop compute collections.

A replica that receives an AllowCompaction command with the empty frontier must eventually respond with a FrontierUppers response reporting the empty frontier for the same collection. (#16275)

§

Peek(Peek<T>)

Peek instructs the replica to perform a peek at an index.

The Peek description must have the following properties:

  • The target index has previously been created by a corresponding CreateDataflows command.
  • The Peek::uuid is unique, i.e., the UUIDs of peeks a replica gets instructed to perform do not repeat (within a single protocol iteration).

A Peek description that violates any of the above properties can cause the replica to exhibit undefined behavior.

Specifying a Peek::timestamp that is less than the target index’s since frontier does not provoke undefined behavior. Instead, the replica must produce a PeekResponse::Error in response.

After receiving a Peek command, the replica must eventually produce a single PeekResponse:

§

CancelPeeks

Fields

§uuids: BTreeSet<Uuid>

The identifiers of the peek requests to cancel.

Values in this set must match Peek::uuid values transmitted in previous Peek commands.

CancelPeeks instructs the replica to cancel the identified pending peeks.

It is invalid to send a CancelPeeks command that references peeks that were not created by a corresponding Peek command before. Doing so may cause the replica to exhibit undefined behavior.

If a replica cancels a peek in response to a CancelPeeks command, it must respond with a PeekResponse::Canceled. The replica may also decide to fulfill the peek instead and return a different PeekResponse, or it may already have returned a response to the specified peek. In these cases it must not return another PeekResponse.

Trait Implementations§

The type of Strategy used to generate values of type Self.
The type of parameters that arbitrary_with accepts for configuration of the generated Strategy. Parameters must implement Default.
Generates a Strategy for producing arbitrary values of type the implementing type (Self). The strategy is passed the arguments given in args. Read more
Generates a Strategy for producing arbitrary values of type the implementing type (Self). Read more
Returns a copy of the value. Read more
Performs copy-assignment from source. Read more
Formats the value using the given formatter. Read more
Deserialize this value from the given Serde deserializer. Read more
Sends a command to the dataflow server. Read more
Receives the next response from the dataflow server. Read more
Returns an adapter that treats the client as a stream. Read more
This method tests for self and other values to be equal, and is used by ==.
This method tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
The type which functions as the state machine for the partitioning.
Construct a PartitionedState for the command–response pair.
Splits a command into multiple partitions.
Absorbs a response from a single partition. Read more
Convert a Self into a Proto value.
Consume and convert a Proto back into a Self value. Read more
Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Converts to this type from a reference to the input type.
Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Attaches the current Context to this type, returning a WithContext wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Wrap the input message T in a tonic::Request
The alignment of pointer.
The type for initializers.
Initializes a with the given initializer. Read more
Dereferences the given pointer. Read more
Mutably dereferences the given pointer. Read more
Drops the object pointed to by the given pointer. Read more
Upcasts this ProgressEventTimestamp to Any. Read more
Returns the name of the concrete type of this object. Read more
Should always be Self
The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more