mz_compute_client/protocol/
response.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol responses.
11
12use std::num::NonZeroUsize;
13
14use mz_compute_types::plan::LirId;
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::tracing::OpenTelemetryContext;
18use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
19use mz_repr::{Diff, GlobalId, Row};
20use mz_timely_util::progress::any_antichain;
21use proptest::prelude::{Arbitrary, any};
22use proptest::strategy::{BoxedStrategy, Just, Strategy, Union};
23use proptest_derive::Arbitrary;
24use serde::{Deserialize, Serialize};
25use timely::progress::frontier::Antichain;
26use uuid::Uuid;
27
28include!(concat!(
29    env!("OUT_DIR"),
30    "/mz_compute_client.protocol.response.rs"
31));
32
33/// Compute protocol responses, sent by replicas to the compute controller.
34///
35/// Replicas send `ComputeResponse`s in response to [`ComputeCommand`]s they previously received
36/// from the compute controller.
37///
38/// [`ComputeCommand`]: super::command::ComputeCommand
39#[derive(Clone, Debug, PartialEq)]
40pub enum ComputeResponse<T = mz_repr::Timestamp> {
41    /// `Frontiers` announces the advancement of the various frontiers of the specified compute
42    /// collection.
43    ///
44    /// Replicas must send `Frontiers` responses for compute collections that are indexes or
45    /// storage sinks. Replicas must not send `Frontiers` responses for subscribes and copy-tos
46    /// ([#16274]).
47    ///
48    /// Replicas must never report regressing frontiers. Specifically:
49    ///
50    ///   * The first frontier of any kind reported for a collection must not be less than that
51    ///     collection's initial `as_of` frontier.
52    ///   * Subsequent reported frontiers for a collection must not be less than any frontier of
53    ///     the same kind reported previously for the same collection.
54    ///
55    /// Replicas must send `Frontiers` responses that report each frontier kind to have advanced to
56    /// the empty frontier in response to an [`AllowCompaction` command] that allows compaction of
57    /// the collection to to the empty frontier, unless the frontier has previously advanced to the
58    /// empty frontier as part of the regular dataflow computation. ([#16271])
59    ///
60    /// Once a frontier was reported to have been advanced to the empty frontier, the replica must
61    /// not send further `Frontiers` responses with non-`None` values for that frontier kind.
62    ///
63    /// The replica must not send `Frontiers` responses for collections that have not
64    /// been created previously by a [`CreateDataflow` command] or by a [`CreateInstance`
65    /// command].
66    ///
67    /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
68    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
69    /// [`CreateInstance` command]: super::command::ComputeCommand::CreateInstance
70    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
71    /// [#16274]: https://github.com/MaterializeInc/database-issues/issues/4701
72    Frontiers(GlobalId, FrontiersResponse<T>),
73
74    /// `PeekResponse` reports the result of a previous [`Peek` command]. The peek is identified by
75    /// a `Uuid` that matches the command's [`Peek::uuid`].
76    ///
77    /// The replica must send exactly one `PeekResponse` for every [`Peek` command] it received.
78    ///
79    /// If the replica did not receive a [`CancelPeek` command] for a peek, it must not send a
80    /// [`Canceled`] response for that peek. If the replica did receive a [`CancelPeek` command]
81    /// for a peek, it may send any of the three [`PeekResponse`] variants.
82    ///
83    /// The replica must not send `PeekResponse`s for peek IDs that were not previously specified
84    /// in a [`Peek` command].
85    ///
86    /// [`Peek` command]: super::command::ComputeCommand::Peek
87    /// [`CancelPeek` command]: super::command::ComputeCommand::CancelPeek
88    /// [`Peek::uuid`]: super::command::Peek::uuid
89    /// [`Canceled`]: PeekResponse::Canceled
90    PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
91
92    /// `SubscribeResponse` reports the results emitted by an active subscribe over some time
93    /// interval.
94    ///
95    /// For each subscribe that was installed by a previous [`CreateDataflow` command], the
96    /// replica must emit [`Batch`] responses that cover the entire time interval from the
97    /// minimum time until the subscribe advances to the empty frontier or is
98    /// dropped. The time intervals of consecutive [`Batch`]es must be increasing, contiguous,
99    /// non-overlapping, and non-empty. All updates transmitted in a batch must be consolidated and
100    /// have times within that batch’s time interval. All updates' times must be greater than or
101    /// equal to `as_of`. The `upper` of the first [`Batch`] of a subscribe must not be less than
102    /// that subscribe's initial `as_of` frontier.
103    ///
104    /// The replica must send [`DroppedAt`] responses if the subscribe was dropped in response to
105    /// an [`AllowCompaction` command] that advanced its read frontier to the empty frontier. The
106    /// [`DroppedAt`] frontier must be the upper frontier of the last emitted batch.
107    ///
108    /// The replica must not send a [`DroppedAt`] response if the subscribe’s upper frontier
109    /// (reported by [`Batch`] responses) has advanced to the empty frontier (e.g. because its
110    /// inputs advanced to the empty frontier).
111    ///
112    /// Once a subscribe was reported to have advanced to the empty frontier, or has been dropped:
113    ///
114    ///   * It must no longer read from its inputs.
115    ///   * The replica must not send further `SubscribeResponse`s for that subscribe.
116    ///
117    /// The replica must not send `SubscribeResponse`s for subscribes that have not been
118    /// created previously by a [`CreateDataflow` command].
119    ///
120    /// [`Batch`]: SubscribeResponse::Batch
121    /// [`DroppedAt`]: SubscribeResponse::DroppedAt
122    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
123    /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
124    SubscribeResponse(GlobalId, SubscribeResponse<T>),
125
126    /// `CopyToResponse` reports the completion of an S3-oneshot sink.
127    ///
128    /// The replica must send exactly one `CopyToResponse` for every S3-oneshot sink previously
129    /// created by a [`CreateDataflow` command].
130    ///
131    /// The replica must not send `CopyToResponse`s for S3-oneshot sinks that were not previously
132    /// created by a [`CreateDataflow` command].
133    ///
134    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
135    CopyToResponse(GlobalId, CopyToResponse),
136
137    /// `Status` reports status updates from replicas to the controller.
138    ///
139    /// `Status` responses are a way for replicas to stream back introspection data that the
140    /// controller can then announce to its clients. They have no effect on the lifecycles of
141    /// compute collections. Correct operation of the Compute layer must not rely on `Status`
142    /// responses being sent or received.
143    ///
144    /// `Status` responses that are specific to collections must only be sent for collections that
145    /// (a) have previously been created by a [`CreateDataflow` command] and (b) have not yet
146    /// been reported to have advanced to the empty frontier.
147    ///
148    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
149    Status(StatusResponse),
150}
151
152impl RustType<ProtoComputeResponse> for ComputeResponse<mz_repr::Timestamp> {
153    fn into_proto(&self) -> ProtoComputeResponse {
154        use proto_compute_response::Kind::*;
155        use proto_compute_response::*;
156        ProtoComputeResponse {
157            kind: Some(match self {
158                ComputeResponse::Frontiers(id, resp) => Frontiers(ProtoFrontiersKind {
159                    id: Some(id.into_proto()),
160                    resp: Some(resp.into_proto()),
161                }),
162                ComputeResponse::PeekResponse(id, resp, otel_ctx) => {
163                    PeekResponse(ProtoPeekResponseKind {
164                        id: Some(id.into_proto()),
165                        resp: Some(resp.into_proto()),
166                        otel_ctx: otel_ctx.clone().into(),
167                    })
168                }
169                ComputeResponse::SubscribeResponse(id, resp) => {
170                    SubscribeResponse(ProtoSubscribeResponseKind {
171                        id: Some(id.into_proto()),
172                        resp: Some(resp.into_proto()),
173                    })
174                }
175                ComputeResponse::CopyToResponse(id, resp) => {
176                    CopyToResponse(ProtoCopyToResponseKind {
177                        id: Some(id.into_proto()),
178                        resp: Some(resp.into_proto()),
179                    })
180                }
181                ComputeResponse::Status(resp) => Status(resp.into_proto()),
182            }),
183        }
184    }
185
186    fn from_proto(proto: ProtoComputeResponse) -> Result<Self, TryFromProtoError> {
187        use proto_compute_response::Kind::*;
188        match proto.kind {
189            Some(Frontiers(resp)) => Ok(ComputeResponse::Frontiers(
190                resp.id.into_rust_if_some("ProtoFrontiersKind::id")?,
191                resp.resp.into_rust_if_some("ProtoFrontiersKind::resp")?,
192            )),
193            Some(PeekResponse(resp)) => Ok(ComputeResponse::PeekResponse(
194                resp.id.into_rust_if_some("ProtoPeekResponseKind::id")?,
195                resp.resp.into_rust_if_some("ProtoPeekResponseKind::resp")?,
196                resp.otel_ctx.into(),
197            )),
198            Some(SubscribeResponse(resp)) => Ok(ComputeResponse::SubscribeResponse(
199                resp.id
200                    .into_rust_if_some("ProtoSubscribeResponseKind::id")?,
201                resp.resp
202                    .into_rust_if_some("ProtoSubscribeResponseKind::resp")?,
203            )),
204            Some(CopyToResponse(resp)) => Ok(ComputeResponse::CopyToResponse(
205                resp.id.into_rust_if_some("ProtoCopyToResponseKind::id")?,
206                resp.resp
207                    .into_rust_if_some("ProtoCopyToResponseKind::resp")?,
208            )),
209            Some(Status(resp)) => Ok(ComputeResponse::Status(resp.into_rust()?)),
210            None => Err(TryFromProtoError::missing_field(
211                "ProtoComputeResponse::kind",
212            )),
213        }
214    }
215}
216
217impl Arbitrary for ComputeResponse<mz_repr::Timestamp> {
218    type Strategy = Union<BoxedStrategy<Self>>;
219    type Parameters = ();
220
221    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
222        Union::new(vec![
223            (any::<GlobalId>(), any::<FrontiersResponse>())
224                .prop_map(|(id, resp)| ComputeResponse::Frontiers(id, resp))
225                .boxed(),
226            (any_uuid(), any::<PeekResponse>())
227                .prop_map(|(id, resp)| {
228                    ComputeResponse::PeekResponse(id, resp, OpenTelemetryContext::empty())
229                })
230                .boxed(),
231            (any::<GlobalId>(), any::<SubscribeResponse>())
232                .prop_map(|(id, resp)| ComputeResponse::SubscribeResponse(id, resp))
233                .boxed(),
234            any::<StatusResponse>()
235                .prop_map(ComputeResponse::Status)
236                .boxed(),
237        ])
238    }
239}
240
241/// A response reporting advancement of frontiers of a compute collection.
242///
243/// All contained frontier fields are optional. `None` values imply that the respective frontier
244/// has not advanced and the previously reported value is still current.
245#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
246pub struct FrontiersResponse<T = mz_repr::Timestamp> {
247    /// The collection's new write frontier, if any.
248    ///
249    /// Upon receiving an updated `write_frontier`, the controller may assume that the contents of the
250    /// collection are sealed for all times less than that frontier. Once it has reported the
251    /// `write_frontier` as the empty frontier, the replica must no longer change the contents of the
252    /// collection.
253    pub write_frontier: Option<Antichain<T>>,
254    /// The collection's new input frontier, if any.
255    ///
256    /// Upon receiving an updated `input_frontier`, the controller may assume that the replica has
257    /// finished reading from the collection’s inputs up to that frontier. Once it has reported the
258    /// `input_frontier` as the empty frontier, the replica must no longer read from the
259    /// collection's inputs.
260    pub input_frontier: Option<Antichain<T>>,
261    /// The collection's new output frontier, if any.
262    ///
263    /// Upon receiving an updated `output_frontier`, the controller may assume that the replica
264    /// has finished processing the collection's input up to that frontier.
265    ///
266    /// The `output_frontier` is often equal to the `write_frontier`, but not always. Some
267    /// collections can jump their write frontiers ahead of the times they have finished
268    /// processing, causing the `output_frontier` to lag behind the `write_frontier`. Collections
269    /// writing materialized views do so in two cases:
270    ///
271    ///  * `REFRESH` MVs jump their write frontier ahead to the next refresh time.
272    ///  * In a multi-replica cluster, slower replicas observe and report the write frontier of the
273    ///    fastest replica, by witnessing advancements of the target persist shard's `upper`.
274    pub output_frontier: Option<Antichain<T>>,
275}
276
277impl<T> FrontiersResponse<T> {
278    /// Returns whether there are any contained updates.
279    pub fn has_updates(&self) -> bool {
280        self.write_frontier.is_some()
281            || self.input_frontier.is_some()
282            || self.output_frontier.is_some()
283    }
284}
285
286impl RustType<ProtoFrontiersResponse> for FrontiersResponse {
287    fn into_proto(&self) -> ProtoFrontiersResponse {
288        ProtoFrontiersResponse {
289            write_frontier: self.write_frontier.into_proto(),
290            input_frontier: self.input_frontier.into_proto(),
291            output_frontier: self.output_frontier.into_proto(),
292        }
293    }
294
295    fn from_proto(proto: ProtoFrontiersResponse) -> Result<Self, TryFromProtoError> {
296        Ok(Self {
297            write_frontier: proto.write_frontier.into_rust()?,
298            input_frontier: proto.input_frontier.into_rust()?,
299            output_frontier: proto.output_frontier.into_rust()?,
300        })
301    }
302}
303
304impl Arbitrary for FrontiersResponse {
305    type Strategy = BoxedStrategy<Self>;
306    type Parameters = ();
307
308    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
309        (any_antichain(), any_antichain(), any_antichain())
310            .prop_map(|(write, input, compute)| Self {
311                write_frontier: Some(write),
312                input_frontier: Some(input),
313                output_frontier: Some(compute),
314            })
315            .boxed()
316    }
317}
318
319/// The response from a `Peek`.
320///
321/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
322/// we expect a 1:1 contract between `Peek` and `PeekResponse`.
323#[derive(Clone, Debug, PartialEq)]
324pub enum PeekResponse {
325    /// Returned rows of a successful peek.
326    Rows(RowCollection),
327    /// Error of an unsuccessful peek.
328    Error(String),
329    /// The peek was canceled.
330    Canceled,
331}
332
333impl RustType<ProtoPeekResponse> for PeekResponse {
334    fn into_proto(&self) -> ProtoPeekResponse {
335        use proto_peek_response::Kind::*;
336        ProtoPeekResponse {
337            kind: Some(match self {
338                PeekResponse::Rows(rows) => Rows(rows.into_proto()),
339                PeekResponse::Error(err) => proto_peek_response::Kind::Error(err.clone()),
340                PeekResponse::Canceled => Canceled(()),
341            }),
342        }
343    }
344
345    fn from_proto(proto: ProtoPeekResponse) -> Result<Self, TryFromProtoError> {
346        use proto_peek_response::Kind::*;
347        match proto.kind {
348            Some(Rows(rows)) => Ok(PeekResponse::Rows(rows.into_rust()?)),
349            Some(proto_peek_response::Kind::Error(err)) => Ok(PeekResponse::Error(err)),
350            Some(Canceled(())) => Ok(PeekResponse::Canceled),
351            None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind")),
352        }
353    }
354}
355
356impl Arbitrary for PeekResponse {
357    type Strategy = Union<BoxedStrategy<Self>>;
358    type Parameters = ();
359
360    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
361        Union::new(vec![
362            proptest::collection::vec(
363                (
364                    any::<Row>(),
365                    (1..usize::MAX).prop_map(|u| NonZeroUsize::try_from(u).unwrap()),
366                ),
367                1..11,
368            )
369            .prop_map(|rows| PeekResponse::Rows(RowCollection::new(rows, &[])))
370            .boxed(),
371            ".*".prop_map(PeekResponse::Error).boxed(),
372            Just(PeekResponse::Canceled).boxed(),
373        ])
374    }
375}
376
377/// Various responses that can be communicated after a COPY TO command.
378#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
379pub enum CopyToResponse {
380    /// Returned number of rows for a successful COPY TO.
381    RowCount(u64),
382    /// Error of an unsuccessful COPY TO.
383    Error(String),
384    /// The COPY TO sink dataflow was dropped.
385    Dropped,
386}
387
388impl RustType<ProtoCopyToResponse> for CopyToResponse {
389    fn into_proto(&self) -> ProtoCopyToResponse {
390        use proto_copy_to_response::Kind::*;
391        ProtoCopyToResponse {
392            kind: Some(match self {
393                CopyToResponse::RowCount(rows) => Rows(*rows),
394                CopyToResponse::Error(error) => Error(error.clone()),
395                CopyToResponse::Dropped => Dropped(()),
396            }),
397        }
398    }
399
400    fn from_proto(proto: ProtoCopyToResponse) -> Result<Self, TryFromProtoError> {
401        use proto_copy_to_response::Kind::*;
402        match proto.kind {
403            Some(Rows(rows)) => Ok(CopyToResponse::RowCount(rows)),
404            Some(Error(error)) => Ok(CopyToResponse::Error(error)),
405            Some(Dropped(())) => Ok(CopyToResponse::Dropped),
406            None => Err(TryFromProtoError::missing_field(
407                "ProtoCopyToResponse::kind",
408            )),
409        }
410    }
411}
412
413/// Various responses that can be communicated about the progress of a SUBSCRIBE command.
414#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
415pub enum SubscribeResponse<T = mz_repr::Timestamp> {
416    /// A batch of updates over a non-empty interval of time.
417    Batch(SubscribeBatch<T>),
418    /// The SUBSCRIBE dataflow was dropped, leaving updates from this frontier onward unspecified.
419    DroppedAt(Antichain<T>),
420}
421
422impl<T> SubscribeResponse<T> {
423    /// Converts `self` to an error if a maximum size is exceeded.
424    pub fn to_error_if_exceeds(&mut self, max_result_size: usize) {
425        if let SubscribeResponse::Batch(batch) = self {
426            batch.to_error_if_exceeds(max_result_size);
427        }
428    }
429}
430
431impl RustType<ProtoSubscribeResponse> for SubscribeResponse<mz_repr::Timestamp> {
432    fn into_proto(&self) -> ProtoSubscribeResponse {
433        use proto_subscribe_response::Kind::*;
434        ProtoSubscribeResponse {
435            kind: Some(match self {
436                SubscribeResponse::Batch(subscribe_batch) => Batch(subscribe_batch.into_proto()),
437                SubscribeResponse::DroppedAt(antichain) => DroppedAt(antichain.into_proto()),
438            }),
439        }
440    }
441
442    fn from_proto(proto: ProtoSubscribeResponse) -> Result<Self, TryFromProtoError> {
443        use proto_subscribe_response::Kind::*;
444        match proto.kind {
445            Some(Batch(subscribe_batch)) => {
446                Ok(SubscribeResponse::Batch(subscribe_batch.into_rust()?))
447            }
448            Some(DroppedAt(antichain)) => Ok(SubscribeResponse::DroppedAt(antichain.into_rust()?)),
449            None => Err(TryFromProtoError::missing_field(
450                "ProtoSubscribeResponse::kind",
451            )),
452        }
453    }
454}
455
456impl Arbitrary for SubscribeResponse<mz_repr::Timestamp> {
457    type Strategy = Union<BoxedStrategy<Self>>;
458    type Parameters = ();
459
460    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
461        Union::new(vec![
462            any::<SubscribeBatch<mz_repr::Timestamp>>()
463                .prop_map(SubscribeResponse::Batch)
464                .boxed(),
465            proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4)
466                .prop_map(|antichain| SubscribeResponse::DroppedAt(Antichain::from(antichain)))
467                .boxed(),
468        ])
469    }
470}
471
472/// A batch of updates for the interval `[lower, upper)`.
473#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
474pub struct SubscribeBatch<T = mz_repr::Timestamp> {
475    /// The lower frontier of the batch of updates.
476    pub lower: Antichain<T>,
477    /// The upper frontier of the batch of updates.
478    pub upper: Antichain<T>,
479    /// All updates greater than `lower` and not greater than `upper`.
480    ///
481    /// An `Err` variant can be used to indicate e.g. that the size of the updates exceeds internal limits.
482    pub updates: Result<Vec<(T, Row, Diff)>, String>,
483}
484
485impl<T> SubscribeBatch<T> {
486    /// Converts `self` to an error if a maximum size is exceeded.
487    fn to_error_if_exceeds(&mut self, max_result_size: usize) {
488        use bytesize::ByteSize;
489        if let Ok(updates) = &self.updates {
490            let total_size: usize = updates
491                .iter()
492                .map(|(_time, row, _diff)| row.byte_len())
493                .sum();
494            if total_size > max_result_size {
495                self.updates = Err(format!(
496                    "result exceeds max size of {}",
497                    ByteSize::b(u64::cast_from(max_result_size))
498                ));
499            }
500        }
501    }
502}
503
504impl RustType<ProtoSubscribeBatch> for SubscribeBatch<mz_repr::Timestamp> {
505    fn into_proto(&self) -> ProtoSubscribeBatch {
506        use proto_subscribe_batch::ProtoUpdate;
507        ProtoSubscribeBatch {
508            lower: Some(self.lower.into_proto()),
509            upper: Some(self.upper.into_proto()),
510            updates: Some(proto_subscribe_batch::ProtoSubscribeBatchContents {
511                kind: match &self.updates {
512                    Ok(updates) => {
513                        let updates = updates
514                            .iter()
515                            .map(|(t, r, d)| ProtoUpdate {
516                                timestamp: t.into(),
517                                row: Some(r.into_proto()),
518                                diff: d.into_proto(),
519                            })
520                            .collect();
521
522                        Some(
523                            proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
524                                proto_subscribe_batch::ProtoSubscribeUpdates { updates },
525                            ),
526                        )
527                    }
528                    Err(text) => Some(
529                        proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(
530                            text.clone(),
531                        ),
532                    ),
533                },
534            }),
535        }
536    }
537
538    fn from_proto(proto: ProtoSubscribeBatch) -> Result<Self, TryFromProtoError> {
539        Ok(SubscribeBatch {
540            lower: proto.lower.into_rust_if_some("ProtoTailUpdate::lower")?,
541            upper: proto.upper.into_rust_if_some("ProtoTailUpdate::upper")?,
542            updates: match proto.updates.unwrap().kind {
543                Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
544                    updates,
545                )) => Ok(updates
546                    .updates
547                    .into_iter()
548                    .map(|update| {
549                        Ok((
550                            update.timestamp.into(),
551                            update.row.into_rust_if_some("ProtoUpdate::row")?,
552                            update.diff.into(),
553                        ))
554                    })
555                    .collect::<Result<Vec<_>, TryFromProtoError>>()?),
556                Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(text)) => {
557                    Err(text)
558                }
559                None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind"))?,
560            },
561        })
562    }
563}
564
565impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
566    type Strategy = BoxedStrategy<Self>;
567    type Parameters = ();
568
569    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
570        (
571            proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
572            proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
573            proptest::collection::vec(
574                (any::<mz_repr::Timestamp>(), any::<Row>(), any::<Diff>()),
575                1..4,
576            ),
577        )
578            .prop_map(|(lower, upper, updates)| SubscribeBatch {
579                lower: Antichain::from(lower),
580                upper: Antichain::from(upper),
581                updates: Ok(updates),
582            })
583            .boxed()
584    }
585}
586
587/// Status updates replicas can report to the controller.
588#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
589pub enum StatusResponse {
590    /// Reports the hydration status of dataflow operators.
591    OperatorHydration(OperatorHydrationStatus),
592}
593
594impl RustType<ProtoStatusResponse> for StatusResponse {
595    fn into_proto(&self) -> ProtoStatusResponse {
596        use proto_status_response::Kind;
597
598        let kind = match self {
599            Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
600        };
601        ProtoStatusResponse { kind: Some(kind) }
602    }
603
604    fn from_proto(proto: ProtoStatusResponse) -> Result<Self, TryFromProtoError> {
605        use proto_status_response::Kind;
606
607        match proto.kind {
608            Some(Kind::OperatorHydration(status)) => {
609                Ok(Self::OperatorHydration(status.into_rust()?))
610            }
611            None => Err(TryFromProtoError::missing_field(
612                "ProtoStatusResponse::kind",
613            )),
614        }
615    }
616}
617
618/// An update about the hydration status of a set of dataflow operators.
619#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
620pub struct OperatorHydrationStatus {
621    /// The ID of the compute collection exported by the dataflow.
622    pub collection_id: GlobalId,
623    /// The ID of the LIR node for which the hydration status changed.
624    pub lir_id: LirId,
625    /// The ID of the worker for which the hydration status changed.
626    pub worker_id: usize,
627    /// Whether the node is hydrated on the worker.
628    pub hydrated: bool,
629}
630
631impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
632    fn into_proto(&self) -> ProtoOperatorHydrationStatus {
633        ProtoOperatorHydrationStatus {
634            collection_id: Some(self.collection_id.into_proto()),
635            lir_id: self.lir_id.into_proto(),
636            worker_id: self.worker_id.into_proto(),
637            hydrated: self.hydrated.into_proto(),
638        }
639    }
640
641    fn from_proto(proto: ProtoOperatorHydrationStatus) -> Result<Self, TryFromProtoError> {
642        Ok(Self {
643            collection_id: proto
644                .collection_id
645                .into_rust_if_some("ProtoOperatorHydrationStatus::collection_id")?,
646            lir_id: proto.lir_id.into_rust()?,
647            worker_id: proto.worker_id.into_rust()?,
648            hydrated: proto.hydrated.into_rust()?,
649        })
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use mz_ore::assert_ok;
656    use mz_proto::protobuf_roundtrip;
657    use proptest::prelude::ProptestConfig;
658    use proptest::proptest;
659
660    use super::*;
661
662    /// Test to ensure the size of the `ComputeResponse` enum doesn't regress.
663    #[mz_ore::test]
664    fn test_compute_response_size() {
665        assert_eq!(std::mem::size_of::<ComputeResponse>(), 120);
666    }
667
668    proptest! {
669        #![proptest_config(ProptestConfig::with_cases(32))]
670
671        #[mz_ore::test]
672        fn compute_response_protobuf_roundtrip(expect in any::<ComputeResponse<mz_repr::Timestamp>>() ) {
673            let actual = protobuf_roundtrip::<_, ProtoComputeResponse>(&expect);
674            assert_ok!(actual);
675            assert_eq!(actual.unwrap(), expect);
676        }
677    }
678}