1use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::num::NonZeroUsize;
15
16use anyhow::anyhow;
17use mz_adapter_types::connection::ConnectionId;
18use mz_compute_client::protocol::response::SubscribeBatch;
19use mz_controller_types::ClusterId;
20use mz_expr::row::RowCollection;
21use mz_expr::{RowComparator, compare_columns};
22use mz_ore::cast::CastFrom;
23use mz_ore::now::EpochMillis;
24use mz_repr::adt::numeric;
25use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp};
26use mz_sql::plan::SubscribeOutput;
27use mz_storage_types::instances::StorageInstanceId;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use uuid::Uuid;
31
32use crate::coord::peek::PeekResponseUnary;
33use crate::{AdapterError, ExecuteContext, ExecuteResponse};
34
35#[derive(Debug)]
36pub enum ActiveComputeSink {
38 Subscribe(ActiveSubscribe),
40 CopyTo(ActiveCopyTo),
42}
43
44impl ActiveComputeSink {
45 pub fn cluster_id(&self) -> ClusterId {
47 match &self {
48 ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
49 ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
50 }
51 }
52
53 pub fn connection_id(&self) -> &ConnectionId {
55 match &self {
56 ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
57 ActiveComputeSink::CopyTo(copy_to) => ©_to.conn_id,
58 }
59 }
60
61 pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
63 match &self {
64 ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
65 ActiveComputeSink::CopyTo(copy_to) => ©_to.depends_on,
66 }
67 }
68
69 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
75 match self {
76 ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
77 ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
84pub enum ActiveComputeSinkRetireReason {
85 Finished,
87 Canceled,
89 DependencyDropped(String),
92}
93
94#[derive(Debug)]
96pub struct ActiveSubscribe {
97 pub conn_id: ConnectionId,
99 pub session_uuid: Uuid,
101 pub cluster_id: ClusterId,
103 pub depends_on: BTreeSet<GlobalId>,
105 pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
109 pub emit_progress: bool,
111 pub as_of: Timestamp,
113 pub arity: usize,
115 pub start_time: EpochMillis,
117 pub output: SubscribeOutput,
119}
120
121impl ActiveSubscribe {
122 pub fn initialize(&self) {
127 self.send_progress_message(&Antichain::from_elem(self.as_of));
129 }
130
131 fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
132 if !self.emit_progress {
133 return;
134 }
135 if let Some(upper) = upper.as_option() {
136 let mut row_buf = Row::default();
137 let mut packer = row_buf.packer();
138 packer.push(Datum::from(numeric::Numeric::from(*upper)));
139 packer.push(Datum::True);
140
141 packer.push(Datum::Null);
143
144 for _ in 0..self.arity {
146 packer.push(Datum::Null);
147 }
148
149 if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
150 for _ in 0..(self.arity - order_by_keys.len()) {
151 packer.push(Datum::Null);
152 }
153 }
154
155 let row_iter = Box::new(row_buf.into_row_iter());
156 self.send(PeekResponseUnary::Rows(row_iter));
157 }
158 }
159
160 pub fn process_response(&self, batch: SubscribeBatch) -> bool {
164 let comparator = RowComparator::new(self.output.row_order());
165 let rows = match batch.updates {
166 Ok(ref rows) => {
167 let iters = rows.iter().map(|r| r.iter());
168 let merged = mz_ore::iter::merge_iters_by(
169 iters,
170 |(left_row, left_time, _), (right_row, right_time, _)| {
171 left_time.cmp(right_time).then_with(|| {
172 comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row))
173 })
174 },
175 );
176 mz_ore::iter::consolidate_update_iter(merged)
177 }
178 Err(s) => {
179 self.send(PeekResponseUnary::Error(s));
180 return true;
181 }
182 };
183
184 let mut output_buf = Row::default();
189 let mut output_builder = RowCollection::builder(0, 0);
190 let mut left_datum_vec = mz_repr::DatumVec::new();
191 let mut right_datum_vec = mz_repr::DatumVec::new();
192 let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| {
193 assert!(self.as_of <= time);
194 let mut packer = output_buf.packer();
195 packer.push(Datum::from(numeric::Numeric::from(time)));
197 if self.emit_progress {
198 packer.push(Datum::False);
203 }
204
205 match &self.output {
206 SubscribeOutput::EnvelopeUpsert { .. }
207 | SubscribeOutput::EnvelopeDebezium { .. } => {}
208 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
209 packer.push(Datum::Int64(diff.into_inner()));
210 }
211 }
212
213 packer.extend_by_row_ref(row);
214
215 output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN);
216 };
217
218 match &self.output {
219 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
220 let mut rows: Vec<_> = rows.collect();
221 rows.sort_by(
224 |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| {
225 left_time.cmp(right_time).then_with(|| {
226 let mut left_datums = left_datum_vec.borrow();
227 left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
228 left_datums.extend(left_row.iter());
229 let mut right_datums = right_datum_vec.borrow();
230 right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
231 right_datums.extend(right_row.iter());
232 compare_columns(order_by, &left_datums, &right_datums, || {
233 left_row.cmp(right_row).then(left_diff.cmp(right_diff))
234 })
235 })
236 },
237 );
238 for (row, time, diff) in rows {
239 push_row(row, *time, diff);
240 }
241 }
242 SubscribeOutput::EnvelopeUpsert { order_by_keys }
243 | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
244 let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
245 let mut it = rows.peekable();
246 let mut datum_vec = mz_repr::DatumVec::new();
247 let mut old_datum_vec = mz_repr::DatumVec::new();
248 let comparator = RowComparator::new(order_by_keys.as_slice());
249 let mut group = Vec::with_capacity(2);
250 let mut row_buf = Row::default();
251 while let Some(start) = it.next() {
254 group.clear();
255 group.push(start);
256 while let Some(row) = it.peek()
257 && start.1 == row.1
258 && {
259 comparator
260 .compare_rows(start.0, row.0, || Ordering::Equal)
261 .is_eq()
262 }
263 {
264 group.extend(it.next());
265 }
266 group.sort_by_key(|(_, _, d)| *d);
267
268 let value_columns = self.arity - order_by_keys.len();
274 let mut packer = row_buf.packer();
275 match &group[..] {
276 [(row, _, Diff::ONE)] => {
277 packer.push(if debezium {
278 Datum::String("insert")
279 } else {
280 Datum::String("upsert")
281 });
282 let datums = datum_vec.borrow_with(row);
283 for column_order in order_by_keys {
284 packer.push(datums[column_order.column]);
285 }
286 if debezium {
287 for _ in 0..value_columns {
288 packer.push(Datum::Null);
289 }
290 }
291 for idx in 0..self.arity {
292 if !order_by_keys.iter().any(|co| co.column == idx) {
293 packer.push(datums[idx]);
294 }
295 }
296 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
297 }
298 [(_, _, Diff::MINUS_ONE)] => {
299 packer.push(Datum::String("delete"));
300 let datums = datum_vec.borrow_with(start.0);
301 for column_order in order_by_keys {
302 packer.push(datums[column_order.column]);
303 }
304 if debezium {
305 for idx in 0..self.arity {
306 if !order_by_keys.iter().any(|co| co.column == idx) {
307 packer.push(datums[idx]);
308 }
309 }
310 }
311 for _ in 0..self.arity - order_by_keys.len() {
312 packer.push(Datum::Null);
313 }
314 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
315 }
316 [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => {
317 packer.push(Datum::String("upsert"));
318 let datums = datum_vec.borrow_with(row);
319 let old_datums = old_datum_vec.borrow_with(old_row);
320
321 for column_order in order_by_keys {
322 packer.push(datums[column_order.column]);
323 }
324 if debezium {
325 for idx in 0..self.arity {
326 if !order_by_keys.iter().any(|co| co.column == idx) {
327 packer.push(old_datums[idx]);
328 }
329 }
330 }
331 for idx in 0..self.arity {
332 if !order_by_keys.iter().any(|co| co.column == idx) {
333 packer.push(datums[idx]);
334 }
335 }
336 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
337 }
338 _ => {
339 packer.push(Datum::String("key_violation"));
340 let datums = datum_vec.borrow_with(start.0);
341 for column_order in order_by_keys {
342 packer.push(datums[column_order.column]);
343 }
344 if debezium {
345 for _ in 0..(self.arity - order_by_keys.len()) {
346 packer.push(Datum::Null);
347 }
348 }
349 for _ in 0..(self.arity - order_by_keys.len()) {
350 packer.push(Datum::Null);
351 }
352 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
353 }
354 };
355 }
356 }
357 SubscribeOutput::Diffs => {
358 for (row, time, diff) in rows {
360 push_row(row, *time, diff)
361 }
362 }
363 };
364
365 let rows = output_builder.build();
366 let rows = Box::new(rows.into_row_iter());
367 self.send(PeekResponseUnary::Rows(rows));
368
369 if !batch.upper.less_equal(&self.as_of) {
374 self.send_progress_message(&batch.upper);
375 }
376
377 batch.upper.is_empty()
378 }
379
380 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
386 let message = match reason {
387 ActiveComputeSinkRetireReason::Finished => return,
388 ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
389 ActiveComputeSinkRetireReason::DependencyDropped(d) => PeekResponseUnary::Error(
390 format!("subscribe has been terminated because underlying {d} was dropped"),
391 ),
392 };
393 self.send(message);
394 }
395
396 fn send(&self, response: PeekResponseUnary) {
399 let _ = self.channel.send(response);
402 }
403}
404
405#[derive(Debug)]
407pub struct ActiveCopyTo {
408 pub conn_id: ConnectionId,
410 pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
412 pub cluster_id: ClusterId,
414 pub depends_on: BTreeSet<GlobalId>,
416}
417
418impl ActiveCopyTo {
419 pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
428 let response = match response {
429 Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
430 Err(error) => Err(AdapterError::Unstructured(error)),
431 };
432 let _ = self.tx.send(response);
433 }
434
435 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
440 let message = match reason {
441 ActiveComputeSinkRetireReason::Finished => return,
442 ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
443 ActiveComputeSinkRetireReason::DependencyDropped(d) => Err(AdapterError::Unstructured(
444 anyhow!("copy has been terminated because underlying {d} was dropped"),
445 )),
446 };
447 let _ = self.tx.send(message);
448 }
449}
450
451#[derive(Debug)]
453pub(crate) struct ActiveCopyFrom {
454 pub ingestion_id: uuid::Uuid,
456 pub cluster_id: StorageInstanceId,
458 pub table_id: CatalogItemId,
460 pub ctx: ExecuteContext,
462}