mz_compute_client/protocol/
response.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol responses.
11
12use mz_expr::row::RowCollection;
13use mz_ore::cast::CastFrom;
14use mz_ore::tracing::OpenTelemetryContext;
15use mz_persist_client::batch::ProtoBatch;
16use mz_persist_types::ShardId;
17use mz_repr::{Diff, GlobalId, RelationDesc, Row};
18use serde::{Deserialize, Serialize};
19use timely::progress::frontier::Antichain;
20use uuid::Uuid;
21
22/// Compute protocol responses, sent by replicas to the compute controller.
23///
24/// Replicas send `ComputeResponse`s in response to [`ComputeCommand`]s they previously received
25/// from the compute controller.
26///
27/// [`ComputeCommand`]: super::command::ComputeCommand
28#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
29pub enum ComputeResponse<T = mz_repr::Timestamp> {
30    /// `Frontiers` announces the advancement of the various frontiers of the specified compute
31    /// collection.
32    ///
33    /// Replicas must send `Frontiers` responses for compute collections that are indexes or
34    /// storage sinks. Replicas must not send `Frontiers` responses for subscribes and copy-tos
35    /// ([#16274]).
36    ///
37    /// Replicas must never report regressing frontiers. Specifically:
38    ///
39    ///   * The first frontier of any kind reported for a collection must not be less than that
40    ///     collection's initial `as_of` frontier.
41    ///   * Subsequent reported frontiers for a collection must not be less than any frontier of
42    ///     the same kind reported previously for the same collection.
43    ///
44    /// Replicas must send `Frontiers` responses that report each frontier kind to have advanced to
45    /// the empty frontier in response to an [`AllowCompaction` command] that allows compaction of
46    /// the collection to to the empty frontier, unless the frontier has previously advanced to the
47    /// empty frontier as part of the regular dataflow computation. ([#16271])
48    ///
49    /// Once a frontier was reported to have been advanced to the empty frontier, the replica must
50    /// not send further `Frontiers` responses with non-`None` values for that frontier kind.
51    ///
52    /// The replica must not send `Frontiers` responses for collections that have not
53    /// been created previously by a [`CreateDataflow` command] or by a [`CreateInstance`
54    /// command].
55    ///
56    /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
57    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
58    /// [`CreateInstance` command]: super::command::ComputeCommand::CreateInstance
59    /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
60    /// [#16274]: https://github.com/MaterializeInc/database-issues/issues/4701
61    Frontiers(GlobalId, FrontiersResponse<T>),
62
63    /// `PeekResponse` reports the result of a previous [`Peek` command]. The peek is identified by
64    /// a `Uuid` that matches the command's [`Peek::uuid`].
65    ///
66    /// The replica must send exactly one `PeekResponse` for every [`Peek` command] it received.
67    ///
68    /// If the replica did not receive a [`CancelPeek` command] for a peek, it must not send a
69    /// [`Canceled`] response for that peek. If the replica did receive a [`CancelPeek` command]
70    /// for a peek, it may send any of the three [`PeekResponse`] variants.
71    ///
72    /// The replica must not send `PeekResponse`s for peek IDs that were not previously specified
73    /// in a [`Peek` command].
74    ///
75    /// [`Peek` command]: super::command::ComputeCommand::Peek
76    /// [`CancelPeek` command]: super::command::ComputeCommand::CancelPeek
77    /// [`Peek::uuid`]: super::command::Peek::uuid
78    /// [`Canceled`]: PeekResponse::Canceled
79    PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
80
81    /// `SubscribeResponse` reports the results emitted by an active subscribe over some time
82    /// interval.
83    ///
84    /// For each subscribe that was installed by a previous [`CreateDataflow` command], the
85    /// replica must emit [`Batch`] responses that cover the entire time interval from the
86    /// minimum time until the subscribe advances to the empty frontier or is
87    /// dropped. The time intervals of consecutive [`Batch`]es must be increasing, contiguous,
88    /// non-overlapping, and non-empty. All updates transmitted in a batch must be consolidated and
89    /// have times within that batch’s time interval. All updates' times must be greater than or
90    /// equal to `as_of`. The `upper` of the first [`Batch`] of a subscribe must not be less than
91    /// that subscribe's initial `as_of` frontier.
92    ///
93    /// The replica must send [`DroppedAt`] responses if the subscribe was dropped in response to
94    /// an [`AllowCompaction` command] that advanced its read frontier to the empty frontier. The
95    /// [`DroppedAt`] frontier must be the upper frontier of the last emitted batch.
96    ///
97    /// The replica must not send a [`DroppedAt`] response if the subscribe’s upper frontier
98    /// (reported by [`Batch`] responses) has advanced to the empty frontier (e.g. because its
99    /// inputs advanced to the empty frontier).
100    ///
101    /// Once a subscribe was reported to have advanced to the empty frontier, or has been dropped:
102    ///
103    ///   * It must no longer read from its inputs.
104    ///   * The replica must not send further `SubscribeResponse`s for that subscribe.
105    ///
106    /// The replica must not send `SubscribeResponse`s for subscribes that have not been
107    /// created previously by a [`CreateDataflow` command].
108    ///
109    /// [`Batch`]: SubscribeResponse::Batch
110    /// [`DroppedAt`]: SubscribeResponse::DroppedAt
111    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
112    /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
113    SubscribeResponse(GlobalId, SubscribeResponse<T>),
114
115    /// `CopyToResponse` reports the completion of an S3-oneshot sink.
116    ///
117    /// The replica must send exactly one `CopyToResponse` for every S3-oneshot sink previously
118    /// created by a [`CreateDataflow` command].
119    ///
120    /// The replica must not send `CopyToResponse`s for S3-oneshot sinks that were not previously
121    /// created by a [`CreateDataflow` command].
122    ///
123    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
124    CopyToResponse(GlobalId, CopyToResponse),
125
126    /// `Status` reports status updates from replicas to the controller.
127    ///
128    /// `Status` responses are a way for replicas to stream back introspection data that the
129    /// controller can then announce to its clients. They have no effect on the lifecycles of
130    /// compute collections. Correct operation of the Compute layer must not rely on `Status`
131    /// responses being sent or received.
132    ///
133    /// `Status` responses that are specific to collections must only be sent for collections that
134    /// (a) have previously been created by a [`CreateDataflow` command] and (b) have not yet
135    /// been reported to have advanced to the empty frontier.
136    ///
137    /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
138    Status(StatusResponse),
139}
140
141/// A response reporting advancement of frontiers of a compute collection.
142///
143/// All contained frontier fields are optional. `None` values imply that the respective frontier
144/// has not advanced and the previously reported value is still current.
145#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
146pub struct FrontiersResponse<T = mz_repr::Timestamp> {
147    /// The collection's new write frontier, if any.
148    ///
149    /// Upon receiving an updated `write_frontier`, the controller may assume that the contents of the
150    /// collection are sealed for all times less than that frontier. Once it has reported the
151    /// `write_frontier` as the empty frontier, the replica must no longer change the contents of the
152    /// collection.
153    pub write_frontier: Option<Antichain<T>>,
154    /// The collection's new input frontier, if any.
155    ///
156    /// Upon receiving an updated `input_frontier`, the controller may assume that the replica has
157    /// finished reading from the collection’s inputs up to that frontier. Once it has reported the
158    /// `input_frontier` as the empty frontier, the replica must no longer read from the
159    /// collection's inputs.
160    pub input_frontier: Option<Antichain<T>>,
161    /// The collection's new output frontier, if any.
162    ///
163    /// Upon receiving an updated `output_frontier`, the controller may assume that the replica
164    /// has finished processing the collection's input up to that frontier.
165    ///
166    /// The `output_frontier` is often equal to the `write_frontier`, but not always. Some
167    /// collections can jump their write frontiers ahead of the times they have finished
168    /// processing, causing the `output_frontier` to lag behind the `write_frontier`. Collections
169    /// writing materialized views do so in two cases:
170    ///
171    ///  * `REFRESH` MVs jump their write frontier ahead to the next refresh time.
172    ///  * In a multi-replica cluster, slower replicas observe and report the write frontier of the
173    ///    fastest replica, by witnessing advancements of the target persist shard's `upper`.
174    pub output_frontier: Option<Antichain<T>>,
175}
176
177impl<T> FrontiersResponse<T> {
178    /// Returns whether there are any contained updates.
179    pub fn has_updates(&self) -> bool {
180        self.write_frontier.is_some()
181            || self.input_frontier.is_some()
182            || self.output_frontier.is_some()
183    }
184}
185
186/// The response from a `Peek`.
187///
188/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
189/// we expect a 1:1 contract between `Peek` and `PeekResponse`.
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub enum PeekResponse {
192    /// Returned rows of a successful peek.
193    Rows(RowCollection),
194    /// Results of the peek were stashed in persist batches.
195    Stashed(Box<StashedPeekResponse>),
196    /// Error of an unsuccessful peek.
197    Error(String),
198    /// The peek was canceled.
199    Canceled,
200}
201
202impl PeekResponse {
203    /// Return the size of row bytes stored inline in this response.
204    pub fn inline_byte_len(&self) -> usize {
205        match self {
206            Self::Rows(rows) => rows.byte_len(),
207            Self::Stashed(stashed) => stashed.inline_rows.byte_len(),
208            Self::Error(_) | Self::Canceled => 0,
209        }
210    }
211}
212
213/// Response from a peek whose results have been stashed into persist.
214#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
215pub struct StashedPeekResponse {
216    /// The number of rows stored in response batches. This is the sum of the
217    /// diff values of the contained rows.
218    ///
219    /// This does _NOT_ include rows in `inline_rows`.
220    pub num_rows_batches: u64,
221    /// The sum of the encoded sizes of all batches in this response.
222    pub encoded_size_bytes: usize,
223    /// [RelationDesc] for the rows in these stashed batches of results.
224    pub relation_desc: RelationDesc,
225    /// The [ShardId] under which result batches have been stashed.
226    pub shard_id: ShardId,
227    /// Batches of Rows, must be combined with responses from other workers and
228    /// consolidated before sending back via a client.
229    pub batches: Vec<ProtoBatch>,
230    /// Rows that have not been uploaded to the stash, because their total size
231    /// did not go above the threshold for using the peek stash.
232    ///
233    /// We will have a mix of stashed responses and inline responses because the
234    /// result sizes across different workers can and will vary.
235    pub inline_rows: RowCollection,
236}
237
238impl StashedPeekResponse {
239    /// Total count of [`Row`]s represented by this collection, considering a
240    /// possible `OFFSET` and `LIMIT`.
241    pub fn num_rows(&self, offset: usize, limit: Option<usize>) -> usize {
242        let num_stashed_rows: usize = usize::cast_from(self.num_rows_batches);
243        let num_inline_rows = self.inline_rows.count(offset, limit);
244        let mut num_rows = num_stashed_rows + num_inline_rows;
245
246        // Consider a possible OFFSET.
247        num_rows = num_rows.saturating_sub(offset);
248
249        // Consider a possible LIMIT.
250        if let Some(limit) = limit {
251            num_rows = std::cmp::min(limit, num_rows);
252        }
253
254        num_rows
255    }
256
257    /// The size in bytes of the encoded rows in this result.
258    pub fn size_bytes(&self) -> usize {
259        let inline_size = self.inline_rows.byte_len();
260
261        self.encoded_size_bytes + inline_size
262    }
263}
264
265/// Various responses that can be communicated after a COPY TO command.
266#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
267pub enum CopyToResponse {
268    /// Returned number of rows for a successful COPY TO.
269    RowCount(u64),
270    /// Error of an unsuccessful COPY TO.
271    Error(String),
272    /// The COPY TO sink dataflow was dropped.
273    Dropped,
274}
275
276/// Various responses that can be communicated about the progress of a SUBSCRIBE command.
277#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
278pub enum SubscribeResponse<T = mz_repr::Timestamp> {
279    /// A batch of updates over a non-empty interval of time.
280    Batch(SubscribeBatch<T>),
281    /// The SUBSCRIBE dataflow was dropped, leaving updates from this frontier onward unspecified.
282    DroppedAt(Antichain<T>),
283}
284
285impl<T> SubscribeResponse<T> {
286    /// Converts `self` to an error if a maximum size is exceeded.
287    pub fn to_error_if_exceeds(&mut self, max_result_size: usize) {
288        if let SubscribeResponse::Batch(batch) = self {
289            batch.to_error_if_exceeds(max_result_size);
290        }
291    }
292}
293
294/// A batch of updates for the interval `[lower, upper)`.
295#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
296pub struct SubscribeBatch<T = mz_repr::Timestamp> {
297    /// The lower frontier of the batch of updates.
298    pub lower: Antichain<T>,
299    /// The upper frontier of the batch of updates.
300    pub upper: Antichain<T>,
301    /// All updates greater than `lower` and not greater than `upper`.
302    ///
303    /// An `Err` variant can be used to indicate e.g. that the size of the updates exceeds internal limits.
304    pub updates: Result<Vec<(T, Row, Diff)>, String>,
305}
306
307impl<T> SubscribeBatch<T> {
308    /// Converts `self` to an error if a maximum size is exceeded.
309    fn to_error_if_exceeds(&mut self, max_result_size: usize) {
310        use bytesize::ByteSize;
311        if let Ok(updates) = &self.updates {
312            let total_size: usize = updates
313                .iter()
314                .map(|(_time, row, _diff)| row.byte_len())
315                .sum();
316            if total_size > max_result_size {
317                self.updates = Err(format!(
318                    "result exceeds max size of {}",
319                    ByteSize::b(u64::cast_from(max_result_size))
320                ));
321            }
322        }
323    }
324}
325
326/// Status updates replicas can report to the controller.
327#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
328pub enum StatusResponse {
329    /// No status responses are implemented currently, but we're leaving the infrastructure around
330    /// in anticipation of materialize#31246.
331    Placeholder,
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    /// Test to ensure the size of the `ComputeResponse` enum doesn't regress.
339    #[mz_ore::test]
340    fn test_compute_response_size() {
341        assert_eq!(std::mem::size_of::<ComputeResponse>(), 120);
342    }
343}