mz_compute_client/protocol.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The compute protocol defines the communication between the compute controller and indiviual
11//! compute replicas.
12//!
13//! # Overview
14//!
15//! The compute protocol consists of [`ComputeCommand`]s and [`ComputeResponse`]s.
16//! [`ComputeCommand`]s are sent from the compute controller to the replicas and instruct the
17//! receivers to perform some action. [`ComputeResponse`]s are sent in the opposite direction and
18//! inform the receiver about status changes of their senders.
19//!
20//! The compute protocol is an asynchronous protocol. Both participants must expect and be able to
21//! gracefully handle messages that don’t reflect the state of the world described by the messages
22//! they have previously sent. In other words, messages sent take effect only eventually. For
23//! example, after the compute controller has instructed the replica to cancel a peek (by sending a
24//! [`CancelPeek`] command, it must still be ready to accept non-[`Canceled`] responses to the
25//! peek. Similarly, if a replica receives a [`CancelPeek`] command for a peek it has already
26//! answered, it must handle that command gracefully (e.g., by ignoring it).
27//!
28//! While the protocol does not provide any guarantees about the delay between sending a message
29//! and it being received and processed, it does guarantee that messages are delivered in the same
30//! order they are sent in. For example, if the compute controller sends a [`Peek`] command
31//! followed by a [`CancelPeek`] command, it is guaranteed that [`CancelPeek`] is only received by
32//! the replica after the [`Peek`] command.
33//!
34//! # Message Transport and Encoding
35//!
36//! To initiate communication, the replica starts listening on a known host and port, to which the
37//! compute controller then connects. We use the gRPC framework for transmitting Protobuf-encoded
38//! messages. The replica exposes a single gRPC service (`ProtoCompute`) which contains a single
39//! RPC (`CommandResponseStream`). The compute controller invokes this RPC to finalize the
40//! connection setup. After the streams have been established, compute commands and responses are
41//! transmitted over these streams.
42//!
43//! # Protocol Stages
44//!
45//! The compute protocol consists of four stages that must be transitioned in order:
46//!
47//! 1. Creation
48//! 2. Initialization
49//! 3. Computation (read-only)
50//! 4. Computation (read-write)
51//!
52//! ## Creation Stage
53//!
54//! The creation stage is the first stage of the compute protocol. It is initiated by the
55//! successful establishment of a gRPC connection between compute controller and replica. In this
56//! stage, the compute controller must send two creation commands in order:
57//!
58//! 1. A [`CreateTimely`] command, which instructs the replica to create the timely dataflow
59//! runtime.
60//! 2. A [`CreateInstance`] command, which instructs the replica to initialize the rest of its
61//! state.
62//!
63//! The replica must not send any responses.
64//!
65//! ## Initialization Stage
66//!
67//! The initialization stage begins as soon as the compute controller has sent the
68//! [`CreateInstance`] command. In this stage, the compute controller informs the replica about its
69//! expected dataflow state. It does so by sending any number of [computation
70//! commands](#computation-stage), followed by an [`InitializationComplete`] command, which marks
71//! the end of the initialization stage.
72//!
73//! Upon receiving computation commands during the initialization phase, the replica is obligated
74//! to ensure its state matches what is requested through the commands. It is up to the replica
75//! whether it ensures that by initializing new state or by reusing existing state (through a
76//! reconciliation process).
77//!
78//! The replica may send responses to computation commands it receives. It may also opt to defer
79//! sending responses to the computation stages instead.
80//!
81//! ## Computation Stage (read-only)
82//!
83//! This computation stage begins as soon as the compute controller has sent the
84//! [`InitializationComplete`] command. In this stage, the compute controller instructs the replica
85//! to create and maintain dataflows, and to perform peeks on indexes exported by dataflows.
86//!
87//! While in the read-only computation stage, the replica must not affect
88//! changes to external systems (writes). For the most part this means it cannot
89//! write to persist. The replica can transition out of the read-only stage when
90//! receiving an [`AllowWrites`] command.
91//!
92//! The compute controller may send any number of computation commands:
93//!
94//! - [`CreateDataflow`]
95//! - [`Schedule`]
96//! - [`AllowCompaction`]
97//! - [`Peek`]
98//! - [`CancelPeek`]
99//! - [`UpdateConfiguration`]
100//!
101//! The compute controller must respect dependencies between commands. For example, it must send a
102//! [`CreateDataflow`] command before it sends [`AllowCompaction`] or [`Peek`] commands that target
103//! the created dataflow.
104//!
105//! The replica must send the required responses to computation commands. This includes commands it
106//! has received in the initialization phase that have not already been responded to.
107//!
108//! ## Computation Stage (read-write)
109//!
110//! The read-write computation stage is exactly like the read-only computation stage, with the
111//! addition that now the replica _can_ affect writes to external systems. This stage begins once
112//! the controller has sent the [`AllowWrites`] command.
113//!
114//! The replica cannot transition back to the read-only computation stage from the read-write stage.
115//!
116//! [`ComputeCommand`]: self::command::ComputeCommand
117//! [`CreateTimely`]: self::command::ComputeCommand::CreateTimely
118//! [`CreateInstance`]: self::command::ComputeCommand::CreateInstance
119//! [`InitializationComplete`]: self::command::ComputeCommand::InitializationComplete
120//! [`CreateDataflow`]: self::command::ComputeCommand::CreateDataflow
121//! [`Schedule`]: self::command::ComputeCommand::Schedule
122//! [`AllowCompaction`]: self::command::ComputeCommand::AllowCompaction
123//! [`AllowWrites`]: self::command::ComputeCommand::AllowWrites
124//! [`Peek`]: self::command::ComputeCommand::Peek
125//! [`CancelPeek`]: self::command::ComputeCommand::CancelPeek
126//! [`UpdateConfiguration`]: self::command::ComputeCommand::UpdateConfiguration
127//! [`ComputeResponse`]: self::response::ComputeResponse
128//! [`Canceled`]: self::response::PeekResponse::Canceled
129//! [`SubscribeResponse::DroppedAt`]: self::response::SubscribeResponse::DroppedAt
130
131pub mod command;
132pub mod history;
133pub mod response;