mz_compute_client/protocol/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::str::FromStr;
13use std::time::Duration;
14
15use mz_cluster_client::client::TryIntoProtocolNonce;
16use mz_compute_types::dataflows::DataflowDescription;
17use mz_compute_types::plan::render_plan::RenderPlan;
18use mz_dyncfg::ConfigUpdates;
19use mz_expr::RowSetFinishing;
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_ore::url::SensitiveUrl;
22use mz_persist_types::PersistLocation;
23use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
24use mz_repr::{GlobalId, RelationDesc, Row};
25use mz_service::params::GrpcClientParameters;
26use mz_storage_client::client::ProtoCompaction;
27use mz_storage_types::controller::CollectionMetadata;
28use mz_timely_util::progress::any_antichain;
29use mz_tracing::params::TracingParameters;
30use proptest::prelude::{Arbitrary, any};
31use proptest::strategy::{BoxedStrategy, Strategy, Union};
32use proptest_derive::Arbitrary;
33use serde::{Deserialize, Serialize};
34use timely::progress::frontier::Antichain;
35use uuid::Uuid;
36
37use crate::logging::LoggingConfig;
38
39include!(concat!(
40    env!("OUT_DIR"),
41    "/mz_compute_client.protocol.command.rs"
42));
43
44/// Compute protocol commands, sent by the compute controller to replicas.
45///
46/// Command sequences sent by the compute controller must be valid according to the [Protocol
47/// Stages].
48///
49/// [Protocol Stages]: super#protocol-stages
50#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
51pub enum ComputeCommand<T = mz_repr::Timestamp> {
52    /// `Hello` is the first command sent to a replica after a connection was established. It
53    /// provides the replica with meta information about the connection.
54    ///
55    /// This command is special in that it is broadcast to all workers of a multi-worker replica.
56    /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
57    /// which then distributes them to the other workers using a dataflow.
58    Hello {
59        /// A nonce unique to the current iteration of the compute protocol.
60        ///
61        /// The nonce allows identifying different iterations of the compute protocol. When the
62        /// compute controller connects to a replica, it must send a nonce that is different from
63        /// all nonces it sent to the same replica on previous connections. Multi-worker replicas
64        /// should use the nonce to ensure that their individual workers agree on which protocol
65        /// iteration they are in.
66        nonce: Uuid,
67    },
68
69    /// `CreateInstance` must be sent after `Hello` to complete the [Creation Stage] of the compute
70    /// protocol. Unlike `Hello`, it is only sent to the first worker of the replica, and then
71    /// distributed through the timely runtime. `CreateInstance` instructs the replica to
72    /// initialize its state to a point where it is ready to start maintaining dataflows.
73    ///
74    /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
75    /// dataflows according to the given [`LoggingConfig`].
76    ///
77    /// [Creation Stage]: super#creation-stage
78    CreateInstance(Box<InstanceConfig>),
79
80    /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
81    /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
82    /// its dataflow state matches the state requested by the computation commands it received
83    /// previously. The replica must now start sending responses to commands received previously,
84    /// if it opted to defer them during the [Initialization Stage].
85    ///
86    /// [Initialization Stage]: super#initialization-stage
87    InitializationComplete,
88
89    /// `AllowWrites` informs the replica that it can transition out of the
90    /// read-only computation stage and into the read-write computation stage.
91    /// It is now allowed to affect changes to external systems (writes).
92    ///
93    /// After initialization is complete, an instance starts out in the
94    /// read-only computation stage. Only when receiving this command will it go
95    /// out of that and allow running operations to do writes.
96    ///
97    /// An instance that has once been told that it can go into read-write mode
98    /// can never go out of that mode again. It is okay for a read-only
99    /// controller to re-connect to an instance that is already in read-write
100    /// mode: _someone_ has already told the instance that it is okay to write
101    /// and there is no way in the protocol to transition an instance back to
102    /// read-only mode.
103    ///
104    /// NOTE: We don't have a protocol in place that allows writes only after a
105    /// certain, controller-determined, timestamp. Such a protocol would allow
106    /// tighter control and could allow the instance to avoid work. However, it
107    /// is more work to put in place the logic for that so we leave it as future
108    /// work for now.
109    AllowWrites,
110
111    /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
112    /// given [`ComputeParameters`].
113    ///
114    /// This command is special in that, like `Hello`, it is broadcast to all workers of the
115    /// replica. However, unlike `Hello`, it is ignored by all workers except the first one, which
116    /// distributes the command to the other workers through the timely runtime.
117    /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
118    /// networking fabric to observe them and learn of configuration updates.
119    ///
120    /// Parameter updates transmitted through this command must be applied by the replica as soon
121    /// as it receives the command, and they must be applied globally to all replica state, even
122    /// dataflows and pending peeks that were created before the parameter update. This property
123    /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
124    ///
125    /// Configuration parameters that should not be applied globally, but only to specific
126    /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
127    /// rather than as [`ComputeParameters`].
128    UpdateConfiguration(Box<ComputeParameters>),
129
130    /// `CreateDataflow` instructs the replica to create a dataflow according to the given
131    /// [`DataflowDescription`].
132    ///
133    /// The [`DataflowDescription`] must have the following properties:
134    ///
135    ///   * Dataflow imports are valid:
136    ///     * Imported storage collections specified in [`source_imports`] exist and are readable by
137    ///       the compute replica.
138    ///     * Imported indexes specified in [`index_imports`] have been created on the replica
139    ///       previously, by previous `CreateDataflow` commands.
140    ///   * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
141    ///     imported collections are not beyond the dataflow [`as_of`].
142    ///   * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
143    ///     instructed to create do not repeat (within a single protocol iteration).
144    ///   * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
145    ///     to the dependency relation.
146    ///
147    /// A dataflow description that violates any of the above properties can cause the replica to
148    /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
149    /// should prefer panicking over producing incorrect results.
150    ///
151    /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
152    /// storage sinks, the replica must produce [`Frontiers`] responses that report the
153    /// advancement of the frontiers of these compute collections.
154    ///
155    /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
156    /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
157    /// subscribes.
158    ///
159    /// After receiving a `CreateDataflow` command, if the created dataflow exports copy-to sinks,
160    /// the replica must produce [`CopyToResponse`]s that report the results and completion of the
161    /// copy-to sinks.
162    ///
163    /// The replica may create the dataflow in a suspended state and defer starting the computation
164    /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
165    /// the compute controller should eventually send a `Schedule` command for each sent
166    /// `CreateDataflow` command.
167    ///
168    /// [`objects_to_build`]: DataflowDescription::objects_to_build
169    /// [`source_imports`]: DataflowDescription::source_imports
170    /// [`index_imports`]: DataflowDescription::index_imports
171    /// [`as_of`]: DataflowDescription::as_of
172    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
173    /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
174    /// [`CopyToResponse`]: super::response::ComputeResponse::CopyToResponse
175    CreateDataflow(Box<DataflowDescription<RenderPlan<T>, CollectionMetadata, T>>),
176
177    /// `Schedule` allows the replica to start computation for a compute collection.
178    ///
179    /// It is invalid to send a `Schedule` command that references a collection that was not
180    /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
181    /// to exhibit undefined behavior.
182    ///
183    /// It is also invalid to send a `Schedule` command that references a collection that has,
184    /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
185    Schedule(GlobalId),
186
187    /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
188    /// a compute collection exported by one of the replica's dataflows.
189    ///
190    /// The command names a collection and provides a frontier after which accumulations must be
191    /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
192    /// through that frontier.
193    ///
194    /// It is invalid to send an `AllowCompaction` command that references a compute collection
195    /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
196    /// the replica to exhibit undefined behavior.
197    ///
198    /// The `AllowCompaction` command only informs about external read requirements, not internal
199    /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
200    /// all times, so local dataflow inputs are not compacted beyond times at which they are still
201    /// being read from.
202    ///
203    /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
204    /// collections' current `upper` frontiers. This signals that external readers are not
205    /// interested in times up to the specified new read frontiers. Consequently, an empty read
206    /// frontier signals that external readers are not interested in updates from the corresponding
207    /// collection ever again, so the collection is not required anymore.
208    ///
209    /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
210    /// compute collections.
211    ///
212    /// A replica that receives an `AllowCompaction` command with the empty frontier must
213    /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
214    /// same collection. ([#16271])
215    ///
216    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
217    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
218    AllowCompaction {
219        /// TODO(database-issues#7533): Add documentation.
220        id: GlobalId,
221        /// TODO(database-issues#7533): Add documentation.
222        frontier: Antichain<T>,
223    },
224
225    /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
226    /// Persist-backed collection.
227    ///
228    /// The [`Peek`] description must have the following properties:
229    ///
230    ///   * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
231    ///     command. (If targeting a persist collection, that collection should exist.)
232    ///   * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
233    ///     perform do not repeat (within a single protocol iteration).
234    ///
235    /// A [`Peek`] description that violates any of the above properties can cause the replica to
236    /// exhibit undefined behavior.
237    ///
238    /// Specifying a [`Peek::timestamp`] that is less than the target index's `since` frontier does
239    /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
240    /// in response.
241    ///
242    /// After receiving a `Peek` command, the replica must eventually produce a single
243    /// [`PeekResponse`]:
244    ///
245    ///    * For peeks that were not cancelled: either [`Rows`] or [`Error`].
246    ///    * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
247    ///
248    /// [`PeekResponse`]: super::response::PeekResponse
249    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
250    /// [`Rows`]: super::response::PeekResponse::Rows
251    /// [`Error`]: super::response::PeekResponse::Error
252    /// [`Canceled`]: super::response::PeekResponse::Canceled
253    Peek(Box<Peek<T>>),
254
255    /// `CancelPeek` instructs the replica to cancel the identified pending peek.
256    ///
257    /// It is invalid to send a `CancelPeek` command that references a peek that was not created
258    /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
259    /// undefined behavior.
260    ///
261    /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
262    /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
263    /// return a different [`PeekResponse`], or it may already have returned a response to the
264    /// specified peek. In these cases it must *not* return another [`PeekResponse`].
265    ///
266    /// [`PeekResponse`]: super::response::PeekResponse
267    /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
268    CancelPeek {
269        /// The identifier of the peek request to cancel.
270        ///
271        /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
272        uuid: Uuid,
273    },
274}
275
276impl RustType<ProtoComputeCommand> for ComputeCommand<mz_repr::Timestamp> {
277    fn into_proto(&self) -> ProtoComputeCommand {
278        use proto_compute_command::Kind::*;
279        use proto_compute_command::*;
280        ProtoComputeCommand {
281            kind: Some(match self {
282                ComputeCommand::Hello { nonce } => Hello(ProtoHello {
283                    nonce: Some(nonce.into_proto()),
284                }),
285                ComputeCommand::CreateInstance(config) => CreateInstance(*config.into_proto()),
286                ComputeCommand::InitializationComplete => InitializationComplete(()),
287                ComputeCommand::UpdateConfiguration(params) => {
288                    UpdateConfiguration(*params.into_proto())
289                }
290                ComputeCommand::CreateDataflow(dataflow) => CreateDataflow(*dataflow.into_proto()),
291                ComputeCommand::Schedule(id) => Schedule(id.into_proto()),
292                ComputeCommand::AllowCompaction { id, frontier } => {
293                    AllowCompaction(ProtoCompaction {
294                        id: Some(id.into_proto()),
295                        frontier: Some(frontier.into_proto()),
296                    })
297                }
298                ComputeCommand::Peek(peek) => Peek(*peek.into_proto()),
299                ComputeCommand::CancelPeek { uuid } => CancelPeek(uuid.into_proto()),
300                ComputeCommand::AllowWrites => AllowWrites(()),
301            }),
302        }
303    }
304
305    fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError> {
306        use proto_compute_command::Kind::*;
307        use proto_compute_command::*;
308        match proto.kind {
309            Some(Hello(ProtoHello { nonce })) => Ok(ComputeCommand::Hello {
310                nonce: nonce.into_rust_if_some("ProtoHello::nonce")?,
311            }),
312            Some(CreateInstance(config)) => {
313                let config = Box::new(config.into_rust()?);
314                Ok(ComputeCommand::CreateInstance(config))
315            }
316            Some(InitializationComplete(())) => Ok(ComputeCommand::InitializationComplete),
317            Some(UpdateConfiguration(params)) => {
318                let params = Box::new(params.into_rust()?);
319                Ok(ComputeCommand::UpdateConfiguration(params))
320            }
321            Some(CreateDataflow(dataflow)) => {
322                let dataflow = Box::new(dataflow.into_rust()?);
323                Ok(ComputeCommand::CreateDataflow(dataflow))
324            }
325            Some(Schedule(id)) => Ok(ComputeCommand::Schedule(id.into_rust()?)),
326            Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
327                Ok(ComputeCommand::AllowCompaction {
328                    id: id.into_rust_if_some("ProtoAllowCompaction::id")?,
329                    frontier: frontier.into_rust_if_some("ProtoAllowCompaction::frontier")?,
330                })
331            }
332            Some(Peek(peek)) => {
333                let peek = Box::new(peek.into_rust()?);
334                Ok(ComputeCommand::Peek(peek))
335            }
336            Some(CancelPeek(uuid)) => Ok(ComputeCommand::CancelPeek {
337                uuid: uuid.into_rust()?,
338            }),
339            Some(AllowWrites(())) => Ok(ComputeCommand::AllowWrites),
340            None => Err(TryFromProtoError::missing_field(
341                "ProtoComputeCommand::kind",
342            )),
343        }
344    }
345}
346
347impl Arbitrary for ComputeCommand<mz_repr::Timestamp> {
348    type Strategy = Union<BoxedStrategy<Self>>;
349    type Parameters = ();
350
351    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
352        Union::new(vec![
353            any::<InstanceConfig>()
354                .prop_map(Box::new)
355                .prop_map(ComputeCommand::CreateInstance)
356                .boxed(),
357            any::<ComputeParameters>()
358                .prop_map(Box::new)
359                .prop_map(ComputeCommand::UpdateConfiguration)
360                .boxed(),
361            any::<DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp>>()
362                .prop_map(Box::new)
363                .prop_map(ComputeCommand::CreateDataflow)
364                .boxed(),
365            any::<GlobalId>().prop_map(ComputeCommand::Schedule).boxed(),
366            (any::<GlobalId>(), any_antichain())
367                .prop_map(|(id, frontier)| ComputeCommand::AllowCompaction { id, frontier })
368                .boxed(),
369            any::<Peek>()
370                .prop_map(Box::new)
371                .prop_map(ComputeCommand::Peek)
372                .boxed(),
373            any_uuid()
374                .prop_map(|uuid| ComputeCommand::CancelPeek { uuid })
375                .boxed(),
376        ])
377    }
378}
379
380/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
381/// if the controller attempt to reconcile them with different values
382/// for anything in this struct.
383#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Arbitrary)]
384pub struct InstanceConfig {
385    /// Specification of introspection logging.
386    pub logging: LoggingConfig,
387    /// The offset relative to the replica startup at which it should expire. None disables feature.
388    pub expiration_offset: Option<Duration>,
389    /// The persist location where we can stash large peek results.
390    pub peek_stash_persist_location: PersistLocation,
391}
392
393impl InstanceConfig {
394    /// Check if the configuration is compatible with another configuration. This is true iff the
395    /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
396    /// the expiration offset.
397    ///
398    /// We consider a stricter offset compatible, which allows us to strengthen the value without
399    /// forcing replica restarts. However, it also means that replicas will only pick up the new
400    /// value after a restart.
401    pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
402        // Destructure to protect against adding fields in the future.
403        let InstanceConfig {
404            logging: self_logging,
405            expiration_offset: self_offset,
406            peek_stash_persist_location: self_peek_stash_persist_location,
407        } = self;
408        let InstanceConfig {
409            logging: other_logging,
410            expiration_offset: other_offset,
411            peek_stash_persist_location: other_peek_stash_persist_location,
412        } = other;
413
414        // Logging is compatible if exactly the same.
415        let logging_compatible = self_logging == other_logging;
416
417        // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
418        // is a smaller offset and strengthens the offset.
419        let self_offset = Antichain::from_iter(*self_offset);
420        let other_offset = Antichain::from_iter(*other_offset);
421        let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
422
423        let persist_location_compatible =
424            self_peek_stash_persist_location == other_peek_stash_persist_location;
425
426        logging_compatible && offset_compatible && persist_location_compatible
427    }
428}
429
430impl RustType<ProtoInstanceConfig> for InstanceConfig {
431    fn into_proto(&self) -> ProtoInstanceConfig {
432        ProtoInstanceConfig {
433            logging: Some(self.logging.into_proto()),
434            expiration_offset: self.expiration_offset.into_proto(),
435            peek_stash_blob_uri: self
436                .peek_stash_persist_location
437                .blob_uri
438                .to_string_unredacted(),
439            peek_stash_consensus_uri: self
440                .peek_stash_persist_location
441                .consensus_uri
442                .to_string_unredacted(),
443        }
444    }
445
446    fn from_proto(proto: ProtoInstanceConfig) -> Result<Self, TryFromProtoError> {
447        Ok(Self {
448            logging: proto
449                .logging
450                .into_rust_if_some("ProtoCreateInstance::logging")?,
451            expiration_offset: proto.expiration_offset.into_rust()?,
452            peek_stash_persist_location: PersistLocation {
453                blob_uri: SensitiveUrl::from_str(&proto.peek_stash_blob_uri)?,
454                consensus_uri: SensitiveUrl::from_str(&proto.peek_stash_consensus_uri)?,
455            },
456        })
457    }
458}
459
460/// Compute instance configuration parameters.
461///
462/// Parameters can be set (`Some`) or unset (`None`).
463/// Unset parameters should be interpreted to mean "use the previous value".
464#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Arbitrary)]
465pub struct ComputeParameters {
466    /// An optional arbitrary string that describes the class of the workload
467    /// this compute instance is running (e.g., `production` or `staging`).
468    ///
469    /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
470    /// exported by the metrics registry associated with the compute instance.
471    pub workload_class: Option<Option<String>>,
472    /// The maximum allowed size in bytes for results of peeks and subscribes.
473    ///
474    /// Peeks and subscribes that would return results larger than this maximum return the
475    /// respective error responses instead:
476    ///   * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
477    ///   * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
478    ///
479    /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
480    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
481    /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
482    pub max_result_size: Option<u64>,
483    /// Tracing configuration.
484    pub tracing: TracingParameters,
485    /// gRPC client configuration.
486    pub grpc_client: GrpcClientParameters,
487
488    /// Config updates for components migrated to `mz_dyncfg`.
489    pub dyncfg_updates: ConfigUpdates,
490}
491
492impl ComputeParameters {
493    /// Update the parameter values with the set ones from `other`.
494    pub fn update(&mut self, other: ComputeParameters) {
495        let ComputeParameters {
496            workload_class,
497            max_result_size,
498            tracing,
499            grpc_client,
500            dyncfg_updates,
501        } = other;
502
503        if workload_class.is_some() {
504            self.workload_class = workload_class;
505        }
506        if max_result_size.is_some() {
507            self.max_result_size = max_result_size;
508        }
509
510        self.tracing.update(tracing);
511        self.grpc_client.update(grpc_client);
512
513        self.dyncfg_updates.extend(dyncfg_updates);
514    }
515
516    /// Return whether all parameters are unset.
517    pub fn all_unset(&self) -> bool {
518        *self == Self::default()
519    }
520}
521
522impl RustType<ProtoComputeParameters> for ComputeParameters {
523    fn into_proto(&self) -> ProtoComputeParameters {
524        ProtoComputeParameters {
525            workload_class: self.workload_class.into_proto(),
526            max_result_size: self.max_result_size.into_proto(),
527            tracing: Some(self.tracing.into_proto()),
528            grpc_client: Some(self.grpc_client.into_proto()),
529            dyncfg_updates: Some(self.dyncfg_updates.clone()),
530        }
531    }
532
533    fn from_proto(proto: ProtoComputeParameters) -> Result<Self, TryFromProtoError> {
534        Ok(Self {
535            workload_class: proto.workload_class.into_rust()?,
536            max_result_size: proto.max_result_size.into_rust()?,
537            tracing: proto
538                .tracing
539                .into_rust_if_some("ProtoComputeParameters::tracing")?,
540            grpc_client: proto
541                .grpc_client
542                .into_rust_if_some("ProtoComputeParameters::grpc_client")?,
543            dyncfg_updates: proto.dyncfg_updates.ok_or_else(|| {
544                TryFromProtoError::missing_field("ProtoComputeParameters::dyncfg_updates")
545            })?,
546        })
547    }
548}
549
550impl RustType<ProtoWorkloadClass> for Option<String> {
551    fn into_proto(&self) -> ProtoWorkloadClass {
552        ProtoWorkloadClass {
553            value: self.clone(),
554        }
555    }
556
557    fn from_proto(proto: ProtoWorkloadClass) -> Result<Self, TryFromProtoError> {
558        Ok(proto.value)
559    }
560}
561
562/// Metadata specific to the peek variant.
563#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
564pub enum PeekTarget {
565    /// This peek is against an index. Since this should be held in memory on
566    /// the target cluster, no additional coordinates are necessary.
567    Index {
568        /// The id of the (possibly transient) index.
569        id: GlobalId,
570    },
571    /// This peek is against a Persist collection.
572    Persist {
573        /// The id of the backing Persist collection.
574        id: GlobalId,
575        /// The identifying metadata of the Persist shard.
576        metadata: CollectionMetadata,
577    },
578}
579
580impl PeekTarget {
581    /// Returns the ID of the peeked collection.
582    pub fn id(&self) -> GlobalId {
583        match self {
584            Self::Index { id } => *id,
585            Self::Persist { id, .. } => *id,
586        }
587    }
588}
589
590/// Peek a collection, either in an arrangement or Persist.
591///
592/// This request elicits data from the worker, by naming the
593/// collection and some actions to apply to the results before
594/// returning them.
595///
596/// The `timestamp` member must be valid for the arrangement that
597/// is referenced by `id`. This means that `AllowCompaction` for
598/// this arrangement should not pass `timestamp` before this command.
599/// Subsequent commands may arbitrarily compact the arrangements;
600/// the dataflow runners are responsible for ensuring that they can
601/// correctly answer the `Peek`.
602#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
603pub struct Peek<T = mz_repr::Timestamp> {
604    /// Target-specific metadata.
605    pub target: PeekTarget,
606    /// The relation description for the rows returned by this peek, before
607    /// applying the [RowSetFinishing] but _after_ applying the given
608    /// `map_filter_project`.
609    pub result_desc: RelationDesc,
610    /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
611    /// The vector is never empty.
612    #[proptest(strategy = "proptest::option::of(proptest::collection::vec(any::<Row>(), 1..5))")]
613    pub literal_constraints: Option<Vec<Row>>,
614    /// The identifier of this peek request.
615    ///
616    /// Used in responses and cancellation requests.
617    #[proptest(strategy = "any_uuid()")]
618    pub uuid: Uuid,
619    /// The logical timestamp at which the collection is queried.
620    pub timestamp: T,
621    /// Actions to apply to the result set before returning them.
622    pub finishing: RowSetFinishing,
623    /// Linear operation to apply in-line on each result.
624    pub map_filter_project: mz_expr::SafeMfpPlan,
625    /// An `OpenTelemetryContext` to forward trace information along
626    /// to the compute worker to allow associating traces between
627    /// the compute controller and the compute worker.
628    #[proptest(strategy = "empty_otel_ctx()")]
629    pub otel_ctx: OpenTelemetryContext,
630}
631
632impl RustType<ProtoPeek> for Peek {
633    fn into_proto(&self) -> ProtoPeek {
634        ProtoPeek {
635            key: match &self.literal_constraints {
636                // In the Some case, the vector is never empty, so it's safe to encode None as an
637                // empty vector, and Some(vector) as just the vector.
638                Some(vec) => {
639                    assert!(!vec.is_empty());
640                    vec.into_proto()
641                }
642                None => Vec::<Row>::new().into_proto(),
643            },
644            uuid: Some(self.uuid.into_proto()),
645            timestamp: self.timestamp.into(),
646            finishing: Some(self.finishing.into_proto()),
647            map_filter_project: Some(self.map_filter_project.into_proto()),
648            otel_ctx: self.otel_ctx.clone().into(),
649            result_desc: Some(self.result_desc.into_proto()),
650            target: Some(match &self.target {
651                PeekTarget::Index { id } => proto_peek::Target::Index(ProtoIndexTarget {
652                    id: Some(id.into_proto()),
653                }),
654
655                PeekTarget::Persist { id, metadata } => {
656                    proto_peek::Target::Persist(ProtoPersistTarget {
657                        id: Some(id.into_proto()),
658                        metadata: Some(metadata.into_proto()),
659                    })
660                }
661            }),
662        }
663    }
664
665    fn from_proto(x: ProtoPeek) -> Result<Self, TryFromProtoError> {
666        Ok(Self {
667            literal_constraints: {
668                let vec: Vec<Row> = x.key.into_rust()?;
669                if vec.is_empty() { None } else { Some(vec) }
670            },
671            uuid: x.uuid.into_rust_if_some("ProtoPeek::uuid")?,
672            timestamp: x.timestamp.into(),
673            finishing: x.finishing.into_rust_if_some("ProtoPeek::finishing")?,
674            map_filter_project: x
675                .map_filter_project
676                .into_rust_if_some("ProtoPeek::map_filter_project")?,
677            otel_ctx: x.otel_ctx.into(),
678            result_desc: x.result_desc.into_rust_if_some("ProtoPeek::result_desc")?,
679            target: match x.target {
680                Some(proto_peek::Target::Index(target)) => PeekTarget::Index {
681                    id: target.id.into_rust_if_some("ProtoIndexTarget::id")?,
682                },
683                Some(proto_peek::Target::Persist(target)) => PeekTarget::Persist {
684                    id: target.id.into_rust_if_some("ProtoPersistTarget::id")?,
685                    metadata: target
686                        .metadata
687                        .into_rust_if_some("ProtoPersistTarget::metadata")?,
688                },
689                None => return Err(TryFromProtoError::missing_field("ProtoPeek::target")),
690            },
691        })
692    }
693}
694
695fn empty_otel_ctx() -> impl Strategy<Value = OpenTelemetryContext> {
696    (0..1).prop_map(|_| OpenTelemetryContext::empty())
697}
698
699impl TryIntoProtocolNonce for ComputeCommand {
700    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
701        match self {
702            ComputeCommand::Hello { nonce } => Ok(nonce),
703            cmd => Err(cmd),
704        }
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use mz_ore::assert_ok;
711    use mz_proto::protobuf_roundtrip;
712    use proptest::prelude::ProptestConfig;
713    use proptest::proptest;
714
715    use super::*;
716
717    /// Test to ensure the size of the `ComputeCommand` enum doesn't regress.
718    #[mz_ore::test]
719    fn test_compute_command_size() {
720        assert_eq!(std::mem::size_of::<ComputeCommand>(), 40);
721    }
722
723    proptest! {
724        #![proptest_config(ProptestConfig::with_cases(32))]
725
726        #[mz_ore::test]
727        #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
728        fn peek_protobuf_roundtrip(expect in any::<Peek>() ) {
729            let actual = protobuf_roundtrip::<_, ProtoPeek>(&expect);
730            assert_ok!(actual);
731            assert_eq!(actual.unwrap(), expect);
732        }
733
734        #[mz_ore::test]
735        fn compute_command_protobuf_roundtrip(expect in any::<ComputeCommand<mz_repr::Timestamp>>() ) {
736            let actual = protobuf_roundtrip::<_, ProtoComputeCommand>(&expect);
737            assert_ok!(actual);
738            assert_eq!(actual.unwrap(), expect);
739        }
740    }
741}