mz_compute_client/protocol/command.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::time::Duration;
13
14use mz_cluster_client::client::TryIntoProtocolNonce;
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::render_plan::RenderPlan;
17use mz_dyncfg::ConfigUpdates;
18use mz_expr::RowSetFinishing;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_persist_types::PersistLocation;
21use mz_repr::{GlobalId, RelationDesc, Row};
22use mz_service::params::GrpcClientParameters;
23use mz_storage_types::controller::CollectionMetadata;
24use mz_tracing::params::TracingParameters;
25use serde::{Deserialize, Serialize};
26use timely::progress::frontier::Antichain;
27use uuid::Uuid;
28
29use crate::logging::LoggingConfig;
30
31/// Compute protocol commands, sent by the compute controller to replicas.
32///
33/// Command sequences sent by the compute controller must be valid according to the [Protocol
34/// Stages].
35///
36/// [Protocol Stages]: super#protocol-stages
37#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
38pub enum ComputeCommand<T = mz_repr::Timestamp> {
39 /// `Hello` is the first command sent to a replica after a connection was established. It
40 /// provides the replica with meta information about the connection.
41 ///
42 /// This command is special in that it is broadcast to all workers of a multi-worker replica.
43 /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
44 /// which then distributes them to the other workers using a dataflow.
45 Hello {
46 /// A nonce unique to the current iteration of the compute protocol.
47 ///
48 /// The nonce allows identifying different iterations of the compute protocol. When the
49 /// compute controller connects to a replica, it must send a nonce that is different from
50 /// all nonces it sent to the same replica on previous connections. Multi-worker replicas
51 /// should use the nonce to ensure that their individual workers agree on which protocol
52 /// iteration they are in.
53 nonce: Uuid,
54 },
55
56 /// `CreateInstance` must be sent after `Hello` to complete the [Creation Stage] of the compute
57 /// protocol. Unlike `Hello`, it is only sent to the first worker of the replica, and then
58 /// distributed through the timely runtime. `CreateInstance` instructs the replica to
59 /// initialize its state to a point where it is ready to start maintaining dataflows.
60 ///
61 /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
62 /// dataflows according to the given [`LoggingConfig`].
63 ///
64 /// [Creation Stage]: super#creation-stage
65 CreateInstance(Box<InstanceConfig>),
66
67 /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
68 /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
69 /// its dataflow state matches the state requested by the computation commands it received
70 /// previously. The replica must now start sending responses to commands received previously,
71 /// if it opted to defer them during the [Initialization Stage].
72 ///
73 /// [Initialization Stage]: super#initialization-stage
74 InitializationComplete,
75
76 /// `AllowWrites` informs the replica that it can transition out of the
77 /// read-only computation stage and into the read-write computation stage.
78 /// It is now allowed to affect changes to external systems (writes).
79 ///
80 /// After initialization is complete, an instance starts out in the
81 /// read-only computation stage. Only when receiving this command will it go
82 /// out of that and allow running operations to do writes.
83 ///
84 /// An instance that has once been told that it can go into read-write mode
85 /// can never go out of that mode again. It is okay for a read-only
86 /// controller to re-connect to an instance that is already in read-write
87 /// mode: _someone_ has already told the instance that it is okay to write
88 /// and there is no way in the protocol to transition an instance back to
89 /// read-only mode.
90 ///
91 /// NOTE: We don't have a protocol in place that allows writes only after a
92 /// certain, controller-determined, timestamp. Such a protocol would allow
93 /// tighter control and could allow the instance to avoid work. However, it
94 /// is more work to put in place the logic for that so we leave it as future
95 /// work for now.
96 AllowWrites,
97
98 /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
99 /// given [`ComputeParameters`].
100 ///
101 /// This command is special in that, like `Hello`, it is broadcast to all workers of the
102 /// replica. However, unlike `Hello`, it is ignored by all workers except the first one, which
103 /// distributes the command to the other workers through the timely runtime.
104 /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
105 /// networking fabric to observe them and learn of configuration updates.
106 ///
107 /// Parameter updates transmitted through this command must be applied by the replica as soon
108 /// as it receives the command, and they must be applied globally to all replica state, even
109 /// dataflows and pending peeks that were created before the parameter update. This property
110 /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
111 ///
112 /// Configuration parameters that should not be applied globally, but only to specific
113 /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
114 /// rather than as [`ComputeParameters`].
115 UpdateConfiguration(Box<ComputeParameters>),
116
117 /// `CreateDataflow` instructs the replica to create a dataflow according to the given
118 /// [`DataflowDescription`].
119 ///
120 /// The [`DataflowDescription`] must have the following properties:
121 ///
122 /// * Dataflow imports are valid:
123 /// * Imported storage collections specified in [`source_imports`] exist and are readable by
124 /// the compute replica.
125 /// * Imported indexes specified in [`index_imports`] have been created on the replica
126 /// previously, by previous `CreateDataflow` commands.
127 /// * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
128 /// imported collections are not beyond the dataflow [`as_of`].
129 /// * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
130 /// instructed to create do not repeat (within a single protocol iteration).
131 /// * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
132 /// to the dependency relation.
133 ///
134 /// A dataflow description that violates any of the above properties can cause the replica to
135 /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
136 /// should prefer panicking over producing incorrect results.
137 ///
138 /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
139 /// storage sinks, the replica must produce [`Frontiers`] responses that report the
140 /// advancement of the frontiers of these compute collections.
141 ///
142 /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
143 /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
144 /// subscribes.
145 ///
146 /// After receiving a `CreateDataflow` command, if the created dataflow exports copy-to sinks,
147 /// the replica must produce [`CopyToResponse`]s that report the results and completion of the
148 /// copy-to sinks.
149 ///
150 /// The replica may create the dataflow in a suspended state and defer starting the computation
151 /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
152 /// the compute controller should eventually send a `Schedule` command for each sent
153 /// `CreateDataflow` command.
154 ///
155 /// [`objects_to_build`]: DataflowDescription::objects_to_build
156 /// [`source_imports`]: DataflowDescription::source_imports
157 /// [`index_imports`]: DataflowDescription::index_imports
158 /// [`as_of`]: DataflowDescription::as_of
159 /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
160 /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
161 /// [`CopyToResponse`]: super::response::ComputeResponse::CopyToResponse
162 CreateDataflow(Box<DataflowDescription<RenderPlan<T>, CollectionMetadata, T>>),
163
164 /// `Schedule` allows the replica to start computation for a compute collection.
165 ///
166 /// It is invalid to send a `Schedule` command that references a collection that was not
167 /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
168 /// to exhibit undefined behavior.
169 ///
170 /// It is also invalid to send a `Schedule` command that references a collection that has,
171 /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
172 Schedule(GlobalId),
173
174 /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
175 /// a compute collection exported by one of the replica's dataflows.
176 ///
177 /// The command names a collection and provides a frontier after which accumulations must be
178 /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
179 /// through that frontier.
180 ///
181 /// It is invalid to send an `AllowCompaction` command that references a compute collection
182 /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
183 /// the replica to exhibit undefined behavior.
184 ///
185 /// The `AllowCompaction` command only informs about external read requirements, not internal
186 /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
187 /// all times, so local dataflow inputs are not compacted beyond times at which they are still
188 /// being read from.
189 ///
190 /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
191 /// collections' current `upper` frontiers. This signals that external readers are not
192 /// interested in times up to the specified new read frontiers. Consequently, an empty read
193 /// frontier signals that external readers are not interested in updates from the corresponding
194 /// collection ever again, so the collection is not required anymore.
195 ///
196 /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
197 /// compute collections.
198 ///
199 /// A replica that receives an `AllowCompaction` command with the empty frontier must
200 /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
201 /// same collection. ([#16271])
202 ///
203 /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
204 /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
205 AllowCompaction {
206 /// TODO(database-issues#7533): Add documentation.
207 id: GlobalId,
208 /// TODO(database-issues#7533): Add documentation.
209 frontier: Antichain<T>,
210 },
211
212 /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
213 /// Persist-backed collection.
214 ///
215 /// The [`Peek`] description must have the following properties:
216 ///
217 /// * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
218 /// command. (If targeting a persist collection, that collection should exist.)
219 /// * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
220 /// perform do not repeat (within a single protocol iteration).
221 ///
222 /// A [`Peek`] description that violates any of the above properties can cause the replica to
223 /// exhibit undefined behavior.
224 ///
225 /// Specifying a [`Peek::timestamp`] that is less than the target index's `since` frontier does
226 /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
227 /// in response.
228 ///
229 /// After receiving a `Peek` command, the replica must eventually produce a single
230 /// [`PeekResponse`]:
231 ///
232 /// * For peeks that were not cancelled: either [`Rows`] or [`Error`].
233 /// * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
234 ///
235 /// [`PeekResponse`]: super::response::PeekResponse
236 /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
237 /// [`Rows`]: super::response::PeekResponse::Rows
238 /// [`Error`]: super::response::PeekResponse::Error
239 /// [`Canceled`]: super::response::PeekResponse::Canceled
240 Peek(Box<Peek<T>>),
241
242 /// `CancelPeek` instructs the replica to cancel the identified pending peek.
243 ///
244 /// It is invalid to send a `CancelPeek` command that references a peek that was not created
245 /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
246 /// undefined behavior.
247 ///
248 /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
249 /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
250 /// return a different [`PeekResponse`], or it may already have returned a response to the
251 /// specified peek. In these cases it must *not* return another [`PeekResponse`].
252 ///
253 /// [`PeekResponse`]: super::response::PeekResponse
254 /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
255 CancelPeek {
256 /// The identifier of the peek request to cancel.
257 ///
258 /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
259 uuid: Uuid,
260 },
261}
262
263/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
264/// if the controller attempt to reconcile them with different values
265/// for anything in this struct.
266#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
267pub struct InstanceConfig {
268 /// Specification of introspection logging.
269 pub logging: LoggingConfig,
270 /// The offset relative to the replica startup at which it should expire. None disables feature.
271 pub expiration_offset: Option<Duration>,
272 /// The persist location where we can stash large peek results.
273 pub peek_stash_persist_location: PersistLocation,
274}
275
276impl InstanceConfig {
277 /// Check if the configuration is compatible with another configuration. This is true iff the
278 /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
279 /// the expiration offset.
280 ///
281 /// We consider a stricter offset compatible, which allows us to strengthen the value without
282 /// forcing replica restarts. However, it also means that replicas will only pick up the new
283 /// value after a restart.
284 pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
285 // Destructure to protect against adding fields in the future.
286 let InstanceConfig {
287 logging: self_logging,
288 expiration_offset: self_offset,
289 peek_stash_persist_location: self_peek_stash_persist_location,
290 } = self;
291 let InstanceConfig {
292 logging: other_logging,
293 expiration_offset: other_offset,
294 peek_stash_persist_location: other_peek_stash_persist_location,
295 } = other;
296
297 // Logging is compatible if exactly the same.
298 let logging_compatible = self_logging == other_logging;
299
300 // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
301 // is a smaller offset and strengthens the offset.
302 let self_offset = Antichain::from_iter(*self_offset);
303 let other_offset = Antichain::from_iter(*other_offset);
304 let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
305
306 let persist_location_compatible =
307 self_peek_stash_persist_location == other_peek_stash_persist_location;
308
309 logging_compatible && offset_compatible && persist_location_compatible
310 }
311}
312
313/// Compute instance configuration parameters.
314///
315/// Parameters can be set (`Some`) or unset (`None`).
316/// Unset parameters should be interpreted to mean "use the previous value".
317#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
318pub struct ComputeParameters {
319 /// An optional arbitrary string that describes the class of the workload
320 /// this compute instance is running (e.g., `production` or `staging`).
321 ///
322 /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
323 /// exported by the metrics registry associated with the compute instance.
324 pub workload_class: Option<Option<String>>,
325 /// The maximum allowed size in bytes for results of peeks and subscribes.
326 ///
327 /// Peeks and subscribes that would return results larger than this maximum return the
328 /// respective error responses instead:
329 /// * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
330 /// * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
331 ///
332 /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
333 /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
334 /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
335 pub max_result_size: Option<u64>,
336 /// Tracing configuration.
337 pub tracing: TracingParameters,
338 /// gRPC client configuration.
339 pub grpc_client: GrpcClientParameters,
340
341 /// Config updates for components migrated to `mz_dyncfg`.
342 pub dyncfg_updates: ConfigUpdates,
343}
344
345impl ComputeParameters {
346 /// Update the parameter values with the set ones from `other`.
347 pub fn update(&mut self, other: ComputeParameters) {
348 let ComputeParameters {
349 workload_class,
350 max_result_size,
351 tracing,
352 grpc_client,
353 dyncfg_updates,
354 } = other;
355
356 if workload_class.is_some() {
357 self.workload_class = workload_class;
358 }
359 if max_result_size.is_some() {
360 self.max_result_size = max_result_size;
361 }
362
363 self.tracing.update(tracing);
364 self.grpc_client.update(grpc_client);
365
366 self.dyncfg_updates.extend(dyncfg_updates);
367 }
368
369 /// Return whether all parameters are unset.
370 pub fn all_unset(&self) -> bool {
371 *self == Self::default()
372 }
373}
374
375/// Metadata specific to the peek variant.
376#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
377pub enum PeekTarget {
378 /// This peek is against an index. Since this should be held in memory on
379 /// the target cluster, no additional coordinates are necessary.
380 Index {
381 /// The id of the (possibly transient) index.
382 id: GlobalId,
383 },
384 /// This peek is against a Persist collection.
385 Persist {
386 /// The id of the backing Persist collection.
387 id: GlobalId,
388 /// The identifying metadata of the Persist shard.
389 metadata: CollectionMetadata,
390 },
391}
392
393impl PeekTarget {
394 /// Returns the ID of the peeked collection.
395 pub fn id(&self) -> GlobalId {
396 match self {
397 Self::Index { id } => *id,
398 Self::Persist { id, .. } => *id,
399 }
400 }
401}
402
403/// Peek a collection, either in an arrangement or Persist.
404///
405/// This request elicits data from the worker, by naming the
406/// collection and some actions to apply to the results before
407/// returning them.
408///
409/// The `timestamp` member must be valid for the arrangement that
410/// is referenced by `id`. This means that `AllowCompaction` for
411/// this arrangement should not pass `timestamp` before this command.
412/// Subsequent commands may arbitrarily compact the arrangements;
413/// the dataflow runners are responsible for ensuring that they can
414/// correctly answer the `Peek`.
415#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
416pub struct Peek<T = mz_repr::Timestamp> {
417 /// Target-specific metadata.
418 pub target: PeekTarget,
419 /// The relation description for the rows returned by this peek, before
420 /// applying the [RowSetFinishing] but _after_ applying the given
421 /// `map_filter_project`.
422 pub result_desc: RelationDesc,
423 /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
424 /// The vector is never empty.
425 pub literal_constraints: Option<Vec<Row>>,
426 /// The identifier of this peek request.
427 ///
428 /// Used in responses and cancellation requests.
429 pub uuid: Uuid,
430 /// The logical timestamp at which the collection is queried.
431 pub timestamp: T,
432 /// Actions to apply to the result set before returning them.
433 pub finishing: RowSetFinishing,
434 /// Linear operation to apply in-line on each result.
435 pub map_filter_project: mz_expr::SafeMfpPlan,
436 /// An `OpenTelemetryContext` to forward trace information along
437 /// to the compute worker to allow associating traces between
438 /// the compute controller and the compute worker.
439 pub otel_ctx: OpenTelemetryContext,
440}
441
442impl TryIntoProtocolNonce for ComputeCommand {
443 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
444 match self {
445 ComputeCommand::Hello { nonce } => Ok(nonce),
446 cmd => Err(cmd),
447 }
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 /// Test to ensure the size of the `ComputeCommand` enum doesn't regress.
456 #[mz_ore::test]
457 fn test_compute_command_size() {
458 assert_eq!(std::mem::size_of::<ComputeCommand>(), 40);
459 }
460}