Expand description

The compute protocol defines the communication between the compute controller and indiviual compute replicas.


The compute protocol consists of ComputeCommands and ComputeResponses. ComputeCommands are sent from the compute controller to the replicas and instruct the receivers to perform some action. ComputeResponses are sent in the opposite direction and inform the receiver about status changes of their senders.

The compute protocol is an asynchronous protocol. Both participants must expect and be able to gracefully handle messages that don’t reflect the state of the world described by the messages they have previously sent. In other words, messages sent take effect only eventually. For example, after the compute controller has instructed the replica to cancel a peek (by sending a CancelPeek command, it must still be ready to accept non-Canceled responses to the peek. Similarly, if a replica receives a CancelPeek command for a peek it has already answered, it must handle that command gracefully (e.g., by ignoring it).

While the protocol does not provide any guarantees about the delay between sending a message and it being received and processed, it does guarantee that messages are delivered in the same order they are sent in. For example, if the compute controller sends a Peek command followed by a CancelPeek command, it is guaranteed that CancelPeek is only received by the replica after the Peek command.

§Message Transport and Encoding

To initiate communication, the replica starts listening on a known host and port, to which the compute controller then connects. We use the gRPC framework for transmitting Protobuf-encoded messages. The replica exposes a single gRPC service (ProtoCompute) which contains a single RPC (CommandResponseStream). The compute controller invokes this RPC to finalize the connection setup. After the streams have been established, compute commands and responses are transmitted over these streams.

§Protocol Stages

The compute protocol consists of three stages that must be transitioned in order:

  1. Creation
  2. Initialization
  3. Computation

§Creation Stage

The creation stage is the first stage of the compute protocol. It is initiated by the successful establishment of a gRPC connection between compute controller and replica. In this stage, the compute controller must send two creation commands in order:

  1. A CreateTimely command, which instructs the replica to create the timely dataflow runtime.
  2. A CreateInstance command, which instructs the replica to initialize the rest of its state.

The replica must not send any responses.

§Initialization Stage

The initialization stage begins as soon as the compute controller has sent the CreateInstance command. In this stage, the compute controller informs the replica about its expected dataflow state. It does so by sending any number of computation commands, followed by an InitializationComplete command, which marks the end of the initialization stage.

Upon receiving computation commands during the initialization phase, the replica is obligated to ensure its state matches what is requested through the commands. It is up to the replica whether it ensures that by initializing new state or by reusing existing state (through a reconciliation process).

The replica may send responses to computation commands it receives. It may also opt to defer sending responses to the computation stage instead.

§Computation Stage

The computation stage begins as soon as the compute controller has sent the InitializationComplete command. In this stage, the compute controller instructs the replica to create and maintain dataflows, and to perform peeks on indexes exported by dataflows.

The compute controller may send any number of computation commands:

The compute controller must respect dependencies between commands. For example, it must send a CreateDataflow command before it sends AllowCompaction or Peek commands that target the created dataflow.

The replica must send the required responses to computation commands. This includes commands it has received in the initialization phase that have not already been responded to.


  • Compute protocol commands.
  • A reducible history of compute commands.
  • Compute protocol responses.