The compute protocol defines the communication between the compute controller and indiviual compute replicas.
The compute protocol consists of
ComputeCommands are sent from the compute controller to the replicas and instruct the
receivers to perform some action.
ComputeResponses are sent in the opposite direction and
inform the receiver about status changes of their senders.
The compute protocol is an asynchronous protocol. Both participants must expect and be able to
gracefully handle messages that don’t reflect the state of the world described by the messages
they have previously sent. In other words, messages sent take effect only eventually. For
example, after the compute controller has instructed the replica to cancel a peek (by sending a
CancelPeek command, it must still be ready to accept non-
Canceled responses to the
peek. Similarly, if a replica receives a
CancelPeek command for a peek it has already
answered, it must handle that command gracefully (e.g., by ignoring it).
While the protocol does not provide any guarantees about the delay between sending a message
and it being received and processed, it does guarantee that messages are delivered in the same
order they are sent in. For example, if the compute controller sends a
followed by a
CancelPeek command, it is guaranteed that
CancelPeek is only received by
the replica after the
To initiate communication, the replica starts listening on a known host and port, to which the
compute controller then connects. We use the gRPC framework for transmitting Protobuf-encoded
messages. The replica exposes a single gRPC service (
ProtoCompute) which contains a single
CommandResponseStream). The compute controller invokes this RPC to finalize the
connection setup. After the streams have been established, compute commands and responses are
transmitted over these streams.
The compute protocol consists of three stages that must be transitioned in order:
The creation stage is the first stage of the compute protocol. It is initiated by the successful establishment of a gRPC connection between compute controller and replica. In this stage, the compute controller must send two creation commands in order:
CreateTimelycommand, which instructs the replica to create the timely dataflow runtime.
CreateInstancecommand, which instructs the replica to initialize the rest of its state.
The replica must not send any responses.
The initialization stage begins as soon as the compute controller has sent the
CreateInstance command. In this stage, the compute controller informs the replica about its
expected dataflow state. It does so by sending any number of computation
commands, followed by an
InitializationComplete command, which marks
the end of the initialization stage.
Upon receiving computation commands during the initialization phase, the replica is obligated to ensure its state matches what is requested through the commands. It is up to the replica whether it ensures that by initializing new state or by reusing existing state (through a reconciliation process).
The replica may send responses to computation commands it receives. It may also opt to defer sending responses to the computation stage instead.
The computation stage begins as soon as the compute controller has sent the
InitializationComplete command. In this stage, the compute controller instructs the replica
to create and maintain dataflows, and to perform peeks on indexes exported by dataflows.
The compute controller may send any number of computation commands:
The compute controller must respect dependencies between commands. For example, it must send a
CreateDataflow command before it sends
Peek commands that target
the created dataflow.
The replica must send the required responses to computation commands. This includes commands it has received in the initialization phase that have not already been responded to.
- Compute protocol commands.
- A reducible history of compute commands.
- Compute protocol responses.