mz_compute_client/protocol/command.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::time::Duration;
13
14use mz_cluster_client::client::TryIntoProtocolNonce;
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::render_plan::RenderPlan;
17use mz_dyncfg::ConfigUpdates;
18use mz_expr::RowSetFinishing;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_persist_types::PersistLocation;
21use mz_repr::{GlobalId, RelationDesc, Row};
22use mz_service::params::GrpcClientParameters;
23use mz_storage_types::controller::CollectionMetadata;
24use mz_tracing::params::TracingParameters;
25use serde::{Deserialize, Serialize};
26use timely::progress::frontier::Antichain;
27use uuid::Uuid;
28
29use crate::logging::LoggingConfig;
30
31/// Compute protocol commands, sent by the compute controller to replicas.
32///
33/// Command sequences sent by the compute controller must be valid according to the [Protocol
34/// Stages].
35///
36/// [Protocol Stages]: super#protocol-stages
37#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
38pub enum ComputeCommand<T = mz_repr::Timestamp> {
39 /// `Hello` is the first command sent to a replica after a connection was established. It
40 /// provides the replica with meta information about the connection.
41 ///
42 /// This command is special in that it is broadcast to all workers of a multi-worker replica.
43 /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
44 /// which then distributes them to the other workers using a dataflow.
45 Hello {
46 /// A nonce unique to the current iteration of the compute protocol.
47 ///
48 /// The nonce allows identifying different iterations of the compute protocol. When the
49 /// compute controller connects to a replica, it must send a nonce that is different from
50 /// all nonces it sent to the same replica on previous connections. Multi-worker replicas
51 /// should use the nonce to ensure that their individual workers agree on which protocol
52 /// iteration they are in.
53 nonce: Uuid,
54 },
55
56 /// `CreateInstance` must be sent after `Hello` to complete the [Creation Stage] of the compute
57 /// protocol. Unlike `Hello`, it is only sent to the first worker of the replica, and then
58 /// distributed through the timely runtime. `CreateInstance` instructs the replica to
59 /// initialize its state to a point where it is ready to start maintaining dataflows.
60 ///
61 /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
62 /// dataflows according to the given [`LoggingConfig`].
63 ///
64 /// [Creation Stage]: super#creation-stage
65 CreateInstance(Box<InstanceConfig>),
66
67 /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
68 /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
69 /// its dataflow state matches the state requested by the computation commands it received
70 /// previously. The replica must now start sending responses to commands received previously,
71 /// if it opted to defer them during the [Initialization Stage].
72 ///
73 /// [Initialization Stage]: super#initialization-stage
74 InitializationComplete,
75
76 /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
77 /// given [`ComputeParameters`].
78 ///
79 /// This command is special in that, like `Hello`, it is broadcast to all workers of the
80 /// replica. However, unlike `Hello`, it is ignored by all workers except the first one, which
81 /// distributes the command to the other workers through the timely runtime.
82 /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
83 /// networking fabric to observe them and learn of configuration updates.
84 ///
85 /// Parameter updates transmitted through this command must be applied by the replica as soon
86 /// as it receives the command, and they must be applied globally to all replica state, even
87 /// dataflows and pending peeks that were created before the parameter update. This property
88 /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
89 ///
90 /// Configuration parameters that should not be applied globally, but only to specific
91 /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
92 /// rather than as [`ComputeParameters`].
93 UpdateConfiguration(Box<ComputeParameters>),
94
95 /// `CreateDataflow` instructs the replica to create a dataflow according to the given
96 /// [`DataflowDescription`].
97 ///
98 /// The [`DataflowDescription`] must have the following properties:
99 ///
100 /// * Dataflow imports are valid:
101 /// * Imported storage collections specified in [`source_imports`] exist and are readable by
102 /// the compute replica.
103 /// * Imported indexes specified in [`index_imports`] have been created on the replica
104 /// previously, by previous `CreateDataflow` commands.
105 /// * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
106 /// imported collections are not beyond the dataflow [`as_of`].
107 /// * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
108 /// instructed to create do not repeat (within a single protocol iteration).
109 /// * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
110 /// to the dependency relation.
111 ///
112 /// A dataflow description that violates any of the above properties can cause the replica to
113 /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
114 /// should prefer panicking over producing incorrect results.
115 ///
116 /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
117 /// storage sinks, the replica must produce [`Frontiers`] responses that report the
118 /// advancement of the frontiers of these compute collections.
119 ///
120 /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
121 /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
122 /// subscribes.
123 ///
124 /// After receiving a `CreateDataflow` command, if the created dataflow exports copy-to sinks,
125 /// the replica must produce [`CopyToResponse`]s that report the results and completion of the
126 /// copy-to sinks.
127 ///
128 /// After receiving a `CreateDataflow` command, if the created dataflow exports a persist sink
129 /// (materialized view), the dataflow must not write any data to the sink until the replica
130 /// receives an `AllowWrites` command for the corresponding collection.
131 ///
132 /// The replica may create the dataflow in a suspended state and defer starting the computation
133 /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
134 /// the compute controller should eventually send a `Schedule` command for each sent
135 /// `CreateDataflow` command.
136 ///
137 /// [`objects_to_build`]: DataflowDescription::objects_to_build
138 /// [`source_imports`]: DataflowDescription::source_imports
139 /// [`index_imports`]: DataflowDescription::index_imports
140 /// [`as_of`]: DataflowDescription::as_of
141 /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
142 /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
143 /// [`CopyToResponse`]: super::response::ComputeResponse::CopyToResponse
144 CreateDataflow(Box<DataflowDescription<RenderPlan<T>, CollectionMetadata, T>>),
145
146 /// `Schedule` allows the replica to start computation for a compute collection.
147 ///
148 /// It is invalid to send a `Schedule` command that references a collection that was not
149 /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
150 /// to exhibit undefined behavior.
151 ///
152 /// It is also invalid to send a `Schedule` command that references a collection that has,
153 /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
154 Schedule(GlobalId),
155
156 /// `AllowWrites` allows the replica to start writing external state for a compute collection.
157 ///
158 /// It is invalid to send an `AllowWrites` command that references a compute collection
159 /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
160 /// the replica to exhibit undefined behavior.
161 ///
162 /// It is also invalid to send an `AllowWrites` command for a compute collection that has
163 /// been allowed to compact to the empty frontier through an `AllowCompaction` command before.
164 ///
165 /// NOTE: This command only allows writes to persist. Other external state, such as copy-to
166 /// sinks, do not need an explicit permission to write.
167 ///
168 /// NOTE: We don't have a protocol in place that allows writes only after a
169 /// certain, controller-determined, timestamp. Such a protocol would allow
170 /// tighter control and could allow the instance to avoid work. However, it
171 /// is more work to put in place the logic for that so we leave it as future
172 /// work for now.
173 ///
174 /// Note: The `AllowWrites` command is per collection, but a dataflow could host multiple
175 /// collections. We're accepting this impedance mismatch for now.
176 AllowWrites(GlobalId),
177
178 /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
179 /// a compute collection exported by one of the replica's dataflows.
180 ///
181 /// The command names a collection and provides a frontier after which accumulations must be
182 /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
183 /// through that frontier.
184 ///
185 /// It is invalid to send an `AllowCompaction` command that references a compute collection
186 /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
187 /// the replica to exhibit undefined behavior.
188 ///
189 /// The `AllowCompaction` command only informs about external read requirements, not internal
190 /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
191 /// all times, so local dataflow inputs are not compacted beyond times at which they are still
192 /// being read from.
193 ///
194 /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
195 /// collections' current `upper` frontiers. This signals that external readers are not
196 /// interested in times up to the specified new read frontiers. Consequently, an empty read
197 /// frontier signals that external readers are not interested in updates from the corresponding
198 /// collection ever again, so the collection is not required anymore.
199 ///
200 /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
201 /// compute collections.
202 ///
203 /// A replica that receives an `AllowCompaction` command with the empty frontier must
204 /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
205 /// same collection. ([#16271])
206 ///
207 /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
208 /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
209 AllowCompaction {
210 /// TODO(database-issues#7533): Add documentation.
211 id: GlobalId,
212 /// TODO(database-issues#7533): Add documentation.
213 frontier: Antichain<T>,
214 },
215
216 /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
217 /// Persist-backed collection.
218 ///
219 /// The [`Peek`] description must have the following properties:
220 ///
221 /// * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
222 /// command. (If targeting a persist collection, that collection should exist.)
223 /// * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
224 /// perform do not repeat (within a single protocol iteration).
225 ///
226 /// A [`Peek`] description that violates any of the above properties can cause the replica to
227 /// exhibit undefined behavior.
228 ///
229 /// Specifying a [`Peek::timestamp`] that is less than the target index's `since` frontier does
230 /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
231 /// in response.
232 ///
233 /// After receiving a `Peek` command, the replica must eventually produce a single
234 /// [`PeekResponse`]:
235 ///
236 /// * For peeks that were not cancelled: either [`Rows`] or [`Error`].
237 /// * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
238 ///
239 /// [`PeekResponse`]: super::response::PeekResponse
240 /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
241 /// [`Rows`]: super::response::PeekResponse::Rows
242 /// [`Error`]: super::response::PeekResponse::Error
243 /// [`Canceled`]: super::response::PeekResponse::Canceled
244 Peek(Box<Peek<T>>),
245
246 /// `CancelPeek` instructs the replica to cancel the identified pending peek.
247 ///
248 /// It is invalid to send a `CancelPeek` command that references a peek that was not created
249 /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
250 /// undefined behavior.
251 ///
252 /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
253 /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
254 /// return a different [`PeekResponse`], or it may already have returned a response to the
255 /// specified peek. In these cases it must *not* return another [`PeekResponse`].
256 ///
257 /// [`PeekResponse`]: super::response::PeekResponse
258 /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
259 CancelPeek {
260 /// The identifier of the peek request to cancel.
261 ///
262 /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
263 uuid: Uuid,
264 },
265}
266
267/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
268/// if the controller attempt to reconcile them with different values
269/// for anything in this struct.
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct InstanceConfig {
272 /// Specification of introspection logging.
273 pub logging: LoggingConfig,
274 /// The offset relative to the replica startup at which it should expire. None disables feature.
275 pub expiration_offset: Option<Duration>,
276 /// The persist location where we can stash large peek results.
277 pub peek_stash_persist_location: PersistLocation,
278}
279
280impl InstanceConfig {
281 /// Check if the configuration is compatible with another configuration. This is true iff the
282 /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
283 /// the expiration offset.
284 ///
285 /// We consider a stricter offset compatible, which allows us to strengthen the value without
286 /// forcing replica restarts. However, it also means that replicas will only pick up the new
287 /// value after a restart.
288 pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
289 // Destructure to protect against adding fields in the future.
290 let InstanceConfig {
291 logging: self_logging,
292 expiration_offset: self_offset,
293 peek_stash_persist_location: self_peek_stash_persist_location,
294 } = self;
295 let InstanceConfig {
296 logging: other_logging,
297 expiration_offset: other_offset,
298 peek_stash_persist_location: other_peek_stash_persist_location,
299 } = other;
300
301 // Logging is compatible if exactly the same.
302 let logging_compatible = self_logging == other_logging;
303
304 // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
305 // is a smaller offset and strengthens the offset.
306 let self_offset = Antichain::from_iter(*self_offset);
307 let other_offset = Antichain::from_iter(*other_offset);
308 let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
309
310 let persist_location_compatible =
311 self_peek_stash_persist_location == other_peek_stash_persist_location;
312
313 logging_compatible && offset_compatible && persist_location_compatible
314 }
315}
316
317/// Compute instance configuration parameters.
318///
319/// Parameters can be set (`Some`) or unset (`None`).
320/// Unset parameters should be interpreted to mean "use the previous value".
321#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
322pub struct ComputeParameters {
323 /// An optional arbitrary string that describes the class of the workload
324 /// this compute instance is running (e.g., `production` or `staging`).
325 ///
326 /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
327 /// exported by the metrics registry associated with the compute instance.
328 pub workload_class: Option<Option<String>>,
329 /// The maximum allowed size in bytes for results of peeks and subscribes.
330 ///
331 /// Peeks and subscribes that would return results larger than this maximum return the
332 /// respective error responses instead:
333 /// * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
334 /// * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
335 ///
336 /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
337 /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
338 /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
339 pub max_result_size: Option<u64>,
340 /// Tracing configuration.
341 pub tracing: TracingParameters,
342 /// gRPC client configuration.
343 pub grpc_client: GrpcClientParameters,
344
345 /// Config updates for components migrated to `mz_dyncfg`.
346 pub dyncfg_updates: ConfigUpdates,
347}
348
349impl ComputeParameters {
350 /// Update the parameter values with the set ones from `other`.
351 pub fn update(&mut self, other: ComputeParameters) {
352 let ComputeParameters {
353 workload_class,
354 max_result_size,
355 tracing,
356 grpc_client,
357 dyncfg_updates,
358 } = other;
359
360 if workload_class.is_some() {
361 self.workload_class = workload_class;
362 }
363 if max_result_size.is_some() {
364 self.max_result_size = max_result_size;
365 }
366
367 self.tracing.update(tracing);
368 self.grpc_client.update(grpc_client);
369
370 self.dyncfg_updates.extend(dyncfg_updates);
371 }
372
373 /// Return whether all parameters are unset.
374 pub fn all_unset(&self) -> bool {
375 *self == Self::default()
376 }
377}
378
379/// Metadata specific to the peek variant.
380#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
381pub enum PeekTarget {
382 /// This peek is against an index. Since this should be held in memory on
383 /// the target cluster, no additional coordinates are necessary.
384 Index {
385 /// The id of the (possibly transient) index.
386 id: GlobalId,
387 },
388 /// This peek is against a Persist collection.
389 Persist {
390 /// The id of the backing Persist collection.
391 id: GlobalId,
392 /// The identifying metadata of the Persist shard.
393 metadata: CollectionMetadata,
394 },
395}
396
397impl PeekTarget {
398 /// Returns the ID of the peeked collection.
399 pub fn id(&self) -> GlobalId {
400 match self {
401 Self::Index { id } => *id,
402 Self::Persist { id, .. } => *id,
403 }
404 }
405}
406
407/// Peek a collection, either in an arrangement or Persist.
408///
409/// This request elicits data from the worker, by naming the
410/// collection and some actions to apply to the results before
411/// returning them.
412///
413/// The `timestamp` member must be valid for the arrangement that
414/// is referenced by `id`. This means that `AllowCompaction` for
415/// this arrangement should not pass `timestamp` before this command.
416/// Subsequent commands may arbitrarily compact the arrangements;
417/// the dataflow runners are responsible for ensuring that they can
418/// correctly answer the `Peek`.
419#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
420pub struct Peek<T = mz_repr::Timestamp> {
421 /// Target-specific metadata.
422 pub target: PeekTarget,
423 /// The relation description for the rows returned by this peek, before
424 /// applying the [RowSetFinishing] but _after_ applying the given
425 /// `map_filter_project`.
426 pub result_desc: RelationDesc,
427 /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
428 /// The vector is never empty.
429 pub literal_constraints: Option<Vec<Row>>,
430 /// The identifier of this peek request.
431 ///
432 /// Used in responses and cancellation requests.
433 pub uuid: Uuid,
434 /// The logical timestamp at which the collection is queried.
435 pub timestamp: T,
436 /// Actions to apply to the result set before returning them.
437 pub finishing: RowSetFinishing,
438 /// Linear operation to apply in-line on each result.
439 pub map_filter_project: mz_expr::SafeMfpPlan,
440 /// An `OpenTelemetryContext` to forward trace information along
441 /// to the compute worker to allow associating traces between
442 /// the compute controller and the compute worker.
443 pub otel_ctx: OpenTelemetryContext,
444}
445
446impl TryIntoProtocolNonce for ComputeCommand {
447 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
448 match self {
449 ComputeCommand::Hello { nonce } => Ok(nonce),
450 cmd => Err(cmd),
451 }
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458
459 /// Test to ensure the size of the `ComputeCommand` enum doesn't regress.
460 #[mz_ore::test]
461 fn test_compute_command_size() {
462 assert_eq!(std::mem::size_of::<ComputeCommand>(), 40);
463 }
464}