Module mz_compute_client::protocol
source · Expand description
The compute protocol defines the communication between the compute controller and indiviual compute replicas.
§Overview
The compute protocol consists of ComputeCommand
s and ComputeResponse
s.
ComputeCommand
s are sent from the compute controller to the replicas and instruct the
receivers to perform some action. ComputeResponse
s are sent in the opposite direction and
inform the receiver about status changes of their senders.
The compute protocol is an asynchronous protocol. Both participants must expect and be able to
gracefully handle messages that don’t reflect the state of the world described by the messages
they have previously sent. In other words, messages sent take effect only eventually. For
example, after the compute controller has instructed the replica to cancel a peek (by sending a
CancelPeek
command, it must still be ready to accept non-Canceled
responses to the
peek. Similarly, if a replica receives a CancelPeek
command for a peek it has already
answered, it must handle that command gracefully (e.g., by ignoring it).
While the protocol does not provide any guarantees about the delay between sending a message
and it being received and processed, it does guarantee that messages are delivered in the same
order they are sent in. For example, if the compute controller sends a Peek
command
followed by a CancelPeek
command, it is guaranteed that CancelPeek
is only received by
the replica after the Peek
command.
§Message Transport and Encoding
To initiate communication, the replica starts listening on a known host and port, to which the
compute controller then connects. We use the gRPC framework for transmitting Protobuf-encoded
messages. The replica exposes a single gRPC service (ProtoCompute
) which contains a single
RPC (CommandResponseStream
). The compute controller invokes this RPC to finalize the
connection setup. After the streams have been established, compute commands and responses are
transmitted over these streams.
§Protocol Stages
The compute protocol consists of three stages that must be transitioned in order:
- Creation
- Initialization
- Computation
§Creation Stage
The creation stage is the first stage of the compute protocol. It is initiated by the successful establishment of a gRPC connection between compute controller and replica. In this stage, the compute controller must send two creation commands in order:
- A
CreateTimely
command, which instructs the replica to create the timely dataflow runtime. - A
CreateInstance
command, which instructs the replica to initialize the rest of its state.
The replica must not send any responses.
§Initialization Stage
The initialization stage begins as soon as the compute controller has sent the
CreateInstance
command. In this stage, the compute controller informs the replica about its
expected dataflow state. It does so by sending any number of computation
commands, followed by an InitializationComplete
command, which marks
the end of the initialization stage.
Upon receiving computation commands during the initialization phase, the replica is obligated to ensure its state matches what is requested through the commands. It is up to the replica whether it ensures that by initializing new state or by reusing existing state (through a reconciliation process).
The replica may send responses to computation commands it receives. It may also opt to defer sending responses to the computation stage instead.
§Computation Stage
The computation stage begins as soon as the compute controller has sent the
InitializationComplete
command. In this stage, the compute controller instructs the replica
to create and maintain dataflows, and to perform peeks on indexes exported by dataflows.
The compute controller may send any number of computation commands:
The compute controller must respect dependencies between commands. For example, it must send a
CreateDataflow
command before it sends AllowCompaction
or Peek
commands that target
the created dataflow.
The replica must send the required responses to computation commands. This includes commands it has received in the initialization phase that have not already been responded to.
Modules§
- Compute protocol commands.
- A reducible history of compute commands.
- Compute protocol responses.