mz_compute_client/protocol/command.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol commands.
11
12use std::time::Duration;
13
14use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::render_plan::RenderPlan;
17use mz_dyncfg::ConfigUpdates;
18use mz_expr::RowSetFinishing;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
21use mz_repr::{GlobalId, Row};
22use mz_service::params::GrpcClientParameters;
23use mz_storage_client::client::ProtoCompaction;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_timely_util::progress::any_antichain;
26use mz_tracing::params::TracingParameters;
27use proptest::prelude::{Arbitrary, any};
28use proptest::strategy::{BoxedStrategy, Strategy, Union};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31use timely::progress::frontier::Antichain;
32use uuid::Uuid;
33
34use crate::logging::LoggingConfig;
35
36include!(concat!(
37 env!("OUT_DIR"),
38 "/mz_compute_client.protocol.command.rs"
39));
40
41/// Compute protocol commands, sent by the compute controller to replicas.
42///
43/// Command sequences sent by the compute controller must be valid according to the [Protocol
44/// Stages].
45///
46/// [Protocol Stages]: super#protocol-stages
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48pub enum ComputeCommand<T = mz_repr::Timestamp> {
49 /// `CreateTimely` is the first command sent to a replica after a connection was established.
50 /// It instructs the replica to initialize the timely dataflow runtime using the given
51 /// `config`.
52 ///
53 /// This command is special in that it is broadcast to all workers of a multi-worker replica.
54 /// All subsequent commands, except `UpdateConfiguration`, are only sent to the first worker,
55 /// which then distributes them to the other workers using a dataflow. This method of command
56 /// distribution requires the timely dataflow runtime to be initialized, which is why the
57 /// `CreateTimely` command exists.
58 ///
59 /// The `epoch` value imposes an ordering on iterations of the compute protocol. When the
60 /// compute controller connects to a replica, it must send an `epoch` that is greater than all
61 /// epochs it sent to the same replica on previous connections. Multi-process replicas should
62 /// use the `epoch` to ensure that their individual processes agree on which protocol iteration
63 /// they are in.
64 CreateTimely {
65 /// TODO(database-issues#7533): Add documentation.
66 config: TimelyConfig,
67 /// TODO(database-issues#7533): Add documentation.
68 epoch: ClusterStartupEpoch,
69 },
70
71 /// `CreateInstance` must be sent after `CreateTimely` to complete the [Creation Stage] of the
72 /// compute protocol. Unlike `CreateTimely`, it is only sent to the first worker of the
73 /// replica, and then distributed through the timely runtime. `CreateInstance` instructs the
74 /// replica to initialize its state to a point where it is ready to start maintaining
75 /// dataflows.
76 ///
77 /// Upon receiving a `CreateInstance` command, the replica must further initialize logging
78 /// dataflows according to the given [`LoggingConfig`].
79 ///
80 /// [Creation Stage]: super#creation-stage
81 CreateInstance(InstanceConfig),
82
83 /// `InitializationComplete` informs the replica about the end of the [Initialization Stage].
84 /// Upon receiving this command, the replica should perform a reconciliation process, to ensure
85 /// its dataflow state matches the state requested by the computation commands it received
86 /// previously. The replica must now start sending responses to commands received previously,
87 /// if it opted to defer them during the [Initialization Stage].
88 ///
89 /// [Initialization Stage]: super#initialization-stage
90 InitializationComplete,
91
92 /// `AllowWrites` informs the replica that it can transition out of the
93 /// read-only computation stage and into the read-write computation stage.
94 /// It is now allowed to affect changes to external systems (writes).
95 ///
96 /// After initialization is complete, an instance starts out in the
97 /// read-only computation stage. Only when receiving this command will it go
98 /// out of that and allow running operations to do writes.
99 ///
100 /// An instance that has once been told that it can go into read-write mode
101 /// can never go out of that mode again. It is okay for a read-only
102 /// controller to re-connect to an instance that is already in read-write
103 /// mode: _someone_ has already told the instance that it is okay to write
104 /// and there is no way in the protocol to transition an instance back to
105 /// read-only mode.
106 ///
107 /// NOTE: We don't have a protocol in place that allows writes only after a
108 /// certain, controller-determined, timestamp. Such a protocol would allow
109 /// tighter control and could allow the instance to avoid work. However, it
110 /// is more work to put in place the logic for that so we leave it as future
111 /// work for now.
112 AllowWrites,
113
114 /// `UpdateConfiguration` instructs the replica to update its configuration, according to the
115 /// given [`ComputeParameters`].
116 ///
117 /// This command is special in that, like `CreateTimely`, it is broadcast to all workers of the
118 /// replica. However, unlike `CreateTimely`, it is ignored by all workers except the first one,
119 /// which distributes the command to the other workers through the timely runtime.
120 /// `UpdateConfiguration` commands are broadcast only to allow the intermediary parts of the
121 /// networking fabric to observe them and learn of configuration updates.
122 ///
123 /// Parameter updates transmitted through this command must be applied by the replica as soon
124 /// as it receives the command, and they must be applied globally to all replica state, even
125 /// dataflows and pending peeks that were created before the parameter update. This property
126 /// allows the replica to hoist `UpdateConfiguration` commands during reconciliation.
127 ///
128 /// Configuration parameters that should not be applied globally, but only to specific
129 /// dataflows or peeks, should be added to the [`DataflowDescription`] or [`Peek`] types,
130 /// rather than as [`ComputeParameters`].
131 UpdateConfiguration(ComputeParameters),
132
133 /// `CreateDataflow` instructs the replica to create a dataflow according to the given
134 /// [`DataflowDescription`].
135 ///
136 /// The [`DataflowDescription`] must have the following properties:
137 ///
138 /// * Dataflow imports are valid:
139 /// * Imported storage collections specified in [`source_imports`] exist and are readable by
140 /// the compute replica.
141 /// * Imported indexes specified in [`index_imports`] have been created on the replica
142 /// previously, by previous `CreateDataflow` commands.
143 /// * Dataflow imports are readable at the specified [`as_of`]. In other words: The `since`s of
144 /// imported collections are not beyond the dataflow [`as_of`].
145 /// * Dataflow exports have unique IDs, i.e., the IDs of exports from dataflows a replica is
146 /// instructed to create do not repeat (within a single protocol iteration).
147 /// * The dataflow objects defined in [`objects_to_build`] are topologically ordered according
148 /// to the dependency relation.
149 ///
150 /// A dataflow description that violates any of the above properties can cause the replica to
151 /// exhibit undefined behavior, such as panicking or production of incorrect results. A replica
152 /// should prefer panicking over producing incorrect results.
153 ///
154 /// After receiving a `CreateDataflow` command, if the created dataflow exports indexes or
155 /// storage sinks, the replica must produce [`Frontiers`] responses that report the
156 /// advancement of the frontiers of these compute collections.
157 ///
158 /// After receiving a `CreateDataflow` command, if the created dataflow exports subscribes, the
159 /// replica must produce [`SubscribeResponse`]s that report the progress and results of the
160 /// subscribes.
161 ///
162 /// The replica may create the dataflow in a suspended state and defer starting the computation
163 /// until it receives a corresponding `Schedule` command. Thus, to ensure dataflow execution,
164 /// the compute controller should eventually send a `Schedule` command for each sent
165 /// `CreateDataflow` command.
166 ///
167 /// [`objects_to_build`]: DataflowDescription::objects_to_build
168 /// [`source_imports`]: DataflowDescription::source_imports
169 /// [`index_imports`]: DataflowDescription::index_imports
170 /// [`as_of`]: DataflowDescription::as_of
171 /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
172 /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse
173 CreateDataflow(DataflowDescription<RenderPlan<T>, CollectionMetadata, T>),
174
175 /// `Schedule` allows the replica to start computation for a compute collection.
176 ///
177 /// It is invalid to send a `Schedule` command that references a collection that was not
178 /// created by a corresponding `CreateDataflow` command before. Doing so may cause the replica
179 /// to exhibit undefined behavior.
180 ///
181 /// It is also invalid to send a `Schedule` command that references a collection that has,
182 /// through an `AllowCompaction` command, been allowed to compact to the empty frontier before.
183 Schedule(GlobalId),
184
185 /// `AllowCompaction` informs the replica about the relaxation of external read capabilities on
186 /// a compute collection exported by one of the replica’s dataflow.
187 ///
188 /// The command names a collection and provides a frontier after which accumulations must be
189 /// correct. The replica gains the liberty of compacting the corresponding maintained trace up
190 /// through that frontier.
191 ///
192 /// It is invalid to send an `AllowCompaction` command that references a compute collection
193 /// that was not created by a corresponding `CreateDataflow` command before. Doing so may cause
194 /// the replica to exhibit undefined behavior.
195 ///
196 /// The `AllowCompaction` command only informs about external read requirements, not internal
197 /// ones. The replica is responsible for ensuring that internal requirements are fulfilled at
198 /// all times, so local dataflow inputs are not compacted beyond times at which they are still
199 /// being read from.
200 ///
201 /// The read frontiers transmitted through `AllowCompaction`s may be beyond the corresponding
202 /// collections' current `upper` frontiers. This signals that external readers are not
203 /// interested in times up to the specified new read frontiers. Consequently, an empty read
204 /// frontier signals that external readers are not interested in updates from the corresponding
205 /// collection ever again, so the collection is not required anymore.
206 ///
207 /// Sending an `AllowCompaction` command with the empty frontier is the canonical way to drop
208 /// compute collections.
209 ///
210 /// A replica that receives an `AllowCompaction` command with the empty frontier must
211 /// eventually respond with [`Frontiers`] responses reporting empty frontiers for the
212 /// same collection. ([#16271])
213 ///
214 /// [`Frontiers`]: super::response::ComputeResponse::Frontiers
215 /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
216 AllowCompaction {
217 /// TODO(database-issues#7533): Add documentation.
218 id: GlobalId,
219 /// TODO(database-issues#7533): Add documentation.
220 frontier: Antichain<T>,
221 },
222
223 /// `Peek` instructs the replica to perform a peek on a collection: either an index or a
224 /// Persist-backed collection.
225 ///
226 /// The [`Peek`] description must have the following properties:
227 ///
228 /// * If targeting an index, it has previously been created by a corresponding `CreateDataflow`
229 /// command. (If targeting a persist collection, that collection should exist.)
230 /// * The [`Peek::uuid`] is unique, i.e., the UUIDs of peeks a replica gets instructed to
231 /// perform do not repeat (within a single protocol iteration).
232 ///
233 /// A [`Peek`] description that violates any of the above properties can cause the replica to
234 /// exhibit undefined behavior.
235 ///
236 /// Specifying a [`Peek::timestamp`] that is less than the target index’s `since` frontier does
237 /// not provoke undefined behavior. Instead, the replica must produce a [`PeekResponse::Error`]
238 /// in response.
239 ///
240 /// After receiving a `Peek` command, the replica must eventually produce a single
241 /// [`PeekResponse`]:
242 ///
243 /// * For peeks that were not cancelled: either [`Rows`] or [`Error`].
244 /// * For peeks that were cancelled: either [`Rows`], or [`Error`], or [`Canceled`].
245 ///
246 /// [`PeekResponse`]: super::response::PeekResponse
247 /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
248 /// [`Rows`]: super::response::PeekResponse::Rows
249 /// [`Error`]: super::response::PeekResponse::Error
250 /// [`Canceled`]: super::response::PeekResponse::Canceled
251 Peek(Peek<T>),
252
253 /// `CancelPeek` instructs the replica to cancel the identified pending peek.
254 ///
255 /// It is invalid to send a `CancelPeek` command that references a peek that was not created
256 /// by a corresponding `Peek` command before. Doing so may cause the replica to exhibit
257 /// undefined behavior.
258 ///
259 /// If a replica cancels a peek in response to a `CancelPeek` command, it must respond with a
260 /// [`PeekResponse::Canceled`]. The replica may also decide to fulfill the peek instead and
261 /// return a different [`PeekResponse`], or it may already have returned a response to the
262 /// specified peek. In these cases it must *not* return another [`PeekResponse`].
263 ///
264 /// [`PeekResponse`]: super::response::PeekResponse
265 /// [`PeekResponse::Canceled`]: super::response::PeekResponse::Canceled
266 CancelPeek {
267 /// The identifier of the peek request to cancel.
268 ///
269 /// This Value must match a [`Peek::uuid`] value transmitted in a previous `Peek` command.
270 uuid: Uuid,
271 },
272}
273
274impl RustType<ProtoComputeCommand> for ComputeCommand<mz_repr::Timestamp> {
275 fn into_proto(&self) -> ProtoComputeCommand {
276 use proto_compute_command::Kind::*;
277 use proto_compute_command::*;
278 ProtoComputeCommand {
279 kind: Some(match self {
280 ComputeCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
281 config: Some(config.into_proto()),
282 epoch: Some(epoch.into_proto()),
283 }),
284 ComputeCommand::CreateInstance(config) => CreateInstance(config.into_proto()),
285 ComputeCommand::InitializationComplete => InitializationComplete(()),
286 ComputeCommand::UpdateConfiguration(params) => {
287 UpdateConfiguration(params.into_proto())
288 }
289 ComputeCommand::CreateDataflow(dataflow) => CreateDataflow(dataflow.into_proto()),
290 ComputeCommand::Schedule(id) => Schedule(id.into_proto()),
291 ComputeCommand::AllowCompaction { id, frontier } => {
292 AllowCompaction(ProtoCompaction {
293 id: Some(id.into_proto()),
294 frontier: Some(frontier.into_proto()),
295 })
296 }
297 ComputeCommand::Peek(peek) => Peek(peek.into_proto()),
298 ComputeCommand::CancelPeek { uuid } => CancelPeek(uuid.into_proto()),
299 ComputeCommand::AllowWrites => AllowWrites(()),
300 }),
301 }
302 }
303
304 fn from_proto(proto: ProtoComputeCommand) -> Result<Self, TryFromProtoError> {
305 use proto_compute_command::Kind::*;
306 use proto_compute_command::*;
307 match proto.kind {
308 Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
309 Ok(ComputeCommand::CreateTimely {
310 config: config.into_rust_if_some("ProtoCreateTimely::config")?,
311 epoch: epoch.into_rust_if_some("ProtoCreateTimely::epoch")?,
312 })
313 }
314 Some(CreateInstance(config)) => Ok(ComputeCommand::CreateInstance(config.into_rust()?)),
315 Some(InitializationComplete(())) => Ok(ComputeCommand::InitializationComplete),
316 Some(UpdateConfiguration(params)) => {
317 Ok(ComputeCommand::UpdateConfiguration(params.into_rust()?))
318 }
319 Some(CreateDataflow(dataflow)) => {
320 Ok(ComputeCommand::CreateDataflow(dataflow.into_rust()?))
321 }
322 Some(Schedule(id)) => Ok(ComputeCommand::Schedule(id.into_rust()?)),
323 Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
324 Ok(ComputeCommand::AllowCompaction {
325 id: id.into_rust_if_some("ProtoAllowCompaction::id")?,
326 frontier: frontier.into_rust_if_some("ProtoAllowCompaction::frontier")?,
327 })
328 }
329 Some(Peek(peek)) => Ok(ComputeCommand::Peek(peek.into_rust()?)),
330 Some(CancelPeek(uuid)) => Ok(ComputeCommand::CancelPeek {
331 uuid: uuid.into_rust()?,
332 }),
333 Some(AllowWrites(())) => Ok(ComputeCommand::AllowWrites),
334 None => Err(TryFromProtoError::missing_field(
335 "ProtoComputeCommand::kind",
336 )),
337 }
338 }
339}
340
341impl Arbitrary for ComputeCommand<mz_repr::Timestamp> {
342 type Strategy = Union<BoxedStrategy<Self>>;
343 type Parameters = ();
344
345 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
346 Union::new(vec![
347 any::<InstanceConfig>()
348 .prop_map(ComputeCommand::CreateInstance)
349 .boxed(),
350 any::<ComputeParameters>()
351 .prop_map(ComputeCommand::UpdateConfiguration)
352 .boxed(),
353 any::<DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp>>()
354 .prop_map(ComputeCommand::CreateDataflow)
355 .boxed(),
356 any::<GlobalId>().prop_map(ComputeCommand::Schedule).boxed(),
357 (any::<GlobalId>(), any_antichain())
358 .prop_map(|(id, frontier)| ComputeCommand::AllowCompaction { id, frontier })
359 .boxed(),
360 any::<Peek>().prop_map(ComputeCommand::Peek).boxed(),
361 any_uuid()
362 .prop_map(|uuid| ComputeCommand::CancelPeek { uuid })
363 .boxed(),
364 ])
365 }
366}
367
368/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
369/// if the controller attempt to reconcile them with different values
370/// for anything in this struct.
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Arbitrary)]
372pub struct InstanceConfig {
373 /// Specification of introspection logging.
374 pub logging: LoggingConfig,
375 /// The offset relative to the replica startup at which it should expire. None disables feature.
376 pub expiration_offset: Option<Duration>,
377}
378
379impl InstanceConfig {
380 /// Check if the configuration is compatible with another configuration. This is true iff the
381 /// logging configuration is equivalent, and the other configuration (non-strictly) strengthens
382 /// the expiration offset.
383 ///
384 /// We consider a stricter offset compatible, which allows us to strengthen the value without
385 /// forcing replica restarts. However, it also means that replicas will only pick up the new
386 /// value after a restart.
387 pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
388 // Destructure to protect against adding fields in the future.
389 let InstanceConfig {
390 logging: self_logging,
391 expiration_offset: self_offset,
392 } = self;
393 let InstanceConfig {
394 logging: other_logging,
395 expiration_offset: other_offset,
396 } = other;
397
398 // Logging is compatible if exactly the same.
399 let logging_compatible = self_logging == other_logging;
400
401 // The offsets are compatible of other_offset is less than or equal to self_offset, i.e., it
402 // is a smaller offset and strengthens the offset.
403 let self_offset = Antichain::from_iter(*self_offset);
404 let other_offset = Antichain::from_iter(*other_offset);
405 let offset_compatible = timely::PartialOrder::less_equal(&other_offset, &self_offset);
406
407 logging_compatible && offset_compatible
408 }
409}
410
411impl RustType<ProtoInstanceConfig> for InstanceConfig {
412 fn into_proto(&self) -> ProtoInstanceConfig {
413 ProtoInstanceConfig {
414 logging: Some(self.logging.into_proto()),
415 expiration_offset: self.expiration_offset.into_proto(),
416 }
417 }
418
419 fn from_proto(proto: ProtoInstanceConfig) -> Result<Self, TryFromProtoError> {
420 Ok(Self {
421 logging: proto
422 .logging
423 .into_rust_if_some("ProtoCreateInstance::logging")?,
424 expiration_offset: proto.expiration_offset.into_rust()?,
425 })
426 }
427}
428
429/// Compute instance configuration parameters.
430///
431/// Parameters can be set (`Some`) or unset (`None`).
432/// Unset parameters should be interpreted to mean "use the previous value".
433#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Arbitrary)]
434pub struct ComputeParameters {
435 /// An optional arbitrary string that describes the class of the workload
436 /// this compute instance is running (e.g., `production` or `staging`).
437 ///
438 /// When `Some(x)`, a `workload_class=x` label is applied to all metrics
439 /// exported by the metrics registry associated with the compute instance.
440 pub workload_class: Option<Option<String>>,
441 /// The maximum allowed size in bytes for results of peeks and subscribes.
442 ///
443 /// Peeks and subscribes that would return results larger than this maximum return the
444 /// respective error responses instead:
445 /// * [`PeekResponse::Rows`] is replaced by [`PeekResponse::Error`].
446 /// * The [`SubscribeBatch::updates`] field is populated with an [`Err`] value.
447 ///
448 /// [`PeekResponse::Rows`]: super::response::PeekResponse::Rows
449 /// [`PeekResponse::Error`]: super::response::PeekResponse::Error
450 /// [`SubscribeBatch::updates`]: super::response::SubscribeBatch::updates
451 pub max_result_size: Option<u64>,
452 /// Tracing configuration.
453 pub tracing: TracingParameters,
454 /// gRPC client configuration.
455 pub grpc_client: GrpcClientParameters,
456
457 /// Config updates for components migrated to `mz_dyncfg`.
458 pub dyncfg_updates: ConfigUpdates,
459}
460
461impl ComputeParameters {
462 /// Update the parameter values with the set ones from `other`.
463 pub fn update(&mut self, other: ComputeParameters) {
464 let ComputeParameters {
465 workload_class,
466 max_result_size,
467 tracing,
468 grpc_client,
469 dyncfg_updates,
470 } = other;
471
472 if workload_class.is_some() {
473 self.workload_class = workload_class;
474 }
475 if max_result_size.is_some() {
476 self.max_result_size = max_result_size;
477 }
478
479 self.tracing.update(tracing);
480 self.grpc_client.update(grpc_client);
481
482 self.dyncfg_updates.extend(dyncfg_updates);
483 }
484
485 /// Return whether all parameters are unset.
486 pub fn all_unset(&self) -> bool {
487 *self == Self::default()
488 }
489}
490
491impl RustType<ProtoComputeParameters> for ComputeParameters {
492 fn into_proto(&self) -> ProtoComputeParameters {
493 ProtoComputeParameters {
494 workload_class: self.workload_class.into_proto(),
495 max_result_size: self.max_result_size.into_proto(),
496 tracing: Some(self.tracing.into_proto()),
497 grpc_client: Some(self.grpc_client.into_proto()),
498 dyncfg_updates: Some(self.dyncfg_updates.clone()),
499 }
500 }
501
502 fn from_proto(proto: ProtoComputeParameters) -> Result<Self, TryFromProtoError> {
503 Ok(Self {
504 workload_class: proto.workload_class.into_rust()?,
505 max_result_size: proto.max_result_size.into_rust()?,
506 tracing: proto
507 .tracing
508 .into_rust_if_some("ProtoComputeParameters::tracing")?,
509 grpc_client: proto
510 .grpc_client
511 .into_rust_if_some("ProtoComputeParameters::grpc_client")?,
512 dyncfg_updates: proto.dyncfg_updates.ok_or_else(|| {
513 TryFromProtoError::missing_field("ProtoComputeParameters::dyncfg_updates")
514 })?,
515 })
516 }
517}
518
519impl RustType<ProtoWorkloadClass> for Option<String> {
520 fn into_proto(&self) -> ProtoWorkloadClass {
521 ProtoWorkloadClass {
522 value: self.clone(),
523 }
524 }
525
526 fn from_proto(proto: ProtoWorkloadClass) -> Result<Self, TryFromProtoError> {
527 Ok(proto.value)
528 }
529}
530
531/// Compute parameters supplied to new replicas as part of the Timely instantiation. Usually cannot
532/// be changed once set.
533#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Arbitrary)]
534pub struct InitialComputeParameters {
535 /// `arrangement_exert_proportionality` value passed to new replicas.
536 pub arrangement_exert_proportionality: u32,
537 /// Enable zero copy allocator.
538 pub enable_zero_copy: bool,
539 /// Enable lgalloc to back the zero copy allocator.
540 pub enable_zero_copy_lgalloc: bool,
541 /// Optional limit on the number of empty buffers retained by the zero copy allocator.
542 pub zero_copy_limit: Option<usize>,
543}
544
545/// Metadata specific to the peek variant.
546#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub enum PeekTarget {
548 /// This peek is against an index. Since this should be held in memory on
549 /// the target cluster, no additional coordinates are necessary.
550 Index {
551 /// The id of the (possibly transient) index.
552 id: GlobalId,
553 },
554 /// This peek is against a Persist collection.
555 Persist {
556 /// The id of the backing Persist collection.
557 id: GlobalId,
558 /// The identifying metadata of the Persist shard.
559 metadata: CollectionMetadata,
560 },
561}
562
563impl PeekTarget {
564 /// Returns the ID of the peeked collection.
565 pub fn id(&self) -> GlobalId {
566 match self {
567 Self::Index { id } => *id,
568 Self::Persist { id, .. } => *id,
569 }
570 }
571}
572
573/// Peek a collection, either in an arrangement or Persist.
574///
575/// This request elicits data from the worker, by naming the
576/// collection and some actions to apply to the results before
577/// returning them.
578///
579/// The `timestamp` member must be valid for the arrangement that
580/// is referenced by `id`. This means that `AllowCompaction` for
581/// this arrangement should not pass `timestamp` before this command.
582/// Subsequent commands may arbitrarily compact the arrangements;
583/// the dataflow runners are responsible for ensuring that they can
584/// correctly answer the `Peek`.
585#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
586pub struct Peek<T = mz_repr::Timestamp> {
587 /// Target-specific metadata.
588 pub target: PeekTarget,
589 /// If `Some`, then look up only the given keys from the collection (instead of a full scan).
590 /// The vector is never empty.
591 #[proptest(strategy = "proptest::option::of(proptest::collection::vec(any::<Row>(), 1..5))")]
592 pub literal_constraints: Option<Vec<Row>>,
593 /// The identifier of this peek request.
594 ///
595 /// Used in responses and cancellation requests.
596 #[proptest(strategy = "any_uuid()")]
597 pub uuid: Uuid,
598 /// The logical timestamp at which the collection is queried.
599 pub timestamp: T,
600 /// Actions to apply to the result set before returning them.
601 pub finishing: RowSetFinishing,
602 /// Linear operation to apply in-line on each result.
603 pub map_filter_project: mz_expr::SafeMfpPlan,
604 /// An `OpenTelemetryContext` to forward trace information along
605 /// to the compute worker to allow associating traces between
606 /// the compute controller and the compute worker.
607 #[proptest(strategy = "empty_otel_ctx()")]
608 pub otel_ctx: OpenTelemetryContext,
609}
610
611impl RustType<ProtoPeek> for Peek {
612 fn into_proto(&self) -> ProtoPeek {
613 ProtoPeek {
614 key: match &self.literal_constraints {
615 // In the Some case, the vector is never empty, so it's safe to encode None as an
616 // empty vector, and Some(vector) as just the vector.
617 Some(vec) => {
618 assert!(!vec.is_empty());
619 vec.into_proto()
620 }
621 None => Vec::<Row>::new().into_proto(),
622 },
623 uuid: Some(self.uuid.into_proto()),
624 timestamp: self.timestamp.into(),
625 finishing: Some(self.finishing.into_proto()),
626 map_filter_project: Some(self.map_filter_project.into_proto()),
627 otel_ctx: self.otel_ctx.clone().into(),
628 target: Some(match &self.target {
629 PeekTarget::Index { id } => proto_peek::Target::Index(ProtoIndexTarget {
630 id: Some(id.into_proto()),
631 }),
632
633 PeekTarget::Persist { id, metadata } => {
634 proto_peek::Target::Persist(ProtoPersistTarget {
635 id: Some(id.into_proto()),
636 metadata: Some(metadata.into_proto()),
637 })
638 }
639 }),
640 }
641 }
642
643 fn from_proto(x: ProtoPeek) -> Result<Self, TryFromProtoError> {
644 Ok(Self {
645 literal_constraints: {
646 let vec: Vec<Row> = x.key.into_rust()?;
647 if vec.is_empty() { None } else { Some(vec) }
648 },
649 uuid: x.uuid.into_rust_if_some("ProtoPeek::uuid")?,
650 timestamp: x.timestamp.into(),
651 finishing: x.finishing.into_rust_if_some("ProtoPeek::finishing")?,
652 map_filter_project: x
653 .map_filter_project
654 .into_rust_if_some("ProtoPeek::map_filter_project")?,
655 otel_ctx: x.otel_ctx.into(),
656 target: match x.target {
657 Some(proto_peek::Target::Index(target)) => PeekTarget::Index {
658 id: target.id.into_rust_if_some("ProtoIndexTarget::id")?,
659 },
660 Some(proto_peek::Target::Persist(target)) => PeekTarget::Persist {
661 id: target.id.into_rust_if_some("ProtoPersistTarget::id")?,
662 metadata: target
663 .metadata
664 .into_rust_if_some("ProtoPersistTarget::metadata")?,
665 },
666 None => return Err(TryFromProtoError::missing_field("ProtoPeek::target")),
667 },
668 })
669 }
670}
671
672fn empty_otel_ctx() -> impl Strategy<Value = OpenTelemetryContext> {
673 (0..1).prop_map(|_| OpenTelemetryContext::empty())
674}
675
676impl TryIntoTimelyConfig for ComputeCommand {
677 fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
678 match self {
679 ComputeCommand::CreateTimely { config, epoch } => Ok((config, epoch)),
680 cmd => Err(cmd),
681 }
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use mz_ore::assert_ok;
688 use mz_proto::protobuf_roundtrip;
689 use proptest::prelude::ProptestConfig;
690 use proptest::proptest;
691
692 use super::*;
693
694 proptest! {
695 #![proptest_config(ProptestConfig::with_cases(32))]
696
697 #[mz_ore::test]
698 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
699 fn peek_protobuf_roundtrip(expect in any::<Peek>() ) {
700 let actual = protobuf_roundtrip::<_, ProtoPeek>(&expect);
701 assert_ok!(actual);
702 assert_eq!(actual.unwrap(), expect);
703 }
704
705 #[mz_ore::test]
706 fn compute_command_protobuf_roundtrip(expect in any::<ComputeCommand<mz_repr::Timestamp>>() ) {
707 let actual = protobuf_roundtrip::<_, ProtoComputeCommand>(&expect);
708 assert_ok!(actual);
709 assert_eq!(actual.unwrap(), expect);
710 }
711 }
712}