Skip to main content

mz_storage/
decode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module provides functions that
11//! build decoding pipelines from raw source streams.
12//!
13//! The primary exports are [`render_decode_delimited`], and
14//! [`render_decode_cdcv2`]. See their docs for more details about their differences.
15
16use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_ore::error::ErrorExt;
25use mz_ore::future::InTask;
26use mz_repr::{Datum, Diff, Row};
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
29use mz_storage_types::sources::encoding::{AvroEncoding, DataEncoding, RegexEncoding};
30use mz_timely_util::builder_async::{
31    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use regex::Regex;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::Exchange;
36use timely::dataflow::operators::Operator;
37use timely::dataflow::operators::vec::Map;
38use timely::dataflow::{Scope, StreamVec};
39use timely::progress::Timestamp;
40use timely::scheduling::SyncActivator;
41use tracing::error;
42
43use crate::decode::avro::AvroDecoderState;
44use crate::decode::csv::CsvDecoderState;
45use crate::decode::protobuf::ProtobufDecoderState;
46use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
47use crate::metrics::decode::DecodeMetricDefs;
48use crate::source::types::{DecodeResult, SourceOutput};
49
50mod avro;
51mod csv;
52mod protobuf;
53
54/// Decode delimited CDCv2 messages.
55///
56/// This not only literally decodes the avro-encoded messages, but
57/// also builds a differential dataflow collection that respects the
58/// data and progress messages in the underlying CDCv2 stream.
59pub fn render_decode_cdcv2<G: Scope<Timestamp = mz_repr::Timestamp>, FromTime: Timestamp>(
60    input: &VecCollection<G, DecodeResult<FromTime>, Diff>,
61) -> (VecCollection<G, Row, Diff>, PressOnDropButton) {
62    let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
63    let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
64
65    let mut row_buf = Row::default();
66    let channel_tx = Rc::clone(&channel_rx);
67    let activator_get = Rc::clone(&activator_set);
68    let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
69    let input2 = input.inner.clone();
70    input2.sink(pact, "CDCv2Unpack", move |(input, _)| {
71        input.for_each(|_time, data| {
72            // The inputs are rows containing two columns that encode an enum, i.e only one of them
73            // is ever set while the other is unset. This is the convention we follow in our Avro
74            // decoder. When the first field of the record is set then we have a data message.
75            // Otherwise we have a progress message.
76            for (row, _time, _diff) in data.drain(..) {
77                let mut record = match &row.value {
78                    Some(Ok(row)) => row.iter(),
79                    Some(Err(err)) => {
80                        error!("Ignoring errored record: {err}");
81                        continue;
82                    }
83                    None => continue,
84                };
85                let message = match (record.next().unwrap(), record.next().unwrap()) {
86                    (Datum::List(datum_updates), Datum::Null) => {
87                        let mut updates = vec![];
88                        for update in datum_updates.iter() {
89                            let mut update = update.unwrap_list().iter();
90                            let data = update.next().unwrap().unwrap_list();
91                            let time = update.next().unwrap().unwrap_int64();
92                            let diff = Diff::from(update.next().unwrap().unwrap_int64());
93
94                            row_buf.packer().extend(data);
95                            let data = row_buf.clone();
96                            let time = u64::try_from(time).expect("non-negative");
97                            let time = mz_repr::Timestamp::from(time);
98                            updates.push((data, time, diff));
99                        }
100                        Message::Updates(updates)
101                    }
102                    (Datum::Null, Datum::List(progress)) => {
103                        let mut progress = progress.iter();
104                        let mut lower = vec![];
105                        for time in progress.next().unwrap().unwrap_list() {
106                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
107                            lower.push(mz_repr::Timestamp::from(time));
108                        }
109                        let mut upper = vec![];
110                        for time in progress.next().unwrap().unwrap_list() {
111                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
112                            upper.push(mz_repr::Timestamp::from(time));
113                        }
114                        let mut counts = vec![];
115                        for pair in progress.next().unwrap().unwrap_list() {
116                            let mut pair = pair.unwrap_list().iter();
117                            let time = pair.next().unwrap().unwrap_int64();
118                            let count = pair.next().unwrap().unwrap_int64();
119
120                            let time = u64::try_from(time).expect("non-negative");
121                            let count = usize::try_from(count).expect("non-negative");
122                            counts.push((mz_repr::Timestamp::from(time), count));
123                        }
124                        let progress = Progress {
125                            lower,
126                            upper,
127                            counts,
128                        };
129                        Message::Progress(progress)
130                    }
131                    _ => unreachable!("invalid input"),
132                };
133                channel_tx.borrow_mut().push_back(message);
134            }
135        });
136        if let Some(activator) = activator_get.borrow_mut().as_mut() {
137            activator.activate().unwrap()
138        }
139    });
140
141    struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
142    impl<T> Iterator for VdIterator<T> {
143        type Item = T;
144        fn next(&mut self) -> Option<T> {
145            self.0.borrow_mut().pop_front()
146        }
147    }
148    // this operator returns a thread-safe drop-token
149    let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
150        *activator_set.borrow_mut() = Some(ac);
151        YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
152    });
153
154    // The token returned by DD's operator is not compatible with the shutdown mechanism used in
155    // storage so we create a dummy operator to hold onto that token.
156    let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
157    let button = builder.build(move |_caps| async move {
158        let _dd_token = token;
159        // Keep this operator around until shutdown
160        std::future::pending::<()>().await;
161    });
162    (stream.as_collection(), button.press_on_drop())
163}
164
165/// An iterator that yields with a `None` every so often.
166pub struct YieldingIter<I> {
167    /// When set, a time after which we should return `None`.
168    start: Option<std::time::Instant>,
169    after: Duration,
170    iter: I,
171}
172
173impl<I> YieldingIter<I> {
174    /// Construct a yielding iterator from an inter-yield duration.
175    pub fn new_from(iter: I, yield_after: Duration) -> Self {
176        Self {
177            start: None,
178            after: yield_after,
179            iter,
180        }
181    }
182}
183
184impl<I: Iterator> Iterator for YieldingIter<I> {
185    type Item = I::Item;
186    fn next(&mut self) -> Option<Self::Item> {
187        if self.start.is_none() {
188            self.start = Some(std::time::Instant::now());
189        }
190        let start = self.start.as_ref().unwrap();
191        if start.elapsed() > self.after {
192            self.start = None;
193            None
194        } else {
195            match self.iter.next() {
196                Some(x) => Some(x),
197                None => {
198                    self.start = None;
199                    None
200                }
201            }
202        }
203    }
204}
205
206// These don't know how to find delimiters --
207// they just go from sequences of vectors of bytes (for which we already know the delimiters)
208// to rows, and can eventually just be planned as `HirRelationExpr::Map`. (TODO)
209#[derive(Debug)]
210pub(crate) enum PreDelimitedFormat {
211    Bytes,
212    Text,
213    Json,
214    Regex(Regex, Row),
215    Protobuf(ProtobufDecoderState),
216}
217
218impl PreDelimitedFormat {
219    pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
220        match self {
221            PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
222            PreDelimitedFormat::Json => {
223                let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
224                    DecodeErrorKind::Bytes(
225                        format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
226                    )
227                })?;
228                Ok(Some(j.into_row()))
229            }
230            PreDelimitedFormat::Text => {
231                let s = std::str::from_utf8(bytes)
232                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
233                Ok(Some(Row::pack(Some(Datum::String(s)))))
234            }
235            PreDelimitedFormat::Regex(regex, row_buf) => {
236                let s = std::str::from_utf8(bytes)
237                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
238                let captures = match regex.captures(s) {
239                    Some(captures) => captures,
240                    None => return Ok(None),
241                };
242                row_buf.packer().extend(
243                    captures
244                        .iter()
245                        .skip(1)
246                        .map(|c| Datum::from(c.map(|c| c.as_str()))),
247                );
248                Ok(Some(row_buf.clone()))
249            }
250            PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
251        }
252    }
253}
254
255#[derive(Debug)]
256pub(crate) enum DataDecoderInner {
257    Avro(AvroDecoderState),
258    DelimitedBytes {
259        delimiter: u8,
260        format: PreDelimitedFormat,
261    },
262    Csv(CsvDecoderState),
263
264    PreDelimited(PreDelimitedFormat),
265}
266
267#[derive(Debug)]
268struct DataDecoder {
269    inner: DataDecoderInner,
270    metrics: DecodeMetricDefs,
271}
272
273impl DataDecoder {
274    pub async fn next(
275        &mut self,
276        bytes: &mut &[u8],
277    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
278        let result = match &mut self.inner {
279            DataDecoderInner::DelimitedBytes { delimiter, format } => {
280                match bytes.iter().position(|&byte| byte == *delimiter) {
281                    Some(chunk_idx) => {
282                        let data = &bytes[0..chunk_idx];
283                        *bytes = &bytes[chunk_idx + 1..];
284                        format.decode(data)
285                    }
286                    None => Ok(None),
287                }
288            }
289            DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
290            DataDecoderInner::Csv(csv) => csv.decode(bytes),
291            DataDecoderInner::PreDelimited(format) => {
292                let result = format.decode(*bytes);
293                *bytes = &[];
294                result
295            }
296        };
297        Ok(result)
298    }
299
300    /// Get the next record if it exists, assuming an EOF has occurred.
301    ///
302    /// This is distinct from `next` because, for example, a CSV record should be returned even if it
303    /// does not end in a newline.
304    pub fn eof(
305        &mut self,
306        bytes: &mut &[u8],
307    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
308        let result = match &mut self.inner {
309            DataDecoderInner::Csv(csv) => {
310                let result = csv.decode(bytes);
311                csv.reset_for_new_object();
312                result
313            }
314            DataDecoderInner::DelimitedBytes { format, .. } => {
315                let data = std::mem::take(bytes);
316                // If we hit EOF with no bytes left in the buffer it means the file had a trailing
317                // \n character that can be ignored. Otherwise, we decode the final bytes as normal
318                if data.is_empty() {
319                    Ok(None)
320                } else {
321                    format.decode(data)
322                }
323            }
324            _ => Ok(None),
325        };
326        Ok(result)
327    }
328
329    pub fn log_errors(&self, n: usize) {
330        self.metrics.count_errors(&self.inner, n);
331    }
332
333    pub fn log_successes(&self, n: usize) {
334        self.metrics.count_successes(&self.inner, n);
335    }
336}
337
338async fn get_decoder(
339    encoding: DataEncoding,
340    debug_name: &str,
341    // Information about optional transformations that can be eagerly done.
342    // If the decoding elects to perform them, it should replace this with
343    // `None`.
344    is_connection_delimited: bool,
345    metrics: DecodeMetricDefs,
346    storage_configuration: &StorageConfiguration,
347) -> Result<DataDecoder, CsrConnectError> {
348    let decoder = match encoding {
349        DataEncoding::Avro(AvroEncoding {
350            schema,
351            reference_schemas,
352            csr_connection,
353            confluent_wire_format,
354        }) => {
355            let csr_client = match csr_connection {
356                None => None,
357                Some(csr_connection) => {
358                    let csr_client = csr_connection
359                        .connect(storage_configuration, InTask::Yes)
360                        .await?;
361                    Some(csr_client)
362                }
363            };
364            let state = avro::AvroDecoderState::new(
365                &schema,
366                &reference_schemas,
367                csr_client,
368                debug_name.to_string(),
369                confluent_wire_format,
370            )
371            .expect("Failed to create avro decoder, even though we validated ccsr client creation in purification.");
372            DataDecoder {
373                inner: DataDecoderInner::Avro(state),
374                metrics,
375            }
376        }
377        DataEncoding::Text
378        | DataEncoding::Bytes
379        | DataEncoding::Json
380        | DataEncoding::Protobuf(_)
381        | DataEncoding::Regex(_) => {
382            let after_delimiting = match encoding {
383                DataEncoding::Regex(RegexEncoding { regex }) => {
384                    PreDelimitedFormat::Regex(regex.regex, Default::default())
385                }
386                DataEncoding::Protobuf(encoding) => {
387                    PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
388                        "Failed to create protobuf decoder, even though we validated ccsr \
389                                    client creation in purification.",
390                    ))
391                }
392                DataEncoding::Bytes => PreDelimitedFormat::Bytes,
393                DataEncoding::Json => PreDelimitedFormat::Json,
394                DataEncoding::Text => PreDelimitedFormat::Text,
395                _ => unreachable!(),
396            };
397            let inner = if is_connection_delimited {
398                DataDecoderInner::PreDelimited(after_delimiting)
399            } else {
400                DataDecoderInner::DelimitedBytes {
401                    delimiter: b'\n',
402                    format: after_delimiting,
403                }
404            };
405            DataDecoder { inner, metrics }
406        }
407        DataEncoding::Csv(enc) => {
408            let state = CsvDecoderState::new(enc);
409            DataDecoder {
410                inner: DataDecoderInner::Csv(state),
411                metrics,
412            }
413        }
414    };
415    Ok(decoder)
416}
417
418async fn decode_delimited(
419    decoder: &mut DataDecoder,
420    buf: &[u8],
421) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
422    let mut remaining_buf = buf;
423    let value = decoder.next(&mut remaining_buf).await?;
424
425    let result = match value {
426        Ok(value) => {
427            if remaining_buf.is_empty() {
428                match value {
429                    Some(value) => Ok(Some(value)),
430                    None => decoder.eof(&mut remaining_buf)?,
431                }
432            } else {
433                Err(DecodeErrorKind::Text(
434                    format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
435                        .into(),
436                ))
437            }
438        }
439        Err(err) => Err(err),
440    };
441
442    Ok(result.map_err(|inner| DecodeError {
443        kind: inner,
444        raw: buf.to_vec(),
445    }))
446}
447
448/// Decode already delimited records of data.
449///
450/// Precondition: each record in the stream has at most one key and at most one value.
451/// This function is useful mainly for decoding data from systems like Kafka,
452/// that have already separated the stream into records/messages/etc. before we
453/// decode them.
454///
455/// Because we expect the upstream connection to have already delimited the data,
456/// we return an error here if the decoder does not consume all the bytes. This
457/// often lets us, for example, detect when Avro decoding has gone off the rails
458/// (which is not always possible otherwise, since often gibberish strings can be interpreted as Avro,
459///  so the only signal is how many bytes you managed to decode).
460pub fn render_decode_delimited<G: Scope, FromTime: Timestamp>(
461    input: VecCollection<G, SourceOutput<FromTime>, Diff>,
462    key_encoding: Option<DataEncoding>,
463    value_encoding: DataEncoding,
464    debug_name: String,
465    metrics: DecodeMetricDefs,
466    storage_configuration: StorageConfiguration,
467) -> (
468    VecCollection<G, DecodeResult<FromTime>, Diff>,
469    StreamVec<G, HealthStatusMessage>,
470) {
471    let op_name = format!(
472        "{}{}DecodeDelimited",
473        key_encoding
474            .as_ref()
475            .map(|key_encoding| key_encoding.op_name())
476            .unwrap_or(""),
477        value_encoding.op_name()
478    );
479    let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
480
481    let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
482
483    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
484    let mut input = builder.new_input_for(input.inner, Exchange::new(dist), &output_handle);
485
486    let (_, transient_errors) = builder.build_fallible(move |caps| {
487        Box::pin(async move {
488            let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
489
490            let mut key_decoder = match key_encoding {
491                Some(encoding) => Some(
492                    get_decoder(
493                        encoding,
494                        &debug_name,
495                        true,
496                        metrics.clone(),
497                        &storage_configuration,
498                    )
499                    .await?,
500                ),
501                None => None,
502            };
503
504            let mut value_decoder = get_decoder(
505                value_encoding,
506                &debug_name,
507                true,
508                metrics,
509                &storage_configuration,
510            )
511            .await?;
512
513            let mut output_container = Vec::new();
514
515            while let Some(event) = input.next().await {
516                match event {
517                    AsyncEvent::Data(cap, data) => {
518                        let mut n_errors = 0;
519                        let mut n_successes = 0;
520                        for (output, ts, diff) in data.iter() {
521                            let key_buf = match output.key.unpack_first() {
522                                Datum::Bytes(buf) => Some(buf),
523                                Datum::Null => None,
524                                d => unreachable!("invalid datum: {d}"),
525                            };
526
527                            let key = match key_decoder.as_mut().zip(key_buf) {
528                                Some((decoder, buf)) => {
529                                    decode_delimited(decoder, buf).await?.transpose()
530                                }
531                                None => None,
532                            };
533
534                            let value = match output.value.unpack_first() {
535                                Datum::Bytes(buf) => {
536                                    decode_delimited(&mut value_decoder, buf).await?.transpose()
537                                }
538                                Datum::Null => None,
539                                d => unreachable!("invalid datum: {d}"),
540                            };
541
542                            if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
543                                n_errors += 1;
544                            } else if matches!(&value, Some(Ok(_))) {
545                                n_successes += 1;
546                            }
547
548                            let result = DecodeResult {
549                                key,
550                                value,
551                                metadata: output.metadata.clone(),
552                                from_time: output.from_time.clone(),
553                            };
554                            output_container.push((result, ts.clone(), *diff));
555                        }
556
557                        // Matching historical practice, we only log metrics on the value decoder.
558                        if n_errors > 0 {
559                            value_decoder.log_errors(n_errors);
560                        }
561                        if n_successes > 0 {
562                            value_decoder.log_successes(n_successes);
563                        }
564
565                        output_handle.give_container(&cap, &mut output_container);
566                    }
567                    AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
568                }
569            }
570
571            Ok(())
572        })
573    });
574
575    let health = transient_errors.map(|err: Rc<CsrConnectError>| {
576        let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
577        HealthStatusMessage {
578            id: None,
579            namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
580                StatusNamespace::Ssh
581            } else {
582                StatusNamespace::Decode
583            },
584            update: halt_status,
585        }
586    });
587
588    (output.as_collection(), health)
589}