1use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_ore::error::ErrorExt;
25use mz_ore::future::InTask;
26use mz_repr::{Datum, Diff, Row};
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
29use mz_storage_types::sources::encoding::{AvroEncoding, DataEncoding, RegexEncoding};
30use mz_timely_util::builder_async::{
31 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use regex::Regex;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::StreamVec;
36use timely::dataflow::channels::pact::Exchange;
37use timely::dataflow::operators::Operator;
38use timely::dataflow::operators::vec::Map;
39use timely::progress::Timestamp;
40use timely::scheduling::SyncActivator;
41use tracing::error;
42
43use crate::decode::avro::AvroDecoderState;
44use crate::decode::csv::CsvDecoderState;
45use crate::decode::protobuf::ProtobufDecoderState;
46use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
47use crate::metrics::decode::DecodeMetricDefs;
48use crate::source::types::{DecodeResult, SourceOutput};
49
50mod avro;
51mod csv;
52mod protobuf;
53
54pub fn render_decode_cdcv2<'scope, FromTime: Timestamp>(
60 input: &VecCollection<'scope, mz_repr::Timestamp, DecodeResult<FromTime>, Diff>,
61) -> (
62 VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
63 PressOnDropButton,
64) {
65 let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
66 let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
67
68 let mut row_buf = Row::default();
69 let channel_tx = Rc::clone(&channel_rx);
70 let activator_get = Rc::clone(&activator_set);
71 let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
72 let input2 = input.inner.clone();
73 input2.sink(pact, "CDCv2Unpack", move |(input, _)| {
74 input.for_each(|_time, data| {
75 for (row, _time, _diff) in data.drain(..) {
80 let mut record = match &row.value {
81 Some(Ok(row)) => row.iter(),
82 Some(Err(err)) => {
83 error!("Ignoring errored record: {err}");
84 continue;
85 }
86 None => continue,
87 };
88 let message = match (record.next().unwrap(), record.next().unwrap()) {
89 (Datum::List(datum_updates), Datum::Null) => {
90 let mut updates = vec![];
91 for update in datum_updates.iter() {
92 let mut update = update.unwrap_list().iter();
93 let data = update.next().unwrap().unwrap_list();
94 let time = update.next().unwrap().unwrap_int64();
95 let diff = Diff::from(update.next().unwrap().unwrap_int64());
96
97 row_buf.packer().extend(data);
98 let data = row_buf.clone();
99 let time = u64::try_from(time).expect("non-negative");
100 let time = mz_repr::Timestamp::from(time);
101 updates.push((data, time, diff));
102 }
103 Message::Updates(updates)
104 }
105 (Datum::Null, Datum::List(progress)) => {
106 let mut progress = progress.iter();
107 let mut lower = vec![];
108 for time in progress.next().unwrap().unwrap_list() {
109 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
110 lower.push(mz_repr::Timestamp::from(time));
111 }
112 let mut upper = vec![];
113 for time in progress.next().unwrap().unwrap_list() {
114 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
115 upper.push(mz_repr::Timestamp::from(time));
116 }
117 let mut counts = vec![];
118 for pair in progress.next().unwrap().unwrap_list() {
119 let mut pair = pair.unwrap_list().iter();
120 let time = pair.next().unwrap().unwrap_int64();
121 let count = pair.next().unwrap().unwrap_int64();
122
123 let time = u64::try_from(time).expect("non-negative");
124 let count = usize::try_from(count).expect("non-negative");
125 counts.push((mz_repr::Timestamp::from(time), count));
126 }
127 let progress = Progress {
128 lower,
129 upper,
130 counts,
131 };
132 Message::Progress(progress)
133 }
134 _ => unreachable!("invalid input"),
135 };
136 channel_tx.borrow_mut().push_back(message);
137 }
138 });
139 if let Some(activator) = activator_get.borrow_mut().as_mut() {
140 activator.activate().unwrap()
141 }
142 });
143
144 struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
145 impl<T> Iterator for VdIterator<T> {
146 type Item = T;
147 fn next(&mut self) -> Option<T> {
148 self.0.borrow_mut().pop_front()
149 }
150 }
151 let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
153 *activator_set.borrow_mut() = Some(ac);
154 YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
155 });
156
157 let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
160 let button = builder.build(move |_caps| async move {
161 let _dd_token = token;
162 std::future::pending::<()>().await;
164 });
165 (stream.as_collection(), button.press_on_drop())
166}
167
168pub struct YieldingIter<I> {
170 start: Option<std::time::Instant>,
172 after: Duration,
173 iter: I,
174}
175
176impl<I> YieldingIter<I> {
177 pub fn new_from(iter: I, yield_after: Duration) -> Self {
179 Self {
180 start: None,
181 after: yield_after,
182 iter,
183 }
184 }
185}
186
187impl<I: Iterator> Iterator for YieldingIter<I> {
188 type Item = I::Item;
189 fn next(&mut self) -> Option<Self::Item> {
190 if self.start.is_none() {
191 self.start = Some(std::time::Instant::now());
192 }
193 let start = self.start.as_ref().unwrap();
194 if start.elapsed() > self.after {
195 self.start = None;
196 None
197 } else {
198 match self.iter.next() {
199 Some(x) => Some(x),
200 None => {
201 self.start = None;
202 None
203 }
204 }
205 }
206 }
207}
208
209#[derive(Debug)]
213pub(crate) enum PreDelimitedFormat {
214 Bytes,
215 Text,
216 Json,
217 Regex(Regex, Row),
218 Protobuf(ProtobufDecoderState),
219}
220
221impl PreDelimitedFormat {
222 pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
223 match self {
224 PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
225 PreDelimitedFormat::Json => {
226 let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
227 DecodeErrorKind::Bytes(
228 format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
229 )
230 })?;
231 Ok(Some(j.into_row()))
232 }
233 PreDelimitedFormat::Text => {
234 let s = std::str::from_utf8(bytes)
235 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
236 Ok(Some(Row::pack(Some(Datum::String(s)))))
237 }
238 PreDelimitedFormat::Regex(regex, row_buf) => {
239 let s = std::str::from_utf8(bytes)
240 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
241 let captures = match regex.captures(s) {
242 Some(captures) => captures,
243 None => return Ok(None),
244 };
245 row_buf.packer().extend(
246 captures
247 .iter()
248 .skip(1)
249 .map(|c| Datum::from(c.map(|c| c.as_str()))),
250 );
251 Ok(Some(row_buf.clone()))
252 }
253 PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
254 }
255 }
256}
257
258#[derive(Debug)]
259pub(crate) enum DataDecoderInner {
260 Avro(AvroDecoderState),
261 DelimitedBytes {
262 delimiter: u8,
263 format: PreDelimitedFormat,
264 },
265 Csv(CsvDecoderState),
266
267 PreDelimited(PreDelimitedFormat),
268}
269
270#[derive(Debug)]
271struct DataDecoder {
272 inner: DataDecoderInner,
273 metrics: DecodeMetricDefs,
274}
275
276impl DataDecoder {
277 pub async fn next(
278 &mut self,
279 bytes: &mut &[u8],
280 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
281 let result = match &mut self.inner {
282 DataDecoderInner::DelimitedBytes { delimiter, format } => {
283 match bytes.iter().position(|&byte| byte == *delimiter) {
284 Some(chunk_idx) => {
285 let data = &bytes[0..chunk_idx];
286 *bytes = &bytes[chunk_idx + 1..];
287 format.decode(data)
288 }
289 None => Ok(None),
290 }
291 }
292 DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
293 DataDecoderInner::Csv(csv) => csv.decode(bytes),
294 DataDecoderInner::PreDelimited(format) => {
295 let result = format.decode(*bytes);
296 *bytes = &[];
297 result
298 }
299 };
300 Ok(result)
301 }
302
303 pub fn eof(
308 &mut self,
309 bytes: &mut &[u8],
310 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
311 let result = match &mut self.inner {
312 DataDecoderInner::Csv(csv) => {
313 let result = csv.decode(bytes);
314 csv.reset_for_new_object();
315 result
316 }
317 DataDecoderInner::DelimitedBytes { format, .. } => {
318 let data = std::mem::take(bytes);
319 if data.is_empty() {
322 Ok(None)
323 } else {
324 format.decode(data)
325 }
326 }
327 _ => Ok(None),
328 };
329 Ok(result)
330 }
331
332 pub fn log_errors(&self, n: usize) {
333 self.metrics.count_errors(&self.inner, n);
334 }
335
336 pub fn log_successes(&self, n: usize) {
337 self.metrics.count_successes(&self.inner, n);
338 }
339}
340
341async fn get_decoder(
342 encoding: DataEncoding,
343 debug_name: &str,
344 is_connection_delimited: bool,
348 metrics: DecodeMetricDefs,
349 storage_configuration: &StorageConfiguration,
350) -> Result<DataDecoder, CsrConnectError> {
351 let decoder = match encoding {
352 DataEncoding::Avro(AvroEncoding {
353 schema,
354 reference_schemas,
355 csr_connection,
356 confluent_wire_format,
357 }) => {
358 let csr_client = match csr_connection {
359 None => None,
360 Some(csr_connection) => {
361 let csr_client = csr_connection
362 .connect(storage_configuration, InTask::Yes)
363 .await?;
364 Some(csr_client)
365 }
366 };
367 let state = avro::AvroDecoderState::new(
368 &schema,
369 &reference_schemas,
370 csr_client,
371 debug_name.to_string(),
372 confluent_wire_format,
373 )
374 .expect("Failed to create avro decoder, even though we validated ccsr client creation in purification.");
375 DataDecoder {
376 inner: DataDecoderInner::Avro(state),
377 metrics,
378 }
379 }
380 DataEncoding::Text
381 | DataEncoding::Bytes
382 | DataEncoding::Json
383 | DataEncoding::Protobuf(_)
384 | DataEncoding::Regex(_) => {
385 let after_delimiting = match encoding {
386 DataEncoding::Regex(RegexEncoding { regex }) => {
387 PreDelimitedFormat::Regex(regex.regex, Default::default())
388 }
389 DataEncoding::Protobuf(encoding) => {
390 PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
391 "Failed to create protobuf decoder, even though we validated ccsr \
392 client creation in purification.",
393 ))
394 }
395 DataEncoding::Bytes => PreDelimitedFormat::Bytes,
396 DataEncoding::Json => PreDelimitedFormat::Json,
397 DataEncoding::Text => PreDelimitedFormat::Text,
398 _ => unreachable!(),
399 };
400 let inner = if is_connection_delimited {
401 DataDecoderInner::PreDelimited(after_delimiting)
402 } else {
403 DataDecoderInner::DelimitedBytes {
404 delimiter: b'\n',
405 format: after_delimiting,
406 }
407 };
408 DataDecoder { inner, metrics }
409 }
410 DataEncoding::Csv(enc) => {
411 let state = CsvDecoderState::new(enc);
412 DataDecoder {
413 inner: DataDecoderInner::Csv(state),
414 metrics,
415 }
416 }
417 };
418 Ok(decoder)
419}
420
421async fn decode_delimited(
422 decoder: &mut DataDecoder,
423 buf: &[u8],
424) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
425 let mut remaining_buf = buf;
426 let value = decoder.next(&mut remaining_buf).await?;
427
428 let result = match value {
429 Ok(value) => {
430 if remaining_buf.is_empty() {
431 match value {
432 Some(value) => Ok(Some(value)),
433 None => decoder.eof(&mut remaining_buf)?,
434 }
435 } else {
436 Err(DecodeErrorKind::Text(
437 format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
438 .into(),
439 ))
440 }
441 }
442 Err(err) => Err(err),
443 };
444
445 Ok(result.map_err(|inner| DecodeError {
446 kind: inner,
447 raw: buf.to_vec(),
448 }))
449}
450
451pub fn render_decode_delimited<'scope, T: Timestamp, FromTime: Timestamp>(
464 input: VecCollection<'scope, T, SourceOutput<FromTime>, Diff>,
465 key_encoding: Option<DataEncoding>,
466 value_encoding: DataEncoding,
467 debug_name: String,
468 metrics: DecodeMetricDefs,
469 storage_configuration: StorageConfiguration,
470) -> (
471 VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
472 StreamVec<'scope, T, HealthStatusMessage>,
473) {
474 let op_name = format!(
475 "{}{}DecodeDelimited",
476 key_encoding
477 .as_ref()
478 .map(|key_encoding| key_encoding.op_name())
479 .unwrap_or(""),
480 value_encoding.op_name()
481 );
482 let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
483
484 let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
485
486 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
487 let mut input = builder.new_input_for(input.inner, Exchange::new(dist), &output_handle);
488
489 let (_, transient_errors) = builder.build_fallible(move |caps| {
490 Box::pin(async move {
491 let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
492
493 let mut key_decoder = match key_encoding {
494 Some(encoding) => Some(
495 get_decoder(
496 encoding,
497 &debug_name,
498 true,
499 metrics.clone(),
500 &storage_configuration,
501 )
502 .await?,
503 ),
504 None => None,
505 };
506
507 let mut value_decoder = get_decoder(
508 value_encoding,
509 &debug_name,
510 true,
511 metrics,
512 &storage_configuration,
513 )
514 .await?;
515
516 let mut output_container = Vec::new();
517
518 while let Some(event) = input.next().await {
519 match event {
520 AsyncEvent::Data(cap, data) => {
521 let mut n_errors = 0;
522 let mut n_successes = 0;
523 for (output, ts, diff) in data.iter() {
524 let key_buf = match output.key.unpack_first() {
525 Datum::Bytes(buf) => Some(buf),
526 Datum::Null => None,
527 d => unreachable!("invalid datum: {d}"),
528 };
529
530 let key = match key_decoder.as_mut().zip(key_buf) {
531 Some((decoder, buf)) => {
532 decode_delimited(decoder, buf).await?.transpose()
533 }
534 None => None,
535 };
536
537 let value = match output.value.unpack_first() {
538 Datum::Bytes(buf) => {
539 decode_delimited(&mut value_decoder, buf).await?.transpose()
540 }
541 Datum::Null => None,
542 d => unreachable!("invalid datum: {d}"),
543 };
544
545 if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
546 n_errors += 1;
547 } else if matches!(&value, Some(Ok(_))) {
548 n_successes += 1;
549 }
550
551 let result = DecodeResult {
552 key,
553 value,
554 metadata: output.metadata.clone(),
555 from_time: output.from_time.clone(),
556 };
557 output_container.push((result, ts.clone(), *diff));
558 }
559
560 if n_errors > 0 {
562 value_decoder.log_errors(n_errors);
563 }
564 if n_successes > 0 {
565 value_decoder.log_successes(n_successes);
566 }
567
568 output_handle.give_container(&cap, &mut output_container);
569 }
570 AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
571 }
572 }
573
574 Ok(())
575 })
576 });
577
578 let health = transient_errors.map(|err: Rc<CsrConnectError>| {
579 let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
580 HealthStatusMessage {
581 id: None,
582 namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
583 StatusNamespace::Ssh
584 } else {
585 StatusNamespace::Decode
586 },
587 update: halt_status,
588 }
589 });
590
591 (output.as_collection(), health)
592}