mz_adapter/
active_compute_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator bookkeeping for active compute sinks.
11
12use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::iter;
15
16use anyhow::anyhow;
17use itertools::Itertools;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_client::protocol::response::SubscribeBatch;
20use mz_controller_types::ClusterId;
21use mz_expr::compare_columns;
22use mz_ore::cast::CastFrom;
23use mz_ore::now::EpochMillis;
24use mz_repr::adt::numeric;
25use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, Timestamp};
26use mz_sql::plan::SubscribeOutput;
27use mz_storage_types::instances::StorageInstanceId;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use uuid::Uuid;
31
32use crate::coord::peek::PeekResponseUnary;
33use crate::{AdapterError, ExecuteContext, ExecuteResponse};
34
35#[derive(Debug)]
36/// A description of an active compute sink from the coordinator's perspective.
37pub enum ActiveComputeSink {
38    /// An active subscribe sink.
39    Subscribe(ActiveSubscribe),
40    /// An active copy to sink.
41    CopyTo(ActiveCopyTo),
42}
43
44impl ActiveComputeSink {
45    /// Reports the ID of the cluster on which the sink is running.
46    pub fn cluster_id(&self) -> ClusterId {
47        match &self {
48            ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
49            ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
50        }
51    }
52
53    /// Reports the ID of the connection which created the sink.
54    pub fn connection_id(&self) -> &ConnectionId {
55        match &self {
56            ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
57            ActiveComputeSink::CopyTo(copy_to) => &copy_to.conn_id,
58        }
59    }
60
61    /// Reports the IDs of the objects on which the sink depends.
62    pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
63        match &self {
64            ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
65            ActiveComputeSink::CopyTo(copy_to) => &copy_to.depends_on,
66        }
67    }
68
69    /// Retires the sink with the specified reason.
70    ///
71    /// This method must be called on every sink before it is dropped. It
72    /// informs the end client that the sink is finished for the specified
73    /// reason.
74    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
75        match self {
76            ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
77            ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
78        }
79    }
80}
81
82/// The reason for removing an [`ActiveComputeSink`].
83#[derive(Debug, Clone)]
84pub enum ActiveComputeSinkRetireReason {
85    /// The compute sink completed successfully.
86    Finished,
87    /// The compute sink was canceled due to a user request.
88    Canceled,
89    /// The compute sink was forcibly terminated because an object it depended on
90    /// was dropped.
91    DependencyDropped(String),
92}
93
94/// A description of an active subscribe from coord's perspective
95#[derive(Debug)]
96pub struct ActiveSubscribe {
97    /// The ID of the connection which created the subscribe.
98    pub conn_id: ConnectionId,
99    /// The UUID of the session which created the subscribe.
100    pub session_uuid: Uuid,
101    /// The ID of the cluster on which the subscribe is running.
102    pub cluster_id: ClusterId,
103    /// The IDs of the objects on which the subscribe depends.
104    pub depends_on: BTreeSet<GlobalId>,
105    /// Channel on which to send responses to the client.
106    // The responses have the form `PeekResponseUnary` but should perhaps
107    // become `SubscribeResponse`.
108    pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
109    /// Whether progress information should be emitted.
110    pub emit_progress: bool,
111    /// The logical timestamp at which the subscribe began execution.
112    pub as_of: Timestamp,
113    /// The number of columns in the relation that was subscribed to.
114    pub arity: usize,
115    /// The time when the subscribe started.
116    pub start_time: EpochMillis,
117    /// How to present the subscribe's output.
118    pub output: SubscribeOutput,
119}
120
121impl ActiveSubscribe {
122    /// Initializes the subscription.
123    ///
124    /// This method must be called exactly once, after constructing an
125    /// `ActiveSubscribe` and before calling `process_response`.
126    pub fn initialize(&self) {
127        // Always emit progress message indicating snapshot timestamp.
128        self.send_progress_message(&Antichain::from_elem(self.as_of));
129    }
130
131    fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
132        if !self.emit_progress {
133            return;
134        }
135        if let Some(upper) = upper.as_option() {
136            let mut row_buf = Row::default();
137            let mut packer = row_buf.packer();
138            packer.push(Datum::from(numeric::Numeric::from(*upper)));
139            packer.push(Datum::True);
140
141            // Fill in the mz_diff or mz_state column
142            packer.push(Datum::Null);
143
144            // Fill all table columns with NULL.
145            for _ in 0..self.arity {
146                packer.push(Datum::Null);
147            }
148
149            if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
150                for _ in 0..(self.arity - order_by_keys.len()) {
151                    packer.push(Datum::Null);
152                }
153            }
154
155            let row_iter = Box::new(row_buf.into_row_iter());
156            self.send(PeekResponseUnary::Rows(row_iter));
157        }
158    }
159
160    /// Processes a subscribe response from the controller.
161    ///
162    /// Returns `true` if the subscribe is finished.
163    pub fn process_response(&self, batch: SubscribeBatch) -> bool {
164        let mut rows = match batch.updates {
165            Ok(rows) => rows,
166            Err(s) => {
167                self.send(PeekResponseUnary::Error(s));
168                return true;
169            }
170        };
171
172        // Sort results by time. We use stable sort here because it will produce
173        // deterministic results since the cursor will always produce rows in
174        // the same order. Compute doesn't guarantee that the results are sorted
175        // (materialize#18936)
176        let mut row_buf = Row::default();
177        match &self.output {
178            SubscribeOutput::WithinTimestampOrderBy { order_by } => {
179                let mut left_datum_vec = mz_repr::DatumVec::new();
180                let mut right_datum_vec = mz_repr::DatumVec::new();
181                rows.sort_by(
182                    |(left_time, left_row, left_diff), (right_time, right_row, right_diff)| {
183                        left_time.cmp(right_time).then_with(|| {
184                            let mut left_datums = left_datum_vec.borrow();
185                            left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
186                            left_datums.extend(left_row.iter());
187                            let mut right_datums = right_datum_vec.borrow();
188                            right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
189                            right_datums.extend(right_row.iter());
190                            compare_columns(order_by, &left_datums, &right_datums, || {
191                                left_row.cmp(right_row).then(left_diff.cmp(right_diff))
192                            })
193                        })
194                    },
195                );
196            }
197            SubscribeOutput::EnvelopeUpsert { order_by_keys }
198            | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
199                let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
200                let mut left_datum_vec = mz_repr::DatumVec::new();
201                let mut right_datum_vec = mz_repr::DatumVec::new();
202                rows.sort_by(
203                    |(left_time, left_row, left_diff), (right_time, right_row, right_diff)| {
204                        left_time.cmp(right_time).then_with(|| {
205                            let left_datums = left_datum_vec.borrow_with(left_row);
206                            let right_datums = right_datum_vec.borrow_with(right_row);
207                            compare_columns(order_by_keys, &left_datums, &right_datums, || {
208                                left_diff.cmp(right_diff)
209                            })
210                        })
211                    },
212                );
213
214                let mut new_rows = Vec::new();
215                let mut it = rows.iter();
216                let mut datum_vec = mz_repr::DatumVec::new();
217                let mut old_datum_vec = mz_repr::DatumVec::new();
218                while let Some(start) = it.next() {
219                    let group = iter::once(start)
220                        .chain(it.take_while_ref(|row| {
221                            let left_datums = left_datum_vec.borrow_with(&start.1);
222                            let right_datums = right_datum_vec.borrow_with(&row.1);
223                            start.0 == row.0
224                                && compare_columns(
225                                    order_by_keys,
226                                    &left_datums,
227                                    &right_datums,
228                                    || Ordering::Equal,
229                                ) == Ordering::Equal
230                        }))
231                        .collect_vec();
232
233                    // Four cases:
234                    // [(key, value, +1)] => ("insert", key, NULL, value)
235                    // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2)
236                    // [(key, value, -1)] => ("delete", key, value, NULL)
237                    // everything else => ("key_violation", key, NULL, NULL)
238                    let value_columns = self.arity - order_by_keys.len();
239                    let mut packer = row_buf.packer();
240                    new_rows.push(match &group[..] {
241                        [(_, row, Diff::ONE)] => {
242                            packer.push(if debezium {
243                                Datum::String("insert")
244                            } else {
245                                Datum::String("upsert")
246                            });
247                            let datums = datum_vec.borrow_with(row);
248                            for column_order in order_by_keys {
249                                packer.push(datums[column_order.column]);
250                            }
251                            if debezium {
252                                for _ in 0..value_columns {
253                                    packer.push(Datum::Null);
254                                }
255                            }
256                            for idx in 0..self.arity {
257                                if !order_by_keys.iter().any(|co| co.column == idx) {
258                                    packer.push(datums[idx]);
259                                }
260                            }
261                            (start.0, row_buf.clone(), Diff::ZERO)
262                        }
263                        [(_, _, Diff::MINUS_ONE)] => {
264                            packer.push(Datum::String("delete"));
265                            let datums = datum_vec.borrow_with(&start.1);
266                            for column_order in order_by_keys {
267                                packer.push(datums[column_order.column]);
268                            }
269                            if debezium {
270                                for idx in 0..self.arity {
271                                    if !order_by_keys.iter().any(|co| co.column == idx) {
272                                        packer.push(datums[idx]);
273                                    }
274                                }
275                            }
276                            for _ in 0..self.arity - order_by_keys.len() {
277                                packer.push(Datum::Null);
278                            }
279                            (start.0, row_buf.clone(), Diff::ZERO)
280                        }
281                        [(_, old_row, Diff::MINUS_ONE), (_, row, Diff::ONE)] => {
282                            packer.push(Datum::String("upsert"));
283                            let datums = datum_vec.borrow_with(row);
284                            let old_datums = old_datum_vec.borrow_with(old_row);
285
286                            for column_order in order_by_keys {
287                                packer.push(datums[column_order.column]);
288                            }
289                            if debezium {
290                                for idx in 0..self.arity {
291                                    if !order_by_keys.iter().any(|co| co.column == idx) {
292                                        packer.push(old_datums[idx]);
293                                    }
294                                }
295                            }
296                            for idx in 0..self.arity {
297                                if !order_by_keys.iter().any(|co| co.column == idx) {
298                                    packer.push(datums[idx]);
299                                }
300                            }
301                            (start.0, row_buf.clone(), Diff::ZERO)
302                        }
303                        _ => {
304                            packer.push(Datum::String("key_violation"));
305                            let datums = datum_vec.borrow_with(&start.1);
306                            for column_order in order_by_keys {
307                                packer.push(datums[column_order.column]);
308                            }
309                            if debezium {
310                                for _ in 0..(self.arity - order_by_keys.len()) {
311                                    packer.push(Datum::Null);
312                                }
313                            }
314                            for _ in 0..(self.arity - order_by_keys.len()) {
315                                packer.push(Datum::Null);
316                            }
317                            (start.0, row_buf.clone(), Diff::ZERO)
318                        }
319                    });
320                }
321                rows = new_rows;
322            }
323            SubscribeOutput::Diffs => rows.sort_by_key(|(time, _, _)| *time),
324        }
325
326        let rows: Vec<Row> = rows
327            .into_iter()
328            .map(|(time, row, diff)| {
329                assert!(self.as_of <= time);
330                let mut packer = row_buf.packer();
331                // TODO: Change to MzTimestamp.
332                packer.push(Datum::from(numeric::Numeric::from(time)));
333                if self.emit_progress {
334                    // When sinking with PROGRESS, the output includes an
335                    // additional column that indicates whether a timestamp is
336                    // complete. For regular "data" updates this is always
337                    // `false`.
338                    packer.push(Datum::False);
339                }
340
341                match &self.output {
342                    SubscribeOutput::EnvelopeUpsert { .. }
343                    | SubscribeOutput::EnvelopeDebezium { .. } => {}
344                    SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
345                        packer.push(Datum::Int64(diff.into_inner()));
346                    }
347                }
348
349                packer.extend_by_row(&row);
350
351                row_buf.clone()
352            })
353            .collect();
354        let rows = Box::new(rows.into_row_iter());
355
356        self.send(PeekResponseUnary::Rows(rows));
357
358        // Emit progress message if requested. Don't emit progress for the first
359        // batch if the upper is exactly `as_of` (we're guaranteed it is not
360        // less than `as_of`, but it might be exactly `as_of`) as we've already
361        // emitted that progress message in `initialize`.
362        if !batch.upper.less_equal(&self.as_of) {
363            self.send_progress_message(&batch.upper);
364        }
365
366        batch.upper.is_empty()
367    }
368
369    /// Retires the subscribe with the specified reason.
370    ///
371    /// This method must be called on every subscribe before it is dropped. It
372    /// informs the end client that the subscribe is finished for the specified
373    /// reason.
374    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
375        let message = match reason {
376            ActiveComputeSinkRetireReason::Finished => return,
377            ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
378            ActiveComputeSinkRetireReason::DependencyDropped(d) => PeekResponseUnary::Error(
379                format!("subscribe has been terminated because underlying {d} was dropped"),
380            ),
381        };
382        self.send(message);
383    }
384
385    /// Sends a message to the client if the subscribe has not already completed
386    /// and if the client has not already gone away.
387    fn send(&self, response: PeekResponseUnary) {
388        // TODO(benesch): the lack of backpressure here can result in
389        // unbounded memory usage.
390        let _ = self.channel.send(response);
391    }
392}
393
394/// A description of an active copy to sink from the coordinator's perspective.
395#[derive(Debug)]
396pub struct ActiveCopyTo {
397    /// The ID of the connection which created the subscribe.
398    pub conn_id: ConnectionId,
399    /// The result channel for the `COPY ... TO` statement that created the copy to sink.
400    pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
401    /// The ID of the cluster on which the copy to is running.
402    pub cluster_id: ClusterId,
403    /// The IDs of the objects on which the copy to depends.
404    pub depends_on: BTreeSet<GlobalId>,
405}
406
407impl ActiveCopyTo {
408    /// Retires the copy to with a response from the controller.
409    ///
410    /// Unlike subscribes, copy tos only expect a single response from the
411    /// controller, so `process_response` and `retire` are unified into a single
412    /// operation.
413    ///
414    /// Either this method or `retire` must be called on every copy to before it
415    /// is dropped.
416    pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
417        let response = match response {
418            Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
419            Err(error) => Err(AdapterError::Unstructured(error)),
420        };
421        let _ = self.tx.send(response);
422    }
423
424    /// Retires the copy to with the specified reason.
425    ///
426    /// Either this method or `retire_with_response` must be called on every
427    /// copy to before it is dropped.
428    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
429        let message = match reason {
430            ActiveComputeSinkRetireReason::Finished => return,
431            ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
432            ActiveComputeSinkRetireReason::DependencyDropped(d) => Err(AdapterError::Unstructured(
433                anyhow!("copy has been terminated because underlying {d} was dropped"),
434            )),
435        };
436        let _ = self.tx.send(message);
437    }
438}
439
440/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
441#[derive(Debug)]
442pub(crate) struct ActiveCopyFrom {
443    /// ID of the ingestion running in clusterd.
444    pub ingestion_id: uuid::Uuid,
445    /// The cluster this is currently running on.
446    pub cluster_id: StorageInstanceId,
447    /// The table we're currently copying into.
448    pub table_id: CatalogItemId,
449    /// Context of the SQL session that ran the statement.
450    pub ctx: ExecuteContext,
451}