1use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_ore::error::ErrorExt;
25use mz_ore::future::InTask;
26use mz_repr::{Datum, Diff, Row};
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
29use mz_storage_types::sources::encoding::{AvroEncoding, DataEncoding, RegexEncoding};
30use mz_timely_util::builder_async::{
31 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use regex::Regex;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::Exchange;
36use timely::dataflow::operators::{Map, Operator};
37use timely::dataflow::{Scope, Stream};
38use timely::progress::Timestamp;
39use timely::scheduling::SyncActivator;
40use tracing::error;
41
42use crate::decode::avro::AvroDecoderState;
43use crate::decode::csv::CsvDecoderState;
44use crate::decode::protobuf::ProtobufDecoderState;
45use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
46use crate::metrics::decode::DecodeMetricDefs;
47use crate::source::types::{DecodeResult, SourceOutput};
48
49mod avro;
50mod csv;
51mod protobuf;
52
53pub fn render_decode_cdcv2<G: Scope<Timestamp = mz_repr::Timestamp>, FromTime: Timestamp>(
59 input: &VecCollection<G, DecodeResult<FromTime>, Diff>,
60) -> (VecCollection<G, Row, Diff>, PressOnDropButton) {
61 let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
62 let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
63
64 let mut row_buf = Row::default();
65 let channel_tx = Rc::clone(&channel_rx);
66 let activator_get = Rc::clone(&activator_set);
67 let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
68 input.inner.sink(pact, "CDCv2Unpack", move |(input, _)| {
69 input.for_each(|_time, data| {
70 for (row, _time, _diff) in data.drain(..) {
75 let mut record = match &row.value {
76 Some(Ok(row)) => row.iter(),
77 Some(Err(err)) => {
78 error!("Ignoring errored record: {err}");
79 continue;
80 }
81 None => continue,
82 };
83 let message = match (record.next().unwrap(), record.next().unwrap()) {
84 (Datum::List(datum_updates), Datum::Null) => {
85 let mut updates = vec![];
86 for update in datum_updates.iter() {
87 let mut update = update.unwrap_list().iter();
88 let data = update.next().unwrap().unwrap_list();
89 let time = update.next().unwrap().unwrap_int64();
90 let diff = Diff::from(update.next().unwrap().unwrap_int64());
91
92 row_buf.packer().extend(data);
93 let data = row_buf.clone();
94 let time = u64::try_from(time).expect("non-negative");
95 let time = mz_repr::Timestamp::from(time);
96 updates.push((data, time, diff));
97 }
98 Message::Updates(updates)
99 }
100 (Datum::Null, Datum::List(progress)) => {
101 let mut progress = progress.iter();
102 let mut lower = vec![];
103 for time in progress.next().unwrap().unwrap_list() {
104 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
105 lower.push(mz_repr::Timestamp::from(time));
106 }
107 let mut upper = vec![];
108 for time in progress.next().unwrap().unwrap_list() {
109 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
110 upper.push(mz_repr::Timestamp::from(time));
111 }
112 let mut counts = vec![];
113 for pair in progress.next().unwrap().unwrap_list() {
114 let mut pair = pair.unwrap_list().iter();
115 let time = pair.next().unwrap().unwrap_int64();
116 let count = pair.next().unwrap().unwrap_int64();
117
118 let time = u64::try_from(time).expect("non-negative");
119 let count = usize::try_from(count).expect("non-negative");
120 counts.push((mz_repr::Timestamp::from(time), count));
121 }
122 let progress = Progress {
123 lower,
124 upper,
125 counts,
126 };
127 Message::Progress(progress)
128 }
129 _ => unreachable!("invalid input"),
130 };
131 channel_tx.borrow_mut().push_back(message);
132 }
133 });
134 if let Some(activator) = activator_get.borrow_mut().as_mut() {
135 activator.activate().unwrap()
136 }
137 });
138
139 struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
140 impl<T> Iterator for VdIterator<T> {
141 type Item = T;
142 fn next(&mut self) -> Option<T> {
143 self.0.borrow_mut().pop_front()
144 }
145 }
146 let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
148 *activator_set.borrow_mut() = Some(ac);
149 YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
150 });
151
152 let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
155 let button = builder.build(move |_caps| async move {
156 let _dd_token = token;
157 std::future::pending::<()>().await;
159 });
160 (stream.as_collection(), button.press_on_drop())
161}
162
163pub struct YieldingIter<I> {
165 start: Option<std::time::Instant>,
167 after: Duration,
168 iter: I,
169}
170
171impl<I> YieldingIter<I> {
172 pub fn new_from(iter: I, yield_after: Duration) -> Self {
174 Self {
175 start: None,
176 after: yield_after,
177 iter,
178 }
179 }
180}
181
182impl<I: Iterator> Iterator for YieldingIter<I> {
183 type Item = I::Item;
184 fn next(&mut self) -> Option<Self::Item> {
185 if self.start.is_none() {
186 self.start = Some(std::time::Instant::now());
187 }
188 let start = self.start.as_ref().unwrap();
189 if start.elapsed() > self.after {
190 self.start = None;
191 None
192 } else {
193 match self.iter.next() {
194 Some(x) => Some(x),
195 None => {
196 self.start = None;
197 None
198 }
199 }
200 }
201 }
202}
203
204#[derive(Debug)]
208pub(crate) enum PreDelimitedFormat {
209 Bytes,
210 Text,
211 Json,
212 Regex(Regex, Row),
213 Protobuf(ProtobufDecoderState),
214}
215
216impl PreDelimitedFormat {
217 pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
218 match self {
219 PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
220 PreDelimitedFormat::Json => {
221 let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
222 DecodeErrorKind::Bytes(
223 format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
224 )
225 })?;
226 Ok(Some(j.into_row()))
227 }
228 PreDelimitedFormat::Text => {
229 let s = std::str::from_utf8(bytes)
230 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
231 Ok(Some(Row::pack(Some(Datum::String(s)))))
232 }
233 PreDelimitedFormat::Regex(regex, row_buf) => {
234 let s = std::str::from_utf8(bytes)
235 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
236 let captures = match regex.captures(s) {
237 Some(captures) => captures,
238 None => return Ok(None),
239 };
240 row_buf.packer().extend(
241 captures
242 .iter()
243 .skip(1)
244 .map(|c| Datum::from(c.map(|c| c.as_str()))),
245 );
246 Ok(Some(row_buf.clone()))
247 }
248 PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
249 }
250 }
251}
252
253#[derive(Debug)]
254pub(crate) enum DataDecoderInner {
255 Avro(AvroDecoderState),
256 DelimitedBytes {
257 delimiter: u8,
258 format: PreDelimitedFormat,
259 },
260 Csv(CsvDecoderState),
261
262 PreDelimited(PreDelimitedFormat),
263}
264
265#[derive(Debug)]
266struct DataDecoder {
267 inner: DataDecoderInner,
268 metrics: DecodeMetricDefs,
269}
270
271impl DataDecoder {
272 pub async fn next(
273 &mut self,
274 bytes: &mut &[u8],
275 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
276 let result = match &mut self.inner {
277 DataDecoderInner::DelimitedBytes { delimiter, format } => {
278 match bytes.iter().position(|&byte| byte == *delimiter) {
279 Some(chunk_idx) => {
280 let data = &bytes[0..chunk_idx];
281 *bytes = &bytes[chunk_idx + 1..];
282 format.decode(data)
283 }
284 None => Ok(None),
285 }
286 }
287 DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
288 DataDecoderInner::Csv(csv) => csv.decode(bytes),
289 DataDecoderInner::PreDelimited(format) => {
290 let result = format.decode(*bytes);
291 *bytes = &[];
292 result
293 }
294 };
295 Ok(result)
296 }
297
298 pub fn eof(
303 &mut self,
304 bytes: &mut &[u8],
305 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
306 let result = match &mut self.inner {
307 DataDecoderInner::Csv(csv) => {
308 let result = csv.decode(bytes);
309 csv.reset_for_new_object();
310 result
311 }
312 DataDecoderInner::DelimitedBytes { format, .. } => {
313 let data = std::mem::take(bytes);
314 if data.is_empty() {
317 Ok(None)
318 } else {
319 format.decode(data)
320 }
321 }
322 _ => Ok(None),
323 };
324 Ok(result)
325 }
326
327 pub fn log_errors(&self, n: usize) {
328 self.metrics.count_errors(&self.inner, n);
329 }
330
331 pub fn log_successes(&self, n: usize) {
332 self.metrics.count_successes(&self.inner, n);
333 }
334}
335
336async fn get_decoder(
337 encoding: DataEncoding,
338 debug_name: &str,
339 is_connection_delimited: bool,
343 metrics: DecodeMetricDefs,
344 storage_configuration: &StorageConfiguration,
345) -> Result<DataDecoder, CsrConnectError> {
346 let decoder = match encoding {
347 DataEncoding::Avro(AvroEncoding {
348 schema,
349 reference_schemas,
350 csr_connection,
351 confluent_wire_format,
352 }) => {
353 let csr_client = match csr_connection {
354 None => None,
355 Some(csr_connection) => {
356 let csr_client = csr_connection
357 .connect(storage_configuration, InTask::Yes)
358 .await?;
359 Some(csr_client)
360 }
361 };
362 let state = avro::AvroDecoderState::new(
363 &schema,
364 &reference_schemas,
365 csr_client,
366 debug_name.to_string(),
367 confluent_wire_format,
368 )
369 .expect("Failed to create avro decoder, even though we validated ccsr client creation in purification.");
370 DataDecoder {
371 inner: DataDecoderInner::Avro(state),
372 metrics,
373 }
374 }
375 DataEncoding::Text
376 | DataEncoding::Bytes
377 | DataEncoding::Json
378 | DataEncoding::Protobuf(_)
379 | DataEncoding::Regex(_) => {
380 let after_delimiting = match encoding {
381 DataEncoding::Regex(RegexEncoding { regex }) => {
382 PreDelimitedFormat::Regex(regex.regex, Default::default())
383 }
384 DataEncoding::Protobuf(encoding) => {
385 PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
386 "Failed to create protobuf decoder, even though we validated ccsr \
387 client creation in purification.",
388 ))
389 }
390 DataEncoding::Bytes => PreDelimitedFormat::Bytes,
391 DataEncoding::Json => PreDelimitedFormat::Json,
392 DataEncoding::Text => PreDelimitedFormat::Text,
393 _ => unreachable!(),
394 };
395 let inner = if is_connection_delimited {
396 DataDecoderInner::PreDelimited(after_delimiting)
397 } else {
398 DataDecoderInner::DelimitedBytes {
399 delimiter: b'\n',
400 format: after_delimiting,
401 }
402 };
403 DataDecoder { inner, metrics }
404 }
405 DataEncoding::Csv(enc) => {
406 let state = CsvDecoderState::new(enc);
407 DataDecoder {
408 inner: DataDecoderInner::Csv(state),
409 metrics,
410 }
411 }
412 };
413 Ok(decoder)
414}
415
416async fn decode_delimited(
417 decoder: &mut DataDecoder,
418 buf: &[u8],
419) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
420 let mut remaining_buf = buf;
421 let value = decoder.next(&mut remaining_buf).await?;
422
423 let result = match value {
424 Ok(value) => {
425 if remaining_buf.is_empty() {
426 match value {
427 Some(value) => Ok(Some(value)),
428 None => decoder.eof(&mut remaining_buf)?,
429 }
430 } else {
431 Err(DecodeErrorKind::Text(
432 format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
433 .into(),
434 ))
435 }
436 }
437 Err(err) => Err(err),
438 };
439
440 Ok(result.map_err(|inner| DecodeError {
441 kind: inner,
442 raw: buf.to_vec(),
443 }))
444}
445
446pub fn render_decode_delimited<G: Scope, FromTime: Timestamp>(
459 input: &VecCollection<G, SourceOutput<FromTime>, Diff>,
460 key_encoding: Option<DataEncoding>,
461 value_encoding: DataEncoding,
462 debug_name: String,
463 metrics: DecodeMetricDefs,
464 storage_configuration: StorageConfiguration,
465) -> (
466 VecCollection<G, DecodeResult<FromTime>, Diff>,
467 Stream<G, HealthStatusMessage>,
468) {
469 let op_name = format!(
470 "{}{}DecodeDelimited",
471 key_encoding
472 .as_ref()
473 .map(|key_encoding| key_encoding.op_name())
474 .unwrap_or(""),
475 value_encoding.op_name()
476 );
477 let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
478
479 let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
480
481 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
482 let mut input = builder.new_input_for(&input.inner, Exchange::new(dist), &output_handle);
483
484 let (_, transient_errors) = builder.build_fallible(move |caps| {
485 Box::pin(async move {
486 let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
487
488 let mut key_decoder = match key_encoding {
489 Some(encoding) => Some(
490 get_decoder(
491 encoding,
492 &debug_name,
493 true,
494 metrics.clone(),
495 &storage_configuration,
496 )
497 .await?,
498 ),
499 None => None,
500 };
501
502 let mut value_decoder = get_decoder(
503 value_encoding,
504 &debug_name,
505 true,
506 metrics,
507 &storage_configuration,
508 )
509 .await?;
510
511 let mut output_container = Vec::new();
512
513 while let Some(event) = input.next().await {
514 match event {
515 AsyncEvent::Data(cap, data) => {
516 let mut n_errors = 0;
517 let mut n_successes = 0;
518 for (output, ts, diff) in data.iter() {
519 let key_buf = match output.key.unpack_first() {
520 Datum::Bytes(buf) => Some(buf),
521 Datum::Null => None,
522 d => unreachable!("invalid datum: {d}"),
523 };
524
525 let key = match key_decoder.as_mut().zip(key_buf) {
526 Some((decoder, buf)) => {
527 decode_delimited(decoder, buf).await?.transpose()
528 }
529 None => None,
530 };
531
532 let value = match output.value.unpack_first() {
533 Datum::Bytes(buf) => {
534 decode_delimited(&mut value_decoder, buf).await?.transpose()
535 }
536 Datum::Null => None,
537 d => unreachable!("invalid datum: {d}"),
538 };
539
540 if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
541 n_errors += 1;
542 } else if matches!(&value, Some(Ok(_))) {
543 n_successes += 1;
544 }
545
546 let result = DecodeResult {
547 key,
548 value,
549 metadata: output.metadata.clone(),
550 from_time: output.from_time.clone(),
551 };
552 output_container.push((result, ts.clone(), *diff));
553 }
554
555 if n_errors > 0 {
557 value_decoder.log_errors(n_errors);
558 }
559 if n_successes > 0 {
560 value_decoder.log_successes(n_successes);
561 }
562
563 output_handle.give_container(&cap, &mut output_container);
564 }
565 AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
566 }
567 }
568
569 Ok(())
570 })
571 });
572
573 let health = transient_errors.map(|err: Rc<CsrConnectError>| {
574 let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
575 HealthStatusMessage {
576 id: None,
577 namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
578 StatusNamespace::Ssh
579 } else {
580 StatusNamespace::Decode
581 },
582 update: halt_status,
583 }
584 });
585
586 (output.as_collection(), health)
587}