1use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Collection, Hashable};
23use futures::StreamExt;
24use mz_ore::error::ErrorExt;
25use mz_ore::future::InTask;
26use mz_repr::{Datum, Diff, Row};
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
29use mz_storage_types::sources::encoding::{AvroEncoding, DataEncoding, RegexEncoding};
30use mz_timely_util::builder_async::{
31    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use regex::Regex;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::Exchange;
36use timely::dataflow::operators::{Map, Operator};
37use timely::dataflow::{Scope, Stream};
38use timely::progress::Timestamp;
39use timely::scheduling::SyncActivator;
40use tracing::error;
41
42use crate::decode::avro::AvroDecoderState;
43use crate::decode::csv::CsvDecoderState;
44use crate::decode::protobuf::ProtobufDecoderState;
45use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
46use crate::metrics::decode::DecodeMetricDefs;
47use crate::source::types::{DecodeResult, SourceOutput};
48
49mod avro;
50mod csv;
51mod protobuf;
52
53pub fn render_decode_cdcv2<G: Scope<Timestamp = mz_repr::Timestamp>, FromTime: Timestamp>(
59    input: &Collection<G, DecodeResult<FromTime>, Diff>,
60) -> (Collection<G, Row, Diff>, PressOnDropButton) {
61    let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
62    let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
63
64    let mut row_buf = Row::default();
65    let channel_tx = Rc::clone(&channel_rx);
66    let activator_get = Rc::clone(&activator_set);
67    let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
68    input.inner.sink(pact, "CDCv2Unpack", move |input| {
69        while let Some((_, data)) = input.next() {
70            for (row, _time, _diff) in data.drain(..) {
75                let mut record = match &row.value {
76                    Some(Ok(row)) => row.iter(),
77                    Some(Err(err)) => {
78                        error!("Ignoring errored record: {err}");
79                        continue;
80                    }
81                    None => continue,
82                };
83                let message = match (record.next().unwrap(), record.next().unwrap()) {
84                    (Datum::List(datum_updates), Datum::Null) => {
85                        let mut updates = vec![];
86                        for update in datum_updates.iter() {
87                            let mut update = update.unwrap_list().iter();
88                            let data = update.next().unwrap().unwrap_list();
89                            let time = update.next().unwrap().unwrap_int64();
90                            let diff = Diff::from(update.next().unwrap().unwrap_int64());
91
92                            row_buf.packer().extend(&data);
93                            let data = row_buf.clone();
94                            let time = u64::try_from(time).expect("non-negative");
95                            let time = mz_repr::Timestamp::from(time);
96                            updates.push((data, time, diff));
97                        }
98                        Message::Updates(updates)
99                    }
100                    (Datum::Null, Datum::List(progress)) => {
101                        let mut progress = progress.iter();
102                        let mut lower = vec![];
103                        for time in &progress.next().unwrap().unwrap_list() {
104                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
105                            lower.push(mz_repr::Timestamp::from(time));
106                        }
107                        let mut upper = vec![];
108                        for time in &progress.next().unwrap().unwrap_list() {
109                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
110                            upper.push(mz_repr::Timestamp::from(time));
111                        }
112                        let mut counts = vec![];
113                        for pair in &progress.next().unwrap().unwrap_list() {
114                            let mut pair = pair.unwrap_list().iter();
115                            let time = pair.next().unwrap().unwrap_int64();
116                            let count = pair.next().unwrap().unwrap_int64();
117
118                            let time = u64::try_from(time).expect("non-negative");
119                            let count = usize::try_from(count).expect("non-negative");
120                            counts.push((mz_repr::Timestamp::from(time), count));
121                        }
122                        let progress = Progress {
123                            lower,
124                            upper,
125                            counts,
126                        };
127                        Message::Progress(progress)
128                    }
129                    _ => unreachable!("invalid input"),
130                };
131                channel_tx.borrow_mut().push_back(message);
132            }
133        }
134        if let Some(activator) = activator_get.borrow_mut().as_mut() {
135            activator.activate().unwrap()
136        }
137    });
138
139    struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
140    impl<T> Iterator for VdIterator<T> {
141        type Item = T;
142        fn next(&mut self) -> Option<T> {
143            self.0.borrow_mut().pop_front()
144        }
145    }
146    let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
148        *activator_set.borrow_mut() = Some(ac);
149        YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
150    });
151
152    let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
155    let button = builder.build(move |_caps| async move {
156        let _dd_token = token;
157        std::future::pending::<()>().await;
159    });
160    (stream.as_collection(), button.press_on_drop())
161}
162
163pub struct YieldingIter<I> {
165    start: Option<std::time::Instant>,
167    after: Duration,
168    iter: I,
169}
170
171impl<I> YieldingIter<I> {
172    pub fn new_from(iter: I, yield_after: Duration) -> Self {
174        Self {
175            start: None,
176            after: yield_after,
177            iter,
178        }
179    }
180}
181
182impl<I: Iterator> Iterator for YieldingIter<I> {
183    type Item = I::Item;
184    fn next(&mut self) -> Option<Self::Item> {
185        if self.start.is_none() {
186            self.start = Some(std::time::Instant::now());
187        }
188        let start = self.start.as_ref().unwrap();
189        if start.elapsed() > self.after {
190            self.start = None;
191            None
192        } else {
193            match self.iter.next() {
194                Some(x) => Some(x),
195                None => {
196                    self.start = None;
197                    None
198                }
199            }
200        }
201    }
202}
203
204#[derive(Debug)]
208pub(crate) enum PreDelimitedFormat {
209    Bytes,
210    Text,
211    Json,
212    Regex(Regex, Row),
213    Protobuf(ProtobufDecoderState),
214}
215
216impl PreDelimitedFormat {
217    pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
218        match self {
219            PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
220            PreDelimitedFormat::Json => {
221                let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
222                    DecodeErrorKind::Bytes(
223                        format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
224                    )
225                })?;
226                Ok(Some(j.into_row()))
227            }
228            PreDelimitedFormat::Text => {
229                let s = std::str::from_utf8(bytes)
230                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
231                Ok(Some(Row::pack(Some(Datum::String(s)))))
232            }
233            PreDelimitedFormat::Regex(regex, row_buf) => {
234                let s = std::str::from_utf8(bytes)
235                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
236                let captures = match regex.captures(s) {
237                    Some(captures) => captures,
238                    None => return Ok(None),
239                };
240                row_buf.packer().extend(
241                    captures
242                        .iter()
243                        .skip(1)
244                        .map(|c| Datum::from(c.map(|c| c.as_str()))),
245                );
246                Ok(Some(row_buf.clone()))
247            }
248            PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
249        }
250    }
251}
252
253#[derive(Debug)]
254pub(crate) enum DataDecoderInner {
255    Avro(AvroDecoderState),
256    DelimitedBytes {
257        delimiter: u8,
258        format: PreDelimitedFormat,
259    },
260    Csv(CsvDecoderState),
261
262    PreDelimited(PreDelimitedFormat),
263}
264
265#[derive(Debug)]
266struct DataDecoder {
267    inner: DataDecoderInner,
268    metrics: DecodeMetricDefs,
269}
270
271impl DataDecoder {
272    pub async fn next(
273        &mut self,
274        bytes: &mut &[u8],
275    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
276        let result = match &mut self.inner {
277            DataDecoderInner::DelimitedBytes { delimiter, format } => {
278                match bytes.iter().position(|&byte| byte == *delimiter) {
279                    Some(chunk_idx) => {
280                        let data = &bytes[0..chunk_idx];
281                        *bytes = &bytes[chunk_idx + 1..];
282                        format.decode(data)
283                    }
284                    None => Ok(None),
285                }
286            }
287            DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
288            DataDecoderInner::Csv(csv) => csv.decode(bytes),
289            DataDecoderInner::PreDelimited(format) => {
290                let result = format.decode(*bytes);
291                *bytes = &[];
292                result
293            }
294        };
295        Ok(result)
296    }
297
298    pub fn eof(
303        &mut self,
304        bytes: &mut &[u8],
305    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
306        let result = match &mut self.inner {
307            DataDecoderInner::Csv(csv) => {
308                let result = csv.decode(bytes);
309                csv.reset_for_new_object();
310                result
311            }
312            DataDecoderInner::DelimitedBytes { format, .. } => {
313                let data = std::mem::take(bytes);
314                if data.is_empty() {
317                    Ok(None)
318                } else {
319                    format.decode(data)
320                }
321            }
322            _ => Ok(None),
323        };
324        Ok(result)
325    }
326
327    pub fn log_errors(&self, n: usize) {
328        self.metrics.count_errors(&self.inner, n);
329    }
330
331    pub fn log_successes(&self, n: usize) {
332        self.metrics.count_successes(&self.inner, n);
333    }
334}
335
336async fn get_decoder(
337    encoding: DataEncoding,
338    debug_name: &str,
339    is_connection_delimited: bool,
343    metrics: DecodeMetricDefs,
344    storage_configuration: &StorageConfiguration,
345) -> Result<DataDecoder, CsrConnectError> {
346    let decoder = match encoding {
347        DataEncoding::Avro(AvroEncoding {
348            schema,
349            csr_connection,
350            confluent_wire_format,
351        }) => {
352            let csr_client = match csr_connection {
353                None => None,
354                Some(csr_connection) => {
355                    let csr_client = csr_connection
356                        .connect(storage_configuration, InTask::Yes)
357                        .await?;
358                    Some(csr_client)
359                }
360            };
361            let state = avro::AvroDecoderState::new(
362                &schema,
363                csr_client,
364                debug_name.to_string(),
365                confluent_wire_format,
366            )
367            .expect("Failed to create avro decoder, even though we validated ccsr client creation in purification.");
368            DataDecoder {
369                inner: DataDecoderInner::Avro(state),
370                metrics,
371            }
372        }
373        DataEncoding::Text
374        | DataEncoding::Bytes
375        | DataEncoding::Json
376        | DataEncoding::Protobuf(_)
377        | DataEncoding::Regex(_) => {
378            let after_delimiting = match encoding {
379                DataEncoding::Regex(RegexEncoding { regex }) => {
380                    PreDelimitedFormat::Regex(regex.regex, Default::default())
381                }
382                DataEncoding::Protobuf(encoding) => {
383                    PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
384                        "Failed to create protobuf decoder, even though we validated ccsr \
385                                    client creation in purification.",
386                    ))
387                }
388                DataEncoding::Bytes => PreDelimitedFormat::Bytes,
389                DataEncoding::Json => PreDelimitedFormat::Json,
390                DataEncoding::Text => PreDelimitedFormat::Text,
391                _ => unreachable!(),
392            };
393            let inner = if is_connection_delimited {
394                DataDecoderInner::PreDelimited(after_delimiting)
395            } else {
396                DataDecoderInner::DelimitedBytes {
397                    delimiter: b'\n',
398                    format: after_delimiting,
399                }
400            };
401            DataDecoder { inner, metrics }
402        }
403        DataEncoding::Csv(enc) => {
404            let state = CsvDecoderState::new(enc);
405            DataDecoder {
406                inner: DataDecoderInner::Csv(state),
407                metrics,
408            }
409        }
410    };
411    Ok(decoder)
412}
413
414async fn decode_delimited(
415    decoder: &mut DataDecoder,
416    buf: &[u8],
417) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
418    let mut remaining_buf = buf;
419    let value = decoder.next(&mut remaining_buf).await?;
420
421    let result = match value {
422        Ok(value) => {
423            if remaining_buf.is_empty() {
424                match value {
425                    Some(value) => Ok(Some(value)),
426                    None => decoder.eof(&mut remaining_buf)?,
427                }
428            } else {
429                Err(DecodeErrorKind::Text(
430                    format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
431                        .into(),
432                ))
433            }
434        }
435        Err(err) => Err(err),
436    };
437
438    Ok(result.map_err(|inner| DecodeError {
439        kind: inner,
440        raw: buf.to_vec(),
441    }))
442}
443
444pub fn render_decode_delimited<G: Scope, FromTime: Timestamp>(
457    input: &Collection<G, SourceOutput<FromTime>, Diff>,
458    key_encoding: Option<DataEncoding>,
459    value_encoding: DataEncoding,
460    debug_name: String,
461    metrics: DecodeMetricDefs,
462    storage_configuration: StorageConfiguration,
463) -> (
464    Collection<G, DecodeResult<FromTime>, Diff>,
465    Stream<G, HealthStatusMessage>,
466) {
467    let op_name = format!(
468        "{}{}DecodeDelimited",
469        key_encoding
470            .as_ref()
471            .map(|key_encoding| key_encoding.op_name())
472            .unwrap_or(""),
473        value_encoding.op_name()
474    );
475    let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
476
477    let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
478
479    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
480    let mut input = builder.new_input_for(&input.inner, Exchange::new(dist), &output_handle);
481
482    let (_, transient_errors) = builder.build_fallible(move |caps| {
483        Box::pin(async move {
484            let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
485
486            let mut key_decoder = match key_encoding {
487                Some(encoding) => Some(
488                    get_decoder(
489                        encoding,
490                        &debug_name,
491                        true,
492                        metrics.clone(),
493                        &storage_configuration,
494                    )
495                    .await?,
496                ),
497                None => None,
498            };
499
500            let mut value_decoder = get_decoder(
501                value_encoding,
502                &debug_name,
503                true,
504                metrics,
505                &storage_configuration,
506            )
507            .await?;
508
509            let mut output_container = Vec::new();
510
511            while let Some(event) = input.next().await {
512                match event {
513                    AsyncEvent::Data(cap, data) => {
514                        let mut n_errors = 0;
515                        let mut n_successes = 0;
516                        for (output, ts, diff) in data.iter() {
517                            let key_buf = match output.key.unpack_first() {
518                                Datum::Bytes(buf) => Some(buf),
519                                Datum::Null => None,
520                                d => unreachable!("invalid datum: {d}"),
521                            };
522
523                            let key = match key_decoder.as_mut().zip(key_buf) {
524                                Some((decoder, buf)) => {
525                                    decode_delimited(decoder, buf).await?.transpose()
526                                }
527                                None => None,
528                            };
529
530                            let value = match output.value.unpack_first() {
531                                Datum::Bytes(buf) => {
532                                    decode_delimited(&mut value_decoder, buf).await?.transpose()
533                                }
534                                Datum::Null => None,
535                                d => unreachable!("invalid datum: {d}"),
536                            };
537
538                            if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
539                                n_errors += 1;
540                            } else if matches!(&value, Some(Ok(_))) {
541                                n_successes += 1;
542                            }
543
544                            let result = DecodeResult {
545                                key,
546                                value,
547                                metadata: output.metadata.clone(),
548                                from_time: output.from_time.clone(),
549                            };
550                            output_container.push((result, ts.clone(), *diff));
551                        }
552
553                        if n_errors > 0 {
555                            value_decoder.log_errors(n_errors);
556                        }
557                        if n_successes > 0 {
558                            value_decoder.log_successes(n_successes);
559                        }
560
561                        output_handle.give_container(&cap, &mut output_container);
562                    }
563                    AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
564                }
565            }
566
567            Ok(())
568        })
569    });
570
571    let health = transient_errors.map(|err: Rc<CsrConnectError>| {
572        let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
573        HealthStatusMessage {
574            id: None,
575            namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
576                StatusNamespace::Ssh
577            } else {
578                StatusNamespace::Decode
579            },
580            update: halt_status,
581        }
582    });
583
584    (output.as_collection(), health)
585}