mz_compute_client/protocol/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::time::Duration;
13
14use mz_cluster_client::client::TryIntoProtocolNonce;
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::render_plan::RenderPlan;
17use mz_dyncfg::ConfigUpdates;
18use mz_expr::RowSetFinishing;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_persist_types::PersistLocation;
21use mz_repr::{GlobalId, RelationDesc, Row};
22use mz_service::params::GrpcClientParameters;
23use mz_storage_types::controller::CollectionMetadata;
24use mz_tracing::params::TracingParameters;
25use serde::{Deserialize, Serialize};
26use timely::progress::frontier::Antichain;
27use uuid::Uuid;
28
29use crate::logging::LoggingConfig;
30
31/// Compute protocol commands, sent by the compute controller to replicas.
32///
33/// Command sequences sent by the compute controller must be valid according to the [Protocol
34/// Stages].
35///
36/// [Protocol Stages]: super#protocol-stages
37#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
38pub enum ComputeCommand<T = mz_repr::Timestamp> {
39    /// `Hello` is the first command sent to a replica after a connection was established. It
40    /// provides the replica with meta information about the connection.
41    ///
42    /// This command is special in that it is broadcast to all workers of a multi-worker replica.
43    /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
44    /// which then distributes them to the other workers using a dataflow.
45    Hello {
46        /// A nonce unique to the current iteration of the compute protocol.
47        ///
48        /// The nonce allows identifying different iterations of the compute protocol. When the
49        /// compute controller connects to a replica, it must send a nonce that is different from
50        /// all nonces it sent to the same replica on previous connections. Multi-worker replicas
51        /// should use the nonce to ensure that their individual workers agree on which protocol
52        /// iteration they are in.
53        nonce: Uuid,
54    },
55
56    /// `CreateInstance` must be sent after `Hello` to complete the [Creation Stage] of the compute
57    /// protocol. Unlike `Hello`, it is only sent to the first worker of the replica, and then
58    /// distributed through the timely runtime. `CreateInstance` instructs the replica to
59    /// initialize its state to a point where it is ready to start maintaining dataflows.
60    ///
61    /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
62    /// dataflows according to the given [`LoggingConfig`].
63    ///
64    /// [Creation Stage]: super#creation-stage
65    CreateInstance(Box<InstanceConfig>),
66
67    /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
68    /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
69    /// its dataflow state matches the state requested by the computation commands it received
70    /// previously. The replica must now start sending responses to commands received previously,
71    /// if it opted to defer them during the [Initialization Stage].
72    ///
73    /// [Initialization Stage]: super#initialization-stage
74    InitializationComplete,
75
76    /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
77    /// given [`ComputeParameters`].
78    ///
79    /// This command is special in that, like `Hello`, it is broadcast to all workers of the
80    /// replica. However, unlike `Hello`, it is ignored by all workers except the first one, which
81    /// distributes the command to the other workers through the timely runtime.
82    /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
83    /// networking fabric to observe them and learn of configuration updates.
84    ///
85    /// Parameter updates transmitted through this command must be applied by the replica as soon
86    /// as it receives the command, and they must be applied globally to all replica state, even
87    /// dataflows and pending peeks that were created before the parameter update. This property
88    /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
89    ///
90    /// Configuration parameters that should not be applied globally, but only to specific
91    /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
92    /// rather than as [`ComputeParameters`].
93    UpdateConfiguration(Box<ComputeParameters>),
94
95    /// `CreateDataflow` instructs the replica to create a dataflow according to the given
96    /// [`DataflowDescription`].
97    ///
98    /// The [`DataflowDescription`] must have the following properties:
99    ///
100    ///   * Dataflow imports are valid:
101    ///     * Imported storage collections specified in [`source_imports`] exist and are readable by
102    ///       the compute replica.
103    ///     * Imported indexes specified in [`index_imports`] have been created on the replica
104    ///       previously, by previous `CreateDataflow` commands.
105    ///   * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
106    ///     imported collections are not beyond the dataflow [`as_of`].
107    ///   * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
108    ///     instructed to create do not repeat (within a single protocol iteration).
109    ///   * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
110    ///     to the dependency relation.
111    ///
112    /// A dataflow description that violates any of the above properties can cause the replica to
113    /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
114    /// should prefer panicking over producing incorrect results.
115    ///
116    /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
117    /// storage sinks, the replica must produce [`Frontiers`] responses that report the
118    /// advancement of the frontiers of these compute collections.
119    ///
120    /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
121    /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
122    /// subscribes.
123    ///
124    /// After receiving a `CreateDataflow` command, if the created dataflow exports copy-to sinks,
125    /// the replica must produce [`CopyToResponse`]s that report the results and completion of the
126    /// copy-to sinks.
127    ///
128    /// After receiving a `CreateDataflow` command, if the created dataflow exports a persist sink
129    /// (materialized view), the dataflow must not write any data to the sink until the replica
130    /// receives an `AllowWrites` command for the corresponding collection.
131    ///
132    /// The replica may create the dataflow in a suspended state and defer starting the computation
133    /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
134    /// the compute controller should eventually send a `Schedule` command for each sent
135    /// `CreateDataflow` command.
136    ///
137    /// [`objects_to_build`]: DataflowDescription::objects_to_build
138    /// [`source_imports`]: DataflowDescription::source_imports
139    /// [`index_imports`]: DataflowDescription::index_imports
140    /// [`as_of`]: DataflowDescription::as_of
141    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
142    /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
143    /// [`CopyToResponse`]: super::response::ComputeResponse::CopyToResponse
144    CreateDataflow(Box<DataflowDescription<RenderPlan<T>, CollectionMetadata, T>>),
145
146    /// `Schedule` allows the replica to start computation for a compute collection.
147    ///
148    /// It is invalid to send a `Schedule` command that references a collection that was not
149    /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
150    /// to exhibit undefined behavior.
151    ///
152    /// It is also invalid to send a `Schedule` command that references a collection that has,
153    /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
154    Schedule(GlobalId),
155
156    /// `AllowWrites` allows the replica to start writing external state for a compute collection.
157    ///
158    /// It is invalid to send an `AllowWrites` command that references a compute collection
159    /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
160    /// the replica to exhibit undefined behavior.
161    ///
162    /// It is also invalid to send an `AllowWrites` command for a compute collection that has
163    /// been allowed to compact to the empty frontier through an `AllowCompaction` command before.
164    ///
165    /// NOTE: This command only allows writes to persist. Other external state, such as copy-to
166    /// sinks, do not need an explicit permission to write.
167    ///
168    /// NOTE: We don't have a protocol in place that allows writes only after a
169    /// certain, controller-determined, timestamp. Such a protocol would allow
170    /// tighter control and could allow the instance to avoid work. However, it
171    /// is more work to put in place the logic for that so we leave it as future
172    /// work for now.
173    ///
174    /// Note: The `AllowWrites` command is per collection, but a dataflow could host multiple
175    /// collections. We're accepting this impedance mismatch for now.
176    AllowWrites(GlobalId),
177
178    /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
179    /// a compute collection exported by one of the replica's dataflows.
180    ///
181    /// The command names a collection and provides a frontier after which accumulations must be
182    /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
183    /// through that frontier.
184    ///
185    /// It is invalid to send an `AllowCompaction` command that references a compute collection
186    /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
187    /// the replica to exhibit undefined behavior.
188    ///
189    /// The `AllowCompaction` command only informs about external read requirements, not internal
190    /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
191    /// all times, so local dataflow inputs are not compacted beyond times at which they are still
192    /// being read from.
193    ///
194    /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
195    /// collections' current `upper` frontiers. This signals that external readers are not
196    /// interested in times up to the specified new read frontiers. Consequently, an empty read
197    /// frontier signals that external readers are not interested in updates from the corresponding
198    /// collection ever again, so the collection is not required anymore.
199    ///
200    /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
201    /// compute collections.
202    ///
203    /// A replica that receives an `AllowCompaction` command with the empty frontier must
204    /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
205    /// same collection. ([#16271])
206    ///
207    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
208    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
209    AllowCompaction {
210        /// TODO(database-issues#7533): Add documentation.
211        id: GlobalId,
212        /// TODO(database-issues#7533): Add documentation.
213        frontier: Antichain<T>,
214    },
215
216    /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
217    /// Persist-backed collection.
218    ///
219    /// The [`Peek`] description must have the following properties:
220    ///
221    ///   * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
222    ///     command. (If targeting a persist collection, that collection should exist.)
223    ///   * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
224    ///     perform do not repeat (within a single protocol iteration).
225    ///
226    /// A [`Peek`] description that violates any of the above properties can cause the replica to
227    /// exhibit undefined behavior.
228    ///
229    /// Specifying a [`Peek::timestamp`] that is less than the target index's `since` frontier does
230    /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
231    /// in response.
232    ///
233    /// After receiving a `Peek` command, the replica must eventually produce a single
234    /// [`PeekResponse`]:
235    ///
236    ///    * For peeks that were not cancelled: either [`Rows`] or [`Error`].
237    ///    * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
238    ///
239    /// [`PeekResponse`]: super::response::PeekResponse
240    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
241    /// [`Rows`]: super::response::PeekResponse::Rows
242    /// [`Error`]: super::response::PeekResponse::Error
243    /// [`Canceled`]: super::response::PeekResponse::Canceled
244    Peek(Box<Peek<T>>),
245
246    /// `CancelPeek` instructs the replica to cancel the identified pending peek.
247    ///
248    /// It is invalid to send a `CancelPeek` command that references a peek that was not created
249    /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
250    /// undefined behavior.
251    ///
252    /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
253    /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
254    /// return a different [`PeekResponse`], or it may already have returned a response to the
255    /// specified peek. In these cases it must *not* return another [`PeekResponse`].
256    ///
257    /// [`PeekResponse`]: super::response::PeekResponse
258    /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
259    CancelPeek {
260        /// The identifier of the peek request to cancel.
261        ///
262        /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
263        uuid: Uuid,
264    },
265}
266
267/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
268/// if the controller attempt to reconcile them with different values
269/// for anything in this struct.
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct InstanceConfig {
272    /// Specification of introspection logging.
273    pub logging: LoggingConfig,
274    /// The offset relative to the replica startup at which it should expire. None disables feature.
275    pub expiration_offset: Option<Duration>,
276    /// The persist location where we can stash large peek results.
277    pub peek_stash_persist_location: PersistLocation,
278}
279
280impl InstanceConfig {
281    /// Check if the configuration is compatible with another configuration. This is true iff the
282    /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
283    /// the expiration offset.
284    ///
285    /// We consider a stricter offset compatible, which allows us to strengthen the value without
286    /// forcing replica restarts. However, it also means that replicas will only pick up the new
287    /// value after a restart.
288    pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
289        // Destructure to protect against adding fields in the future.
290        let InstanceConfig {
291            logging: self_logging,
292            expiration_offset: self_offset,
293            peek_stash_persist_location: self_peek_stash_persist_location,
294        } = self;
295        let InstanceConfig {
296            logging: other_logging,
297            expiration_offset: other_offset,
298            peek_stash_persist_location: other_peek_stash_persist_location,
299        } = other;
300
301        // Logging is compatible if exactly the same.
302        let logging_compatible = self_logging == other_logging;
303
304        // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
305        // is a smaller offset and strengthens the offset.
306        let self_offset = Antichain::from_iter(*self_offset);
307        let other_offset = Antichain::from_iter(*other_offset);
308        let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
309
310        let persist_location_compatible =
311            self_peek_stash_persist_location == other_peek_stash_persist_location;
312
313        logging_compatible && offset_compatible && persist_location_compatible
314    }
315}
316
317/// Compute instance configuration parameters.
318///
319/// Parameters can be set (`Some`) or unset (`None`).
320/// Unset parameters should be interpreted to mean "use the previous value".
321#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
322pub struct ComputeParameters {
323    /// An optional arbitrary string that describes the class of the workload
324    /// this compute instance is running (e.g., `production` or `staging`).
325    ///
326    /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
327    /// exported by the metrics registry associated with the compute instance.
328    pub workload_class: Option<Option<String>>,
329    /// The maximum allowed size in bytes for results of peeks and subscribes.
330    ///
331    /// Peeks and subscribes that would return results larger than this maximum return the
332    /// respective error responses instead:
333    ///   * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
334    ///   * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
335    ///
336    /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
337    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
338    /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
339    pub max_result_size: Option<u64>,
340    /// Tracing configuration.
341    pub tracing: TracingParameters,
342    /// gRPC client configuration.
343    pub grpc_client: GrpcClientParameters,
344
345    /// Config updates for components migrated to `mz_dyncfg`.
346    pub dyncfg_updates: ConfigUpdates,
347}
348
349impl ComputeParameters {
350    /// Update the parameter values with the set ones from `other`.
351    pub fn update(&mut self, other: ComputeParameters) {
352        let ComputeParameters {
353            workload_class,
354            max_result_size,
355            tracing,
356            grpc_client,
357            dyncfg_updates,
358        } = other;
359
360        if workload_class.is_some() {
361            self.workload_class = workload_class;
362        }
363        if max_result_size.is_some() {
364            self.max_result_size = max_result_size;
365        }
366
367        self.tracing.update(tracing);
368        self.grpc_client.update(grpc_client);
369
370        self.dyncfg_updates.extend(dyncfg_updates);
371    }
372
373    /// Return whether all parameters are unset.
374    pub fn all_unset(&self) -> bool {
375        *self == Self::default()
376    }
377}
378
379/// Metadata specific to the peek variant.
380#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
381pub enum PeekTarget {
382    /// This peek is against an index. Since this should be held in memory on
383    /// the target cluster, no additional coordinates are necessary.
384    Index {
385        /// The id of the (possibly transient) index.
386        id: GlobalId,
387    },
388    /// This peek is against a Persist collection.
389    Persist {
390        /// The id of the backing Persist collection.
391        id: GlobalId,
392        /// The identifying metadata of the Persist shard.
393        metadata: CollectionMetadata,
394    },
395}
396
397impl PeekTarget {
398    /// Returns the ID of the peeked collection.
399    pub fn id(&self) -> GlobalId {
400        match self {
401            Self::Index { id } => *id,
402            Self::Persist { id, .. } => *id,
403        }
404    }
405}
406
407/// Peek a collection, either in an arrangement or Persist.
408///
409/// This request elicits data from the worker, by naming the
410/// collection and some actions to apply to the results before
411/// returning them.
412///
413/// The `timestamp` member must be valid for the arrangement that
414/// is referenced by `id`. This means that `AllowCompaction` for
415/// this arrangement should not pass `timestamp` before this command.
416/// Subsequent commands may arbitrarily compact the arrangements;
417/// the dataflow runners are responsible for ensuring that they can
418/// correctly answer the `Peek`.
419#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
420pub struct Peek<T = mz_repr::Timestamp> {
421    /// Target-specific metadata.
422    pub target: PeekTarget,
423    /// The relation description for the rows returned by this peek, before
424    /// applying the [RowSetFinishing] but _after_ applying the given
425    /// `map_filter_project`.
426    pub result_desc: RelationDesc,
427    /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
428    /// The vector is never empty.
429    pub literal_constraints: Option<Vec<Row>>,
430    /// The identifier of this peek request.
431    ///
432    /// Used in responses and cancellation requests.
433    pub uuid: Uuid,
434    /// The logical timestamp at which the collection is queried.
435    pub timestamp: T,
436    /// Actions to apply to the result set before returning them.
437    pub finishing: RowSetFinishing,
438    /// Linear operation to apply in-line on each result.
439    pub map_filter_project: mz_expr::SafeMfpPlan,
440    /// An `OpenTelemetryContext` to forward trace information along
441    /// to the compute worker to allow associating traces between
442    /// the compute controller and the compute worker.
443    pub otel_ctx: OpenTelemetryContext,
444}
445
446impl TryIntoProtocolNonce for ComputeCommand {
447    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
448        match self {
449            ComputeCommand::Hello { nonce } => Ok(nonce),
450            cmd => Err(cmd),
451        }
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    /// Test to ensure the size of the `ComputeCommand` enum doesn't regress.
460    #[mz_ore::test]
461    fn test_compute_command_size() {
462        assert_eq!(std::mem::size_of::<ComputeCommand>(), 40);
463    }
464}