1use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_ore::error::ErrorExt;
25use mz_ore::future::InTask;
26use mz_repr::{Datum, Diff, Row};
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
29use mz_storage_types::sources::encoding::{AvroEncoding, DataEncoding, RegexEncoding};
30use mz_timely_util::builder_async::{
31 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use regex::Regex;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::Exchange;
36use timely::dataflow::operators::Operator;
37use timely::dataflow::operators::vec::Map;
38use timely::dataflow::{Scope, StreamVec};
39use timely::progress::Timestamp;
40use timely::scheduling::SyncActivator;
41use tracing::error;
42
43use crate::decode::avro::AvroDecoderState;
44use crate::decode::csv::CsvDecoderState;
45use crate::decode::protobuf::ProtobufDecoderState;
46use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
47use crate::metrics::decode::DecodeMetricDefs;
48use crate::source::types::{DecodeResult, SourceOutput};
49
50mod avro;
51mod csv;
52mod protobuf;
53
54pub fn render_decode_cdcv2<G: Scope<Timestamp = mz_repr::Timestamp>, FromTime: Timestamp>(
60 input: &VecCollection<G, DecodeResult<FromTime>, Diff>,
61) -> (VecCollection<G, Row, Diff>, PressOnDropButton) {
62 let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
63 let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
64
65 let mut row_buf = Row::default();
66 let channel_tx = Rc::clone(&channel_rx);
67 let activator_get = Rc::clone(&activator_set);
68 let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
69 let input2 = input.inner.clone();
70 input2.sink(pact, "CDCv2Unpack", move |(input, _)| {
71 input.for_each(|_time, data| {
72 for (row, _time, _diff) in data.drain(..) {
77 let mut record = match &row.value {
78 Some(Ok(row)) => row.iter(),
79 Some(Err(err)) => {
80 error!("Ignoring errored record: {err}");
81 continue;
82 }
83 None => continue,
84 };
85 let message = match (record.next().unwrap(), record.next().unwrap()) {
86 (Datum::List(datum_updates), Datum::Null) => {
87 let mut updates = vec![];
88 for update in datum_updates.iter() {
89 let mut update = update.unwrap_list().iter();
90 let data = update.next().unwrap().unwrap_list();
91 let time = update.next().unwrap().unwrap_int64();
92 let diff = Diff::from(update.next().unwrap().unwrap_int64());
93
94 row_buf.packer().extend(data);
95 let data = row_buf.clone();
96 let time = u64::try_from(time).expect("non-negative");
97 let time = mz_repr::Timestamp::from(time);
98 updates.push((data, time, diff));
99 }
100 Message::Updates(updates)
101 }
102 (Datum::Null, Datum::List(progress)) => {
103 let mut progress = progress.iter();
104 let mut lower = vec![];
105 for time in progress.next().unwrap().unwrap_list() {
106 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
107 lower.push(mz_repr::Timestamp::from(time));
108 }
109 let mut upper = vec![];
110 for time in progress.next().unwrap().unwrap_list() {
111 let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
112 upper.push(mz_repr::Timestamp::from(time));
113 }
114 let mut counts = vec![];
115 for pair in progress.next().unwrap().unwrap_list() {
116 let mut pair = pair.unwrap_list().iter();
117 let time = pair.next().unwrap().unwrap_int64();
118 let count = pair.next().unwrap().unwrap_int64();
119
120 let time = u64::try_from(time).expect("non-negative");
121 let count = usize::try_from(count).expect("non-negative");
122 counts.push((mz_repr::Timestamp::from(time), count));
123 }
124 let progress = Progress {
125 lower,
126 upper,
127 counts,
128 };
129 Message::Progress(progress)
130 }
131 _ => unreachable!("invalid input"),
132 };
133 channel_tx.borrow_mut().push_back(message);
134 }
135 });
136 if let Some(activator) = activator_get.borrow_mut().as_mut() {
137 activator.activate().unwrap()
138 }
139 });
140
141 struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
142 impl<T> Iterator for VdIterator<T> {
143 type Item = T;
144 fn next(&mut self) -> Option<T> {
145 self.0.borrow_mut().pop_front()
146 }
147 }
148 let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
150 *activator_set.borrow_mut() = Some(ac);
151 YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
152 });
153
154 let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
157 let button = builder.build(move |_caps| async move {
158 let _dd_token = token;
159 std::future::pending::<()>().await;
161 });
162 (stream.as_collection(), button.press_on_drop())
163}
164
165pub struct YieldingIter<I> {
167 start: Option<std::time::Instant>,
169 after: Duration,
170 iter: I,
171}
172
173impl<I> YieldingIter<I> {
174 pub fn new_from(iter: I, yield_after: Duration) -> Self {
176 Self {
177 start: None,
178 after: yield_after,
179 iter,
180 }
181 }
182}
183
184impl<I: Iterator> Iterator for YieldingIter<I> {
185 type Item = I::Item;
186 fn next(&mut self) -> Option<Self::Item> {
187 if self.start.is_none() {
188 self.start = Some(std::time::Instant::now());
189 }
190 let start = self.start.as_ref().unwrap();
191 if start.elapsed() > self.after {
192 self.start = None;
193 None
194 } else {
195 match self.iter.next() {
196 Some(x) => Some(x),
197 None => {
198 self.start = None;
199 None
200 }
201 }
202 }
203 }
204}
205
206#[derive(Debug)]
210pub(crate) enum PreDelimitedFormat {
211 Bytes,
212 Text,
213 Json,
214 Regex(Regex, Row),
215 Protobuf(ProtobufDecoderState),
216}
217
218impl PreDelimitedFormat {
219 pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
220 match self {
221 PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
222 PreDelimitedFormat::Json => {
223 let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
224 DecodeErrorKind::Bytes(
225 format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
226 )
227 })?;
228 Ok(Some(j.into_row()))
229 }
230 PreDelimitedFormat::Text => {
231 let s = std::str::from_utf8(bytes)
232 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
233 Ok(Some(Row::pack(Some(Datum::String(s)))))
234 }
235 PreDelimitedFormat::Regex(regex, row_buf) => {
236 let s = std::str::from_utf8(bytes)
237 .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
238 let captures = match regex.captures(s) {
239 Some(captures) => captures,
240 None => return Ok(None),
241 };
242 row_buf.packer().extend(
243 captures
244 .iter()
245 .skip(1)
246 .map(|c| Datum::from(c.map(|c| c.as_str()))),
247 );
248 Ok(Some(row_buf.clone()))
249 }
250 PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
251 }
252 }
253}
254
255#[derive(Debug)]
256pub(crate) enum DataDecoderInner {
257 Avro(AvroDecoderState),
258 DelimitedBytes {
259 delimiter: u8,
260 format: PreDelimitedFormat,
261 },
262 Csv(CsvDecoderState),
263
264 PreDelimited(PreDelimitedFormat),
265}
266
267#[derive(Debug)]
268struct DataDecoder {
269 inner: DataDecoderInner,
270 metrics: DecodeMetricDefs,
271}
272
273impl DataDecoder {
274 pub async fn next(
275 &mut self,
276 bytes: &mut &[u8],
277 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
278 let result = match &mut self.inner {
279 DataDecoderInner::DelimitedBytes { delimiter, format } => {
280 match bytes.iter().position(|&byte| byte == *delimiter) {
281 Some(chunk_idx) => {
282 let data = &bytes[0..chunk_idx];
283 *bytes = &bytes[chunk_idx + 1..];
284 format.decode(data)
285 }
286 None => Ok(None),
287 }
288 }
289 DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
290 DataDecoderInner::Csv(csv) => csv.decode(bytes),
291 DataDecoderInner::PreDelimited(format) => {
292 let result = format.decode(*bytes);
293 *bytes = &[];
294 result
295 }
296 };
297 Ok(result)
298 }
299
300 pub fn eof(
305 &mut self,
306 bytes: &mut &[u8],
307 ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
308 let result = match &mut self.inner {
309 DataDecoderInner::Csv(csv) => {
310 let result = csv.decode(bytes);
311 csv.reset_for_new_object();
312 result
313 }
314 DataDecoderInner::DelimitedBytes { format, .. } => {
315 let data = std::mem::take(bytes);
316 if data.is_empty() {
319 Ok(None)
320 } else {
321 format.decode(data)
322 }
323 }
324 _ => Ok(None),
325 };
326 Ok(result)
327 }
328
329 pub fn log_errors(&self, n: usize) {
330 self.metrics.count_errors(&self.inner, n);
331 }
332
333 pub fn log_successes(&self, n: usize) {
334 self.metrics.count_successes(&self.inner, n);
335 }
336}
337
338async fn get_decoder(
339 encoding: DataEncoding,
340 debug_name: &str,
341 is_connection_delimited: bool,
345 metrics: DecodeMetricDefs,
346 storage_configuration: &StorageConfiguration,
347) -> Result<DataDecoder, CsrConnectError> {
348 let decoder = match encoding {
349 DataEncoding::Avro(AvroEncoding {
350 schema,
351 reference_schemas,
352 csr_connection,
353 confluent_wire_format,
354 }) => {
355 let csr_client = match csr_connection {
356 None => None,
357 Some(csr_connection) => {
358 let csr_client = csr_connection
359 .connect(storage_configuration, InTask::Yes)
360 .await?;
361 Some(csr_client)
362 }
363 };
364 let state = avro::AvroDecoderState::new(
365 &schema,
366 &reference_schemas,
367 csr_client,
368 debug_name.to_string(),
369 confluent_wire_format,
370 )
371 .expect("Failed to create avro decoder, even though we validated ccsr client creation in purification.");
372 DataDecoder {
373 inner: DataDecoderInner::Avro(state),
374 metrics,
375 }
376 }
377 DataEncoding::Text
378 | DataEncoding::Bytes
379 | DataEncoding::Json
380 | DataEncoding::Protobuf(_)
381 | DataEncoding::Regex(_) => {
382 let after_delimiting = match encoding {
383 DataEncoding::Regex(RegexEncoding { regex }) => {
384 PreDelimitedFormat::Regex(regex.regex, Default::default())
385 }
386 DataEncoding::Protobuf(encoding) => {
387 PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
388 "Failed to create protobuf decoder, even though we validated ccsr \
389 client creation in purification.",
390 ))
391 }
392 DataEncoding::Bytes => PreDelimitedFormat::Bytes,
393 DataEncoding::Json => PreDelimitedFormat::Json,
394 DataEncoding::Text => PreDelimitedFormat::Text,
395 _ => unreachable!(),
396 };
397 let inner = if is_connection_delimited {
398 DataDecoderInner::PreDelimited(after_delimiting)
399 } else {
400 DataDecoderInner::DelimitedBytes {
401 delimiter: b'\n',
402 format: after_delimiting,
403 }
404 };
405 DataDecoder { inner, metrics }
406 }
407 DataEncoding::Csv(enc) => {
408 let state = CsvDecoderState::new(enc);
409 DataDecoder {
410 inner: DataDecoderInner::Csv(state),
411 metrics,
412 }
413 }
414 };
415 Ok(decoder)
416}
417
418async fn decode_delimited(
419 decoder: &mut DataDecoder,
420 buf: &[u8],
421) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
422 let mut remaining_buf = buf;
423 let value = decoder.next(&mut remaining_buf).await?;
424
425 let result = match value {
426 Ok(value) => {
427 if remaining_buf.is_empty() {
428 match value {
429 Some(value) => Ok(Some(value)),
430 None => decoder.eof(&mut remaining_buf)?,
431 }
432 } else {
433 Err(DecodeErrorKind::Text(
434 format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
435 .into(),
436 ))
437 }
438 }
439 Err(err) => Err(err),
440 };
441
442 Ok(result.map_err(|inner| DecodeError {
443 kind: inner,
444 raw: buf.to_vec(),
445 }))
446}
447
448pub fn render_decode_delimited<G: Scope, FromTime: Timestamp>(
461 input: VecCollection<G, SourceOutput<FromTime>, Diff>,
462 key_encoding: Option<DataEncoding>,
463 value_encoding: DataEncoding,
464 debug_name: String,
465 metrics: DecodeMetricDefs,
466 storage_configuration: StorageConfiguration,
467) -> (
468 VecCollection<G, DecodeResult<FromTime>, Diff>,
469 StreamVec<G, HealthStatusMessage>,
470) {
471 let op_name = format!(
472 "{}{}DecodeDelimited",
473 key_encoding
474 .as_ref()
475 .map(|key_encoding| key_encoding.op_name())
476 .unwrap_or(""),
477 value_encoding.op_name()
478 );
479 let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
480
481 let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
482
483 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
484 let mut input = builder.new_input_for(input.inner, Exchange::new(dist), &output_handle);
485
486 let (_, transient_errors) = builder.build_fallible(move |caps| {
487 Box::pin(async move {
488 let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
489
490 let mut key_decoder = match key_encoding {
491 Some(encoding) => Some(
492 get_decoder(
493 encoding,
494 &debug_name,
495 true,
496 metrics.clone(),
497 &storage_configuration,
498 )
499 .await?,
500 ),
501 None => None,
502 };
503
504 let mut value_decoder = get_decoder(
505 value_encoding,
506 &debug_name,
507 true,
508 metrics,
509 &storage_configuration,
510 )
511 .await?;
512
513 let mut output_container = Vec::new();
514
515 while let Some(event) = input.next().await {
516 match event {
517 AsyncEvent::Data(cap, data) => {
518 let mut n_errors = 0;
519 let mut n_successes = 0;
520 for (output, ts, diff) in data.iter() {
521 let key_buf = match output.key.unpack_first() {
522 Datum::Bytes(buf) => Some(buf),
523 Datum::Null => None,
524 d => unreachable!("invalid datum: {d}"),
525 };
526
527 let key = match key_decoder.as_mut().zip(key_buf) {
528 Some((decoder, buf)) => {
529 decode_delimited(decoder, buf).await?.transpose()
530 }
531 None => None,
532 };
533
534 let value = match output.value.unpack_first() {
535 Datum::Bytes(buf) => {
536 decode_delimited(&mut value_decoder, buf).await?.transpose()
537 }
538 Datum::Null => None,
539 d => unreachable!("invalid datum: {d}"),
540 };
541
542 if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
543 n_errors += 1;
544 } else if matches!(&value, Some(Ok(_))) {
545 n_successes += 1;
546 }
547
548 let result = DecodeResult {
549 key,
550 value,
551 metadata: output.metadata.clone(),
552 from_time: output.from_time.clone(),
553 };
554 output_container.push((result, ts.clone(), *diff));
555 }
556
557 if n_errors > 0 {
559 value_decoder.log_errors(n_errors);
560 }
561 if n_successes > 0 {
562 value_decoder.log_successes(n_successes);
563 }
564
565 output_handle.give_container(&cap, &mut output_container);
566 }
567 AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
568 }
569 }
570
571 Ok(())
572 })
573 });
574
575 let health = transient_errors.map(|err: Rc<CsrConnectError>| {
576 let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
577 HealthStatusMessage {
578 id: None,
579 namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
580 StatusNamespace::Ssh
581 } else {
582 StatusNamespace::Decode
583 },
584 update: halt_status,
585 }
586 });
587
588 (output.as_collection(), health)
589}