mz_compute_client/protocol/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::time::Duration;
13
14use mz_cluster_client::client::TryIntoProtocolNonce;
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::render_plan::RenderPlan;
17use mz_dyncfg::ConfigUpdates;
18use mz_expr::RowSetFinishing;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_persist_types::PersistLocation;
21use mz_repr::{GlobalId, RelationDesc, Row};
22use mz_service::params::GrpcClientParameters;
23use mz_storage_types::controller::CollectionMetadata;
24use mz_tracing::params::TracingParameters;
25use serde::{Deserialize, Serialize};
26use timely::progress::frontier::Antichain;
27use uuid::Uuid;
28
29use crate::logging::LoggingConfig;
30
31/// Compute protocol commands, sent by the compute controller to replicas.
32///
33/// Command sequences sent by the compute controller must be valid according to the [Protocol
34/// Stages].
35///
36/// [Protocol Stages]: super#protocol-stages
37#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
38pub enum ComputeCommand<T = mz_repr::Timestamp> {
39    /// `Hello` is the first command sent to a replica after a connection was established. It
40    /// provides the replica with meta information about the connection.
41    ///
42    /// This command is special in that it is broadcast to all workers of a multi-worker replica.
43    /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
44    /// which then distributes them to the other workers using a dataflow.
45    Hello {
46        /// A nonce unique to the current iteration of the compute protocol.
47        ///
48        /// The nonce allows identifying different iterations of the compute protocol. When the
49        /// compute controller connects to a replica, it must send a nonce that is different from
50        /// all nonces it sent to the same replica on previous connections. Multi-worker replicas
51        /// should use the nonce to ensure that their individual workers agree on which protocol
52        /// iteration they are in.
53        nonce: Uuid,
54    },
55
56    /// `CreateInstance` must be sent after `Hello` to complete the [Creation Stage] of the compute
57    /// protocol. Unlike `Hello`, it is only sent to the first worker of the replica, and then
58    /// distributed through the timely runtime. `CreateInstance` instructs the replica to
59    /// initialize its state to a point where it is ready to start maintaining dataflows.
60    ///
61    /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
62    /// dataflows according to the given [`LoggingConfig`].
63    ///
64    /// [Creation Stage]: super#creation-stage
65    CreateInstance(Box<InstanceConfig>),
66
67    /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
68    /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
69    /// its dataflow state matches the state requested by the computation commands it received
70    /// previously. The replica must now start sending responses to commands received previously,
71    /// if it opted to defer them during the [Initialization Stage].
72    ///
73    /// [Initialization Stage]: super#initialization-stage
74    InitializationComplete,
75
76    /// `AllowWrites` informs the replica that it can transition out of the
77    /// read-only computation stage and into the read-write computation stage.
78    /// It is now allowed to affect changes to external systems (writes).
79    ///
80    /// After initialization is complete, an instance starts out in the
81    /// read-only computation stage. Only when receiving this command will it go
82    /// out of that and allow running operations to do writes.
83    ///
84    /// An instance that has once been told that it can go into read-write mode
85    /// can never go out of that mode again. It is okay for a read-only
86    /// controller to re-connect to an instance that is already in read-write
87    /// mode: _someone_ has already told the instance that it is okay to write
88    /// and there is no way in the protocol to transition an instance back to
89    /// read-only mode.
90    ///
91    /// NOTE: We don't have a protocol in place that allows writes only after a
92    /// certain, controller-determined, timestamp. Such a protocol would allow
93    /// tighter control and could allow the instance to avoid work. However, it
94    /// is more work to put in place the logic for that so we leave it as future
95    /// work for now.
96    AllowWrites,
97
98    /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
99    /// given [`ComputeParameters`].
100    ///
101    /// This command is special in that, like `Hello`, it is broadcast to all workers of the
102    /// replica. However, unlike `Hello`, it is ignored by all workers except the first one, which
103    /// distributes the command to the other workers through the timely runtime.
104    /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
105    /// networking fabric to observe them and learn of configuration updates.
106    ///
107    /// Parameter updates transmitted through this command must be applied by the replica as soon
108    /// as it receives the command, and they must be applied globally to all replica state, even
109    /// dataflows and pending peeks that were created before the parameter update. This property
110    /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
111    ///
112    /// Configuration parameters that should not be applied globally, but only to specific
113    /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
114    /// rather than as [`ComputeParameters`].
115    UpdateConfiguration(Box<ComputeParameters>),
116
117    /// `CreateDataflow` instructs the replica to create a dataflow according to the given
118    /// [`DataflowDescription`].
119    ///
120    /// The [`DataflowDescription`] must have the following properties:
121    ///
122    ///   * Dataflow imports are valid:
123    ///     * Imported storage collections specified in [`source_imports`] exist and are readable by
124    ///       the compute replica.
125    ///     * Imported indexes specified in [`index_imports`] have been created on the replica
126    ///       previously, by previous `CreateDataflow` commands.
127    ///   * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
128    ///     imported collections are not beyond the dataflow [`as_of`].
129    ///   * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
130    ///     instructed to create do not repeat (within a single protocol iteration).
131    ///   * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
132    ///     to the dependency relation.
133    ///
134    /// A dataflow description that violates any of the above properties can cause the replica to
135    /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
136    /// should prefer panicking over producing incorrect results.
137    ///
138    /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
139    /// storage sinks, the replica must produce [`Frontiers`] responses that report the
140    /// advancement of the frontiers of these compute collections.
141    ///
142    /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
143    /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
144    /// subscribes.
145    ///
146    /// After receiving a `CreateDataflow` command, if the created dataflow exports copy-to sinks,
147    /// the replica must produce [`CopyToResponse`]s that report the results and completion of the
148    /// copy-to sinks.
149    ///
150    /// The replica may create the dataflow in a suspended state and defer starting the computation
151    /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
152    /// the compute controller should eventually send a `Schedule` command for each sent
153    /// `CreateDataflow` command.
154    ///
155    /// [`objects_to_build`]: DataflowDescription::objects_to_build
156    /// [`source_imports`]: DataflowDescription::source_imports
157    /// [`index_imports`]: DataflowDescription::index_imports
158    /// [`as_of`]: DataflowDescription::as_of
159    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
160    /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
161    /// [`CopyToResponse`]: super::response::ComputeResponse::CopyToResponse
162    CreateDataflow(Box<DataflowDescription<RenderPlan<T>, CollectionMetadata, T>>),
163
164    /// `Schedule` allows the replica to start computation for a compute collection.
165    ///
166    /// It is invalid to send a `Schedule` command that references a collection that was not
167    /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
168    /// to exhibit undefined behavior.
169    ///
170    /// It is also invalid to send a `Schedule` command that references a collection that has,
171    /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
172    Schedule(GlobalId),
173
174    /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
175    /// a compute collection exported by one of the replica's dataflows.
176    ///
177    /// The command names a collection and provides a frontier after which accumulations must be
178    /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
179    /// through that frontier.
180    ///
181    /// It is invalid to send an `AllowCompaction` command that references a compute collection
182    /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
183    /// the replica to exhibit undefined behavior.
184    ///
185    /// The `AllowCompaction` command only informs about external read requirements, not internal
186    /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
187    /// all times, so local dataflow inputs are not compacted beyond times at which they are still
188    /// being read from.
189    ///
190    /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
191    /// collections' current `upper` frontiers. This signals that external readers are not
192    /// interested in times up to the specified new read frontiers. Consequently, an empty read
193    /// frontier signals that external readers are not interested in updates from the corresponding
194    /// collection ever again, so the collection is not required anymore.
195    ///
196    /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
197    /// compute collections.
198    ///
199    /// A replica that receives an `AllowCompaction` command with the empty frontier must
200    /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
201    /// same collection. ([#16271])
202    ///
203    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
204    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
205    AllowCompaction {
206        /// TODO(database-issues#7533): Add documentation.
207        id: GlobalId,
208        /// TODO(database-issues#7533): Add documentation.
209        frontier: Antichain<T>,
210    },
211
212    /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
213    /// Persist-backed collection.
214    ///
215    /// The [`Peek`] description must have the following properties:
216    ///
217    ///   * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
218    ///     command. (If targeting a persist collection, that collection should exist.)
219    ///   * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
220    ///     perform do not repeat (within a single protocol iteration).
221    ///
222    /// A [`Peek`] description that violates any of the above properties can cause the replica to
223    /// exhibit undefined behavior.
224    ///
225    /// Specifying a [`Peek::timestamp`] that is less than the target index's `since` frontier does
226    /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
227    /// in response.
228    ///
229    /// After receiving a `Peek` command, the replica must eventually produce a single
230    /// [`PeekResponse`]:
231    ///
232    ///    * For peeks that were not cancelled: either [`Rows`] or [`Error`].
233    ///    * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
234    ///
235    /// [`PeekResponse`]: super::response::PeekResponse
236    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
237    /// [`Rows`]: super::response::PeekResponse::Rows
238    /// [`Error`]: super::response::PeekResponse::Error
239    /// [`Canceled`]: super::response::PeekResponse::Canceled
240    Peek(Box<Peek<T>>),
241
242    /// `CancelPeek` instructs the replica to cancel the identified pending peek.
243    ///
244    /// It is invalid to send a `CancelPeek` command that references a peek that was not created
245    /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
246    /// undefined behavior.
247    ///
248    /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
249    /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
250    /// return a different [`PeekResponse`], or it may already have returned a response to the
251    /// specified peek. In these cases it must *not* return another [`PeekResponse`].
252    ///
253    /// [`PeekResponse`]: super::response::PeekResponse
254    /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
255    CancelPeek {
256        /// The identifier of the peek request to cancel.
257        ///
258        /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
259        uuid: Uuid,
260    },
261}
262
263/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
264/// if the controller attempt to reconcile them with different values
265/// for anything in this struct.
266#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
267pub struct InstanceConfig {
268    /// Specification of introspection logging.
269    pub logging: LoggingConfig,
270    /// The offset relative to the replica startup at which it should expire. None disables feature.
271    pub expiration_offset: Option<Duration>,
272    /// The persist location where we can stash large peek results.
273    pub peek_stash_persist_location: PersistLocation,
274}
275
276impl InstanceConfig {
277    /// Check if the configuration is compatible with another configuration. This is true iff the
278    /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
279    /// the expiration offset.
280    ///
281    /// We consider a stricter offset compatible, which allows us to strengthen the value without
282    /// forcing replica restarts. However, it also means that replicas will only pick up the new
283    /// value after a restart.
284    pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
285        // Destructure to protect against adding fields in the future.
286        let InstanceConfig {
287            logging: self_logging,
288            expiration_offset: self_offset,
289            peek_stash_persist_location: self_peek_stash_persist_location,
290        } = self;
291        let InstanceConfig {
292            logging: other_logging,
293            expiration_offset: other_offset,
294            peek_stash_persist_location: other_peek_stash_persist_location,
295        } = other;
296
297        // Logging is compatible if exactly the same.
298        let logging_compatible = self_logging == other_logging;
299
300        // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
301        // is a smaller offset and strengthens the offset.
302        let self_offset = Antichain::from_iter(*self_offset);
303        let other_offset = Antichain::from_iter(*other_offset);
304        let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
305
306        let persist_location_compatible =
307            self_peek_stash_persist_location == other_peek_stash_persist_location;
308
309        logging_compatible && offset_compatible && persist_location_compatible
310    }
311}
312
313/// Compute instance configuration parameters.
314///
315/// Parameters can be set (`Some`) or unset (`None`).
316/// Unset parameters should be interpreted to mean "use the previous value".
317#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
318pub struct ComputeParameters {
319    /// An optional arbitrary string that describes the class of the workload
320    /// this compute instance is running (e.g., `production` or `staging`).
321    ///
322    /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
323    /// exported by the metrics registry associated with the compute instance.
324    pub workload_class: Option<Option<String>>,
325    /// The maximum allowed size in bytes for results of peeks and subscribes.
326    ///
327    /// Peeks and subscribes that would return results larger than this maximum return the
328    /// respective error responses instead:
329    ///   * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
330    ///   * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
331    ///
332    /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
333    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
334    /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
335    pub max_result_size: Option<u64>,
336    /// Tracing configuration.
337    pub tracing: TracingParameters,
338    /// gRPC client configuration.
339    pub grpc_client: GrpcClientParameters,
340
341    /// Config updates for components migrated to `mz_dyncfg`.
342    pub dyncfg_updates: ConfigUpdates,
343}
344
345impl ComputeParameters {
346    /// Update the parameter values with the set ones from `other`.
347    pub fn update(&mut self, other: ComputeParameters) {
348        let ComputeParameters {
349            workload_class,
350            max_result_size,
351            tracing,
352            grpc_client,
353            dyncfg_updates,
354        } = other;
355
356        if workload_class.is_some() {
357            self.workload_class = workload_class;
358        }
359        if max_result_size.is_some() {
360            self.max_result_size = max_result_size;
361        }
362
363        self.tracing.update(tracing);
364        self.grpc_client.update(grpc_client);
365
366        self.dyncfg_updates.extend(dyncfg_updates);
367    }
368
369    /// Return whether all parameters are unset.
370    pub fn all_unset(&self) -> bool {
371        *self == Self::default()
372    }
373}
374
375/// Metadata specific to the peek variant.
376#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
377pub enum PeekTarget {
378    /// This peek is against an index. Since this should be held in memory on
379    /// the target cluster, no additional coordinates are necessary.
380    Index {
381        /// The id of the (possibly transient) index.
382        id: GlobalId,
383    },
384    /// This peek is against a Persist collection.
385    Persist {
386        /// The id of the backing Persist collection.
387        id: GlobalId,
388        /// The identifying metadata of the Persist shard.
389        metadata: CollectionMetadata,
390    },
391}
392
393impl PeekTarget {
394    /// Returns the ID of the peeked collection.
395    pub fn id(&self) -> GlobalId {
396        match self {
397            Self::Index { id } => *id,
398            Self::Persist { id, .. } => *id,
399        }
400    }
401}
402
403/// Peek a collection, either in an arrangement or Persist.
404///
405/// This request elicits data from the worker, by naming the
406/// collection and some actions to apply to the results before
407/// returning them.
408///
409/// The `timestamp` member must be valid for the arrangement that
410/// is referenced by `id`. This means that `AllowCompaction` for
411/// this arrangement should not pass `timestamp` before this command.
412/// Subsequent commands may arbitrarily compact the arrangements;
413/// the dataflow runners are responsible for ensuring that they can
414/// correctly answer the `Peek`.
415#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
416pub struct Peek<T = mz_repr::Timestamp> {
417    /// Target-specific metadata.
418    pub target: PeekTarget,
419    /// The relation description for the rows returned by this peek, before
420    /// applying the [RowSetFinishing] but _after_ applying the given
421    /// `map_filter_project`.
422    pub result_desc: RelationDesc,
423    /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
424    /// The vector is never empty.
425    pub literal_constraints: Option<Vec<Row>>,
426    /// The identifier of this peek request.
427    ///
428    /// Used in responses and cancellation requests.
429    pub uuid: Uuid,
430    /// The logical timestamp at which the collection is queried.
431    pub timestamp: T,
432    /// Actions to apply to the result set before returning them.
433    pub finishing: RowSetFinishing,
434    /// Linear operation to apply in-line on each result.
435    pub map_filter_project: mz_expr::SafeMfpPlan,
436    /// An `OpenTelemetryContext` to forward trace information along
437    /// to the compute worker to allow associating traces between
438    /// the compute controller and the compute worker.
439    pub otel_ctx: OpenTelemetryContext,
440}
441
442impl TryIntoProtocolNonce for ComputeCommand {
443    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
444        match self {
445            ComputeCommand::Hello { nonce } => Ok(nonce),
446            cmd => Err(cmd),
447        }
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    /// Test to ensure the size of the `ComputeCommand` enum doesn't regress.
456    #[mz_ore::test]
457    fn test_compute_command_size() {
458        assert_eq!(std::mem::size_of::<ComputeCommand>(), 40);
459    }
460}