1use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_interchange::avro::WriterSchemaProvider;
25use mz_ore::error::ErrorExt;
26use mz_ore::future::InTask;
27use mz_repr::{Datum, Diff, Row};
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
30use mz_storage_types::sources::encoding::{
31 AvroEncoding, CsvDecoderState, DataEncoding, RegexEncoding,
32};
33use mz_storage_types::wire_format::WireFormat;
34use mz_timely_util::builder_async::{
35 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use regex::Regex;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::StreamVec;
40use timely::dataflow::channels::pact::Exchange;
41use timely::dataflow::operators::Operator;
42use timely::dataflow::operators::vec::Map;
43use timely::progress::Timestamp;
44use timely::scheduling::SyncActivator;
45use tracing::error;
46
47use crate::decode::avro::AvroDecoderState;
48use crate::decode::protobuf::ProtobufDecoderState;
49use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
50use crate::metrics::decode::DecodeMetricDefs;
51use crate::source::types::{DecodeResult, SourceOutput};
52
53mod avro;
54mod protobuf;
55
56pub fn render_decode_cdcv2<'scope, FromTime: Timestamp>(
62 input: &VecCollection<'scope, mz_repr::Timestamp, DecodeResult<FromTime>, Diff>,
63) -> (
64 VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
65 PressOnDropButton,
66) {
67 let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
68 let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
69
70 let mut row_buf = Row::default();
71 let channel_tx = Rc::clone(&channel_rx);
72 let activator_get = Rc::clone(&activator_set);
73 let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
74 let input2 = input.inner.clone();
75 input2.sink(pact, "CDCv2Unpack", move |(input, _)| {
76 input.for_each(|_time, data| {
77 for (row, _time, _diff) in data.drain(..) {
82 let mut record = match &row.value {
83 Some(Ok(row)) => row.iter(),
84 Some(Err(err)) => {
85 error!("Ignoring errored record: {err}");
86 continue;
87 }
88 None => continue,
89 };
90 let message = match (record.next().unwrap(), record.next().unwrap()) {
91 (Datum::List(datum_updates), Datum::Null) => {
92 let mut updates = vec![];
93 for update in datum_updates.iter() {
94 let mut update = update.unwrap_list().iter();
95 let data = update.next().unwrap().unwrap_list();
96 let time = update.next().unwrap().unwrap_int64();
97 let diff = Diff::from(update.next().unwrap().unwrap_int64());
98
99 row_buf.packer().extend(data);
100 let data = row_buf.clone();
101 let time = u64::try_from(time).expect("non-negative");
102 let time = mz_repr::Timestamp::from(time);
103 updates.push((data, time, diff));
104 }
105 Message::Updates(updates)
106 }
107 (Datum::Null, Datum::List(progress)) => {
108 let mut progress = progress.iter();
109 let mut lower = vec![];
110 for time in progress.next().unwrap().unwrap_list() {
111 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
112 lower.push(mz_repr::Timestamp::from(time));
113 }
114 let mut upper = vec![];
115 for time in progress.next().unwrap().unwrap_list() {
116 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
117 upper.push(mz_repr::Timestamp::from(time));
118 }
119 let mut counts = vec![];
120 for pair in progress.next().unwrap().unwrap_list() {
121 let mut pair = pair.unwrap_list().iter();
122 let time = pair.next().unwrap().unwrap_int64();
123 let count = pair.next().unwrap().unwrap_int64();
124
125 let time = u64::try_from(time).expect("non-negative");
126 let count = usize::try_from(count).expect("non-negative");
127 counts.push((mz_repr::Timestamp::from(time), count));
128 }
129 let progress = Progress {
130 lower,
131 upper,
132 counts,
133 };
134 Message::Progress(progress)
135 }
136 _ => unreachable!("invalid input"),
137 };
138 channel_tx.borrow_mut().push_back(message);
139 }
140 });
141 if let Some(activator) = activator_get.borrow_mut().as_mut() {
142 activator.activate().unwrap()
143 }
144 });
145
146 struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
147 impl<T> Iterator for VdIterator<T> {
148 type Item = T;
149 fn next(&mut self) -> Option<T> {
150 self.0.borrow_mut().pop_front()
151 }
152 }
153 let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
155 *activator_set.borrow_mut() = Some(ac);
156 YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
157 });
158
159 let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
162 let button = builder.build(move |_caps| async move {
163 let _dd_token = token;
164 std::future::pending::<()>().await;
166 });
167 (stream.as_collection(), button.press_on_drop())
168}
169
170pub struct YieldingIter<I> {
172 start: Option<std::time::Instant>,
174 after: Duration,
175 iter: I,
176}
177
178impl<I> YieldingIter<I> {
179 pub fn new_from(iter: I, yield_after: Duration) -> Self {
181 Self {
182 start: None,
183 after: yield_after,
184 iter,
185 }
186 }
187}
188
189impl<I: Iterator> Iterator for YieldingIter<I> {
190 type Item = I::Item;
191 fn next(&mut self) -> Option<Self::Item> {
192 if self.start.is_none() {
193 self.start = Some(std::time::Instant::now());
194 }
195 let start = self.start.as_ref().unwrap();
196 if start.elapsed() > self.after {
197 self.start = None;
198 None
199 } else {
200 match self.iter.next() {
201 Some(x) => Some(x),
202 None => {
203 self.start = None;
204 None
205 }
206 }
207 }
208 }
209}
210
211#[derive(Debug)]
215pub(crate) enum PreDelimitedFormat {
216 Bytes,
217 Text,
218 Json,
219 Regex(Regex, Row),
220 Protobuf(ProtobufDecoderState),
221}
222
223impl PreDelimitedFormat {
224 pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
225 match self {
226 PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
227 PreDelimitedFormat::Json => {
228 let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
229 DecodeErrorKind::Bytes(
230 format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
231 )
232 })?;
233 Ok(Some(j.into_row()))
234 }
235 PreDelimitedFormat::Text => {
236 let s = std::str::from_utf8(bytes)
237 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
238 Ok(Some(Row::pack(Some(Datum::String(s)))))
239 }
240 PreDelimitedFormat::Regex(regex, row_buf) => {
241 let s = std::str::from_utf8(bytes)
242 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
243 let captures = match regex.captures(s) {
244 Some(captures) => captures,
245 None => return Ok(None),
246 };
247 row_buf.packer().extend(
248 captures
249 .iter()
250 .skip(1)
251 .map(|c| Datum::from(c.map(|c| c.as_str()))),
252 );
253 Ok(Some(row_buf.clone()))
254 }
255 PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
256 }
257 }
258}
259
260#[derive(Debug)]
261pub(crate) enum DataDecoderInner {
262 Avro(AvroDecoderState),
263 DelimitedBytes {
264 delimiter: u8,
265 format: PreDelimitedFormat,
266 },
267 Csv(CsvDecoderState),
268
269 PreDelimited(PreDelimitedFormat),
270}
271
272#[derive(Debug)]
273struct DataDecoder {
274 inner: DataDecoderInner,
275 metrics: DecodeMetricDefs,
276}
277
278impl DataDecoder {
279 pub async fn next(
280 &mut self,
281 bytes: &mut &[u8],
282 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
283 let result = match &mut self.inner {
284 DataDecoderInner::DelimitedBytes { delimiter, format } => {
285 match bytes.iter().position(|&byte| byte == *delimiter) {
286 Some(chunk_idx) => {
287 let data = &bytes[0..chunk_idx];
288 *bytes = &bytes[chunk_idx + 1..];
289 format.decode(data)
290 }
291 None => Ok(None),
292 }
293 }
294 DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
295 DataDecoderInner::Csv(csv) => csv.decode(bytes),
296 DataDecoderInner::PreDelimited(format) => {
297 let result = format.decode(*bytes);
298 *bytes = &[];
299 result
300 }
301 };
302 Ok(result)
303 }
304
305 pub fn eof(
310 &mut self,
311 bytes: &mut &[u8],
312 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
313 let result = match &mut self.inner {
314 DataDecoderInner::Csv(csv) => {
315 let result = csv.decode(bytes);
316 csv.reset_for_new_object();
317 result
318 }
319 DataDecoderInner::DelimitedBytes { format, .. } => {
320 let data = std::mem::take(bytes);
321 if data.is_empty() {
324 Ok(None)
325 } else {
326 format.decode(data)
327 }
328 }
329 _ => Ok(None),
330 };
331 Ok(result)
332 }
333
334 pub fn log_errors(&self, n: usize) {
335 self.metrics.count_errors(&self.inner, n);
336 }
337
338 pub fn log_successes(&self, n: usize) {
339 self.metrics.count_successes(&self.inner, n);
340 }
341}
342
343async fn get_decoder(
344 encoding: DataEncoding,
345 debug_name: &str,
346 is_connection_delimited: bool,
350 metrics: DecodeMetricDefs,
351 storage_configuration: &StorageConfiguration,
352) -> Result<DataDecoder, CsrConnectError> {
353 let decoder = match encoding {
354 DataEncoding::Avro(AvroEncoding {
355 schema,
356 reference_schemas,
357 wire_format,
358 }) => {
359 let writer_schemas = match wire_format {
360 WireFormat::None => WriterSchemaProvider::None,
361 WireFormat::Confluent { registry } => {
362 let client = match registry {
363 None => None,
364 Some(csr_connection) => Some(
365 csr_connection
366 .connect(storage_configuration, InTask::Yes)
367 .await?,
368 ),
369 };
370 WriterSchemaProvider::confluent(client)
371 }
372 WireFormat::Glue { registry } => {
373 let client_with_registry = match registry {
374 None => None,
375 Some(glue_connection) => {
376 let enforce_external_addresses =
377 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
378 .get(storage_configuration.config_set());
379 let sdk_config = glue_connection
380 .aws_connection
381 .connection
382 .load_sdk_config(
383 &storage_configuration.connection_context,
384 glue_connection.aws_connection.connection_id,
385 InTask::Yes,
386 enforce_external_addresses,
387 )
388 .await?;
389 let client =
390 mz_aws_glue_schema_registry::ClientConfig::new(sdk_config).build();
391 Some((client, glue_connection.registry_name.clone()))
392 }
393 };
394 WriterSchemaProvider::glue(client_with_registry)
395 }
396 };
397 let state = avro::AvroDecoderState::new(
398 &schema,
399 &reference_schemas,
400 writer_schemas,
401 debug_name.to_string(),
402 )
403 .expect(
404 "avro decoder construction is infallible once writer-schema setup has succeeded",
405 );
406 DataDecoder {
407 inner: DataDecoderInner::Avro(state),
408 metrics,
409 }
410 }
411 DataEncoding::Text
412 | DataEncoding::Bytes
413 | DataEncoding::Json
414 | DataEncoding::Protobuf(_)
415 | DataEncoding::Regex(_) => {
416 let after_delimiting = match encoding {
417 DataEncoding::Regex(RegexEncoding { regex }) => {
418 PreDelimitedFormat::Regex(regex.regex, Default::default())
419 }
420 DataEncoding::Protobuf(encoding) => {
421 PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
422 "Failed to create protobuf decoder, even though we validated ccsr \
423 client creation in purification.",
424 ))
425 }
426 DataEncoding::Bytes => PreDelimitedFormat::Bytes,
427 DataEncoding::Json => PreDelimitedFormat::Json,
428 DataEncoding::Text => PreDelimitedFormat::Text,
429 _ => unreachable!(),
430 };
431 let inner = if is_connection_delimited {
432 DataDecoderInner::PreDelimited(after_delimiting)
433 } else {
434 DataDecoderInner::DelimitedBytes {
435 delimiter: b'\n',
436 format: after_delimiting,
437 }
438 };
439 DataDecoder { inner, metrics }
440 }
441 DataEncoding::Csv(enc) => {
442 let state = CsvDecoderState::new(enc);
443 DataDecoder {
444 inner: DataDecoderInner::Csv(state),
445 metrics,
446 }
447 }
448 };
449 Ok(decoder)
450}
451
452async fn decode_delimited(
453 decoder: &mut DataDecoder,
454 buf: &[u8],
455) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
456 let mut remaining_buf = buf;
457 let value = decoder.next(&mut remaining_buf).await?;
458
459 let result = match value {
460 Ok(value) => {
461 if remaining_buf.is_empty() {
462 match value {
463 Some(value) => Ok(Some(value)),
464 None => decoder.eof(&mut remaining_buf)?,
465 }
466 } else {
467 Err(DecodeErrorKind::Text(
468 format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
469 .into(),
470 ))
471 }
472 }
473 Err(err) => Err(err),
474 };
475
476 Ok(result.map_err(|inner| DecodeError {
477 kind: inner,
478 raw: buf.to_vec(),
479 }))
480}
481
482pub fn render_decode_delimited<'scope, T: Timestamp, FromTime: Timestamp>(
495 input: VecCollection<'scope, T, SourceOutput<FromTime>, Diff>,
496 key_encoding: Option<DataEncoding>,
497 value_encoding: DataEncoding,
498 debug_name: String,
499 metrics: DecodeMetricDefs,
500 storage_configuration: StorageConfiguration,
501) -> (
502 VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
503 StreamVec<'scope, T, HealthStatusMessage>,
504) {
505 let op_name = format!(
506 "{}{}DecodeDelimited",
507 key_encoding
508 .as_ref()
509 .map(|key_encoding| key_encoding.op_name())
510 .unwrap_or(""),
511 value_encoding.op_name()
512 );
513 let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
514
515 let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
516
517 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
518 let mut input = builder.new_input_for(input.inner, Exchange::new(dist), &output_handle);
519
520 let (_, transient_errors) = builder.build_fallible(move |caps| {
521 Box::pin(async move {
522 let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
523
524 let mut key_decoder = match key_encoding {
525 Some(encoding) => Some(
526 get_decoder(
527 encoding,
528 &debug_name,
529 true,
530 metrics.clone(),
531 &storage_configuration,
532 )
533 .await?,
534 ),
535 None => None,
536 };
537
538 let mut value_decoder = get_decoder(
539 value_encoding,
540 &debug_name,
541 true,
542 metrics,
543 &storage_configuration,
544 )
545 .await?;
546
547 let mut output_container = Vec::new();
548
549 while let Some(event) = input.next().await {
550 match event {
551 AsyncEvent::Data(cap, data) => {
552 let mut n_errors = 0;
553 let mut n_successes = 0;
554 for (output, ts, diff) in data.iter() {
555 let key_buf = match output.key.unpack_first() {
556 Datum::Bytes(buf) => Some(buf),
557 Datum::Null => None,
558 d => unreachable!("invalid datum: {d}"),
559 };
560
561 let key = match key_decoder.as_mut().zip(key_buf) {
562 Some((decoder, buf)) => {
563 decode_delimited(decoder, buf).await?.transpose()
564 }
565 None => None,
566 };
567
568 let value = match output.value.unpack_first() {
569 Datum::Bytes(buf) => {
570 decode_delimited(&mut value_decoder, buf).await?.transpose()
571 }
572 Datum::Null => None,
573 d => unreachable!("invalid datum: {d}"),
574 };
575
576 if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
577 n_errors += 1;
578 } else if matches!(&value, Some(Ok(_))) {
579 n_successes += 1;
580 }
581
582 let result = DecodeResult {
583 key,
584 value,
585 metadata: output.metadata.clone(),
586 from_time: output.from_time.clone(),
587 };
588 output_container.push((result, ts.clone(), *diff));
589 }
590
591 if n_errors > 0 {
593 value_decoder.log_errors(n_errors);
594 }
595 if n_successes > 0 {
596 value_decoder.log_successes(n_successes);
597 }
598
599 output_handle.give_container(&cap, &mut output_container);
600 }
601 AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
602 }
603 }
604
605 Ok(())
606 })
607 });
608
609 let health = transient_errors.map(|err: Rc<CsrConnectError>| {
610 let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
611 HealthStatusMessage {
612 id: None,
613 namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
614 StatusNamespace::Ssh
615 } else {
616 StatusNamespace::Decode
617 },
618 update: halt_status,
619 }
620 });
621
622 (output.as_collection(), health)
623}