mz_compute_client/protocol/
response.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol responses.
11
12use std::num::NonZeroUsize;
13
14use mz_compute_types::plan::LirId;
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::tracing::OpenTelemetryContext;
18use mz_persist_client::batch::ProtoBatch;
19use mz_persist_types::ShardId;
20use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
21use mz_repr::{Diff, GlobalId, RelationDesc, Row};
22use mz_timely_util::progress::any_antichain;
23use proptest::prelude::{Arbitrary, any};
24use proptest::strategy::{BoxedStrategy, Just, Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use timely::progress::frontier::Antichain;
28use uuid::Uuid;
29
30include!(concat!(
31    env!("OUT_DIR"),
32    "/mz_compute_client.protocol.response.rs"
33));
34
35/// Compute protocol responses, sent by replicas to the compute controller.
36///
37/// Replicas send `ComputeResponse`s in response to [`ComputeCommand`]s they previously received
38/// from the compute controller.
39///
40/// [`ComputeCommand`]: super::command::ComputeCommand
41#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub enum ComputeResponse<T = mz_repr::Timestamp> {
43    /// `Frontiers` announces the advancement of the various frontiers of the specified compute
44    /// collection.
45    ///
46    /// Replicas must send `Frontiers` responses for compute collections that are indexes or
47    /// storage sinks. Replicas must not send `Frontiers` responses for subscribes and copy-tos
48    /// ([#16274]).
49    ///
50    /// Replicas must never report regressing frontiers. Specifically:
51    ///
52    ///   * The first frontier of any kind reported for a collection must not be less than that
53    ///     collection's initial `as_of` frontier.
54    ///   * Subsequent reported frontiers for a collection must not be less than any frontier of
55    ///     the same kind reported previously for the same collection.
56    ///
57    /// Replicas must send `Frontiers` responses that report each frontier kind to have advanced to
58    /// the empty frontier in response to an [`AllowCompaction` command] that allows compaction of
59    /// the collection to to the empty frontier, unless the frontier has previously advanced to the
60    /// empty frontier as part of the regular dataflow computation. ([#16271])
61    ///
62    /// Once a frontier was reported to have been advanced to the empty frontier, the replica must
63    /// not send further `Frontiers` responses with non-`None` values for that frontier kind.
64    ///
65    /// The replica must not send `Frontiers` responses for collections that have not
66    /// been created previously by a [`CreateDataflow` command] or by a [`CreateInstance`
67    /// command].
68    ///
69    /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
70    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
71    /// [`CreateInstance` command]: super::command::ComputeCommand::CreateInstance
72    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
73    /// [#16274]: https://github.com/MaterializeInc/database-issues/issues/4701
74    Frontiers(GlobalId, FrontiersResponse<T>),
75
76    /// `PeekResponse` reports the result of a previous [`Peek` command]. The peek is identified by
77    /// a `Uuid` that matches the command's [`Peek::uuid`].
78    ///
79    /// The replica must send exactly one `PeekResponse` for every [`Peek` command] it received.
80    ///
81    /// If the replica did not receive a [`CancelPeek` command] for a peek, it must not send a
82    /// [`Canceled`] response for that peek. If the replica did receive a [`CancelPeek` command]
83    /// for a peek, it may send any of the three [`PeekResponse`] variants.
84    ///
85    /// The replica must not send `PeekResponse`s for peek IDs that were not previously specified
86    /// in a [`Peek` command].
87    ///
88    /// [`Peek` command]: super::command::ComputeCommand::Peek
89    /// [`CancelPeek` command]: super::command::ComputeCommand::CancelPeek
90    /// [`Peek::uuid`]: super::command::Peek::uuid
91    /// [`Canceled`]: PeekResponse::Canceled
92    PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
93
94    /// `SubscribeResponse` reports the results emitted by an active subscribe over some time
95    /// interval.
96    ///
97    /// For each subscribe that was installed by a previous [`CreateDataflow` command], the
98    /// replica must emit [`Batch`] responses that cover the entire time interval from the
99    /// minimum time until the subscribe advances to the empty frontier or is
100    /// dropped. The time intervals of consecutive [`Batch`]es must be increasing, contiguous,
101    /// non-overlapping, and non-empty. All updates transmitted in a batch must be consolidated and
102    /// have times within that batch’s time interval. All updates' times must be greater than or
103    /// equal to `as_of`. The `upper` of the first [`Batch`] of a subscribe must not be less than
104    /// that subscribe's initial `as_of` frontier.
105    ///
106    /// The replica must send [`DroppedAt`] responses if the subscribe was dropped in response to
107    /// an [`AllowCompaction` command] that advanced its read frontier to the empty frontier. The
108    /// [`DroppedAt`] frontier must be the upper frontier of the last emitted batch.
109    ///
110    /// The replica must not send a [`DroppedAt`] response if the subscribe’s upper frontier
111    /// (reported by [`Batch`] responses) has advanced to the empty frontier (e.g. because its
112    /// inputs advanced to the empty frontier).
113    ///
114    /// Once a subscribe was reported to have advanced to the empty frontier, or has been dropped:
115    ///
116    ///   * It must no longer read from its inputs.
117    ///   * The replica must not send further `SubscribeResponse`s for that subscribe.
118    ///
119    /// The replica must not send `SubscribeResponse`s for subscribes that have not been
120    /// created previously by a [`CreateDataflow` command].
121    ///
122    /// [`Batch`]: SubscribeResponse::Batch
123    /// [`DroppedAt`]: SubscribeResponse::DroppedAt
124    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
125    /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
126    SubscribeResponse(GlobalId, SubscribeResponse<T>),
127
128    /// `CopyToResponse` reports the completion of an S3-oneshot sink.
129    ///
130    /// The replica must send exactly one `CopyToResponse` for every S3-oneshot sink previously
131    /// created by a [`CreateDataflow` command].
132    ///
133    /// The replica must not send `CopyToResponse`s for S3-oneshot sinks that were not previously
134    /// created by a [`CreateDataflow` command].
135    ///
136    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
137    CopyToResponse(GlobalId, CopyToResponse),
138
139    /// `Status` reports status updates from replicas to the controller.
140    ///
141    /// `Status` responses are a way for replicas to stream back introspection data that the
142    /// controller can then announce to its clients. They have no effect on the lifecycles of
143    /// compute collections. Correct operation of the Compute layer must not rely on `Status`
144    /// responses being sent or received.
145    ///
146    /// `Status` responses that are specific to collections must only be sent for collections that
147    /// (a) have previously been created by a [`CreateDataflow` command] and (b) have not yet
148    /// been reported to have advanced to the empty frontier.
149    ///
150    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
151    Status(StatusResponse),
152}
153
154impl RustType<ProtoComputeResponse> for ComputeResponse<mz_repr::Timestamp> {
155    fn into_proto(&self) -> ProtoComputeResponse {
156        use proto_compute_response::Kind::*;
157        use proto_compute_response::*;
158        ProtoComputeResponse {
159            kind: Some(match self {
160                ComputeResponse::Frontiers(id, resp) => Frontiers(ProtoFrontiersKind {
161                    id: Some(id.into_proto()),
162                    resp: Some(resp.into_proto()),
163                }),
164                ComputeResponse::PeekResponse(id, resp, otel_ctx) => {
165                    PeekResponse(ProtoPeekResponseKind {
166                        id: Some(id.into_proto()),
167                        resp: Some(resp.into_proto()),
168                        otel_ctx: otel_ctx.clone().into(),
169                    })
170                }
171                ComputeResponse::SubscribeResponse(id, resp) => {
172                    SubscribeResponse(ProtoSubscribeResponseKind {
173                        id: Some(id.into_proto()),
174                        resp: Some(resp.into_proto()),
175                    })
176                }
177                ComputeResponse::CopyToResponse(id, resp) => {
178                    CopyToResponse(ProtoCopyToResponseKind {
179                        id: Some(id.into_proto()),
180                        resp: Some(resp.into_proto()),
181                    })
182                }
183                ComputeResponse::Status(resp) => Status(resp.into_proto()),
184            }),
185        }
186    }
187
188    fn from_proto(proto: ProtoComputeResponse) -> Result<Self, TryFromProtoError> {
189        use proto_compute_response::Kind::*;
190        match proto.kind {
191            Some(Frontiers(resp)) => Ok(ComputeResponse::Frontiers(
192                resp.id.into_rust_if_some("ProtoFrontiersKind::id")?,
193                resp.resp.into_rust_if_some("ProtoFrontiersKind::resp")?,
194            )),
195            Some(PeekResponse(resp)) => Ok(ComputeResponse::PeekResponse(
196                resp.id.into_rust_if_some("ProtoPeekResponseKind::id")?,
197                resp.resp.into_rust_if_some("ProtoPeekResponseKind::resp")?,
198                resp.otel_ctx.into(),
199            )),
200            Some(SubscribeResponse(resp)) => Ok(ComputeResponse::SubscribeResponse(
201                resp.id
202                    .into_rust_if_some("ProtoSubscribeResponseKind::id")?,
203                resp.resp
204                    .into_rust_if_some("ProtoSubscribeResponseKind::resp")?,
205            )),
206            Some(CopyToResponse(resp)) => Ok(ComputeResponse::CopyToResponse(
207                resp.id.into_rust_if_some("ProtoCopyToResponseKind::id")?,
208                resp.resp
209                    .into_rust_if_some("ProtoCopyToResponseKind::resp")?,
210            )),
211            Some(Status(resp)) => Ok(ComputeResponse::Status(resp.into_rust()?)),
212            None => Err(TryFromProtoError::missing_field(
213                "ProtoComputeResponse::kind",
214            )),
215        }
216    }
217}
218
219impl Arbitrary for ComputeResponse<mz_repr::Timestamp> {
220    type Strategy = Union<BoxedStrategy<Self>>;
221    type Parameters = ();
222
223    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
224        Union::new(vec![
225            (any::<GlobalId>(), any::<FrontiersResponse>())
226                .prop_map(|(id, resp)| ComputeResponse::Frontiers(id, resp))
227                .boxed(),
228            (any_uuid(), any::<PeekResponse>())
229                .prop_map(|(id, resp)| {
230                    ComputeResponse::PeekResponse(id, resp, OpenTelemetryContext::empty())
231                })
232                .boxed(),
233            (any::<GlobalId>(), any::<SubscribeResponse>())
234                .prop_map(|(id, resp)| ComputeResponse::SubscribeResponse(id, resp))
235                .boxed(),
236            any::<StatusResponse>()
237                .prop_map(ComputeResponse::Status)
238                .boxed(),
239        ])
240    }
241}
242
243/// A response reporting advancement of frontiers of a compute collection.
244///
245/// All contained frontier fields are optional. `None` values imply that the respective frontier
246/// has not advanced and the previously reported value is still current.
247#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
248pub struct FrontiersResponse<T = mz_repr::Timestamp> {
249    /// The collection's new write frontier, if any.
250    ///
251    /// Upon receiving an updated `write_frontier`, the controller may assume that the contents of the
252    /// collection are sealed for all times less than that frontier. Once it has reported the
253    /// `write_frontier` as the empty frontier, the replica must no longer change the contents of the
254    /// collection.
255    pub write_frontier: Option<Antichain<T>>,
256    /// The collection's new input frontier, if any.
257    ///
258    /// Upon receiving an updated `input_frontier`, the controller may assume that the replica has
259    /// finished reading from the collection’s inputs up to that frontier. Once it has reported the
260    /// `input_frontier` as the empty frontier, the replica must no longer read from the
261    /// collection's inputs.
262    pub input_frontier: Option<Antichain<T>>,
263    /// The collection's new output frontier, if any.
264    ///
265    /// Upon receiving an updated `output_frontier`, the controller may assume that the replica
266    /// has finished processing the collection's input up to that frontier.
267    ///
268    /// The `output_frontier` is often equal to the `write_frontier`, but not always. Some
269    /// collections can jump their write frontiers ahead of the times they have finished
270    /// processing, causing the `output_frontier` to lag behind the `write_frontier`. Collections
271    /// writing materialized views do so in two cases:
272    ///
273    ///  * `REFRESH` MVs jump their write frontier ahead to the next refresh time.
274    ///  * In a multi-replica cluster, slower replicas observe and report the write frontier of the
275    ///    fastest replica, by witnessing advancements of the target persist shard's `upper`.
276    pub output_frontier: Option<Antichain<T>>,
277}
278
279impl<T> FrontiersResponse<T> {
280    /// Returns whether there are any contained updates.
281    pub fn has_updates(&self) -> bool {
282        self.write_frontier.is_some()
283            || self.input_frontier.is_some()
284            || self.output_frontier.is_some()
285    }
286}
287
288impl RustType<ProtoFrontiersResponse> for FrontiersResponse {
289    fn into_proto(&self) -> ProtoFrontiersResponse {
290        ProtoFrontiersResponse {
291            write_frontier: self.write_frontier.into_proto(),
292            input_frontier: self.input_frontier.into_proto(),
293            output_frontier: self.output_frontier.into_proto(),
294        }
295    }
296
297    fn from_proto(proto: ProtoFrontiersResponse) -> Result<Self, TryFromProtoError> {
298        Ok(Self {
299            write_frontier: proto.write_frontier.into_rust()?,
300            input_frontier: proto.input_frontier.into_rust()?,
301            output_frontier: proto.output_frontier.into_rust()?,
302        })
303    }
304}
305
306impl Arbitrary for FrontiersResponse {
307    type Strategy = BoxedStrategy<Self>;
308    type Parameters = ();
309
310    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
311        (any_antichain(), any_antichain(), any_antichain())
312            .prop_map(|(write, input, compute)| Self {
313                write_frontier: Some(write),
314                input_frontier: Some(input),
315                output_frontier: Some(compute),
316            })
317            .boxed()
318    }
319}
320
321/// The response from a `Peek`.
322///
323/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
324/// we expect a 1:1 contract between `Peek` and `PeekResponse`.
325#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
326pub enum PeekResponse {
327    /// Returned rows of a successful peek.
328    Rows(RowCollection),
329    /// Results of the peek were stashed in persist batches.
330    Stashed(Box<StashedPeekResponse>),
331    /// Error of an unsuccessful peek.
332    Error(String),
333    /// The peek was canceled.
334    Canceled,
335}
336
337impl PeekResponse {
338    /// Return the size of row bytes stored inline in this response.
339    pub fn inline_byte_len(&self) -> usize {
340        match self {
341            Self::Rows(rows) => rows.byte_len(),
342            Self::Stashed(stashed) => stashed.inline_rows.byte_len(),
343            Self::Error(_) | Self::Canceled => 0,
344        }
345    }
346}
347
348impl RustType<ProtoPeekResponse> for PeekResponse {
349    fn into_proto(&self) -> ProtoPeekResponse {
350        use proto_peek_response::Kind::*;
351        ProtoPeekResponse {
352            kind: Some(match self {
353                PeekResponse::Rows(rows) => Rows(rows.into_proto()),
354                PeekResponse::Stashed(stashed) => Stashed(stashed.as_ref().into_proto()),
355                PeekResponse::Error(err) => proto_peek_response::Kind::Error(err.clone()),
356                PeekResponse::Canceled => Canceled(()),
357            }),
358        }
359    }
360
361    fn from_proto(proto: ProtoPeekResponse) -> Result<Self, TryFromProtoError> {
362        use proto_peek_response::Kind::*;
363        match proto.kind {
364            Some(Rows(rows)) => Ok(PeekResponse::Rows(rows.into_rust()?)),
365            Some(Stashed(stashed)) => Ok(PeekResponse::Stashed(Box::new(stashed.into_rust()?))),
366            Some(proto_peek_response::Kind::Error(err)) => Ok(PeekResponse::Error(err)),
367            Some(Canceled(())) => Ok(PeekResponse::Canceled),
368            None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind")),
369        }
370    }
371}
372
373impl Arbitrary for PeekResponse {
374    type Strategy = Union<BoxedStrategy<Self>>;
375    type Parameters = ();
376
377    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
378        Union::new(vec![
379            proptest::collection::vec(
380                (
381                    any::<Row>(),
382                    (1..usize::MAX).prop_map(|u| NonZeroUsize::try_from(u).unwrap()),
383                ),
384                1..11,
385            )
386            .prop_map(|rows| PeekResponse::Rows(RowCollection::new(rows, &[])))
387            .boxed(),
388            ".*".prop_map(PeekResponse::Error).boxed(),
389            Just(PeekResponse::Canceled).boxed(),
390        ])
391    }
392}
393
394/// Response from a peek whose results have been stashed into persist.
395#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
396pub struct StashedPeekResponse {
397    /// The number of rows stored in response batches. This is the sum of the
398    /// diff values of the contained rows.
399    ///
400    /// This does _NOT_ include rows in `inline_rows`.
401    pub num_rows_batches: u64,
402    /// The sum of the encoded sizes of all batches in this response.
403    pub encoded_size_bytes: usize,
404    /// [RelationDesc] for the rows in these stashed batches of results.
405    pub relation_desc: RelationDesc,
406    /// The [ShardId] under which result batches have been stashed.
407    pub shard_id: ShardId,
408    /// Batches of Rows, must be combined with reponses from other workers and
409    /// consolidated before sending back via a client.
410    pub batches: Vec<ProtoBatch>,
411    /// Rows that have not been uploaded to the stash, because their total size
412    /// did not go above the threshold for using the peek stash.
413    ///
414    /// We will have a mix of stashed responses and inline responses because the
415    /// result sizes across different workers can and will vary.
416    pub inline_rows: RowCollection,
417}
418
419impl StashedPeekResponse {
420    /// Total count of [`Row`]s represented by this collection, considering a
421    /// possible `OFFSET` and `LIMIT`.
422    pub fn num_rows(&self, offset: usize, limit: Option<usize>) -> usize {
423        let num_stashed_rows: usize = usize::cast_from(self.num_rows_batches);
424        let num_inline_rows = self.inline_rows.count(offset, limit);
425        let mut num_rows = num_stashed_rows + num_inline_rows;
426
427        // Consider a possible OFFSET.
428        num_rows = num_rows.saturating_sub(offset);
429
430        // Consider a possible LIMIT.
431        if let Some(limit) = limit {
432            num_rows = std::cmp::min(limit, num_rows);
433        }
434
435        num_rows
436    }
437
438    /// The size in bytes of the encoded rows in this result.
439    pub fn size_bytes(&self) -> usize {
440        let inline_size = self.inline_rows.byte_len();
441
442        self.encoded_size_bytes + inline_size
443    }
444}
445
446impl RustType<ProtoStashedPeekResponse> for StashedPeekResponse {
447    fn into_proto(&self) -> ProtoStashedPeekResponse {
448        ProtoStashedPeekResponse {
449            relation_desc: Some(self.relation_desc.into_proto()),
450            shard_id: self.shard_id.into_proto(),
451            batches: self.batches.clone(),
452            num_rows: self.num_rows_batches.into_proto(),
453            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
454            inline_rows: Some(self.inline_rows.into_proto()),
455        }
456    }
457
458    fn from_proto(proto: ProtoStashedPeekResponse) -> Result<Self, TryFromProtoError> {
459        let shard_id: ShardId = proto
460            .shard_id
461            .into_rust()
462            .expect("valid transmittable shard_id");
463        Ok(StashedPeekResponse {
464            relation_desc: proto
465                .relation_desc
466                .into_rust_if_some("ProtoStashedPeekResponse::relation_desc")?,
467            shard_id,
468            batches: proto.batches,
469            num_rows_batches: proto.num_rows,
470            encoded_size_bytes: usize::cast_from(proto.encoded_size_bytes),
471            inline_rows: proto
472                .inline_rows
473                .into_rust_if_some("ProtoStashedPeekResponse::inline_rows")?,
474        })
475    }
476}
477
478/// Various responses that can be communicated after a COPY TO command.
479#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
480pub enum CopyToResponse {
481    /// Returned number of rows for a successful COPY TO.
482    RowCount(u64),
483    /// Error of an unsuccessful COPY TO.
484    Error(String),
485    /// The COPY TO sink dataflow was dropped.
486    Dropped,
487}
488
489impl RustType<ProtoCopyToResponse> for CopyToResponse {
490    fn into_proto(&self) -> ProtoCopyToResponse {
491        use proto_copy_to_response::Kind::*;
492        ProtoCopyToResponse {
493            kind: Some(match self {
494                CopyToResponse::RowCount(rows) => Rows(*rows),
495                CopyToResponse::Error(error) => Error(error.clone()),
496                CopyToResponse::Dropped => Dropped(()),
497            }),
498        }
499    }
500
501    fn from_proto(proto: ProtoCopyToResponse) -> Result<Self, TryFromProtoError> {
502        use proto_copy_to_response::Kind::*;
503        match proto.kind {
504            Some(Rows(rows)) => Ok(CopyToResponse::RowCount(rows)),
505            Some(Error(error)) => Ok(CopyToResponse::Error(error)),
506            Some(Dropped(())) => Ok(CopyToResponse::Dropped),
507            None => Err(TryFromProtoError::missing_field(
508                "ProtoCopyToResponse::kind",
509            )),
510        }
511    }
512}
513
514/// Various responses that can be communicated about the progress of a SUBSCRIBE command.
515#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
516pub enum SubscribeResponse<T = mz_repr::Timestamp> {
517    /// A batch of updates over a non-empty interval of time.
518    Batch(SubscribeBatch<T>),
519    /// The SUBSCRIBE dataflow was dropped, leaving updates from this frontier onward unspecified.
520    DroppedAt(Antichain<T>),
521}
522
523impl<T> SubscribeResponse<T> {
524    /// Converts `self` to an error if a maximum size is exceeded.
525    pub fn to_error_if_exceeds(&mut self, max_result_size: usize) {
526        if let SubscribeResponse::Batch(batch) = self {
527            batch.to_error_if_exceeds(max_result_size);
528        }
529    }
530}
531
532impl RustType<ProtoSubscribeResponse> for SubscribeResponse<mz_repr::Timestamp> {
533    fn into_proto(&self) -> ProtoSubscribeResponse {
534        use proto_subscribe_response::Kind::*;
535        ProtoSubscribeResponse {
536            kind: Some(match self {
537                SubscribeResponse::Batch(subscribe_batch) => Batch(subscribe_batch.into_proto()),
538                SubscribeResponse::DroppedAt(antichain) => DroppedAt(antichain.into_proto()),
539            }),
540        }
541    }
542
543    fn from_proto(proto: ProtoSubscribeResponse) -> Result<Self, TryFromProtoError> {
544        use proto_subscribe_response::Kind::*;
545        match proto.kind {
546            Some(Batch(subscribe_batch)) => {
547                Ok(SubscribeResponse::Batch(subscribe_batch.into_rust()?))
548            }
549            Some(DroppedAt(antichain)) => Ok(SubscribeResponse::DroppedAt(antichain.into_rust()?)),
550            None => Err(TryFromProtoError::missing_field(
551                "ProtoSubscribeResponse::kind",
552            )),
553        }
554    }
555}
556
557impl Arbitrary for SubscribeResponse<mz_repr::Timestamp> {
558    type Strategy = Union<BoxedStrategy<Self>>;
559    type Parameters = ();
560
561    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
562        Union::new(vec![
563            any::<SubscribeBatch<mz_repr::Timestamp>>()
564                .prop_map(SubscribeResponse::Batch)
565                .boxed(),
566            proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4)
567                .prop_map(|antichain| SubscribeResponse::DroppedAt(Antichain::from(antichain)))
568                .boxed(),
569        ])
570    }
571}
572
573/// A batch of updates for the interval `[lower, upper)`.
574#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
575pub struct SubscribeBatch<T = mz_repr::Timestamp> {
576    /// The lower frontier of the batch of updates.
577    pub lower: Antichain<T>,
578    /// The upper frontier of the batch of updates.
579    pub upper: Antichain<T>,
580    /// All updates greater than `lower` and not greater than `upper`.
581    ///
582    /// An `Err` variant can be used to indicate e.g. that the size of the updates exceeds internal limits.
583    pub updates: Result<Vec<(T, Row, Diff)>, String>,
584}
585
586impl<T> SubscribeBatch<T> {
587    /// Converts `self` to an error if a maximum size is exceeded.
588    fn to_error_if_exceeds(&mut self, max_result_size: usize) {
589        use bytesize::ByteSize;
590        if let Ok(updates) = &self.updates {
591            let total_size: usize = updates
592                .iter()
593                .map(|(_time, row, _diff)| row.byte_len())
594                .sum();
595            if total_size > max_result_size {
596                self.updates = Err(format!(
597                    "result exceeds max size of {}",
598                    ByteSize::b(u64::cast_from(max_result_size))
599                ));
600            }
601        }
602    }
603}
604
605impl RustType<ProtoSubscribeBatch> for SubscribeBatch<mz_repr::Timestamp> {
606    fn into_proto(&self) -> ProtoSubscribeBatch {
607        use proto_subscribe_batch::ProtoUpdate;
608        ProtoSubscribeBatch {
609            lower: Some(self.lower.into_proto()),
610            upper: Some(self.upper.into_proto()),
611            updates: Some(proto_subscribe_batch::ProtoSubscribeBatchContents {
612                kind: match &self.updates {
613                    Ok(updates) => {
614                        let updates = updates
615                            .iter()
616                            .map(|(t, r, d)| ProtoUpdate {
617                                timestamp: t.into(),
618                                row: Some(r.into_proto()),
619                                diff: d.into_proto(),
620                            })
621                            .collect();
622
623                        Some(
624                            proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
625                                proto_subscribe_batch::ProtoSubscribeUpdates { updates },
626                            ),
627                        )
628                    }
629                    Err(text) => Some(
630                        proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(
631                            text.clone(),
632                        ),
633                    ),
634                },
635            }),
636        }
637    }
638
639    fn from_proto(proto: ProtoSubscribeBatch) -> Result<Self, TryFromProtoError> {
640        Ok(SubscribeBatch {
641            lower: proto.lower.into_rust_if_some("ProtoTailUpdate::lower")?,
642            upper: proto.upper.into_rust_if_some("ProtoTailUpdate::upper")?,
643            updates: match proto.updates.unwrap().kind {
644                Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
645                    updates,
646                )) => Ok(updates
647                    .updates
648                    .into_iter()
649                    .map(|update| {
650                        Ok((
651                            update.timestamp.into(),
652                            update.row.into_rust_if_some("ProtoUpdate::row")?,
653                            update.diff.into(),
654                        ))
655                    })
656                    .collect::<Result<Vec<_>, TryFromProtoError>>()?),
657                Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(text)) => {
658                    Err(text)
659                }
660                None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind"))?,
661            },
662        })
663    }
664}
665
666impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
667    type Strategy = BoxedStrategy<Self>;
668    type Parameters = ();
669
670    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
671        (
672            proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
673            proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
674            proptest::collection::vec(
675                (any::<mz_repr::Timestamp>(), any::<Row>(), any::<Diff>()),
676                1..4,
677            ),
678        )
679            .prop_map(|(lower, upper, updates)| SubscribeBatch {
680                lower: Antichain::from(lower),
681                upper: Antichain::from(upper),
682                updates: Ok(updates),
683            })
684            .boxed()
685    }
686}
687
688/// Status updates replicas can report to the controller.
689#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
690pub enum StatusResponse {
691    /// Reports the hydration status of dataflow operators.
692    OperatorHydration(OperatorHydrationStatus),
693}
694
695impl RustType<ProtoStatusResponse> for StatusResponse {
696    fn into_proto(&self) -> ProtoStatusResponse {
697        use proto_status_response::Kind;
698
699        let kind = match self {
700            Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
701        };
702        ProtoStatusResponse { kind: Some(kind) }
703    }
704
705    fn from_proto(proto: ProtoStatusResponse) -> Result<Self, TryFromProtoError> {
706        use proto_status_response::Kind;
707
708        match proto.kind {
709            Some(Kind::OperatorHydration(status)) => {
710                Ok(Self::OperatorHydration(status.into_rust()?))
711            }
712            None => Err(TryFromProtoError::missing_field(
713                "ProtoStatusResponse::kind",
714            )),
715        }
716    }
717}
718
719/// An update about the hydration status of a set of dataflow operators.
720#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
721pub struct OperatorHydrationStatus {
722    /// The ID of the compute collection exported by the dataflow.
723    pub collection_id: GlobalId,
724    /// The ID of the LIR node for which the hydration status changed.
725    pub lir_id: LirId,
726    /// The ID of the worker for which the hydration status changed.
727    pub worker_id: usize,
728    /// Whether the node is hydrated on the worker.
729    pub hydrated: bool,
730}
731
732impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
733    fn into_proto(&self) -> ProtoOperatorHydrationStatus {
734        ProtoOperatorHydrationStatus {
735            collection_id: Some(self.collection_id.into_proto()),
736            lir_id: self.lir_id.into_proto(),
737            worker_id: self.worker_id.into_proto(),
738            hydrated: self.hydrated.into_proto(),
739        }
740    }
741
742    fn from_proto(proto: ProtoOperatorHydrationStatus) -> Result<Self, TryFromProtoError> {
743        Ok(Self {
744            collection_id: proto
745                .collection_id
746                .into_rust_if_some("ProtoOperatorHydrationStatus::collection_id")?,
747            lir_id: proto.lir_id.into_rust()?,
748            worker_id: proto.worker_id.into_rust()?,
749            hydrated: proto.hydrated.into_rust()?,
750        })
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use mz_ore::assert_ok;
757    use mz_proto::protobuf_roundtrip;
758    use proptest::prelude::ProptestConfig;
759    use proptest::proptest;
760
761    use super::*;
762
763    /// Test to ensure the size of the `ComputeResponse` enum doesn't regress.
764    #[mz_ore::test]
765    fn test_compute_response_size() {
766        assert_eq!(std::mem::size_of::<ComputeResponse>(), 120);
767    }
768
769    proptest! {
770        #![proptest_config(ProptestConfig::with_cases(32))]
771
772        #[mz_ore::test]
773        fn compute_response_protobuf_roundtrip(expect in any::<ComputeResponse<mz_repr::Timestamp>>() ) {
774            let actual = protobuf_roundtrip::<_, ProtoComputeResponse>(&expect);
775            assert_ok!(actual);
776            assert_eq!(actual.unwrap(), expect);
777        }
778    }
779}