mz_compute_client/protocol/
command.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::time::Duration;
13
14use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::render_plan::RenderPlan;
17use mz_dyncfg::ConfigUpdates;
18use mz_expr::RowSetFinishing;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
21use mz_repr::{GlobalId, Row};
22use mz_service::params::GrpcClientParameters;
23use mz_storage_client::client::ProtoCompaction;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_timely_util::progress::any_antichain;
26use mz_tracing::params::TracingParameters;
27use proptest::prelude::{Arbitrary, any};
28use proptest::strategy::{BoxedStrategy, Strategy, Union};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31use timely::progress::frontier::Antichain;
32use uuid::Uuid;
33
34use crate::logging::LoggingConfig;
35
36include!(concat!(
37    env!("OUT_DIR"),
38    "/mz_compute_client.protocol.command.rs"
39));
40
41/// Compute protocol commands, sent by the compute controller to replicas.
42///
43/// Command sequences sent by the compute controller must be valid according to the [Protocol
44/// Stages].
45///
46/// [Protocol Stages]: super#protocol-stages
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48pub enum ComputeCommand<T = mz_repr::Timestamp> {
49    /// `CreateTimely` is the first command sent to a replica after a connection was established.
50    /// It instructs the replica to initialize the timely dataflow runtime using the given
51    /// `config`.
52    ///
53    /// This command is special in that it is broadcast to all workers of a multi-worker replica.
54    /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
55    /// which then distributes them to the other workers using a dataflow. This method of command
56    /// distribution requires the timely dataflow runtime to be initialized, which is why the
57    /// `CreateTimely` command exists.
58    ///
59    /// The `epoch` value imposes an ordering on iterations of the compute protocol. When the
60    /// compute controller connects to a replica, it must send an `epoch` that is greater than all
61    /// epochs it sent to the same replica on previous connections. Multi-process replicas should
62    /// use the `epoch` to ensure that their individual processes agree on which protocol iteration
63    /// they are in.
64    CreateTimely {
65        /// TODO(database-issues#7533): Add documentation.
66        config: Box<TimelyConfig>,
67        /// TODO(database-issues#7533): Add documentation.
68        epoch: ClusterStartupEpoch,
69    },
70
71    /// `CreateInstance` must be sent after `CreateTimely` to complete the [Creation Stage] of the
72    /// compute protocol. Unlike `CreateTimely`, it is only sent to the first worker of the
73    /// replica, and then distributed through the timely runtime. `CreateInstance` instructs the
74    /// replica to initialize its state to a point where it is ready to start maintaining
75    /// dataflows.
76    ///
77    /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
78    /// dataflows according to the given [`LoggingConfig`].
79    ///
80    /// [Creation Stage]: super#creation-stage
81    CreateInstance(Box<InstanceConfig>),
82
83    /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
84    /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
85    /// its dataflow state matches the state requested by the computation commands it received
86    /// previously. The replica must now start sending responses to commands received previously,
87    /// if it opted to defer them during the [Initialization Stage].
88    ///
89    /// [Initialization Stage]: super#initialization-stage
90    InitializationComplete,
91
92    /// `AllowWrites` informs the replica that it can transition out of the
93    /// read-only computation stage and into the read-write computation stage.
94    /// It is now allowed to affect changes to external systems (writes).
95    ///
96    /// After initialization is complete, an instance starts out in the
97    /// read-only computation stage. Only when receiving this command will it go
98    /// out of that and allow running operations to do writes.
99    ///
100    /// An instance that has once been told that it can go into read-write mode
101    /// can never go out of that mode again. It is okay for a read-only
102    /// controller to re-connect to an instance that is already in read-write
103    /// mode: _someone_ has already told the instance that it is okay to write
104    /// and there is no way in the protocol to transition an instance back to
105    /// read-only mode.
106    ///
107    /// NOTE: We don't have a protocol in place that allows writes only after a
108    /// certain, controller-determined, timestamp. Such a protocol would allow
109    /// tighter control and could allow the instance to avoid work. However, it
110    /// is more work to put in place the logic for that so we leave it as future
111    /// work for now.
112    AllowWrites,
113
114    /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
115    /// given [`ComputeParameters`].
116    ///
117    /// This command is special in that, like `CreateTimely`, it is broadcast to all workers of the
118    /// replica. However, unlike `CreateTimely`, it is ignored by all workers except the first one,
119    /// which distributes the command to the other workers through the timely runtime.
120    /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
121    /// networking fabric to observe them and learn of configuration updates.
122    ///
123    /// Parameter updates transmitted through this command must be applied by the replica as soon
124    /// as it receives the command, and they must be applied globally to all replica state, even
125    /// dataflows and pending peeks that were created before the parameter update. This property
126    /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
127    ///
128    /// Configuration parameters that should not be applied globally, but only to specific
129    /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
130    /// rather than as [`ComputeParameters`].
131    UpdateConfiguration(Box<ComputeParameters>),
132
133    /// `CreateDataflow` instructs the replica to create a dataflow according to the given
134    /// [`DataflowDescription`].
135    ///
136    /// The [`DataflowDescription`] must have the following properties:
137    ///
138    ///   * Dataflow imports are valid:
139    ///     * Imported storage collections specified in [`source_imports`] exist and are readable by
140    ///       the compute replica.
141    ///     * Imported indexes specified in [`index_imports`] have been created on the replica
142    ///       previously, by previous `CreateDataflow` commands.
143    ///   * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
144    ///     imported collections are not beyond the dataflow [`as_of`].
145    ///   * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
146    ///     instructed to create do not repeat (within a single protocol iteration).
147    ///   * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
148    ///     to the dependency relation.
149    ///
150    /// A dataflow description that violates any of the above properties can cause the replica to
151    /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
152    /// should prefer panicking over producing incorrect results.
153    ///
154    /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
155    /// storage sinks, the replica must produce [`Frontiers`] responses that report the
156    /// advancement of the frontiers of these compute collections.
157    ///
158    /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
159    /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
160    /// subscribes.
161    ///
162    /// The replica may create the dataflow in a suspended state and defer starting the computation
163    /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
164    /// the compute controller should eventually send a `Schedule` command for each sent
165    /// `CreateDataflow` command.
166    ///
167    /// [`objects_to_build`]: DataflowDescription::objects_to_build
168    /// [`source_imports`]: DataflowDescription::source_imports
169    /// [`index_imports`]: DataflowDescription::index_imports
170    /// [`as_of`]: DataflowDescription::as_of
171    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
172    /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
173    CreateDataflow(Box<DataflowDescription<RenderPlan<T>, CollectionMetadata, T>>),
174
175    /// `Schedule` allows the replica to start computation for a compute collection.
176    ///
177    /// It is invalid to send a `Schedule` command that references a collection that was not
178    /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
179    /// to exhibit undefined behavior.
180    ///
181    /// It is also invalid to send a `Schedule` command that references a collection that has,
182    /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
183    Schedule(GlobalId),
184
185    /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
186    /// a compute collection exported by one of the replica’s dataflow.
187    ///
188    /// The command names a collection and provides a frontier after which accumulations must be
189    /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
190    /// through that frontier.
191    ///
192    /// It is invalid to send an `AllowCompaction` command that references a compute collection
193    /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
194    /// the replica to exhibit undefined behavior.
195    ///
196    /// The `AllowCompaction` command only informs about external read requirements, not internal
197    /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
198    /// all times, so local dataflow inputs are not compacted beyond times at which they are still
199    /// being read from.
200    ///
201    /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
202    /// collections' current `upper` frontiers. This signals that external readers are not
203    /// interested in times up to the specified new read frontiers. Consequently, an empty read
204    /// frontier signals that external readers are not interested in updates from the corresponding
205    /// collection ever again, so the collection is not required anymore.
206    ///
207    /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
208    /// compute collections.
209    ///
210    /// A replica that receives an `AllowCompaction` command with the empty frontier must
211    /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
212    /// same collection. ([#16271])
213    ///
214    /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
215    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
216    AllowCompaction {
217        /// TODO(database-issues#7533): Add documentation.
218        id: GlobalId,
219        /// TODO(database-issues#7533): Add documentation.
220        frontier: Antichain<T>,
221    },
222
223    /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
224    /// Persist-backed collection.
225    ///
226    /// The [`Peek`] description must have the following properties:
227    ///
228    ///   * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
229    ///     command. (If targeting a persist collection, that collection should exist.)
230    ///   * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
231    ///     perform do not repeat (within a single protocol iteration).
232    ///
233    /// A [`Peek`] description that violates any of the above properties can cause the replica to
234    /// exhibit undefined behavior.
235    ///
236    /// Specifying a [`Peek::timestamp`] that is less than the target index’s `since` frontier does
237    /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
238    /// in response.
239    ///
240    /// After receiving a `Peek` command, the replica must eventually produce a single
241    /// [`PeekResponse`]:
242    ///
243    ///    * For peeks that were not cancelled: either [`Rows`] or [`Error`].
244    ///    * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
245    ///
246    /// [`PeekResponse`]: super::response::PeekResponse
247    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
248    /// [`Rows`]: super::response::PeekResponse::Rows
249    /// [`Error`]: super::response::PeekResponse::Error
250    /// [`Canceled`]: super::response::PeekResponse::Canceled
251    Peek(Box<Peek<T>>),
252
253    /// `CancelPeek` instructs the replica to cancel the identified pending peek.
254    ///
255    /// It is invalid to send a `CancelPeek` command that references a peek that was not created
256    /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
257    /// undefined behavior.
258    ///
259    /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
260    /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
261    /// return a different [`PeekResponse`], or it may already have returned a response to the
262    /// specified peek. In these cases it must *not* return another [`PeekResponse`].
263    ///
264    /// [`PeekResponse`]: super::response::PeekResponse
265    /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
266    CancelPeek {
267        /// The identifier of the peek request to cancel.
268        ///
269        /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
270        uuid: Uuid,
271    },
272}
273
274impl RustType<ProtoComputeCommand> for ComputeCommand<mz_repr::Timestamp> {
275    fn into_proto(&self) -> ProtoComputeCommand {
276        use proto_compute_command::Kind::*;
277        use proto_compute_command::*;
278        ProtoComputeCommand {
279            kind: Some(match self {
280                ComputeCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
281                    config: Some(*config.into_proto()),
282                    epoch: Some(epoch.into_proto()),
283                }),
284                ComputeCommand::CreateInstance(config) => CreateInstance(*config.into_proto()),
285                ComputeCommand::InitializationComplete => InitializationComplete(()),
286                ComputeCommand::UpdateConfiguration(params) => {
287                    UpdateConfiguration(*params.into_proto())
288                }
289                ComputeCommand::CreateDataflow(dataflow) => CreateDataflow(*dataflow.into_proto()),
290                ComputeCommand::Schedule(id) => Schedule(id.into_proto()),
291                ComputeCommand::AllowCompaction { id, frontier } => {
292                    AllowCompaction(ProtoCompaction {
293                        id: Some(id.into_proto()),
294                        frontier: Some(frontier.into_proto()),
295                    })
296                }
297                ComputeCommand::Peek(peek) => Peek(*peek.into_proto()),
298                ComputeCommand::CancelPeek { uuid } => CancelPeek(uuid.into_proto()),
299                ComputeCommand::AllowWrites => AllowWrites(()),
300            }),
301        }
302    }
303
304    fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError> {
305        use proto_compute_command::Kind::*;
306        use proto_compute_command::*;
307        match proto.kind {
308            Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
309                let config = Box::new(config.into_rust_if_some("ProtoCreateTimely::config")?);
310                let epoch = epoch.into_rust_if_some("ProtoCreateTimely::epoch")?;
311                Ok(ComputeCommand::CreateTimely { config, epoch })
312            }
313            Some(CreateInstance(config)) => {
314                let config = Box::new(config.into_rust()?);
315                Ok(ComputeCommand::CreateInstance(config))
316            }
317            Some(InitializationComplete(())) => Ok(ComputeCommand::InitializationComplete),
318            Some(UpdateConfiguration(params)) => {
319                let params = Box::new(params.into_rust()?);
320                Ok(ComputeCommand::UpdateConfiguration(params))
321            }
322            Some(CreateDataflow(dataflow)) => {
323                let dataflow = Box::new(dataflow.into_rust()?);
324                Ok(ComputeCommand::CreateDataflow(dataflow))
325            }
326            Some(Schedule(id)) => Ok(ComputeCommand::Schedule(id.into_rust()?)),
327            Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
328                Ok(ComputeCommand::AllowCompaction {
329                    id: id.into_rust_if_some("ProtoAllowCompaction::id")?,
330                    frontier: frontier.into_rust_if_some("ProtoAllowCompaction::frontier")?,
331                })
332            }
333            Some(Peek(peek)) => {
334                let peek = Box::new(peek.into_rust()?);
335                Ok(ComputeCommand::Peek(peek))
336            }
337            Some(CancelPeek(uuid)) => Ok(ComputeCommand::CancelPeek {
338                uuid: uuid.into_rust()?,
339            }),
340            Some(AllowWrites(())) => Ok(ComputeCommand::AllowWrites),
341            None => Err(TryFromProtoError::missing_field(
342                "ProtoComputeCommand::kind",
343            )),
344        }
345    }
346}
347
348impl Arbitrary for ComputeCommand<mz_repr::Timestamp> {
349    type Strategy = Union<BoxedStrategy<Self>>;
350    type Parameters = ();
351
352    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
353        Union::new(vec![
354            any::<InstanceConfig>()
355                .prop_map(Box::new)
356                .prop_map(ComputeCommand::CreateInstance)
357                .boxed(),
358            any::<ComputeParameters>()
359                .prop_map(Box::new)
360                .prop_map(ComputeCommand::UpdateConfiguration)
361                .boxed(),
362            any::<DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp>>()
363                .prop_map(Box::new)
364                .prop_map(ComputeCommand::CreateDataflow)
365                .boxed(),
366            any::<GlobalId>().prop_map(ComputeCommand::Schedule).boxed(),
367            (any::<GlobalId>(), any_antichain())
368                .prop_map(|(id, frontier)| ComputeCommand::AllowCompaction { id, frontier })
369                .boxed(),
370            any::<Peek>()
371                .prop_map(Box::new)
372                .prop_map(ComputeCommand::Peek)
373                .boxed(),
374            any_uuid()
375                .prop_map(|uuid| ComputeCommand::CancelPeek { uuid })
376                .boxed(),
377        ])
378    }
379}
380
381/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
382/// if the controller attempt to reconcile them with different values
383/// for anything in this struct.
384#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Arbitrary)]
385pub struct InstanceConfig {
386    /// Specification of introspection logging.
387    pub logging: LoggingConfig,
388    /// The offset relative to the replica startup at which it should expire. None disables feature.
389    pub expiration_offset: Option<Duration>,
390}
391
392impl InstanceConfig {
393    /// Check if the configuration is compatible with another configuration. This is true iff the
394    /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
395    /// the expiration offset.
396    ///
397    /// We consider a stricter offset compatible, which allows us to strengthen the value without
398    /// forcing replica restarts. However, it also means that replicas will only pick up the new
399    /// value after a restart.
400    pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
401        // Destructure to protect against adding fields in the future.
402        let InstanceConfig {
403            logging: self_logging,
404            expiration_offset: self_offset,
405        } = self;
406        let InstanceConfig {
407            logging: other_logging,
408            expiration_offset: other_offset,
409        } = other;
410
411        // Logging is compatible if exactly the same.
412        let logging_compatible = self_logging == other_logging;
413
414        // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
415        // is a smaller offset and strengthens the offset.
416        let self_offset = Antichain::from_iter(*self_offset);
417        let other_offset = Antichain::from_iter(*other_offset);
418        let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
419
420        logging_compatible && offset_compatible
421    }
422}
423
424impl RustType<ProtoInstanceConfig> for InstanceConfig {
425    fn into_proto(&self) -> ProtoInstanceConfig {
426        ProtoInstanceConfig {
427            logging: Some(self.logging.into_proto()),
428            expiration_offset: self.expiration_offset.into_proto(),
429        }
430    }
431
432    fn from_proto(proto: ProtoInstanceConfig) -> Result<Self, TryFromProtoError> {
433        Ok(Self {
434            logging: proto
435                .logging
436                .into_rust_if_some("ProtoCreateInstance::logging")?,
437            expiration_offset: proto.expiration_offset.into_rust()?,
438        })
439    }
440}
441
442/// Compute instance configuration parameters.
443///
444/// Parameters can be set (`Some`) or unset (`None`).
445/// Unset parameters should be interpreted to mean "use the previous value".
446#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Arbitrary)]
447pub struct ComputeParameters {
448    /// An optional arbitrary string that describes the class of the workload
449    /// this compute instance is running (e.g., `production` or `staging`).
450    ///
451    /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
452    /// exported by the metrics registry associated with the compute instance.
453    pub workload_class: Option<Option<String>>,
454    /// The maximum allowed size in bytes for results of peeks and subscribes.
455    ///
456    /// Peeks and subscribes that would return results larger than this maximum return the
457    /// respective error responses instead:
458    ///   * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
459    ///   * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
460    ///
461    /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
462    /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
463    /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
464    pub max_result_size: Option<u64>,
465    /// Tracing configuration.
466    pub tracing: TracingParameters,
467    /// gRPC client configuration.
468    pub grpc_client: GrpcClientParameters,
469
470    /// Config updates for components migrated to `mz_dyncfg`.
471    pub dyncfg_updates: ConfigUpdates,
472}
473
474impl ComputeParameters {
475    /// Update the parameter values with the set ones from `other`.
476    pub fn update(&mut self, other: ComputeParameters) {
477        let ComputeParameters {
478            workload_class,
479            max_result_size,
480            tracing,
481            grpc_client,
482            dyncfg_updates,
483        } = other;
484
485        if workload_class.is_some() {
486            self.workload_class = workload_class;
487        }
488        if max_result_size.is_some() {
489            self.max_result_size = max_result_size;
490        }
491
492        self.tracing.update(tracing);
493        self.grpc_client.update(grpc_client);
494
495        self.dyncfg_updates.extend(dyncfg_updates);
496    }
497
498    /// Return whether all parameters are unset.
499    pub fn all_unset(&self) -> bool {
500        *self == Self::default()
501    }
502}
503
504impl RustType<ProtoComputeParameters> for ComputeParameters {
505    fn into_proto(&self) -> ProtoComputeParameters {
506        ProtoComputeParameters {
507            workload_class: self.workload_class.into_proto(),
508            max_result_size: self.max_result_size.into_proto(),
509            tracing: Some(self.tracing.into_proto()),
510            grpc_client: Some(self.grpc_client.into_proto()),
511            dyncfg_updates: Some(self.dyncfg_updates.clone()),
512        }
513    }
514
515    fn from_proto(proto: ProtoComputeParameters) -> Result<Self, TryFromProtoError> {
516        Ok(Self {
517            workload_class: proto.workload_class.into_rust()?,
518            max_result_size: proto.max_result_size.into_rust()?,
519            tracing: proto
520                .tracing
521                .into_rust_if_some("ProtoComputeParameters::tracing")?,
522            grpc_client: proto
523                .grpc_client
524                .into_rust_if_some("ProtoComputeParameters::grpc_client")?,
525            dyncfg_updates: proto.dyncfg_updates.ok_or_else(|| {
526                TryFromProtoError::missing_field("ProtoComputeParameters::dyncfg_updates")
527            })?,
528        })
529    }
530}
531
532impl RustType<ProtoWorkloadClass> for Option<String> {
533    fn into_proto(&self) -> ProtoWorkloadClass {
534        ProtoWorkloadClass {
535            value: self.clone(),
536        }
537    }
538
539    fn from_proto(proto: ProtoWorkloadClass) -> Result<Self, TryFromProtoError> {
540        Ok(proto.value)
541    }
542}
543
544/// Compute parameters supplied to new replicas as part of the Timely instantiation. Usually cannot
545/// be changed once set.
546#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Arbitrary)]
547pub struct InitialComputeParameters {
548    /// `arrangement_exert_proportionality` value passed to new replicas.
549    pub arrangement_exert_proportionality: u32,
550    /// Enable zero copy allocator.
551    pub enable_zero_copy: bool,
552    /// Enable lgalloc to back the zero copy allocator.
553    pub enable_zero_copy_lgalloc: bool,
554    /// Optional limit on the number of empty buffers retained by the zero copy allocator.
555    pub zero_copy_limit: Option<usize>,
556}
557
558/// Metadata specific to the peek variant.
559#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
560pub enum PeekTarget {
561    /// This peek is against an index. Since this should be held in memory on
562    /// the target cluster, no additional coordinates are necessary.
563    Index {
564        /// The id of the (possibly transient) index.
565        id: GlobalId,
566    },
567    /// This peek is against a Persist collection.
568    Persist {
569        /// The id of the backing Persist collection.
570        id: GlobalId,
571        /// The identifying metadata of the Persist shard.
572        metadata: CollectionMetadata,
573    },
574}
575
576impl PeekTarget {
577    /// Returns the ID of the peeked collection.
578    pub fn id(&self) -> GlobalId {
579        match self {
580            Self::Index { id } => *id,
581            Self::Persist { id, .. } => *id,
582        }
583    }
584}
585
586/// Peek a collection, either in an arrangement or Persist.
587///
588/// This request elicits data from the worker, by naming the
589/// collection and some actions to apply to the results before
590/// returning them.
591///
592/// The `timestamp` member must be valid for the arrangement that
593/// is referenced by `id`. This means that `AllowCompaction` for
594/// this arrangement should not pass `timestamp` before this command.
595/// Subsequent commands may arbitrarily compact the arrangements;
596/// the dataflow runners are responsible for ensuring that they can
597/// correctly answer the `Peek`.
598#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
599pub struct Peek<T = mz_repr::Timestamp> {
600    /// Target-specific metadata.
601    pub target: PeekTarget,
602    /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
603    /// The vector is never empty.
604    #[proptest(strategy = "proptest::option::of(proptest::collection::vec(any::<Row>(), 1..5))")]
605    pub literal_constraints: Option<Vec<Row>>,
606    /// The identifier of this peek request.
607    ///
608    /// Used in responses and cancellation requests.
609    #[proptest(strategy = "any_uuid()")]
610    pub uuid: Uuid,
611    /// The logical timestamp at which the collection is queried.
612    pub timestamp: T,
613    /// Actions to apply to the result set before returning them.
614    pub finishing: RowSetFinishing,
615    /// Linear operation to apply in-line on each result.
616    pub map_filter_project: mz_expr::SafeMfpPlan,
617    /// An `OpenTelemetryContext` to forward trace information along
618    /// to the compute worker to allow associating traces between
619    /// the compute controller and the compute worker.
620    #[proptest(strategy = "empty_otel_ctx()")]
621    pub otel_ctx: OpenTelemetryContext,
622}
623
624impl RustType<ProtoPeek> for Peek {
625    fn into_proto(&self) -> ProtoPeek {
626        ProtoPeek {
627            key: match &self.literal_constraints {
628                // In the Some case, the vector is never empty, so it's safe to encode None as an
629                // empty vector, and Some(vector) as just the vector.
630                Some(vec) => {
631                    assert!(!vec.is_empty());
632                    vec.into_proto()
633                }
634                None => Vec::<Row>::new().into_proto(),
635            },
636            uuid: Some(self.uuid.into_proto()),
637            timestamp: self.timestamp.into(),
638            finishing: Some(self.finishing.into_proto()),
639            map_filter_project: Some(self.map_filter_project.into_proto()),
640            otel_ctx: self.otel_ctx.clone().into(),
641            target: Some(match &self.target {
642                PeekTarget::Index { id } => proto_peek::Target::Index(ProtoIndexTarget {
643                    id: Some(id.into_proto()),
644                }),
645
646                PeekTarget::Persist { id, metadata } => {
647                    proto_peek::Target::Persist(ProtoPersistTarget {
648                        id: Some(id.into_proto()),
649                        metadata: Some(metadata.into_proto()),
650                    })
651                }
652            }),
653        }
654    }
655
656    fn from_proto(x: ProtoPeek) -> Result<Self, TryFromProtoError> {
657        Ok(Self {
658            literal_constraints: {
659                let vec: Vec<Row> = x.key.into_rust()?;
660                if vec.is_empty() { None } else { Some(vec) }
661            },
662            uuid: x.uuid.into_rust_if_some("ProtoPeek::uuid")?,
663            timestamp: x.timestamp.into(),
664            finishing: x.finishing.into_rust_if_some("ProtoPeek::finishing")?,
665            map_filter_project: x
666                .map_filter_project
667                .into_rust_if_some("ProtoPeek::map_filter_project")?,
668            otel_ctx: x.otel_ctx.into(),
669            target: match x.target {
670                Some(proto_peek::Target::Index(target)) => PeekTarget::Index {
671                    id: target.id.into_rust_if_some("ProtoIndexTarget::id")?,
672                },
673                Some(proto_peek::Target::Persist(target)) => PeekTarget::Persist {
674                    id: target.id.into_rust_if_some("ProtoPersistTarget::id")?,
675                    metadata: target
676                        .metadata
677                        .into_rust_if_some("ProtoPersistTarget::metadata")?,
678                },
679                None => return Err(TryFromProtoError::missing_field("ProtoPeek::target")),
680            },
681        })
682    }
683}
684
685fn empty_otel_ctx() -> impl Strategy<Value = OpenTelemetryContext> {
686    (0..1).prop_map(|_| OpenTelemetryContext::empty())
687}
688
689impl TryIntoTimelyConfig for ComputeCommand {
690    fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
691        match self {
692            ComputeCommand::CreateTimely { config, epoch } => Ok((*config, epoch)),
693            cmd => Err(cmd),
694        }
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use mz_ore::assert_ok;
701    use mz_proto::protobuf_roundtrip;
702    use proptest::prelude::ProptestConfig;
703    use proptest::proptest;
704
705    use super::*;
706
707    /// Test to ensure the size of the `ComputeCommand` enum doesn't regress.
708    #[mz_ore::test]
709    fn test_compute_command_size() {
710        assert_eq!(std::mem::size_of::<ComputeCommand>(), 40);
711    }
712
713    proptest! {
714        #![proptest_config(ProptestConfig::with_cases(32))]
715
716        #[mz_ore::test]
717        #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
718        fn peek_protobuf_roundtrip(expect in any::<Peek>() ) {
719            let actual = protobuf_roundtrip::<_, ProtoPeek>(&expect);
720            assert_ok!(actual);
721            assert_eq!(actual.unwrap(), expect);
722        }
723
724        #[mz_ore::test]
725        fn compute_command_protobuf_roundtrip(expect in any::<ComputeCommand<mz_repr::Timestamp>>() ) {
726            let actual = protobuf_roundtrip::<_, ProtoComputeCommand>(&expect);
727            assert_ok!(actual);
728            assert_eq!(actual.unwrap(), expect);
729        }
730    }
731}