mz_adapter/active_compute_sink.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator bookkeeping for active compute sinks.
11
12use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::num::NonZeroUsize;
15
16use mz_adapter_types::connection::ConnectionId;
17use mz_compute_client::protocol::response::SubscribeBatch;
18use mz_controller_types::ClusterId;
19use mz_expr::row::RowCollection;
20use mz_expr::{RowComparator, compare_columns};
21use mz_ore::cast::CastFrom;
22use mz_ore::now::EpochMillis;
23use mz_repr::adt::numeric;
24use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp};
25use mz_sql::plan::SubscribeOutput;
26use mz_storage_types::instances::StorageInstanceId;
27use timely::progress::Antichain;
28use tokio::sync::{mpsc, oneshot};
29use uuid::Uuid;
30
31use crate::coord::peek::{DroppedDependency, PeekResponseUnary};
32use crate::{AdapterError, ExecuteContext, ExecuteResponse};
33
34#[derive(Debug)]
35/// A description of an active compute sink from the coordinator's perspective.
36pub enum ActiveComputeSink {
37 /// An active subscribe sink.
38 Subscribe(ActiveSubscribe),
39 /// An active copy to sink.
40 CopyTo(ActiveCopyTo),
41}
42
43impl ActiveComputeSink {
44 /// Reports the ID of the cluster on which the sink is running.
45 pub fn cluster_id(&self) -> ClusterId {
46 match &self {
47 ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
48 ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
49 }
50 }
51
52 /// Reports the ID of the connection which created the sink.
53 pub fn connection_id(&self) -> &ConnectionId {
54 match &self {
55 ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
56 ActiveComputeSink::CopyTo(copy_to) => ©_to.conn_id,
57 }
58 }
59
60 /// Reports the IDs of the objects on which the sink depends.
61 pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
62 match &self {
63 ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
64 ActiveComputeSink::CopyTo(copy_to) => ©_to.depends_on,
65 }
66 }
67
68 /// Retires the sink with the specified reason.
69 ///
70 /// This method must be called on every sink before it is dropped. It
71 /// informs the end client that the sink is finished for the specified
72 /// reason.
73 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
74 match self {
75 ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
76 ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
77 }
78 }
79}
80
81/// The reason for removing an [`ActiveComputeSink`].
82#[derive(Debug, Clone)]
83pub enum ActiveComputeSinkRetireReason {
84 /// The compute sink completed successfully.
85 Finished,
86 /// The compute sink was canceled due to a user request.
87 Canceled,
88 /// The compute sink was forcibly terminated because an object it depended on
89 /// was dropped.
90 DependencyDropped(DroppedDependency),
91}
92
93/// A description of an active subscribe from coord's perspective
94#[derive(Debug)]
95pub struct ActiveSubscribe {
96 /// The ID of the connection which created the subscribe.
97 pub conn_id: ConnectionId,
98 /// The UUID of the session which created the subscribe.
99 pub session_uuid: Uuid,
100 /// The ID of the cluster on which the subscribe is running.
101 pub cluster_id: ClusterId,
102 /// The IDs of the objects on which the subscribe depends.
103 pub depends_on: BTreeSet<GlobalId>,
104 /// Channel on which to send responses to the client.
105 // The responses have the form `PeekResponseUnary` but should perhaps
106 // become `SubscribeResponse`.
107 pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
108 /// Whether progress information should be emitted.
109 pub emit_progress: bool,
110 /// The logical timestamp at which the subscribe began execution.
111 pub as_of: Timestamp,
112 /// The number of columns in the relation that was subscribed to.
113 pub arity: usize,
114 /// The time when the subscribe started.
115 pub start_time: EpochMillis,
116 /// How to present the subscribe's output.
117 pub output: SubscribeOutput,
118 /// If true, this is an internal subscribe that should not appear in
119 /// introspection tables like mz_subscriptions.
120 pub internal: bool,
121}
122
123impl ActiveSubscribe {
124 /// Initializes the subscription.
125 ///
126 /// This method must be called exactly once, after constructing an
127 /// `ActiveSubscribe` and before calling `process_response`.
128 pub fn initialize(&self) {
129 // Always emit progress message indicating snapshot timestamp.
130 self.send_progress_message(&Antichain::from_elem(self.as_of));
131 }
132
133 fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
134 if !self.emit_progress {
135 return;
136 }
137 if let Some(upper) = upper.as_option() {
138 let mut row_buf = Row::default();
139 let mut packer = row_buf.packer();
140 packer.push(Datum::from(numeric::Numeric::from(*upper)));
141 packer.push(Datum::True);
142
143 // Fill in the mz_diff or mz_state column
144 packer.push(Datum::Null);
145
146 // Fill all table columns with NULL.
147 for _ in 0..self.arity {
148 packer.push(Datum::Null);
149 }
150
151 if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
152 for _ in 0..(self.arity - order_by_keys.len()) {
153 packer.push(Datum::Null);
154 }
155 }
156
157 let row_iter = Box::new(row_buf.into_row_iter());
158 self.send(PeekResponseUnary::Rows(row_iter));
159 }
160 }
161
162 /// Processes a subscribe response from the controller.
163 ///
164 /// Returns `true` if the subscribe is finished.
165 pub fn process_response(&self, batch: SubscribeBatch) -> bool {
166 let comparator = RowComparator::new(self.output.row_order());
167 let rows = match batch.updates {
168 Ok(ref rows) => {
169 let iters = rows.iter().map(|r| r.iter());
170 let merged = mz_ore::iter::merge_iters_by(
171 iters,
172 |(left_row, left_time, _), (right_row, right_time, _)| {
173 left_time.cmp(right_time).then_with(|| {
174 comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row))
175 })
176 },
177 );
178 mz_ore::iter::consolidate_update_iter(merged)
179 }
180 Err(s) => {
181 self.send(PeekResponseUnary::Error(s));
182 return true;
183 }
184 };
185
186 // Sort results by time. We use stable sort here because it will produce
187 // deterministic results since the cursor will always produce rows in
188 // the same order. Compute doesn't guarantee that the results are sorted
189 // (materialize#18936)
190 let mut output_buf = Row::default();
191 let mut output_builder = RowCollection::builder(0, 0);
192 let mut left_datum_vec = mz_repr::DatumVec::new();
193 let mut right_datum_vec = mz_repr::DatumVec::new();
194 let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| {
195 assert!(self.as_of <= time);
196 let mut packer = output_buf.packer();
197 // TODO: Change to MzTimestamp.
198 packer.push(Datum::from(numeric::Numeric::from(time)));
199 if self.emit_progress {
200 // When sinking with PROGRESS, the output includes an
201 // additional column that indicates whether a timestamp is
202 // complete. For regular "data" updates this is always
203 // `false`.
204 packer.push(Datum::False);
205 }
206
207 match &self.output {
208 SubscribeOutput::EnvelopeUpsert { .. }
209 | SubscribeOutput::EnvelopeDebezium { .. } => {}
210 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
211 packer.push(Datum::Int64(diff.into_inner()));
212 }
213 }
214
215 packer.extend_by_row_ref(row);
216
217 output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN);
218 };
219
220 match &self.output {
221 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
222 let mut rows: Vec<_> = rows.collect();
223 // Since the diff is inserted as the first column, we can't take advantage of the
224 // known ordering. (Aside from timestamp, I suppose.)
225 rows.sort_by(
226 |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| {
227 left_time.cmp(right_time).then_with(|| {
228 let mut left_datums = left_datum_vec.borrow();
229 left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
230 left_datums.extend(left_row.iter());
231 let mut right_datums = right_datum_vec.borrow();
232 right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
233 right_datums.extend(right_row.iter());
234 compare_columns(order_by, &left_datums, &right_datums, || {
235 left_row.cmp(right_row).then(left_diff.cmp(right_diff))
236 })
237 })
238 },
239 );
240 for (row, time, diff) in rows {
241 push_row(row, *time, diff);
242 }
243 }
244 SubscribeOutput::EnvelopeUpsert { order_by_keys }
245 | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
246 let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
247 let mut it = rows.peekable();
248 let mut datum_vec = mz_repr::DatumVec::new();
249 let mut old_datum_vec = mz_repr::DatumVec::new();
250 let comparator = RowComparator::new(order_by_keys.as_slice());
251 let mut group = Vec::with_capacity(2);
252 let mut row_buf = Row::default();
253 // The iterator is sorted by time and key, so elements in the same group should be
254 // adjacent already.
255 while let Some(start) = it.next() {
256 group.clear();
257 group.push(start);
258 while let Some(row) = it.peek()
259 && start.1 == row.1
260 && {
261 comparator
262 .compare_rows(start.0, row.0, || Ordering::Equal)
263 .is_eq()
264 }
265 {
266 group.extend(it.next());
267 }
268 group.sort_by_key(|(_, _, d)| *d);
269
270 // Four cases:
271 // [(key, value, +1)] => ("insert", key, NULL, value)
272 // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2)
273 // [(key, value, -1)] => ("delete", key, value, NULL)
274 // everything else => ("key_violation", key, NULL, NULL)
275 // Defense in depth: the planner ensures that KEY columns are
276 // distinct columns of the underlying relation, so this
277 // subtraction must never underflow. If it does, we'd OOM
278 // the coordinator with a giant loop, so check it here.
279 mz_ore::soft_assert_or_log!(
280 order_by_keys.len() <= self.arity,
281 "SUBSCRIBE ENVELOPE has more KEY columns ({}) than \
282 relation arity ({}); planner should have rejected this",
283 order_by_keys.len(),
284 self.arity,
285 );
286 let value_columns = self.arity.saturating_sub(order_by_keys.len());
287 let mut packer = row_buf.packer();
288 match &group[..] {
289 [(row, _, Diff::ONE)] => {
290 packer.push(if debezium {
291 Datum::String("insert")
292 } else {
293 Datum::String("upsert")
294 });
295 let datums = datum_vec.borrow_with(row);
296 for column_order in order_by_keys {
297 packer.push(datums[column_order.column]);
298 }
299 if debezium {
300 for _ in 0..value_columns {
301 packer.push(Datum::Null);
302 }
303 }
304 for idx in 0..self.arity {
305 if !order_by_keys.iter().any(|co| co.column == idx) {
306 packer.push(datums[idx]);
307 }
308 }
309 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
310 }
311 [(_, _, Diff::MINUS_ONE)] => {
312 packer.push(Datum::String("delete"));
313 let datums = datum_vec.borrow_with(start.0);
314 for column_order in order_by_keys {
315 packer.push(datums[column_order.column]);
316 }
317 if debezium {
318 for idx in 0..self.arity {
319 if !order_by_keys.iter().any(|co| co.column == idx) {
320 packer.push(datums[idx]);
321 }
322 }
323 }
324 for _ in 0..value_columns {
325 packer.push(Datum::Null);
326 }
327 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
328 }
329 [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => {
330 packer.push(Datum::String("upsert"));
331 let datums = datum_vec.borrow_with(row);
332 let old_datums = old_datum_vec.borrow_with(old_row);
333
334 for column_order in order_by_keys {
335 packer.push(datums[column_order.column]);
336 }
337 if debezium {
338 for idx in 0..self.arity {
339 if !order_by_keys.iter().any(|co| co.column == idx) {
340 packer.push(old_datums[idx]);
341 }
342 }
343 }
344 for idx in 0..self.arity {
345 if !order_by_keys.iter().any(|co| co.column == idx) {
346 packer.push(datums[idx]);
347 }
348 }
349 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
350 }
351 _ => {
352 packer.push(Datum::String("key_violation"));
353 let datums = datum_vec.borrow_with(start.0);
354 for column_order in order_by_keys {
355 packer.push(datums[column_order.column]);
356 }
357 if debezium {
358 for _ in 0..value_columns {
359 packer.push(Datum::Null);
360 }
361 }
362 for _ in 0..value_columns {
363 packer.push(Datum::Null);
364 }
365 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
366 }
367 };
368 }
369 }
370 SubscribeOutput::Diffs => {
371 // Diffs output is sorted by time and row, so it can be pushed directly.
372 for (row, time, diff) in rows {
373 push_row(row, *time, diff)
374 }
375 }
376 };
377
378 let rows = output_builder.build();
379 let rows = Box::new(rows.into_row_iter());
380 self.send(PeekResponseUnary::Rows(rows));
381
382 // Emit progress message if requested. Don't emit progress for the first
383 // batch if the upper is exactly `as_of` (we're guaranteed it is not
384 // less than `as_of`, but it might be exactly `as_of`) as we've already
385 // emitted that progress message in `initialize`.
386 if !batch.upper.less_equal(&self.as_of) {
387 self.send_progress_message(&batch.upper);
388 }
389
390 batch.upper.is_empty()
391 }
392
393 /// Retires the subscribe with the specified reason.
394 ///
395 /// This method must be called on every subscribe before it is dropped. It
396 /// informs the end client that the subscribe is finished for the specified
397 /// reason.
398 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
399 let message = match reason {
400 ActiveComputeSinkRetireReason::Finished => return,
401 ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
402 ActiveComputeSinkRetireReason::DependencyDropped(d) => {
403 PeekResponseUnary::DependencyDropped(d)
404 }
405 };
406 self.send(message);
407 }
408
409 /// Sends a message to the client if the subscribe has not already completed
410 /// and if the client has not already gone away.
411 fn send(&self, response: PeekResponseUnary) {
412 // TODO(benesch): the lack of backpressure here can result in
413 // unbounded memory usage.
414 let _ = self.channel.send(response);
415 }
416}
417
418/// A description of an active copy to sink from the coordinator's perspective.
419#[derive(Debug)]
420pub struct ActiveCopyTo {
421 /// The ID of the connection which created the subscribe.
422 pub conn_id: ConnectionId,
423 /// The result channel for the `COPY ... TO` statement that created the copy to sink.
424 pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
425 /// The ID of the cluster on which the copy to is running.
426 pub cluster_id: ClusterId,
427 /// The IDs of the objects on which the copy to depends.
428 pub depends_on: BTreeSet<GlobalId>,
429}
430
431impl ActiveCopyTo {
432 /// Retires the copy to with a response from the controller.
433 ///
434 /// Unlike subscribes, copy tos only expect a single response from the
435 /// controller, so `process_response` and `retire` are unified into a single
436 /// operation.
437 ///
438 /// Either this method or `retire` must be called on every copy to before it
439 /// is dropped.
440 pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
441 let response = match response {
442 Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
443 Err(error) => Err(AdapterError::Unstructured(error)),
444 };
445 let _ = self.tx.send(response);
446 }
447
448 /// Retires the copy to with the specified reason.
449 ///
450 /// Either this method or `retire_with_response` must be called on every
451 /// copy to before it is dropped.
452 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
453 let message = match reason {
454 ActiveComputeSinkRetireReason::Finished => return,
455 ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
456 ActiveComputeSinkRetireReason::DependencyDropped(dep) => {
457 Err(dep.to_concurrent_dependency_drop())
458 }
459 };
460 let _ = self.tx.send(message);
461 }
462}
463
464/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
465#[derive(Debug)]
466pub(crate) struct ActiveCopyFrom {
467 /// ID of the ingestion running in clusterd.
468 pub ingestion_id: uuid::Uuid,
469 /// The cluster this is currently running on.
470 pub cluster_id: StorageInstanceId,
471 /// The table we're currently copying into.
472 pub table_id: CatalogItemId,
473 /// Context of the SQL session that ran the statement.
474 pub ctx: ExecuteContext,
475}