Skip to main content

mz_adapter/
active_compute_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator bookkeeping for active compute sinks.
11
12use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::num::NonZeroUsize;
15
16use mz_adapter_types::connection::ConnectionId;
17use mz_compute_client::protocol::response::SubscribeBatch;
18use mz_controller_types::ClusterId;
19use mz_expr::row::RowCollection;
20use mz_expr::{RowComparator, compare_columns};
21use mz_ore::cast::CastFrom;
22use mz_ore::now::EpochMillis;
23use mz_repr::adt::numeric;
24use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp};
25use mz_sql::plan::SubscribeOutput;
26use mz_storage_types::instances::StorageInstanceId;
27use timely::progress::Antichain;
28use tokio::sync::{mpsc, oneshot};
29use uuid::Uuid;
30
31use crate::coord::peek::{DroppedDependency, PeekResponseUnary};
32use crate::{AdapterError, ExecuteContext, ExecuteResponse};
33
34#[derive(Debug)]
35/// A description of an active compute sink from the coordinator's perspective.
36pub enum ActiveComputeSink {
37    /// An active subscribe sink.
38    Subscribe(ActiveSubscribe),
39    /// An active copy to sink.
40    CopyTo(ActiveCopyTo),
41}
42
43impl ActiveComputeSink {
44    /// Reports the ID of the cluster on which the sink is running.
45    pub fn cluster_id(&self) -> ClusterId {
46        match &self {
47            ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
48            ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
49        }
50    }
51
52    /// Reports the ID of the connection which created the sink.
53    pub fn connection_id(&self) -> &ConnectionId {
54        match &self {
55            ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
56            ActiveComputeSink::CopyTo(copy_to) => &copy_to.conn_id,
57        }
58    }
59
60    /// Reports the IDs of the objects on which the sink depends.
61    pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
62        match &self {
63            ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
64            ActiveComputeSink::CopyTo(copy_to) => &copy_to.depends_on,
65        }
66    }
67
68    /// Retires the sink with the specified reason.
69    ///
70    /// This method must be called on every sink before it is dropped. It
71    /// informs the end client that the sink is finished for the specified
72    /// reason.
73    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
74        match self {
75            ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
76            ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
77        }
78    }
79}
80
81/// The reason for removing an [`ActiveComputeSink`].
82#[derive(Debug, Clone)]
83pub enum ActiveComputeSinkRetireReason {
84    /// The compute sink completed successfully.
85    Finished,
86    /// The compute sink was canceled due to a user request.
87    Canceled,
88    /// The compute sink was forcibly terminated because an object it depended on
89    /// was dropped.
90    DependencyDropped(DroppedDependency),
91}
92
93/// A description of an active subscribe from coord's perspective
94#[derive(Debug)]
95pub struct ActiveSubscribe {
96    /// The ID of the connection which created the subscribe.
97    pub conn_id: ConnectionId,
98    /// The UUID of the session which created the subscribe.
99    pub session_uuid: Uuid,
100    /// The ID of the cluster on which the subscribe is running.
101    pub cluster_id: ClusterId,
102    /// The IDs of the objects on which the subscribe depends.
103    pub depends_on: BTreeSet<GlobalId>,
104    /// Channel on which to send responses to the client.
105    // The responses have the form `PeekResponseUnary` but should perhaps
106    // become `SubscribeResponse`.
107    pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
108    /// Whether progress information should be emitted.
109    pub emit_progress: bool,
110    /// The logical timestamp at which the subscribe began execution.
111    pub as_of: Timestamp,
112    /// The number of columns in the relation that was subscribed to.
113    pub arity: usize,
114    /// The time when the subscribe started.
115    pub start_time: EpochMillis,
116    /// How to present the subscribe's output.
117    pub output: SubscribeOutput,
118}
119
120impl ActiveSubscribe {
121    /// Initializes the subscription.
122    ///
123    /// This method must be called exactly once, after constructing an
124    /// `ActiveSubscribe` and before calling `process_response`.
125    pub fn initialize(&self) {
126        // Always emit progress message indicating snapshot timestamp.
127        self.send_progress_message(&Antichain::from_elem(self.as_of));
128    }
129
130    fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
131        if !self.emit_progress {
132            return;
133        }
134        if let Some(upper) = upper.as_option() {
135            let mut row_buf = Row::default();
136            let mut packer = row_buf.packer();
137            packer.push(Datum::from(numeric::Numeric::from(*upper)));
138            packer.push(Datum::True);
139
140            // Fill in the mz_diff or mz_state column
141            packer.push(Datum::Null);
142
143            // Fill all table columns with NULL.
144            for _ in 0..self.arity {
145                packer.push(Datum::Null);
146            }
147
148            if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
149                for _ in 0..(self.arity - order_by_keys.len()) {
150                    packer.push(Datum::Null);
151                }
152            }
153
154            let row_iter = Box::new(row_buf.into_row_iter());
155            self.send(PeekResponseUnary::Rows(row_iter));
156        }
157    }
158
159    /// Processes a subscribe response from the controller.
160    ///
161    /// Returns `true` if the subscribe is finished.
162    pub fn process_response(&self, batch: SubscribeBatch) -> bool {
163        let comparator = RowComparator::new(self.output.row_order());
164        let rows = match batch.updates {
165            Ok(ref rows) => {
166                let iters = rows.iter().map(|r| r.iter());
167                let merged = mz_ore::iter::merge_iters_by(
168                    iters,
169                    |(left_row, left_time, _), (right_row, right_time, _)| {
170                        left_time.cmp(right_time).then_with(|| {
171                            comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row))
172                        })
173                    },
174                );
175                mz_ore::iter::consolidate_update_iter(merged)
176            }
177            Err(s) => {
178                self.send(PeekResponseUnary::Error(s));
179                return true;
180            }
181        };
182
183        // Sort results by time. We use stable sort here because it will produce
184        // deterministic results since the cursor will always produce rows in
185        // the same order. Compute doesn't guarantee that the results are sorted
186        // (materialize#18936)
187        let mut output_buf = Row::default();
188        let mut output_builder = RowCollection::builder(0, 0);
189        let mut left_datum_vec = mz_repr::DatumVec::new();
190        let mut right_datum_vec = mz_repr::DatumVec::new();
191        let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| {
192            assert!(self.as_of <= time);
193            let mut packer = output_buf.packer();
194            // TODO: Change to MzTimestamp.
195            packer.push(Datum::from(numeric::Numeric::from(time)));
196            if self.emit_progress {
197                // When sinking with PROGRESS, the output includes an
198                // additional column that indicates whether a timestamp is
199                // complete. For regular "data" updates this is always
200                // `false`.
201                packer.push(Datum::False);
202            }
203
204            match &self.output {
205                SubscribeOutput::EnvelopeUpsert { .. }
206                | SubscribeOutput::EnvelopeDebezium { .. } => {}
207                SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
208                    packer.push(Datum::Int64(diff.into_inner()));
209                }
210            }
211
212            packer.extend_by_row_ref(row);
213
214            output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN);
215        };
216
217        match &self.output {
218            SubscribeOutput::WithinTimestampOrderBy { order_by } => {
219                let mut rows: Vec<_> = rows.collect();
220                // Since the diff is inserted as the first column, we can't take advantage of the
221                // known ordering. (Aside from timestamp, I suppose.)
222                rows.sort_by(
223                    |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| {
224                        left_time.cmp(right_time).then_with(|| {
225                            let mut left_datums = left_datum_vec.borrow();
226                            left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
227                            left_datums.extend(left_row.iter());
228                            let mut right_datums = right_datum_vec.borrow();
229                            right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
230                            right_datums.extend(right_row.iter());
231                            compare_columns(order_by, &left_datums, &right_datums, || {
232                                left_row.cmp(right_row).then(left_diff.cmp(right_diff))
233                            })
234                        })
235                    },
236                );
237                for (row, time, diff) in rows {
238                    push_row(row, *time, diff);
239                }
240            }
241            SubscribeOutput::EnvelopeUpsert { order_by_keys }
242            | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
243                let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
244                let mut it = rows.peekable();
245                let mut datum_vec = mz_repr::DatumVec::new();
246                let mut old_datum_vec = mz_repr::DatumVec::new();
247                let comparator = RowComparator::new(order_by_keys.as_slice());
248                let mut group = Vec::with_capacity(2);
249                let mut row_buf = Row::default();
250                // The iterator is sorted by time and key, so elements in the same group should be
251                // adjacent already.
252                while let Some(start) = it.next() {
253                    group.clear();
254                    group.push(start);
255                    while let Some(row) = it.peek()
256                        && start.1 == row.1
257                        && {
258                            comparator
259                                .compare_rows(start.0, row.0, || Ordering::Equal)
260                                .is_eq()
261                        }
262                    {
263                        group.extend(it.next());
264                    }
265                    group.sort_by_key(|(_, _, d)| *d);
266
267                    // Four cases:
268                    // [(key, value, +1)] => ("insert", key, NULL, value)
269                    // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2)
270                    // [(key, value, -1)] => ("delete", key, value, NULL)
271                    // everything else => ("key_violation", key, NULL, NULL)
272                    let value_columns = self.arity - order_by_keys.len();
273                    let mut packer = row_buf.packer();
274                    match &group[..] {
275                        [(row, _, Diff::ONE)] => {
276                            packer.push(if debezium {
277                                Datum::String("insert")
278                            } else {
279                                Datum::String("upsert")
280                            });
281                            let datums = datum_vec.borrow_with(row);
282                            for column_order in order_by_keys {
283                                packer.push(datums[column_order.column]);
284                            }
285                            if debezium {
286                                for _ in 0..value_columns {
287                                    packer.push(Datum::Null);
288                                }
289                            }
290                            for idx in 0..self.arity {
291                                if !order_by_keys.iter().any(|co| co.column == idx) {
292                                    packer.push(datums[idx]);
293                                }
294                            }
295                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
296                        }
297                        [(_, _, Diff::MINUS_ONE)] => {
298                            packer.push(Datum::String("delete"));
299                            let datums = datum_vec.borrow_with(start.0);
300                            for column_order in order_by_keys {
301                                packer.push(datums[column_order.column]);
302                            }
303                            if debezium {
304                                for idx in 0..self.arity {
305                                    if !order_by_keys.iter().any(|co| co.column == idx) {
306                                        packer.push(datums[idx]);
307                                    }
308                                }
309                            }
310                            for _ in 0..self.arity - order_by_keys.len() {
311                                packer.push(Datum::Null);
312                            }
313                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
314                        }
315                        [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => {
316                            packer.push(Datum::String("upsert"));
317                            let datums = datum_vec.borrow_with(row);
318                            let old_datums = old_datum_vec.borrow_with(old_row);
319
320                            for column_order in order_by_keys {
321                                packer.push(datums[column_order.column]);
322                            }
323                            if debezium {
324                                for idx in 0..self.arity {
325                                    if !order_by_keys.iter().any(|co| co.column == idx) {
326                                        packer.push(old_datums[idx]);
327                                    }
328                                }
329                            }
330                            for idx in 0..self.arity {
331                                if !order_by_keys.iter().any(|co| co.column == idx) {
332                                    packer.push(datums[idx]);
333                                }
334                            }
335                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
336                        }
337                        _ => {
338                            packer.push(Datum::String("key_violation"));
339                            let datums = datum_vec.borrow_with(start.0);
340                            for column_order in order_by_keys {
341                                packer.push(datums[column_order.column]);
342                            }
343                            if debezium {
344                                for _ in 0..(self.arity - order_by_keys.len()) {
345                                    packer.push(Datum::Null);
346                                }
347                            }
348                            for _ in 0..(self.arity - order_by_keys.len()) {
349                                packer.push(Datum::Null);
350                            }
351                            push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
352                        }
353                    };
354                }
355            }
356            SubscribeOutput::Diffs => {
357                // Diffs output is sorted by time and row, so it can be pushed directly.
358                for (row, time, diff) in rows {
359                    push_row(row, *time, diff)
360                }
361            }
362        };
363
364        let rows = output_builder.build();
365        let rows = Box::new(rows.into_row_iter());
366        self.send(PeekResponseUnary::Rows(rows));
367
368        // Emit progress message if requested. Don't emit progress for the first
369        // batch if the upper is exactly `as_of` (we're guaranteed it is not
370        // less than `as_of`, but it might be exactly `as_of`) as we've already
371        // emitted that progress message in `initialize`.
372        if !batch.upper.less_equal(&self.as_of) {
373            self.send_progress_message(&batch.upper);
374        }
375
376        batch.upper.is_empty()
377    }
378
379    /// Retires the subscribe with the specified reason.
380    ///
381    /// This method must be called on every subscribe before it is dropped. It
382    /// informs the end client that the subscribe is finished for the specified
383    /// reason.
384    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
385        let message = match reason {
386            ActiveComputeSinkRetireReason::Finished => return,
387            ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
388            ActiveComputeSinkRetireReason::DependencyDropped(d) => {
389                PeekResponseUnary::DependencyDropped(d)
390            }
391        };
392        self.send(message);
393    }
394
395    /// Sends a message to the client if the subscribe has not already completed
396    /// and if the client has not already gone away.
397    fn send(&self, response: PeekResponseUnary) {
398        // TODO(benesch): the lack of backpressure here can result in
399        // unbounded memory usage.
400        let _ = self.channel.send(response);
401    }
402}
403
404/// A description of an active copy to sink from the coordinator's perspective.
405#[derive(Debug)]
406pub struct ActiveCopyTo {
407    /// The ID of the connection which created the subscribe.
408    pub conn_id: ConnectionId,
409    /// The result channel for the `COPY ... TO` statement that created the copy to sink.
410    pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
411    /// The ID of the cluster on which the copy to is running.
412    pub cluster_id: ClusterId,
413    /// The IDs of the objects on which the copy to depends.
414    pub depends_on: BTreeSet<GlobalId>,
415}
416
417impl ActiveCopyTo {
418    /// Retires the copy to with a response from the controller.
419    ///
420    /// Unlike subscribes, copy tos only expect a single response from the
421    /// controller, so `process_response` and `retire` are unified into a single
422    /// operation.
423    ///
424    /// Either this method or `retire` must be called on every copy to before it
425    /// is dropped.
426    pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
427        let response = match response {
428            Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
429            Err(error) => Err(AdapterError::Unstructured(error)),
430        };
431        let _ = self.tx.send(response);
432    }
433
434    /// Retires the copy to with the specified reason.
435    ///
436    /// Either this method or `retire_with_response` must be called on every
437    /// copy to before it is dropped.
438    pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
439        let message = match reason {
440            ActiveComputeSinkRetireReason::Finished => return,
441            ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
442            ActiveComputeSinkRetireReason::DependencyDropped(dep) => {
443                Err(dep.to_concurrent_dependency_drop())
444            }
445        };
446        let _ = self.tx.send(message);
447    }
448}
449
450/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
451#[derive(Debug)]
452pub(crate) struct ActiveCopyFrom {
453    /// ID of the ingestion running in clusterd.
454    pub ingestion_id: uuid::Uuid,
455    /// The cluster this is currently running on.
456    pub cluster_id: StorageInstanceId,
457    /// The table we're currently copying into.
458    pub table_id: CatalogItemId,
459    /// Context of the SQL session that ran the statement.
460    pub ctx: ExecuteContext,
461}