1use std::cmp::Ordering;
13use std::collections::BTreeSet;
14use std::iter;
15
16use anyhow::anyhow;
17use itertools::Itertools;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_client::protocol::response::SubscribeBatch;
20use mz_controller_types::ClusterId;
21use mz_expr::compare_columns;
22use mz_ore::cast::CastFrom;
23use mz_ore::now::EpochMillis;
24use mz_repr::adt::numeric;
25use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, Timestamp};
26use mz_sql::plan::SubscribeOutput;
27use mz_storage_types::instances::StorageInstanceId;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use uuid::Uuid;
31
32use crate::coord::peek::PeekResponseUnary;
33use crate::{AdapterError, ExecuteContext, ExecuteResponse};
34
35#[derive(Debug)]
36pub enum ActiveComputeSink {
38 Subscribe(ActiveSubscribe),
40 CopyTo(ActiveCopyTo),
42}
43
44impl ActiveComputeSink {
45 pub fn cluster_id(&self) -> ClusterId {
47 match &self {
48 ActiveComputeSink::Subscribe(subscribe) => subscribe.cluster_id,
49 ActiveComputeSink::CopyTo(copy_to) => copy_to.cluster_id,
50 }
51 }
52
53 pub fn connection_id(&self) -> &ConnectionId {
55 match &self {
56 ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
57 ActiveComputeSink::CopyTo(copy_to) => ©_to.conn_id,
58 }
59 }
60
61 pub fn depends_on(&self) -> &BTreeSet<GlobalId> {
63 match &self {
64 ActiveComputeSink::Subscribe(subscribe) => &subscribe.depends_on,
65 ActiveComputeSink::CopyTo(copy_to) => ©_to.depends_on,
66 }
67 }
68
69 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
75 match self {
76 ActiveComputeSink::Subscribe(subscribe) => subscribe.retire(reason),
77 ActiveComputeSink::CopyTo(copy_to) => copy_to.retire(reason),
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
84pub enum ActiveComputeSinkRetireReason {
85 Finished,
87 Canceled,
89 DependencyDropped(String),
92}
93
94#[derive(Debug)]
96pub struct ActiveSubscribe {
97 pub conn_id: ConnectionId,
99 pub session_uuid: Uuid,
101 pub cluster_id: ClusterId,
103 pub depends_on: BTreeSet<GlobalId>,
105 pub channel: mpsc::UnboundedSender<PeekResponseUnary>,
109 pub emit_progress: bool,
111 pub as_of: Timestamp,
113 pub arity: usize,
115 pub start_time: EpochMillis,
117 pub output: SubscribeOutput,
119}
120
121impl ActiveSubscribe {
122 pub fn initialize(&self) {
127 self.send_progress_message(&Antichain::from_elem(self.as_of));
129 }
130
131 fn send_progress_message(&self, upper: &Antichain<Timestamp>) {
132 if !self.emit_progress {
133 return;
134 }
135 if let Some(upper) = upper.as_option() {
136 let mut row_buf = Row::default();
137 let mut packer = row_buf.packer();
138 packer.push(Datum::from(numeric::Numeric::from(*upper)));
139 packer.push(Datum::True);
140
141 packer.push(Datum::Null);
143
144 for _ in 0..self.arity {
146 packer.push(Datum::Null);
147 }
148
149 if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output {
150 for _ in 0..(self.arity - order_by_keys.len()) {
151 packer.push(Datum::Null);
152 }
153 }
154
155 let row_iter = Box::new(row_buf.into_row_iter());
156 self.send(PeekResponseUnary::Rows(row_iter));
157 }
158 }
159
160 pub fn process_response(&self, batch: SubscribeBatch) -> bool {
164 let mut rows = match batch.updates {
165 Ok(rows) => rows,
166 Err(s) => {
167 self.send(PeekResponseUnary::Error(s));
168 return true;
169 }
170 };
171
172 let mut row_buf = Row::default();
177 match &self.output {
178 SubscribeOutput::WithinTimestampOrderBy { order_by } => {
179 let mut left_datum_vec = mz_repr::DatumVec::new();
180 let mut right_datum_vec = mz_repr::DatumVec::new();
181 rows.sort_by(
182 |(left_time, left_row, left_diff), (right_time, right_row, right_diff)| {
183 left_time.cmp(right_time).then_with(|| {
184 let mut left_datums = left_datum_vec.borrow();
185 left_datums.extend(&[Datum::Int64(left_diff.into_inner())]);
186 left_datums.extend(left_row.iter());
187 let mut right_datums = right_datum_vec.borrow();
188 right_datums.extend(&[Datum::Int64(right_diff.into_inner())]);
189 right_datums.extend(right_row.iter());
190 compare_columns(order_by, &left_datums, &right_datums, || {
191 left_row.cmp(right_row).then(left_diff.cmp(right_diff))
192 })
193 })
194 },
195 );
196 }
197 SubscribeOutput::EnvelopeUpsert { order_by_keys }
198 | SubscribeOutput::EnvelopeDebezium { order_by_keys } => {
199 let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. });
200 let mut left_datum_vec = mz_repr::DatumVec::new();
201 let mut right_datum_vec = mz_repr::DatumVec::new();
202 rows.sort_by(
203 |(left_time, left_row, left_diff), (right_time, right_row, right_diff)| {
204 left_time.cmp(right_time).then_with(|| {
205 let left_datums = left_datum_vec.borrow_with(left_row);
206 let right_datums = right_datum_vec.borrow_with(right_row);
207 compare_columns(order_by_keys, &left_datums, &right_datums, || {
208 left_diff.cmp(right_diff)
209 })
210 })
211 },
212 );
213
214 let mut new_rows = Vec::new();
215 let mut it = rows.iter();
216 let mut datum_vec = mz_repr::DatumVec::new();
217 let mut old_datum_vec = mz_repr::DatumVec::new();
218 while let Some(start) = it.next() {
219 let group = iter::once(start)
220 .chain(it.take_while_ref(|row| {
221 let left_datums = left_datum_vec.borrow_with(&start.1);
222 let right_datums = right_datum_vec.borrow_with(&row.1);
223 start.0 == row.0
224 && compare_columns(
225 order_by_keys,
226 &left_datums,
227 &right_datums,
228 || Ordering::Equal,
229 ) == Ordering::Equal
230 }))
231 .collect_vec();
232
233 let value_columns = self.arity - order_by_keys.len();
239 let mut packer = row_buf.packer();
240 new_rows.push(match &group[..] {
241 [(_, row, Diff::ONE)] => {
242 packer.push(if debezium {
243 Datum::String("insert")
244 } else {
245 Datum::String("upsert")
246 });
247 let datums = datum_vec.borrow_with(row);
248 for column_order in order_by_keys {
249 packer.push(datums[column_order.column]);
250 }
251 if debezium {
252 for _ in 0..value_columns {
253 packer.push(Datum::Null);
254 }
255 }
256 for idx in 0..self.arity {
257 if !order_by_keys.iter().any(|co| co.column == idx) {
258 packer.push(datums[idx]);
259 }
260 }
261 (start.0, row_buf.clone(), Diff::ZERO)
262 }
263 [(_, _, Diff::MINUS_ONE)] => {
264 packer.push(Datum::String("delete"));
265 let datums = datum_vec.borrow_with(&start.1);
266 for column_order in order_by_keys {
267 packer.push(datums[column_order.column]);
268 }
269 if debezium {
270 for idx in 0..self.arity {
271 if !order_by_keys.iter().any(|co| co.column == idx) {
272 packer.push(datums[idx]);
273 }
274 }
275 }
276 for _ in 0..self.arity - order_by_keys.len() {
277 packer.push(Datum::Null);
278 }
279 (start.0, row_buf.clone(), Diff::ZERO)
280 }
281 [(_, old_row, Diff::MINUS_ONE), (_, row, Diff::ONE)] => {
282 packer.push(Datum::String("upsert"));
283 let datums = datum_vec.borrow_with(row);
284 let old_datums = old_datum_vec.borrow_with(old_row);
285
286 for column_order in order_by_keys {
287 packer.push(datums[column_order.column]);
288 }
289 if debezium {
290 for idx in 0..self.arity {
291 if !order_by_keys.iter().any(|co| co.column == idx) {
292 packer.push(old_datums[idx]);
293 }
294 }
295 }
296 for idx in 0..self.arity {
297 if !order_by_keys.iter().any(|co| co.column == idx) {
298 packer.push(datums[idx]);
299 }
300 }
301 (start.0, row_buf.clone(), Diff::ZERO)
302 }
303 _ => {
304 packer.push(Datum::String("key_violation"));
305 let datums = datum_vec.borrow_with(&start.1);
306 for column_order in order_by_keys {
307 packer.push(datums[column_order.column]);
308 }
309 if debezium {
310 for _ in 0..(self.arity - order_by_keys.len()) {
311 packer.push(Datum::Null);
312 }
313 }
314 for _ in 0..(self.arity - order_by_keys.len()) {
315 packer.push(Datum::Null);
316 }
317 (start.0, row_buf.clone(), Diff::ZERO)
318 }
319 });
320 }
321 rows = new_rows;
322 }
323 SubscribeOutput::Diffs => rows.sort_by_key(|(time, _, _)| *time),
324 }
325
326 let rows: Vec<Row> = rows
327 .into_iter()
328 .map(|(time, row, diff)| {
329 assert!(self.as_of <= time);
330 let mut packer = row_buf.packer();
331 packer.push(Datum::from(numeric::Numeric::from(time)));
333 if self.emit_progress {
334 packer.push(Datum::False);
339 }
340
341 match &self.output {
342 SubscribeOutput::EnvelopeUpsert { .. }
343 | SubscribeOutput::EnvelopeDebezium { .. } => {}
344 SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
345 packer.push(Datum::Int64(diff.into_inner()));
346 }
347 }
348
349 packer.extend_by_row(&row);
350
351 row_buf.clone()
352 })
353 .collect();
354 let rows = Box::new(rows.into_row_iter());
355
356 self.send(PeekResponseUnary::Rows(rows));
357
358 if !batch.upper.less_equal(&self.as_of) {
363 self.send_progress_message(&batch.upper);
364 }
365
366 batch.upper.is_empty()
367 }
368
369 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
375 let message = match reason {
376 ActiveComputeSinkRetireReason::Finished => return,
377 ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
378 ActiveComputeSinkRetireReason::DependencyDropped(d) => PeekResponseUnary::Error(
379 format!("subscribe has been terminated because underlying {d} was dropped"),
380 ),
381 };
382 self.send(message);
383 }
384
385 fn send(&self, response: PeekResponseUnary) {
388 let _ = self.channel.send(response);
391 }
392}
393
394#[derive(Debug)]
396pub struct ActiveCopyTo {
397 pub conn_id: ConnectionId,
399 pub tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
401 pub cluster_id: ClusterId,
403 pub depends_on: BTreeSet<GlobalId>,
405}
406
407impl ActiveCopyTo {
408 pub fn retire_with_response(self, response: Result<u64, anyhow::Error>) {
417 let response = match response {
418 Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
419 Err(error) => Err(AdapterError::Unstructured(error)),
420 };
421 let _ = self.tx.send(response);
422 }
423
424 pub fn retire(self, reason: ActiveComputeSinkRetireReason) {
429 let message = match reason {
430 ActiveComputeSinkRetireReason::Finished => return,
431 ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
432 ActiveComputeSinkRetireReason::DependencyDropped(d) => Err(AdapterError::Unstructured(
433 anyhow!("copy has been terminated because underlying {d} was dropped"),
434 )),
435 };
436 let _ = self.tx.send(message);
437 }
438}
439
440#[derive(Debug)]
442pub(crate) struct ActiveCopyFrom {
443 pub ingestion_id: uuid::Uuid,
445 pub cluster_id: StorageInstanceId,
447 pub table_id: CatalogItemId,
449 pub ctx: ExecuteContext,
451}