Skip to main content

mz_adapter/
active_compute_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator bookkeeping for active compute sinks.
11
12use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::num::NonZeroUsize;
15
16use mz_adapter_types::connection::ConnectionId;
17use mz_compute_client::protocol::response::SubscribeBatch;
18use mz_controller_types::ClusterId;
19use mz_expr::row::RowCollection;
20use mz_expr::{RowComparator, compare_columns};
21use mz_ore::cast::CastFrom;
22use mz_ore::now::EpochMillis;
23use mz_repr::adt::numeric;
24use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp};
25use mz_sql::plan::SubscribeOutput;
26use mz_storage_types::instances::StorageInstanceId;
27use timely::progress::Antichain;
28use tokio::sync::{mpsc, oneshot};
29use uuid::Uuid;
30
31use crate::coord::peek::{DroppedDependency, PeekResponseUnary};
32use crate::{AdapterError, ExecuteContext, ExecuteResponse};
33
34#[derive(Debug)]
35/// A description of an active compute sink from the coordinator's perspective.
36pub enum ActiveComputeSink {
37    /// An active subscribe sink.
38    Subscribe(ActiveSubscribe),
39    /// An active copy to sink.
40    CopyTo(ActiveCopyTo),
41}
42
43impl ActiveComputeSink {
44    /// Reports the ID of the cluster on which the sink is running.
45    pub fn cluster_id(&self) -> ClusterId {
46        match &self {
47            ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
48            ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
49        }
50    }
51
52    /// Reports the ID of the connection which created the sink.
53    pub fn connection_id(&self) -> &ConnectionId {
54        match &self {
55            ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
56            ActiveComputeSink::CopyTo(copy_to) => &copy_to.conn_id,
57        }
58    }
59
60    /// Reports the IDs of the objects on which the sink depends.
61    pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
62        match &self {
63            ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
64            ActiveComputeSink::CopyTo(copy_to) => &copy_to.depends_on,
65        }
66    }
67
68    /// Retires the sink with the specified reason.
69    ///
70    /// This method must be called on every sink before it is dropped. It
71    /// informs the end client that the sink is finished for the specified
72    /// reason.
73    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
74        match self {
75            ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
76            ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
77        }
78    }
79}
80
81/// The reason for removing an [`ActiveComputeSink`].
82#[derive(Debug, Clone)]
83pub enum ActiveComputeSinkRetireReason {
84    /// The compute sink completed successfully.
85    Finished,
86    /// The compute sink was canceled due to a user request.
87    Canceled,
88    /// The compute sink was forcibly terminated because an object it depended on
89    /// was dropped.
90    DependencyDropped(DroppedDependency),
91}
92
93/// A description of an active subscribe from coord's perspective
94#[derive(Debug)]
95pub struct ActiveSubscribe {
96    /// The ID of the connection which created the subscribe.
97    pub conn_id: ConnectionId,
98    /// The UUID of the session which created the subscribe.
99    pub session_uuid: Uuid,
100    /// The ID of the cluster on which the subscribe is running.
101    pub cluster_id: ClusterId,
102    /// The IDs of the objects on which the subscribe depends.
103    pub depends_on: BTreeSet<GlobalId>,
104    /// Channel on which to send responses to the client.
105    // The responses have the form `PeekResponseUnary` but should perhaps
106    // become `SubscribeResponse`.
107    pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
108    /// Whether progress information should be emitted.
109    pub emit_progress: bool,
110    /// The logical timestamp at which the subscribe began execution.
111    pub as_of: Timestamp,
112    /// The number of columns in the relation that was subscribed to.
113    pub arity: usize,
114    /// The time when the subscribe started.
115    pub start_time: EpochMillis,
116    /// How to present the subscribe's output.
117    pub output: SubscribeOutput,
118    /// If true, this is an internal subscribe that should not appear in
119    /// introspection tables like mz_subscriptions.
120    pub internal: bool,
121}
122
123impl ActiveSubscribe {
124    /// Initializes the subscription.
125    ///
126    /// This method must be called exactly once, after constructing an
127    /// `ActiveSubscribe` and before calling `process_response`.
128    pub fn initialize(&self) {
129        // Always emit progress message indicating snapshot timestamp.
130        self.send_progress_message(&Antichain::from_elem(self.as_of));
131    }
132
133    fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
134        if !self.emit_progress {
135            return;
136        }
137        if let Some(upper) = upper.as_option() {
138            let mut row_buf = Row::default();
139            let mut packer = row_buf.packer();
140            packer.push(Datum::from(numeric::Numeric::from(*upper)));
141            packer.push(Datum::True);
142
143            // Fill in the mz_diff or mz_state column
144            packer.push(Datum::Null);
145
146            // Fill all table columns with NULL.
147            for _ in 0..self.arity {
148                packer.push(Datum::Null);
149            }
150
151            if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
152                for _ in 0..(self.arity - order_by_keys.len()) {
153                    packer.push(Datum::Null);
154                }
155            }
156
157            let row_iter = Box::new(row_buf.into_row_iter());
158            self.send(PeekResponseUnary::Rows(row_iter));
159        }
160    }
161
162    /// Processes a subscribe response from the controller.
163    ///
164    /// Returns `true` if the subscribe is finished.
165    pub fn process_response(&self, batch: SubscribeBatch) -> bool {
166        let comparator = RowComparator::new(self.output.row_order());
167        let rows = match batch.updates {
168            Ok(ref rows) => {
169                let iters = rows.iter().map(|r| r.iter());
170                let merged = mz_ore::iter::merge_iters_by(
171                    iters,
172                    |(left_row, left_time, _), (right_row, right_time, _)| {
173                        left_time.cmp(right_time).then_with(|| {
174                            comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row))
175                        })
176                    },
177                );
178                mz_ore::iter::consolidate_update_iter(merged)
179            }
180            Err(s) => {
181                self.send(PeekResponseUnary::Error(s));
182                return true;
183            }
184        };
185
186        // Sort results by time. We use stable sort here because it will produce
187        // deterministic results since the cursor will always produce rows in
188        // the same order. Compute doesn't guarantee that the results are sorted
189        // (materialize#18936)
190        let mut output_buf = Row::default();
191        let mut output_builder = RowCollection::builder(0, 0);
192        let mut left_datum_vec = mz_repr::DatumVec::new();
193        let mut right_datum_vec = mz_repr::DatumVec::new();
194        let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| {
195            assert!(self.as_of <= time);
196            let mut packer = output_buf.packer();
197            // TODO: Change to MzTimestamp.
198            packer.push(Datum::from(numeric::Numeric::from(time)));
199            if self.emit_progress {
200                // When sinking with PROGRESS, the output includes an
201                // additional column that indicates whether a timestamp is
202                // complete. For regular "data" updates this is always
203                // `false`.
204                packer.push(Datum::False);
205            }
206
207            match &self.output {
208                SubscribeOutput::EnvelopeUpsert { .. }
209                | SubscribeOutput::EnvelopeDebezium { .. } => {}
210                SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
211                    packer.push(Datum::Int64(diff.into_inner()));
212                }
213            }
214
215            packer.extend_by_row_ref(row);
216
217            output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN);
218        };
219
220        match &self.output {
221            SubscribeOutput::WithinTimestampOrderBy { order_by } => {
222                let mut rows: Vec<_> = rows.collect();
223                // Since the diff is inserted as the first column, we can't take advantage of the
224                // known ordering. (Aside from timestamp, I suppose.)
225                rows.sort_by(
226                    |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| {
227                        left_time.cmp(right_time).then_with(|| {
228                            let mut left_datums = left_datum_vec.borrow();
229                            left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
230                            left_datums.extend(left_row.iter());
231                            let mut right_datums = right_datum_vec.borrow();
232                            right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
233                            right_datums.extend(right_row.iter());
234                            compare_columns(order_by, &left_datums, &right_datums, || {
235                                left_row.cmp(right_row).then(left_diff.cmp(right_diff))
236                            })
237                        })
238                    },
239                );
240                for (row, time, diff) in rows {
241                    push_row(row, *time, diff);
242                }
243            }
244            SubscribeOutput::EnvelopeUpsert { order_by_keys }
245            | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
246                let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
247                let mut it = rows.peekable();
248                let mut datum_vec = mz_repr::DatumVec::new();
249                let mut old_datum_vec = mz_repr::DatumVec::new();
250                let comparator = RowComparator::new(order_by_keys.as_slice());
251                let mut group = Vec::with_capacity(2);
252                let mut row_buf = Row::default();
253                // The iterator is sorted by time and key, so elements in the same group should be
254                // adjacent already.
255                while let Some(start) = it.next() {
256                    group.clear();
257                    group.push(start);
258                    while let Some(row) = it.peek()
259                        && start.1 == row.1
260                        && {
261                            comparator
262                                .compare_rows(start.0, row.0, || Ordering::Equal)
263                                .is_eq()
264                        }
265                    {
266                        group.extend(it.next());
267                    }
268                    group.sort_by_key(|(_, _, d)| *d);
269
270                    // Four cases:
271                    // [(key, value, +1)] => ("insert", key, NULL, value)
272                    // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2)
273                    // [(key, value, -1)] => ("delete", key, value, NULL)
274                    // everything else => ("key_violation", key, NULL, NULL)
275                    // Defense in depth: the planner ensures that KEY columns are
276                    // distinct columns of the underlying relation, so this
277                    // subtraction must never underflow. If it does, we'd OOM
278                    // the coordinator with a giant loop, so check it here.
279                    mz_ore::soft_assert_or_log!(
280                        order_by_keys.len() <= self.arity,
281                        "SUBSCRIBE ENVELOPE has more KEY columns ({}) than \
282                         relation arity ({}); planner should have rejected this",
283                        order_by_keys.len(),
284                        self.arity,
285                    );
286                    let value_columns = self.arity.saturating_sub(order_by_keys.len());
287                    let mut packer = row_buf.packer();
288                    match &group[..] {
289                        [(row, _, Diff::ONE)] => {
290                            packer.push(if debezium {
291                                Datum::String("insert")
292                            } else {
293                                Datum::String("upsert")
294                            });
295                            let datums = datum_vec.borrow_with(row);
296                            for column_order in order_by_keys {
297                                packer.push(datums[column_order.column]);
298                            }
299                            if debezium {
300                                for _ in 0..value_columns {
301                                    packer.push(Datum::Null);
302                                }
303                            }
304                            for idx in 0..self.arity {
305                                if !order_by_keys.iter().any(|co| co.column == idx) {
306                                    packer.push(datums[idx]);
307                                }
308                            }
309                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
310                        }
311                        [(_, _, Diff::MINUS_ONE)] => {
312                            packer.push(Datum::String("delete"));
313                            let datums = datum_vec.borrow_with(start.0);
314                            for column_order in order_by_keys {
315                                packer.push(datums[column_order.column]);
316                            }
317                            if debezium {
318                                for idx in 0..self.arity {
319                                    if !order_by_keys.iter().any(|co| co.column == idx) {
320                                        packer.push(datums[idx]);
321                                    }
322                                }
323                            }
324                            for _ in 0..value_columns {
325                                packer.push(Datum::Null);
326                            }
327                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
328                        }
329                        [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => {
330                            packer.push(Datum::String("upsert"));
331                            let datums = datum_vec.borrow_with(row);
332                            let old_datums = old_datum_vec.borrow_with(old_row);
333
334                            for column_order in order_by_keys {
335                                packer.push(datums[column_order.column]);
336                            }
337                            if debezium {
338                                for idx in 0..self.arity {
339                                    if !order_by_keys.iter().any(|co| co.column == idx) {
340                                        packer.push(old_datums[idx]);
341                                    }
342                                }
343                            }
344                            for idx in 0..self.arity {
345                                if !order_by_keys.iter().any(|co| co.column == idx) {
346                                    packer.push(datums[idx]);
347                                }
348                            }
349                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
350                        }
351                        _ => {
352                            packer.push(Datum::String("key_violation"));
353                            let datums = datum_vec.borrow_with(start.0);
354                            for column_order in order_by_keys {
355                                packer.push(datums[column_order.column]);
356                            }
357                            if debezium {
358                                for _ in 0..value_columns {
359                                    packer.push(Datum::Null);
360                                }
361                            }
362                            for _ in 0..value_columns {
363                                packer.push(Datum::Null);
364                            }
365                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
366                        }
367                    };
368                }
369            }
370            SubscribeOutput::Diffs => {
371                // Diffs output is sorted by time and row, so it can be pushed directly.
372                for (row, time, diff) in rows {
373                    push_row(row, *time, diff)
374                }
375            }
376        };
377
378        let rows = output_builder.build();
379        let rows = Box::new(rows.into_row_iter());
380        self.send(PeekResponseUnary::Rows(rows));
381
382        // Emit progress message if requested. Don't emit progress for the first
383        // batch if the upper is exactly `as_of` (we're guaranteed it is not
384        // less than `as_of`, but it might be exactly `as_of`) as we've already
385        // emitted that progress message in `initialize`.
386        if !batch.upper.less_equal(&self.as_of) {
387            self.send_progress_message(&batch.upper);
388        }
389
390        batch.upper.is_empty()
391    }
392
393    /// Retires the subscribe with the specified reason.
394    ///
395    /// This method must be called on every subscribe before it is dropped. It
396    /// informs the end client that the subscribe is finished for the specified
397    /// reason.
398    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
399        let message = match reason {
400            ActiveComputeSinkRetireReason::Finished => return,
401            ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
402            ActiveComputeSinkRetireReason::DependencyDropped(d) => {
403                PeekResponseUnary::DependencyDropped(d)
404            }
405        };
406        self.send(message);
407    }
408
409    /// Sends a message to the client if the subscribe has not already completed
410    /// and if the client has not already gone away.
411    fn send(&self, response: PeekResponseUnary) {
412        // TODO(benesch): the lack of backpressure here can result in
413        // unbounded memory usage.
414        let _ = self.channel.send(response);
415    }
416}
417
418/// A description of an active copy to sink from the coordinator's perspective.
419#[derive(Debug)]
420pub struct ActiveCopyTo {
421    /// The ID of the connection which created the subscribe.
422    pub conn_id: ConnectionId,
423    /// The result channel for the `COPY ... TO` statement that created the copy to sink.
424    pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
425    /// The ID of the cluster on which the copy to is running.
426    pub cluster_id: ClusterId,
427    /// The IDs of the objects on which the copy to depends.
428    pub depends_on: BTreeSet<GlobalId>,
429}
430
431impl ActiveCopyTo {
432    /// Retires the copy to with a response from the controller.
433    ///
434    /// Unlike subscribes, copy tos only expect a single response from the
435    /// controller, so `process_response` and `retire` are unified into a single
436    /// operation.
437    ///
438    /// Either this method or `retire` must be called on every copy to before it
439    /// is dropped.
440    pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
441        let response = match response {
442            Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
443            Err(error) => Err(AdapterError::Unstructured(error)),
444        };
445        let _ = self.tx.send(response);
446    }
447
448    /// Retires the copy to with the specified reason.
449    ///
450    /// Either this method or `retire_with_response` must be called on every
451    /// copy to before it is dropped.
452    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
453        let message = match reason {
454            ActiveComputeSinkRetireReason::Finished => return,
455            ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
456            ActiveComputeSinkRetireReason::DependencyDropped(dep) => {
457                Err(dep.to_concurrent_dependency_drop())
458            }
459        };
460        let _ = self.tx.send(message);
461    }
462}
463
464/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
465#[derive(Debug)]
466pub(crate) struct ActiveCopyFrom {
467    /// ID of the ingestion running in clusterd.
468    pub ingestion_id: uuid::Uuid,
469    /// The cluster this is currently running on.
470    pub cluster_id: StorageInstanceId,
471    /// The table we're currently copying into.
472    pub table_id: CatalogItemId,
473    /// Context of the SQL session that ran the statement.
474    pub ctx: ExecuteContext,
475}