1use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::num::NonZeroUsize;
15
16use mz_adapter_types::connection::ConnectionId;
17use mz_compute_client::protocol::response::SubscribeBatch;
18use mz_controller_types::ClusterId;
19use mz_expr::row::RowCollection;
20use mz_expr::{RowComparator, compare_columns};
21use mz_ore::cast::CastFrom;
22use mz_ore::now::EpochMillis;
23use mz_repr::adt::numeric;
24use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp};
25use mz_sql::plan::SubscribeOutput;
26use mz_storage_types::instances::StorageInstanceId;
27use timely::progress::Antichain;
28use tokio::sync::{mpsc, oneshot};
29use uuid::Uuid;
30
31use crate::coord::peek::{DroppedDependency, PeekResponseUnary};
32use crate::{AdapterError, ExecuteContext, ExecuteResponse};
33
34#[derive(Debug)]
35pub enum ActiveComputeSink {
37 Subscribe(ActiveSubscribe),
39 CopyTo(ActiveCopyTo),
41}
42
43impl ActiveComputeSink {
44 pub fn cluster_id(&self) -> ClusterId {
46 match &self {
47 ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
48 ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
49 }
50 }
51
52 pub fn connection_id(&self) -> &ConnectionId {
54 match &self {
55 ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
56 ActiveComputeSink::CopyTo(copy_to) => ©_to.conn_id,
57 }
58 }
59
60 pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
62 match &self {
63 ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
64 ActiveComputeSink::CopyTo(copy_to) => ©_to.depends_on,
65 }
66 }
67
68 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
74 match self {
75 ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
76 ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
83pub enum ActiveComputeSinkRetireReason {
84 Finished,
86 Canceled,
88 DependencyDropped(DroppedDependency),
91}
92
93#[derive(Debug)]
95pub struct ActiveSubscribe {
96 pub conn_id: ConnectionId,
98 pub session_uuid: Uuid,
100 pub cluster_id: ClusterId,
102 pub depends_on: BTreeSet<GlobalId>,
104 pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
108 pub emit_progress: bool,
110 pub as_of: Timestamp,
112 pub arity: usize,
114 pub start_time: EpochMillis,
116 pub output: SubscribeOutput,
118}
119
120impl ActiveSubscribe {
121 pub fn initialize(&self) {
126 self.send_progress_message(&Antichain::from_elem(self.as_of));
128 }
129
130 fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
131 if !self.emit_progress {
132 return;
133 }
134 if let Some(upper) = upper.as_option() {
135 let mut row_buf = Row::default();
136 let mut packer = row_buf.packer();
137 packer.push(Datum::from(numeric::Numeric::from(*upper)));
138 packer.push(Datum::True);
139
140 packer.push(Datum::Null);
142
143 for _ in 0..self.arity {
145 packer.push(Datum::Null);
146 }
147
148 if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
149 for _ in 0..(self.arity - order_by_keys.len()) {
150 packer.push(Datum::Null);
151 }
152 }
153
154 let row_iter = Box::new(row_buf.into_row_iter());
155 self.send(PeekResponseUnary::Rows(row_iter));
156 }
157 }
158
159 pub fn process_response(&self, batch: SubscribeBatch) -> bool {
163 let comparator = RowComparator::new(self.output.row_order());
164 let rows = match batch.updates {
165 Ok(ref rows) => {
166 let iters = rows.iter().map(|r| r.iter());
167 let merged = mz_ore::iter::merge_iters_by(
168 iters,
169 |(left_row, left_time, _), (right_row, right_time, _)| {
170 left_time.cmp(right_time).then_with(|| {
171 comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row))
172 })
173 },
174 );
175 mz_ore::iter::consolidate_update_iter(merged)
176 }
177 Err(s) => {
178 self.send(PeekResponseUnary::Error(s));
179 return true;
180 }
181 };
182
183 let mut output_buf = Row::default();
188 let mut output_builder = RowCollection::builder(0, 0);
189 let mut left_datum_vec = mz_repr::DatumVec::new();
190 let mut right_datum_vec = mz_repr::DatumVec::new();
191 let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| {
192 assert!(self.as_of <= time);
193 let mut packer = output_buf.packer();
194 packer.push(Datum::from(numeric::Numeric::from(time)));
196 if self.emit_progress {
197 packer.push(Datum::False);
202 }
203
204 match &self.output {
205 SubscribeOutput::EnvelopeUpsert { .. }
206 | SubscribeOutput::EnvelopeDebezium { .. } => {}
207 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
208 packer.push(Datum::Int64(diff.into_inner()));
209 }
210 }
211
212 packer.extend_by_row_ref(row);
213
214 output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN);
215 };
216
217 match &self.output {
218 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
219 let mut rows: Vec<_> = rows.collect();
220 rows.sort_by(
223 |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| {
224 left_time.cmp(right_time).then_with(|| {
225 let mut left_datums = left_datum_vec.borrow();
226 left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
227 left_datums.extend(left_row.iter());
228 let mut right_datums = right_datum_vec.borrow();
229 right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
230 right_datums.extend(right_row.iter());
231 compare_columns(order_by, &left_datums, &right_datums, || {
232 left_row.cmp(right_row).then(left_diff.cmp(right_diff))
233 })
234 })
235 },
236 );
237 for (row, time, diff) in rows {
238 push_row(row, *time, diff);
239 }
240 }
241 SubscribeOutput::EnvelopeUpsert { order_by_keys }
242 | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
243 let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
244 let mut it = rows.peekable();
245 let mut datum_vec = mz_repr::DatumVec::new();
246 let mut old_datum_vec = mz_repr::DatumVec::new();
247 let comparator = RowComparator::new(order_by_keys.as_slice());
248 let mut group = Vec::with_capacity(2);
249 let mut row_buf = Row::default();
250 while let Some(start) = it.next() {
253 group.clear();
254 group.push(start);
255 while let Some(row) = it.peek()
256 && start.1 == row.1
257 && {
258 comparator
259 .compare_rows(start.0, row.0, || Ordering::Equal)
260 .is_eq()
261 }
262 {
263 group.extend(it.next());
264 }
265 group.sort_by_key(|(_, _, d)| *d);
266
267 let value_columns = self.arity - order_by_keys.len();
273 let mut packer = row_buf.packer();
274 match &group[..] {
275 [(row, _, Diff::ONE)] => {
276 packer.push(if debezium {
277 Datum::String("insert")
278 } else {
279 Datum::String("upsert")
280 });
281 let datums = datum_vec.borrow_with(row);
282 for column_order in order_by_keys {
283 packer.push(datums[column_order.column]);
284 }
285 if debezium {
286 for _ in 0..value_columns {
287 packer.push(Datum::Null);
288 }
289 }
290 for idx in 0..self.arity {
291 if !order_by_keys.iter().any(|co| co.column == idx) {
292 packer.push(datums[idx]);
293 }
294 }
295 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
296 }
297 [(_, _, Diff::MINUS_ONE)] => {
298 packer.push(Datum::String("delete"));
299 let datums = datum_vec.borrow_with(start.0);
300 for column_order in order_by_keys {
301 packer.push(datums[column_order.column]);
302 }
303 if debezium {
304 for idx in 0..self.arity {
305 if !order_by_keys.iter().any(|co| co.column == idx) {
306 packer.push(datums[idx]);
307 }
308 }
309 }
310 for _ in 0..self.arity - order_by_keys.len() {
311 packer.push(Datum::Null);
312 }
313 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
314 }
315 [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => {
316 packer.push(Datum::String("upsert"));
317 let datums = datum_vec.borrow_with(row);
318 let old_datums = old_datum_vec.borrow_with(old_row);
319
320 for column_order in order_by_keys {
321 packer.push(datums[column_order.column]);
322 }
323 if debezium {
324 for idx in 0..self.arity {
325 if !order_by_keys.iter().any(|co| co.column == idx) {
326 packer.push(old_datums[idx]);
327 }
328 }
329 }
330 for idx in 0..self.arity {
331 if !order_by_keys.iter().any(|co| co.column == idx) {
332 packer.push(datums[idx]);
333 }
334 }
335 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
336 }
337 _ => {
338 packer.push(Datum::String("key_violation"));
339 let datums = datum_vec.borrow_with(start.0);
340 for column_order in order_by_keys {
341 packer.push(datums[column_order.column]);
342 }
343 if debezium {
344 for _ in 0..(self.arity - order_by_keys.len()) {
345 packer.push(Datum::Null);
346 }
347 }
348 for _ in 0..(self.arity - order_by_keys.len()) {
349 packer.push(Datum::Null);
350 }
351 push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO)
352 }
353 };
354 }
355 }
356 SubscribeOutput::Diffs => {
357 for (row, time, diff) in rows {
359 push_row(row, *time, diff)
360 }
361 }
362 };
363
364 let rows = output_builder.build();
365 let rows = Box::new(rows.into_row_iter());
366 self.send(PeekResponseUnary::Rows(rows));
367
368 if !batch.upper.less_equal(&self.as_of) {
373 self.send_progress_message(&batch.upper);
374 }
375
376 batch.upper.is_empty()
377 }
378
379 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
385 let message = match reason {
386 ActiveComputeSinkRetireReason::Finished => return,
387 ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
388 ActiveComputeSinkRetireReason::DependencyDropped(d) => {
389 PeekResponseUnary::DependencyDropped(d)
390 }
391 };
392 self.send(message);
393 }
394
395 fn send(&self, response: PeekResponseUnary) {
398 let _ = self.channel.send(response);
401 }
402}
403
404#[derive(Debug)]
406pub struct ActiveCopyTo {
407 pub conn_id: ConnectionId,
409 pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
411 pub cluster_id: ClusterId,
413 pub depends_on: BTreeSet<GlobalId>,
415}
416
417impl ActiveCopyTo {
418 pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
427 let response = match response {
428 Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
429 Err(error) => Err(AdapterError::Unstructured(error)),
430 };
431 let _ = self.tx.send(response);
432 }
433
434 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
439 let message = match reason {
440 ActiveComputeSinkRetireReason::Finished => return,
441 ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
442 ActiveComputeSinkRetireReason::DependencyDropped(dep) => {
443 Err(dep.to_concurrent_dependency_drop())
444 }
445 };
446 let _ = self.tx.send(message);
447 }
448}
449
450#[derive(Debug)]
452pub(crate) struct ActiveCopyFrom {
453 pub ingestion_id: uuid::Uuid,
455 pub cluster_id: StorageInstanceId,
457 pub table_id: CatalogItemId,
459 pub ctx: ExecuteContext,
461}