Skip to main content

mz_storage/
decode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module provides functions that
11//! build decoding pipelines from raw source streams.
12//!
13//! The primary exports are [`render_decode_delimited`], and
14//! [`render_decode_cdcv2`]. See their docs for more details about their differences.
15
16use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_interchange::avro::WriterSchemaProvider;
25use mz_ore::error::ErrorExt;
26use mz_ore::future::InTask;
27use mz_repr::{Datum, Diff, Row};
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
30use mz_storage_types::sources::encoding::{
31    AvroEncoding, CsvDecoderState, DataEncoding, RegexEncoding,
32};
33use mz_storage_types::wire_format::WireFormat;
34use mz_timely_util::builder_async::{
35    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use regex::Regex;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::StreamVec;
40use timely::dataflow::channels::pact::Exchange;
41use timely::dataflow::operators::Operator;
42use timely::dataflow::operators::vec::Map;
43use timely::progress::Timestamp;
44use timely::scheduling::SyncActivator;
45use tracing::error;
46
47use crate::decode::avro::AvroDecoderState;
48use crate::decode::protobuf::ProtobufDecoderState;
49use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
50use crate::metrics::decode::DecodeMetricDefs;
51use crate::source::types::{DecodeResult, SourceOutput};
52
53mod avro;
54mod protobuf;
55
56/// Decode delimited CDCv2 messages.
57///
58/// This not only literally decodes the avro-encoded messages, but
59/// also builds a differential dataflow collection that respects the
60/// data and progress messages in the underlying CDCv2 stream.
61pub fn render_decode_cdcv2<'scope, FromTime: Timestamp>(
62    input: &VecCollection<'scope, mz_repr::Timestamp, DecodeResult<FromTime>, Diff>,
63) -> (
64    VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
65    PressOnDropButton,
66) {
67    let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
68    let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
69
70    let mut row_buf = Row::default();
71    let channel_tx = Rc::clone(&channel_rx);
72    let activator_get = Rc::clone(&activator_set);
73    let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
74    let input2 = input.inner.clone();
75    input2.sink(pact, "CDCv2Unpack", move |(input, _)| {
76        input.for_each(|_time, data| {
77            // The inputs are rows containing two columns that encode an enum, i.e only one of them
78            // is ever set while the other is unset. This is the convention we follow in our Avro
79            // decoder. When the first field of the record is set then we have a data message.
80            // Otherwise we have a progress message.
81            for (row, _time, _diff) in data.drain(..) {
82                let mut record = match &row.value {
83                    Some(Ok(row)) => row.iter(),
84                    Some(Err(err)) => {
85                        error!("Ignoring errored record: {err}");
86                        continue;
87                    }
88                    None => continue,
89                };
90                let message = match (record.next().unwrap(), record.next().unwrap()) {
91                    (Datum::List(datum_updates), Datum::Null) => {
92                        let mut updates = vec![];
93                        for update in datum_updates.iter() {
94                            let mut update = update.unwrap_list().iter();
95                            let data = update.next().unwrap().unwrap_list();
96                            let time = update.next().unwrap().unwrap_int64();
97                            let diff = Diff::from(update.next().unwrap().unwrap_int64());
98
99                            row_buf.packer().extend(data);
100                            let data = row_buf.clone();
101                            let time = u64::try_from(time).expect("non-negative");
102                            let time = mz_repr::Timestamp::from(time);
103                            updates.push((data, time, diff));
104                        }
105                        Message::Updates(updates)
106                    }
107                    (Datum::Null, Datum::List(progress)) => {
108                        let mut progress = progress.iter();
109                        let mut lower = vec![];
110                        for time in progress.next().unwrap().unwrap_list() {
111                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
112                            lower.push(mz_repr::Timestamp::from(time));
113                        }
114                        let mut upper = vec![];
115                        for time in progress.next().unwrap().unwrap_list() {
116                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
117                            upper.push(mz_repr::Timestamp::from(time));
118                        }
119                        let mut counts = vec![];
120                        for pair in progress.next().unwrap().unwrap_list() {
121                            let mut pair = pair.unwrap_list().iter();
122                            let time = pair.next().unwrap().unwrap_int64();
123                            let count = pair.next().unwrap().unwrap_int64();
124
125                            let time = u64::try_from(time).expect("non-negative");
126                            let count = usize::try_from(count).expect("non-negative");
127                            counts.push((mz_repr::Timestamp::from(time), count));
128                        }
129                        let progress = Progress {
130                            lower,
131                            upper,
132                            counts,
133                        };
134                        Message::Progress(progress)
135                    }
136                    _ => unreachable!("invalid input"),
137                };
138                channel_tx.borrow_mut().push_back(message);
139            }
140        });
141        if let Some(activator) = activator_get.borrow_mut().as_mut() {
142            activator.activate().unwrap()
143        }
144    });
145
146    struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
147    impl<T> Iterator for VdIterator<T> {
148        type Item = T;
149        fn next(&mut self) -> Option<T> {
150            self.0.borrow_mut().pop_front()
151        }
152    }
153    // this operator returns a thread-safe drop-token
154    let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
155        *activator_set.borrow_mut() = Some(ac);
156        YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
157    });
158
159    // The token returned by DD's operator is not compatible with the shutdown mechanism used in
160    // storage so we create a dummy operator to hold onto that token.
161    let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
162    let button = builder.build(move |_caps| async move {
163        let _dd_token = token;
164        // Keep this operator around until shutdown
165        std::future::pending::<()>().await;
166    });
167    (stream.as_collection(), button.press_on_drop())
168}
169
170/// An iterator that yields with a `None` every so often.
171pub struct YieldingIter<I> {
172    /// When set, a time after which we should return `None`.
173    start: Option<std::time::Instant>,
174    after: Duration,
175    iter: I,
176}
177
178impl<I> YieldingIter<I> {
179    /// Construct a yielding iterator from an inter-yield duration.
180    pub fn new_from(iter: I, yield_after: Duration) -> Self {
181        Self {
182            start: None,
183            after: yield_after,
184            iter,
185        }
186    }
187}
188
189impl<I: Iterator> Iterator for YieldingIter<I> {
190    type Item = I::Item;
191    fn next(&mut self) -> Option<Self::Item> {
192        if self.start.is_none() {
193            self.start = Some(std::time::Instant::now());
194        }
195        let start = self.start.as_ref().unwrap();
196        if start.elapsed() > self.after {
197            self.start = None;
198            None
199        } else {
200            match self.iter.next() {
201                Some(x) => Some(x),
202                None => {
203                    self.start = None;
204                    None
205                }
206            }
207        }
208    }
209}
210
211// These don't know how to find delimiters --
212// they just go from sequences of vectors of bytes (for which we already know the delimiters)
213// to rows, and can eventually just be planned as `HirRelationExpr::Map`. (TODO)
214#[derive(Debug)]
215pub(crate) enum PreDelimitedFormat {
216    Bytes,
217    Text,
218    Json,
219    Regex(Regex, Row),
220    Protobuf(ProtobufDecoderState),
221}
222
223impl PreDelimitedFormat {
224    pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
225        match self {
226            PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
227            PreDelimitedFormat::Json => {
228                let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
229                    DecodeErrorKind::Bytes(
230                        format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
231                    )
232                })?;
233                Ok(Some(j.into_row()))
234            }
235            PreDelimitedFormat::Text => {
236                let s = std::str::from_utf8(bytes)
237                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
238                Ok(Some(Row::pack(Some(Datum::String(s)))))
239            }
240            PreDelimitedFormat::Regex(regex, row_buf) => {
241                let s = std::str::from_utf8(bytes)
242                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
243                let captures = match regex.captures(s) {
244                    Some(captures) => captures,
245                    None => return Ok(None),
246                };
247                row_buf.packer().extend(
248                    captures
249                        .iter()
250                        .skip(1)
251                        .map(|c| Datum::from(c.map(|c| c.as_str()))),
252                );
253                Ok(Some(row_buf.clone()))
254            }
255            PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
256        }
257    }
258}
259
260#[derive(Debug)]
261pub(crate) enum DataDecoderInner {
262    Avro(AvroDecoderState),
263    DelimitedBytes {
264        delimiter: u8,
265        format: PreDelimitedFormat,
266    },
267    Csv(CsvDecoderState),
268
269    PreDelimited(PreDelimitedFormat),
270}
271
272#[derive(Debug)]
273struct DataDecoder {
274    inner: DataDecoderInner,
275    metrics: DecodeMetricDefs,
276}
277
278impl DataDecoder {
279    pub async fn next(
280        &mut self,
281        bytes: &mut &[u8],
282    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
283        let result = match &mut self.inner {
284            DataDecoderInner::DelimitedBytes { delimiter, format } => {
285                match bytes.iter().position(|&byte| byte == *delimiter) {
286                    Some(chunk_idx) => {
287                        let data = &bytes[0..chunk_idx];
288                        *bytes = &bytes[chunk_idx + 1..];
289                        format.decode(data)
290                    }
291                    None => Ok(None),
292                }
293            }
294            DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
295            DataDecoderInner::Csv(csv) => csv.decode(bytes),
296            DataDecoderInner::PreDelimited(format) => {
297                let result = format.decode(*bytes);
298                *bytes = &[];
299                result
300            }
301        };
302        Ok(result)
303    }
304
305    /// Get the next record if it exists, assuming an EOF has occurred.
306    ///
307    /// This is distinct from `next` because, for example, a CSV record should be returned even if it
308    /// does not end in a newline.
309    pub fn eof(
310        &mut self,
311        bytes: &mut &[u8],
312    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
313        let result = match &mut self.inner {
314            DataDecoderInner::Csv(csv) => {
315                let result = csv.decode(bytes);
316                csv.reset_for_new_object();
317                result
318            }
319            DataDecoderInner::DelimitedBytes { format, .. } => {
320                let data = std::mem::take(bytes);
321                // If we hit EOF with no bytes left in the buffer it means the file had a trailing
322                // \n character that can be ignored. Otherwise, we decode the final bytes as normal
323                if data.is_empty() {
324                    Ok(None)
325                } else {
326                    format.decode(data)
327                }
328            }
329            _ => Ok(None),
330        };
331        Ok(result)
332    }
333
334    pub fn log_errors(&self, n: usize) {
335        self.metrics.count_errors(&self.inner, n);
336    }
337
338    pub fn log_successes(&self, n: usize) {
339        self.metrics.count_successes(&self.inner, n);
340    }
341}
342
343async fn get_decoder(
344    encoding: DataEncoding,
345    debug_name: &str,
346    // Information about optional transformations that can be eagerly done.
347    // If the decoding elects to perform them, it should replace this with
348    // `None`.
349    is_connection_delimited: bool,
350    metrics: DecodeMetricDefs,
351    storage_configuration: &StorageConfiguration,
352) -> Result<DataDecoder, CsrConnectError> {
353    let decoder = match encoding {
354        DataEncoding::Avro(AvroEncoding {
355            schema,
356            reference_schemas,
357            wire_format,
358        }) => {
359            // Only the Confluent variant is reachable from SQL today; Glue
360            // is not yet wired to the planner.
361
362            let writer_schemas = match wire_format {
363                WireFormat::None => WriterSchemaProvider::None,
364                WireFormat::Confluent { registry } => {
365                    let client = match registry {
366                        None => None,
367                        Some(csr_connection) => Some(
368                            csr_connection
369                                .connect(storage_configuration, InTask::Yes)
370                                .await?,
371                        ),
372                    };
373                    WriterSchemaProvider::confluent(client)
374                }
375                WireFormat::Glue { registry } => {
376                    let client_with_registry = match registry {
377                        None => None,
378                        Some(glue_connection) => {
379                            let enforce_external_addresses =
380                                mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
381                                    .get(storage_configuration.config_set());
382                            let sdk_config = glue_connection
383                                .aws_connection
384                                .connection
385                                .load_sdk_config(
386                                    &storage_configuration.connection_context,
387                                    glue_connection.aws_connection.connection_id,
388                                    InTask::Yes,
389                                    enforce_external_addresses,
390                                )
391                                .await?;
392                            let client =
393                                mz_aws_glue_schema_registry::ClientConfig::new(sdk_config).build();
394                            Some((client, glue_connection.registry_name.clone()))
395                        }
396                    };
397                    WriterSchemaProvider::glue(client_with_registry)
398                }
399            };
400            let state = avro::AvroDecoderState::new(
401                &schema,
402                &reference_schemas,
403                writer_schemas,
404                debug_name.to_string(),
405            )
406            .expect(
407                "avro decoder construction is infallible once writer-schema setup has succeeded",
408            );
409            DataDecoder {
410                inner: DataDecoderInner::Avro(state),
411                metrics,
412            }
413        }
414        DataEncoding::Text
415        | DataEncoding::Bytes
416        | DataEncoding::Json
417        | DataEncoding::Protobuf(_)
418        | DataEncoding::Regex(_) => {
419            let after_delimiting = match encoding {
420                DataEncoding::Regex(RegexEncoding { regex }) => {
421                    PreDelimitedFormat::Regex(regex.regex, Default::default())
422                }
423                DataEncoding::Protobuf(encoding) => {
424                    PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
425                        "Failed to create protobuf decoder, even though we validated ccsr \
426                                    client creation in purification.",
427                    ))
428                }
429                DataEncoding::Bytes => PreDelimitedFormat::Bytes,
430                DataEncoding::Json => PreDelimitedFormat::Json,
431                DataEncoding::Text => PreDelimitedFormat::Text,
432                _ => unreachable!(),
433            };
434            let inner = if is_connection_delimited {
435                DataDecoderInner::PreDelimited(after_delimiting)
436            } else {
437                DataDecoderInner::DelimitedBytes {
438                    delimiter: b'\n',
439                    format: after_delimiting,
440                }
441            };
442            DataDecoder { inner, metrics }
443        }
444        DataEncoding::Csv(enc) => {
445            let state = CsvDecoderState::new(enc);
446            DataDecoder {
447                inner: DataDecoderInner::Csv(state),
448                metrics,
449            }
450        }
451    };
452    Ok(decoder)
453}
454
455async fn decode_delimited(
456    decoder: &mut DataDecoder,
457    buf: &[u8],
458) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
459    let mut remaining_buf = buf;
460    let value = decoder.next(&mut remaining_buf).await?;
461
462    let result = match value {
463        Ok(value) => {
464            if remaining_buf.is_empty() {
465                match value {
466                    Some(value) => Ok(Some(value)),
467                    None => decoder.eof(&mut remaining_buf)?,
468                }
469            } else {
470                Err(DecodeErrorKind::Text(
471                    format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
472                        .into(),
473                ))
474            }
475        }
476        Err(err) => Err(err),
477    };
478
479    Ok(result.map_err(|inner| DecodeError {
480        kind: inner,
481        raw: buf.to_vec(),
482    }))
483}
484
485/// Decode already delimited records of data.
486///
487/// Precondition: each record in the stream has at most one key and at most one value.
488/// This function is useful mainly for decoding data from systems like Kafka,
489/// that have already separated the stream into records/messages/etc. before we
490/// decode them.
491///
492/// Because we expect the upstream connection to have already delimited the data,
493/// we return an error here if the decoder does not consume all the bytes. This
494/// often lets us, for example, detect when Avro decoding has gone off the rails
495/// (which is not always possible otherwise, since often gibberish strings can be interpreted as Avro,
496///  so the only signal is how many bytes you managed to decode).
497pub fn render_decode_delimited<'scope, T: Timestamp, FromTime: Timestamp>(
498    input: VecCollection<'scope, T, SourceOutput<FromTime>, Diff>,
499    key_encoding: Option<DataEncoding>,
500    value_encoding: DataEncoding,
501    debug_name: String,
502    metrics: DecodeMetricDefs,
503    storage_configuration: StorageConfiguration,
504) -> (
505    VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
506    StreamVec<'scope, T, HealthStatusMessage>,
507) {
508    let op_name = format!(
509        "{}{}DecodeDelimited",
510        key_encoding
511            .as_ref()
512            .map(|key_encoding| key_encoding.op_name())
513            .unwrap_or(""),
514        value_encoding.op_name()
515    );
516    let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
517
518    let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
519
520    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
521    let mut input = builder.new_input_for(input.inner, Exchange::new(dist), &output_handle);
522
523    let (_, transient_errors) = builder.build_fallible(move |caps| {
524        Box::pin(async move {
525            let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
526
527            let mut key_decoder = match key_encoding {
528                Some(encoding) => Some(
529                    get_decoder(
530                        encoding,
531                        &debug_name,
532                        true,
533                        metrics.clone(),
534                        &storage_configuration,
535                    )
536                    .await?,
537                ),
538                None => None,
539            };
540
541            let mut value_decoder = get_decoder(
542                value_encoding,
543                &debug_name,
544                true,
545                metrics,
546                &storage_configuration,
547            )
548            .await?;
549
550            let mut output_container = Vec::new();
551
552            while let Some(event) = input.next().await {
553                match event {
554                    AsyncEvent::Data(cap, data) => {
555                        let mut n_errors = 0;
556                        let mut n_successes = 0;
557                        for (output, ts, diff) in data.iter() {
558                            let key_buf = match output.key.unpack_first() {
559                                Datum::Bytes(buf) => Some(buf),
560                                Datum::Null => None,
561                                d => unreachable!("invalid datum: {d}"),
562                            };
563
564                            let key = match key_decoder.as_mut().zip(key_buf) {
565                                Some((decoder, buf)) => {
566                                    decode_delimited(decoder, buf).await?.transpose()
567                                }
568                                None => None,
569                            };
570
571                            let value = match output.value.unpack_first() {
572                                Datum::Bytes(buf) => {
573                                    decode_delimited(&mut value_decoder, buf).await?.transpose()
574                                }
575                                Datum::Null => None,
576                                d => unreachable!("invalid datum: {d}"),
577                            };
578
579                            if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
580                                n_errors += 1;
581                            } else if matches!(&value, Some(Ok(_))) {
582                                n_successes += 1;
583                            }
584
585                            let result = DecodeResult {
586                                key,
587                                value,
588                                metadata: output.metadata.clone(),
589                                from_time: output.from_time.clone(),
590                            };
591                            output_container.push((result, ts.clone(), *diff));
592                        }
593
594                        // Matching historical practice, we only log metrics on the value decoder.
595                        if n_errors > 0 {
596                            value_decoder.log_errors(n_errors);
597                        }
598                        if n_successes > 0 {
599                            value_decoder.log_successes(n_successes);
600                        }
601
602                        output_handle.give_container(&cap, &mut output_container);
603                    }
604                    AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
605                }
606            }
607
608            Ok(())
609        })
610    });
611
612    let health = transient_errors.map(|err: Rc<CsrConnectError>| {
613        let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
614        HealthStatusMessage {
615            id: None,
616            namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
617                StatusNamespace::Ssh
618            } else {
619                StatusNamespace::Decode
620            },
621            update: halt_status,
622        }
623    });
624
625    (output.as_collection(), health)
626}