mz_compute_client/protocol/response.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute protocol responses.
11
12use mz_expr::row::RowCollection;
13use mz_ore::cast::CastFrom;
14use mz_ore::tracing::OpenTelemetryContext;
15use mz_persist_client::batch::ProtoBatch;
16use mz_persist_types::ShardId;
17use mz_repr::{Diff, GlobalId, RelationDesc, Row};
18use serde::{Deserialize, Serialize};
19use timely::progress::frontier::Antichain;
20use uuid::Uuid;
21
22/// Compute protocol responses, sent by replicas to the compute controller.
23///
24/// Replicas send `ComputeResponse`s in response to [`ComputeCommand`]s they previously received
25/// from the compute controller.
26///
27/// [`ComputeCommand`]: super::command::ComputeCommand
28#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
29pub enum ComputeResponse<T = mz_repr::Timestamp> {
30 /// `Frontiers` announces the advancement of the various frontiers of the specified compute
31 /// collection.
32 ///
33 /// Replicas must send `Frontiers` responses for compute collections that are indexes or
34 /// storage sinks. Replicas must not send `Frontiers` responses for subscribes and copy-tos
35 /// ([#16274]).
36 ///
37 /// Replicas must never report regressing frontiers. Specifically:
38 ///
39 /// * The first frontier of any kind reported for a collection must not be less than that
40 /// collection's initial `as_of` frontier.
41 /// * Subsequent reported frontiers for a collection must not be less than any frontier of
42 /// the same kind reported previously for the same collection.
43 ///
44 /// Replicas must send `Frontiers` responses that report each frontier kind to have advanced to
45 /// the empty frontier in response to an [`AllowCompaction` command] that allows compaction of
46 /// the collection to to the empty frontier, unless the frontier has previously advanced to the
47 /// empty frontier as part of the regular dataflow computation. ([#16271])
48 ///
49 /// Once a frontier was reported to have been advanced to the empty frontier, the replica must
50 /// not send further `Frontiers` responses with non-`None` values for that frontier kind.
51 ///
52 /// The replica must not send `Frontiers` responses for collections that have not
53 /// been created previously by a [`CreateDataflow` command] or by a [`CreateInstance`
54 /// command].
55 ///
56 /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
57 /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
58 /// [`CreateInstance` command]: super::command::ComputeCommand::CreateInstance
59 /// [#16271]: https://github.com/MaterializeInc/database-issues/issues/4699
60 /// [#16274]: https://github.com/MaterializeInc/database-issues/issues/4701
61 Frontiers(GlobalId, FrontiersResponse<T>),
62
63 /// `PeekResponse` reports the result of a previous [`Peek` command]. The peek is identified by
64 /// a `Uuid` that matches the command's [`Peek::uuid`].
65 ///
66 /// The replica must send exactly one `PeekResponse` for every [`Peek` command] it received.
67 ///
68 /// If the replica did not receive a [`CancelPeek` command] for a peek, it must not send a
69 /// [`Canceled`] response for that peek. If the replica did receive a [`CancelPeek` command]
70 /// for a peek, it may send any of the three [`PeekResponse`] variants.
71 ///
72 /// The replica must not send `PeekResponse`s for peek IDs that were not previously specified
73 /// in a [`Peek` command].
74 ///
75 /// [`Peek` command]: super::command::ComputeCommand::Peek
76 /// [`CancelPeek` command]: super::command::ComputeCommand::CancelPeek
77 /// [`Peek::uuid`]: super::command::Peek::uuid
78 /// [`Canceled`]: PeekResponse::Canceled
79 PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
80
81 /// `SubscribeResponse` reports the results emitted by an active subscribe over some time
82 /// interval.
83 ///
84 /// For each subscribe that was installed by a previous [`CreateDataflow` command], the
85 /// replica must emit [`Batch`] responses that cover the entire time interval from the
86 /// minimum time until the subscribe advances to the empty frontier or is
87 /// dropped. The time intervals of consecutive [`Batch`]es must be increasing, contiguous,
88 /// non-overlapping, and non-empty. All updates transmitted in a batch must be consolidated and
89 /// have times within that batch’s time interval. All updates' times must be greater than or
90 /// equal to `as_of`. The `upper` of the first [`Batch`] of a subscribe must not be less than
91 /// that subscribe's initial `as_of` frontier.
92 ///
93 /// The replica must send [`DroppedAt`] responses if the subscribe was dropped in response to
94 /// an [`AllowCompaction` command] that advanced its read frontier to the empty frontier. The
95 /// [`DroppedAt`] frontier must be the upper frontier of the last emitted batch.
96 ///
97 /// The replica must not send a [`DroppedAt`] response if the subscribe’s upper frontier
98 /// (reported by [`Batch`] responses) has advanced to the empty frontier (e.g. because its
99 /// inputs advanced to the empty frontier).
100 ///
101 /// Once a subscribe was reported to have advanced to the empty frontier, or has been dropped:
102 ///
103 /// * It must no longer read from its inputs.
104 /// * The replica must not send further `SubscribeResponse`s for that subscribe.
105 ///
106 /// The replica must not send `SubscribeResponse`s for subscribes that have not been
107 /// created previously by a [`CreateDataflow` command].
108 ///
109 /// [`Batch`]: SubscribeResponse::Batch
110 /// [`DroppedAt`]: SubscribeResponse::DroppedAt
111 /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
112 /// [`AllowCompaction` command]: super::command::ComputeCommand::AllowCompaction
113 SubscribeResponse(GlobalId, SubscribeResponse<T>),
114
115 /// `CopyToResponse` reports the completion of an S3-oneshot sink.
116 ///
117 /// The replica must send exactly one `CopyToResponse` for every S3-oneshot sink previously
118 /// created by a [`CreateDataflow` command].
119 ///
120 /// The replica must not send `CopyToResponse`s for S3-oneshot sinks that were not previously
121 /// created by a [`CreateDataflow` command].
122 ///
123 /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
124 CopyToResponse(GlobalId, CopyToResponse),
125
126 /// `Status` reports status updates from replicas to the controller.
127 ///
128 /// `Status` responses are a way for replicas to stream back introspection data that the
129 /// controller can then announce to its clients. They have no effect on the lifecycles of
130 /// compute collections. Correct operation of the Compute layer must not rely on `Status`
131 /// responses being sent or received.
132 ///
133 /// `Status` responses that are specific to collections must only be sent for collections that
134 /// (a) have previously been created by a [`CreateDataflow` command] and (b) have not yet
135 /// been reported to have advanced to the empty frontier.
136 ///
137 /// [`CreateDataflow` command]: super::command::ComputeCommand::CreateDataflow
138 Status(StatusResponse),
139}
140
141/// A response reporting advancement of frontiers of a compute collection.
142///
143/// All contained frontier fields are optional. `None` values imply that the respective frontier
144/// has not advanced and the previously reported value is still current.
145#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
146pub struct FrontiersResponse<T = mz_repr::Timestamp> {
147 /// The collection's new write frontier, if any.
148 ///
149 /// Upon receiving an updated `write_frontier`, the controller may assume that the contents of the
150 /// collection are sealed for all times less than that frontier. Once it has reported the
151 /// `write_frontier` as the empty frontier, the replica must no longer change the contents of the
152 /// collection.
153 pub write_frontier: Option<Antichain<T>>,
154 /// The collection's new input frontier, if any.
155 ///
156 /// Upon receiving an updated `input_frontier`, the controller may assume that the replica has
157 /// finished reading from the collection’s inputs up to that frontier. Once it has reported the
158 /// `input_frontier` as the empty frontier, the replica must no longer read from the
159 /// collection's inputs.
160 pub input_frontier: Option<Antichain<T>>,
161 /// The collection's new output frontier, if any.
162 ///
163 /// Upon receiving an updated `output_frontier`, the controller may assume that the replica
164 /// has finished processing the collection's input up to that frontier.
165 ///
166 /// The `output_frontier` is often equal to the `write_frontier`, but not always. Some
167 /// collections can jump their write frontiers ahead of the times they have finished
168 /// processing, causing the `output_frontier` to lag behind the `write_frontier`. Collections
169 /// writing materialized views do so in two cases:
170 ///
171 /// * `REFRESH` MVs jump their write frontier ahead to the next refresh time.
172 /// * In a multi-replica cluster, slower replicas observe and report the write frontier of the
173 /// fastest replica, by witnessing advancements of the target persist shard's `upper`.
174 pub output_frontier: Option<Antichain<T>>,
175}
176
177impl<T> FrontiersResponse<T> {
178 /// Returns whether there are any contained updates.
179 pub fn has_updates(&self) -> bool {
180 self.write_frontier.is_some()
181 || self.input_frontier.is_some()
182 || self.output_frontier.is_some()
183 }
184}
185
186/// The response from a `Peek`.
187///
188/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
189/// we expect a 1:1 contract between `Peek` and `PeekResponse`.
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub enum PeekResponse {
192 /// Returned rows of a successful peek.
193 Rows(RowCollection),
194 /// Results of the peek were stashed in persist batches.
195 Stashed(Box<StashedPeekResponse>),
196 /// Error of an unsuccessful peek.
197 Error(String),
198 /// The peek was canceled.
199 Canceled,
200}
201
202impl PeekResponse {
203 /// Return the size of row bytes stored inline in this response.
204 pub fn inline_byte_len(&self) -> usize {
205 match self {
206 Self::Rows(rows) => rows.byte_len(),
207 Self::Stashed(stashed) => stashed.inline_rows.byte_len(),
208 Self::Error(_) | Self::Canceled => 0,
209 }
210 }
211}
212
213/// Response from a peek whose results have been stashed into persist.
214#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
215pub struct StashedPeekResponse {
216 /// The number of rows stored in response batches. This is the sum of the
217 /// diff values of the contained rows.
218 ///
219 /// This does _NOT_ include rows in `inline_rows`.
220 pub num_rows_batches: u64,
221 /// The sum of the encoded sizes of all batches in this response.
222 pub encoded_size_bytes: usize,
223 /// [RelationDesc] for the rows in these stashed batches of results.
224 pub relation_desc: RelationDesc,
225 /// The [ShardId] under which result batches have been stashed.
226 pub shard_id: ShardId,
227 /// Batches of Rows, must be combined with responses from other workers and
228 /// consolidated before sending back via a client.
229 pub batches: Vec<ProtoBatch>,
230 /// Rows that have not been uploaded to the stash, because their total size
231 /// did not go above the threshold for using the peek stash.
232 ///
233 /// We will have a mix of stashed responses and inline responses because the
234 /// result sizes across different workers can and will vary.
235 pub inline_rows: RowCollection,
236}
237
238impl StashedPeekResponse {
239 /// Total count of [`Row`]s represented by this collection, considering a
240 /// possible `OFFSET` and `LIMIT`.
241 pub fn num_rows(&self, offset: usize, limit: Option<usize>) -> usize {
242 let num_stashed_rows: usize = usize::cast_from(self.num_rows_batches);
243 let num_inline_rows = self.inline_rows.count(offset, limit);
244 let mut num_rows = num_stashed_rows + num_inline_rows;
245
246 // Consider a possible OFFSET.
247 num_rows = num_rows.saturating_sub(offset);
248
249 // Consider a possible LIMIT.
250 if let Some(limit) = limit {
251 num_rows = std::cmp::min(limit, num_rows);
252 }
253
254 num_rows
255 }
256
257 /// The size in bytes of the encoded rows in this result.
258 pub fn size_bytes(&self) -> usize {
259 let inline_size = self.inline_rows.byte_len();
260
261 self.encoded_size_bytes + inline_size
262 }
263}
264
265/// Various responses that can be communicated after a COPY TO command.
266#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
267pub enum CopyToResponse {
268 /// Returned number of rows for a successful COPY TO.
269 RowCount(u64),
270 /// Error of an unsuccessful COPY TO.
271 Error(String),
272 /// The COPY TO sink dataflow was dropped.
273 Dropped,
274}
275
276/// Various responses that can be communicated about the progress of a SUBSCRIBE command.
277#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
278pub enum SubscribeResponse<T = mz_repr::Timestamp> {
279 /// A batch of updates over a non-empty interval of time.
280 Batch(SubscribeBatch<T>),
281 /// The SUBSCRIBE dataflow was dropped, leaving updates from this frontier onward unspecified.
282 DroppedAt(Antichain<T>),
283}
284
285impl<T> SubscribeResponse<T> {
286 /// Converts `self` to an error if a maximum size is exceeded.
287 pub fn to_error_if_exceeds(&mut self, max_result_size: usize) {
288 if let SubscribeResponse::Batch(batch) = self {
289 batch.to_error_if_exceeds(max_result_size);
290 }
291 }
292}
293
294/// A batch of updates for the interval `[lower, upper)`.
295#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
296pub struct SubscribeBatch<T = mz_repr::Timestamp> {
297 /// The lower frontier of the batch of updates.
298 pub lower: Antichain<T>,
299 /// The upper frontier of the batch of updates.
300 pub upper: Antichain<T>,
301 /// All updates greater than `lower` and not greater than `upper`.
302 ///
303 /// An `Err` variant can be used to indicate e.g. that the size of the updates exceeds internal limits.
304 pub updates: Result<Vec<(T, Row, Diff)>, String>,
305}
306
307impl<T> SubscribeBatch<T> {
308 /// Converts `self` to an error if a maximum size is exceeded.
309 fn to_error_if_exceeds(&mut self, max_result_size: usize) {
310 use bytesize::ByteSize;
311 if let Ok(updates) = &self.updates {
312 let total_size: usize = updates
313 .iter()
314 .map(|(_time, row, _diff)| row.byte_len())
315 .sum();
316 if total_size > max_result_size {
317 self.updates = Err(format!(
318 "result exceeds max size of {}",
319 ByteSize::b(u64::cast_from(max_result_size))
320 ));
321 }
322 }
323 }
324}
325
326/// Status updates replicas can report to the controller.
327#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
328pub enum StatusResponse {
329 /// No status responses are implemented currently, but we're leaving the infrastructure around
330 /// in anticipation of materialize#31246.
331 Placeholder,
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 /// Test to ensure the size of the `ComputeResponse` enum doesn't regress.
339 #[mz_ore::test]
340 fn test_compute_response_size() {
341 assert_eq!(std::mem::size_of::<ComputeResponse>(), 120);
342 }
343}