Skip to main content

mz_adapter/
active_compute_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator bookkeeping for active compute sinks.
11
12use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::num::NonZeroUsize;
15
16use anyhow::anyhow;
17use mz_adapter_types::connection::ConnectionId;
18use mz_compute_client::protocol::response::SubscribeBatch;
19use mz_controller_types::ClusterId;
20use mz_expr::row::RowCollection;
21use mz_expr::{RowComparator, compare_columns};
22use mz_ore::cast::CastFrom;
23use mz_ore::now::EpochMillis;
24use mz_repr::adt::numeric;
25use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp};
26use mz_sql::plan::SubscribeOutput;
27use mz_storage_types::instances::StorageInstanceId;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use uuid::Uuid;
31
32use crate::coord::peek::PeekResponseUnary;
33use crate::{AdapterError, ExecuteContext, ExecuteResponse};
34
35#[derive(Debug)]
36/// A description of an active compute sink from the coordinator's perspective.
37pub enum ActiveComputeSink {
38    /// An active subscribe sink.
39    Subscribe(ActiveSubscribe),
40    /// An active copy to sink.
41    CopyTo(ActiveCopyTo),
42}
43
44impl ActiveComputeSink {
45    /// Reports the ID of the cluster on which the sink is running.
46    pub fn cluster_id(&self) -> ClusterId {
47        match &self {
48            ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
49            ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
50        }
51    }
52
53    /// Reports the ID of the connection which created the sink.
54    pub fn connection_id(&self) -> &ConnectionId {
55        match &self {
56            ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
57            ActiveComputeSink::CopyTo(copy_to) => &copy_to.conn_id,
58        }
59    }
60
61    /// Reports the IDs of the objects on which the sink depends.
62    pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
63        match &self {
64            ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
65            ActiveComputeSink::CopyTo(copy_to) => &copy_to.depends_on,
66        }
67    }
68
69    /// Retires the sink with the specified reason.
70    ///
71    /// This method must be called on every sink before it is dropped. It
72    /// informs the end client that the sink is finished for the specified
73    /// reason.
74    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
75        match self {
76            ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
77            ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
78        }
79    }
80}
81
82/// The reason for removing an [`ActiveComputeSink`].
83#[derive(Debug, Clone)]
84pub enum ActiveComputeSinkRetireReason {
85    /// The compute sink completed successfully.
86    Finished,
87    /// The compute sink was canceled due to a user request.
88    Canceled,
89    /// The compute sink was forcibly terminated because an object it depended on
90    /// was dropped.
91    DependencyDropped(String),
92}
93
94/// A description of an active subscribe from coord's perspective
95#[derive(Debug)]
96pub struct ActiveSubscribe {
97    /// The ID of the connection which created the subscribe.
98    pub conn_id: ConnectionId,
99    /// The UUID of the session which created the subscribe.
100    pub session_uuid: Uuid,
101    /// The ID of the cluster on which the subscribe is running.
102    pub cluster_id: ClusterId,
103    /// The IDs of the objects on which the subscribe depends.
104    pub depends_on: BTreeSet<GlobalId>,
105    /// Channel on which to send responses to the client.
106    // The responses have the form `PeekResponseUnary` but should perhaps
107    // become `SubscribeResponse`.
108    pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
109    /// Whether progress information should be emitted.
110    pub emit_progress: bool,
111    /// The logical timestamp at which the subscribe began execution.
112    pub as_of: Timestamp,
113    /// The number of columns in the relation that was subscribed to.
114    pub arity: usize,
115    /// The time when the subscribe started.
116    pub start_time: EpochMillis,
117    /// How to present the subscribe's output.
118    pub output: SubscribeOutput,
119}
120
121impl ActiveSubscribe {
122    /// Initializes the subscription.
123    ///
124    /// This method must be called exactly once, after constructing an
125    /// `ActiveSubscribe` and before calling `process_response`.
126    pub fn initialize(&self) {
127        // Always emit progress message indicating snapshot timestamp.
128        self.send_progress_message(&Antichain::from_elem(self.as_of));
129    }
130
131    fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
132        if !self.emit_progress {
133            return;
134        }
135        if let Some(upper) = upper.as_option() {
136            let mut row_buf = Row::default();
137            let mut packer = row_buf.packer();
138            packer.push(Datum::from(numeric::Numeric::from(*upper)));
139            packer.push(Datum::True);
140
141            // Fill in the mz_diff or mz_state column
142            packer.push(Datum::Null);
143
144            // Fill all table columns with NULL.
145            for _ in 0..self.arity {
146                packer.push(Datum::Null);
147            }
148
149            if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
150                for _ in 0..(self.arity - order_by_keys.len()) {
151                    packer.push(Datum::Null);
152                }
153            }
154
155            let row_iter = Box::new(row_buf.into_row_iter());
156            self.send(PeekResponseUnary::Rows(row_iter));
157        }
158    }
159
160    /// Processes a subscribe response from the controller.
161    ///
162    /// Returns `true` if the subscribe is finished.
163    pub fn process_response(&self, batch: SubscribeBatch) -> bool {
164        let comparator = RowComparator::new(self.output.row_order());
165        let rows = match batch.updates {
166            Ok(ref rows) => {
167                let iters = rows.iter().map(|r| r.iter());
168                let merged = mz_ore::iter::merge_iters_by(
169                    iters,
170                    |(left_row, left_time, _), (right_row, right_time, _)| {
171                        left_time.cmp(right_time).then_with(|| {
172                            comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row))
173                        })
174                    },
175                );
176                mz_ore::iter::consolidate_update_iter(merged)
177            }
178            Err(s) => {
179                self.send(PeekResponseUnary::Error(s));
180                return true;
181            }
182        };
183
184        // Sort results by time. We use stable sort here because it will produce
185        // deterministic results since the cursor will always produce rows in
186        // the same order. Compute doesn't guarantee that the results are sorted
187        // (materialize#18936)
188        let mut output_buf = Row::default();
189        let mut output_builder = RowCollection::builder(0, 0);
190        let mut left_datum_vec = mz_repr::DatumVec::new();
191        let mut right_datum_vec = mz_repr::DatumVec::new();
192        let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| {
193            assert!(self.as_of <= time);
194            let mut packer = output_buf.packer();
195            // TODO: Change to MzTimestamp.
196            packer.push(Datum::from(numeric::Numeric::from(time)));
197            if self.emit_progress {
198                // When sinking with PROGRESS, the output includes an
199                // additional column that indicates whether a timestamp is
200                // complete. For regular "data" updates this is always
201                // `false`.
202                packer.push(Datum::False);
203            }
204
205            match &self.output {
206                SubscribeOutput::EnvelopeUpsert { .. }
207                | SubscribeOutput::EnvelopeDebezium { .. } => {}
208                SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
209                    packer.push(Datum::Int64(diff.into_inner()));
210                }
211            }
212
213            packer.extend_by_row_ref(row);
214
215            output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN);
216        };
217
218        match &self.output {
219            SubscribeOutput::WithinTimestampOrderBy { order_by } => {
220                let mut rows: Vec<_> = rows.collect();
221                // Since the diff is inserted as the first column, we can't take advantage of the
222                // known ordering. (Aside from timestamp, I suppose.)
223                rows.sort_by(
224                    |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| {
225                        left_time.cmp(right_time).then_with(|| {
226                            let mut left_datums = left_datum_vec.borrow();
227                            left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
228                            left_datums.extend(left_row.iter());
229                            let mut right_datums = right_datum_vec.borrow();
230                            right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
231                            right_datums.extend(right_row.iter());
232                            compare_columns(order_by, &left_datums, &right_datums, || {
233                                left_row.cmp(right_row).then(left_diff.cmp(right_diff))
234                            })
235                        })
236                    },
237                );
238                for (row, time, diff) in rows {
239                    push_row(row, *time, diff);
240                }
241            }
242            SubscribeOutput::EnvelopeUpsert { order_by_keys }
243            | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
244                let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
245                let mut it = rows.peekable();
246                let mut datum_vec = mz_repr::DatumVec::new();
247                let mut old_datum_vec = mz_repr::DatumVec::new();
248                let comparator = RowComparator::new(order_by_keys.as_slice());
249                let mut group = Vec::with_capacity(2);
250                let mut row_buf = Row::default();
251                // The iterator is sorted by time and key, so elements in the same group should be
252                // adjacent already.
253                while let Some(start) = it.next() {
254                    group.clear();
255                    group.push(start);
256                    while let Some(row) = it.peek()
257                        && start.1 == row.1
258                        && {
259                            comparator
260                                .compare_rows(start.0, row.0, || Ordering::Equal)
261                                .is_eq()
262                        }
263                    {
264                        group.extend(it.next());
265                    }
266                    group.sort_by_key(|(_, _, d)| *d);
267
268                    // Four cases:
269                    // [(key, value, +1)] => ("insert", key, NULL, value)
270                    // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2)
271                    // [(key, value, -1)] => ("delete", key, value, NULL)
272                    // everything else => ("key_violation", key, NULL, NULL)
273                    let value_columns = self.arity - order_by_keys.len();
274                    let mut packer = row_buf.packer();
275                    match &group[..] {
276                        [(row, _, Diff::ONE)] => {
277                            packer.push(if debezium {
278                                Datum::String("insert")
279                            } else {
280                                Datum::String("upsert")
281                            });
282                            let datums = datum_vec.borrow_with(row);
283                            for column_order in order_by_keys {
284                                packer.push(datums[column_order.column]);
285                            }
286                            if debezium {
287                                for _ in 0..value_columns {
288                                    packer.push(Datum::Null);
289                                }
290                            }
291                            for idx in 0..self.arity {
292                                if !order_by_keys.iter().any(|co| co.column == idx) {
293                                    packer.push(datums[idx]);
294                                }
295                            }
296                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
297                        }
298                        [(_, _, Diff::MINUS_ONE)] => {
299                            packer.push(Datum::String("delete"));
300                            let datums = datum_vec.borrow_with(start.0);
301                            for column_order in order_by_keys {
302                                packer.push(datums[column_order.column]);
303                            }
304                            if debezium {
305                                for idx in 0..self.arity {
306                                    if !order_by_keys.iter().any(|co| co.column == idx) {
307                                        packer.push(datums[idx]);
308                                    }
309                                }
310                            }
311                            for _ in 0..self.arity - order_by_keys.len() {
312                                packer.push(Datum::Null);
313                            }
314                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
315                        }
316                        [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => {
317                            packer.push(Datum::String("upsert"));
318                            let datums = datum_vec.borrow_with(row);
319                            let old_datums = old_datum_vec.borrow_with(old_row);
320
321                            for column_order in order_by_keys {
322                                packer.push(datums[column_order.column]);
323                            }
324                            if debezium {
325                                for idx in 0..self.arity {
326                                    if !order_by_keys.iter().any(|co| co.column == idx) {
327                                        packer.push(old_datums[idx]);
328                                    }
329                                }
330                            }
331                            for idx in 0..self.arity {
332                                if !order_by_keys.iter().any(|co| co.column == idx) {
333                                    packer.push(datums[idx]);
334                                }
335                            }
336                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
337                        }
338                        _ => {
339                            packer.push(Datum::String("key_violation"));
340                            let datums = datum_vec.borrow_with(start.0);
341                            for column_order in order_by_keys {
342                                packer.push(datums[column_order.column]);
343                            }
344                            if debezium {
345                                for _ in 0..(self.arity - order_by_keys.len()) {
346                                    packer.push(Datum::Null);
347                                }
348                            }
349                            for _ in 0..(self.arity - order_by_keys.len()) {
350                                packer.push(Datum::Null);
351                            }
352                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
353                        }
354                    };
355                }
356            }
357            SubscribeOutput::Diffs => {
358                // Diffs output is sorted by time and row, so it can be pushed directly.
359                for (row, time, diff) in rows {
360                    push_row(row, *time, diff)
361                }
362            }
363        };
364
365        let rows = output_builder.build();
366        let rows = Box::new(rows.into_row_iter());
367        self.send(PeekResponseUnary::Rows(rows));
368
369        // Emit progress message if requested. Don't emit progress for the first
370        // batch if the upper is exactly `as_of` (we're guaranteed it is not
371        // less than `as_of`, but it might be exactly `as_of`) as we've already
372        // emitted that progress message in `initialize`.
373        if !batch.upper.less_equal(&self.as_of) {
374            self.send_progress_message(&batch.upper);
375        }
376
377        batch.upper.is_empty()
378    }
379
380    /// Retires the subscribe with the specified reason.
381    ///
382    /// This method must be called on every subscribe before it is dropped. It
383    /// informs the end client that the subscribe is finished for the specified
384    /// reason.
385    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
386        let message = match reason {
387            ActiveComputeSinkRetireReason::Finished => return,
388            ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
389            ActiveComputeSinkRetireReason::DependencyDropped(d) => PeekResponseUnary::Error(
390                format!("subscribe has been terminated because underlying {d} was dropped"),
391            ),
392        };
393        self.send(message);
394    }
395
396    /// Sends a message to the client if the subscribe has not already completed
397    /// and if the client has not already gone away.
398    fn send(&self, response: PeekResponseUnary) {
399        // TODO(benesch): the lack of backpressure here can result in
400        // unbounded memory usage.
401        let _ = self.channel.send(response);
402    }
403}
404
405/// A description of an active copy to sink from the coordinator's perspective.
406#[derive(Debug)]
407pub struct ActiveCopyTo {
408    /// The ID of the connection which created the subscribe.
409    pub conn_id: ConnectionId,
410    /// The result channel for the `COPY ... TO` statement that created the copy to sink.
411    pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
412    /// The ID of the cluster on which the copy to is running.
413    pub cluster_id: ClusterId,
414    /// The IDs of the objects on which the copy to depends.
415    pub depends_on: BTreeSet<GlobalId>,
416}
417
418impl ActiveCopyTo {
419    /// Retires the copy to with a response from the controller.
420    ///
421    /// Unlike subscribes, copy tos only expect a single response from the
422    /// controller, so `process_response` and `retire` are unified into a single
423    /// operation.
424    ///
425    /// Either this method or `retire` must be called on every copy to before it
426    /// is dropped.
427    pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
428        let response = match response {
429            Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
430            Err(error) => Err(AdapterError::Unstructured(error)),
431        };
432        let _ = self.tx.send(response);
433    }
434
435    /// Retires the copy to with the specified reason.
436    ///
437    /// Either this method or `retire_with_response` must be called on every
438    /// copy to before it is dropped.
439    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
440        let message = match reason {
441            ActiveComputeSinkRetireReason::Finished => return,
442            ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
443            ActiveComputeSinkRetireReason::DependencyDropped(d) => Err(AdapterError::Unstructured(
444                anyhow!("copy has been terminated because underlying {d} was dropped"),
445            )),
446        };
447        let _ = self.tx.send(message);
448    }
449}
450
451/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
452#[derive(Debug)]
453pub(crate) struct ActiveCopyFrom {
454    /// ID of the ingestion running in clusterd.
455    pub ingestion_id: uuid::Uuid,
456    /// The cluster this is currently running on.
457    pub cluster_id: StorageInstanceId,
458    /// The table we're currently copying into.
459    pub table_id: CatalogItemId,
460    /// Context of the SQL session that ran the statement.
461    pub ctx: ExecuteContext,
462}