Skip to main content

mz_storage/
decode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module provides functions that
11//! build decoding pipelines from raw source streams.
12//!
13//! The primary exports are [`render_decode_delimited`], and
14//! [`render_decode_cdcv2`]. See their docs for more details about their differences.
15
16use std::cell::RefCell;
17use std::collections::VecDeque;
18use std::rc::Rc;
19use std::time::Duration;
20
21use differential_dataflow::capture::{Message, Progress};
22use differential_dataflow::{AsCollection, Hashable, VecCollection};
23use futures::StreamExt;
24use mz_ore::error::ErrorExt;
25use mz_ore::future::InTask;
26use mz_repr::{Datum, Diff, Row};
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::errors::{CsrConnectError, DecodeError, DecodeErrorKind};
29use mz_storage_types::sources::encoding::{AvroEncoding, DataEncoding, RegexEncoding};
30use mz_timely_util::builder_async::{
31    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use regex::Regex;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::StreamVec;
36use timely::dataflow::channels::pact::Exchange;
37use timely::dataflow::operators::Operator;
38use timely::dataflow::operators::vec::Map;
39use timely::progress::Timestamp;
40use timely::scheduling::SyncActivator;
41use tracing::error;
42
43use crate::decode::avro::AvroDecoderState;
44use crate::decode::csv::CsvDecoderState;
45use crate::decode::protobuf::ProtobufDecoderState;
46use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
47use crate::metrics::decode::DecodeMetricDefs;
48use crate::source::types::{DecodeResult, SourceOutput};
49
50mod avro;
51mod csv;
52mod protobuf;
53
54/// Decode delimited CDCv2 messages.
55///
56/// This not only literally decodes the avro-encoded messages, but
57/// also builds a differential dataflow collection that respects the
58/// data and progress messages in the underlying CDCv2 stream.
59pub fn render_decode_cdcv2<'scope, FromTime: Timestamp>(
60    input: &VecCollection<'scope, mz_repr::Timestamp, DecodeResult<FromTime>, Diff>,
61) -> (
62    VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
63    PressOnDropButton,
64) {
65    let channel_rx = Rc::new(RefCell::new(VecDeque::new()));
66    let activator_set: Rc<RefCell<Option<SyncActivator>>> = Rc::new(RefCell::new(None));
67
68    let mut row_buf = Row::default();
69    let channel_tx = Rc::clone(&channel_rx);
70    let activator_get = Rc::clone(&activator_set);
71    let pact = Exchange::new(|(x, _, _): &(DecodeResult<FromTime>, _, _)| x.key.hashed());
72    let input2 = input.inner.clone();
73    input2.sink(pact, "CDCv2Unpack", move |(input, _)| {
74        input.for_each(|_time, data| {
75            // The inputs are rows containing two columns that encode an enum, i.e only one of them
76            // is ever set while the other is unset. This is the convention we follow in our Avro
77            // decoder. When the first field of the record is set then we have a data message.
78            // Otherwise we have a progress message.
79            for (row, _time, _diff) in data.drain(..) {
80                let mut record = match &row.value {
81                    Some(Ok(row)) => row.iter(),
82                    Some(Err(err)) => {
83                        error!("Ignoring errored record: {err}");
84                        continue;
85                    }
86                    None => continue,
87                };
88                let message = match (record.next().unwrap(), record.next().unwrap()) {
89                    (Datum::List(datum_updates), Datum::Null) => {
90                        let mut updates = vec![];
91                        for update in datum_updates.iter() {
92                            let mut update = update.unwrap_list().iter();
93                            let data = update.next().unwrap().unwrap_list();
94                            let time = update.next().unwrap().unwrap_int64();
95                            let diff = Diff::from(update.next().unwrap().unwrap_int64());
96
97                            row_buf.packer().extend(data);
98                            let data = row_buf.clone();
99                            let time = u64::try_from(time).expect("non-negative");
100                            let time = mz_repr::Timestamp::from(time);
101                            updates.push((data, time, diff));
102                        }
103                        Message::Updates(updates)
104                    }
105                    (Datum::Null, Datum::List(progress)) => {
106                        let mut progress = progress.iter();
107                        let mut lower = vec![];
108                        for time in progress.next().unwrap().unwrap_list() {
109                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
110                            lower.push(mz_repr::Timestamp::from(time));
111                        }
112                        let mut upper = vec![];
113                        for time in progress.next().unwrap().unwrap_list() {
114                            let time = u64::try_from(time.unwrap_int64()).expect("non-negative");
115                            upper.push(mz_repr::Timestamp::from(time));
116                        }
117                        let mut counts = vec![];
118                        for pair in progress.next().unwrap().unwrap_list() {
119                            let mut pair = pair.unwrap_list().iter();
120                            let time = pair.next().unwrap().unwrap_int64();
121                            let count = pair.next().unwrap().unwrap_int64();
122
123                            let time = u64::try_from(time).expect("non-negative");
124                            let count = usize::try_from(count).expect("non-negative");
125                            counts.push((mz_repr::Timestamp::from(time), count));
126                        }
127                        let progress = Progress {
128                            lower,
129                            upper,
130                            counts,
131                        };
132                        Message::Progress(progress)
133                    }
134                    _ => unreachable!("invalid input"),
135                };
136                channel_tx.borrow_mut().push_back(message);
137            }
138        });
139        if let Some(activator) = activator_get.borrow_mut().as_mut() {
140            activator.activate().unwrap()
141        }
142    });
143
144    struct VdIterator<T>(Rc<RefCell<VecDeque<T>>>);
145    impl<T> Iterator for VdIterator<T> {
146        type Item = T;
147        fn next(&mut self) -> Option<T> {
148            self.0.borrow_mut().pop_front()
149        }
150    }
151    // this operator returns a thread-safe drop-token
152    let (token, stream) = differential_dataflow::capture::source::build(input.scope(), move |ac| {
153        *activator_set.borrow_mut() = Some(ac);
154        YieldingIter::new_from(VdIterator(channel_rx), Duration::from_millis(10))
155    });
156
157    // The token returned by DD's operator is not compatible with the shutdown mechanism used in
158    // storage so we create a dummy operator to hold onto that token.
159    let builder = AsyncOperatorBuilder::new("CDCv2-Token".to_owned(), input.scope());
160    let button = builder.build(move |_caps| async move {
161        let _dd_token = token;
162        // Keep this operator around until shutdown
163        std::future::pending::<()>().await;
164    });
165    (stream.as_collection(), button.press_on_drop())
166}
167
168/// An iterator that yields with a `None` every so often.
169pub struct YieldingIter<I> {
170    /// When set, a time after which we should return `None`.
171    start: Option<std::time::Instant>,
172    after: Duration,
173    iter: I,
174}
175
176impl<I> YieldingIter<I> {
177    /// Construct a yielding iterator from an inter-yield duration.
178    pub fn new_from(iter: I, yield_after: Duration) -> Self {
179        Self {
180            start: None,
181            after: yield_after,
182            iter,
183        }
184    }
185}
186
187impl<I: Iterator> Iterator for YieldingIter<I> {
188    type Item = I::Item;
189    fn next(&mut self) -> Option<Self::Item> {
190        if self.start.is_none() {
191            self.start = Some(std::time::Instant::now());
192        }
193        let start = self.start.as_ref().unwrap();
194        if start.elapsed() > self.after {
195            self.start = None;
196            None
197        } else {
198            match self.iter.next() {
199                Some(x) => Some(x),
200                None => {
201                    self.start = None;
202                    None
203                }
204            }
205        }
206    }
207}
208
209// These don't know how to find delimiters --
210// they just go from sequences of vectors of bytes (for which we already know the delimiters)
211// to rows, and can eventually just be planned as `HirRelationExpr::Map`. (TODO)
212#[derive(Debug)]
213pub(crate) enum PreDelimitedFormat {
214    Bytes,
215    Text,
216    Json,
217    Regex(Regex, Row),
218    Protobuf(ProtobufDecoderState),
219}
220
221impl PreDelimitedFormat {
222    pub fn decode(&mut self, bytes: &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
223        match self {
224            PreDelimitedFormat::Bytes => Ok(Some(Row::pack(Some(Datum::Bytes(bytes))))),
225            PreDelimitedFormat::Json => {
226                let j = mz_repr::adt::jsonb::Jsonb::from_slice(bytes).map_err(|e| {
227                    DecodeErrorKind::Bytes(
228                        format!("Failed to decode JSON: {}", e.display_with_causes(),).into(),
229                    )
230                })?;
231                Ok(Some(j.into_row()))
232            }
233            PreDelimitedFormat::Text => {
234                let s = std::str::from_utf8(bytes)
235                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
236                Ok(Some(Row::pack(Some(Datum::String(s)))))
237            }
238            PreDelimitedFormat::Regex(regex, row_buf) => {
239                let s = std::str::from_utf8(bytes)
240                    .map_err(|_| DecodeErrorKind::Text("Failed to decode UTF-8".into()))?;
241                let captures = match regex.captures(s) {
242                    Some(captures) => captures,
243                    None => return Ok(None),
244                };
245                row_buf.packer().extend(
246                    captures
247                        .iter()
248                        .skip(1)
249                        .map(|c| Datum::from(c.map(|c| c.as_str()))),
250                );
251                Ok(Some(row_buf.clone()))
252            }
253            PreDelimitedFormat::Protobuf(pb) => pb.get_value(bytes).transpose(),
254        }
255    }
256}
257
258#[derive(Debug)]
259pub(crate) enum DataDecoderInner {
260    Avro(AvroDecoderState),
261    DelimitedBytes {
262        delimiter: u8,
263        format: PreDelimitedFormat,
264    },
265    Csv(CsvDecoderState),
266
267    PreDelimited(PreDelimitedFormat),
268}
269
270#[derive(Debug)]
271struct DataDecoder {
272    inner: DataDecoderInner,
273    metrics: DecodeMetricDefs,
274}
275
276impl DataDecoder {
277    pub async fn next(
278        &mut self,
279        bytes: &mut &[u8],
280    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
281        let result = match &mut self.inner {
282            DataDecoderInner::DelimitedBytes { delimiter, format } => {
283                match bytes.iter().position(|&byte| byte == *delimiter) {
284                    Some(chunk_idx) => {
285                        let data = &bytes[0..chunk_idx];
286                        *bytes = &bytes[chunk_idx + 1..];
287                        format.decode(data)
288                    }
289                    None => Ok(None),
290                }
291            }
292            DataDecoderInner::Avro(avro) => avro.decode(bytes).await?,
293            DataDecoderInner::Csv(csv) => csv.decode(bytes),
294            DataDecoderInner::PreDelimited(format) => {
295                let result = format.decode(*bytes);
296                *bytes = &[];
297                result
298            }
299        };
300        Ok(result)
301    }
302
303    /// Get the next record if it exists, assuming an EOF has occurred.
304    ///
305    /// This is distinct from `next` because, for example, a CSV record should be returned even if it
306    /// does not end in a newline.
307    pub fn eof(
308        &mut self,
309        bytes: &mut &[u8],
310    ) -> Result<Result<Option<Row>, DecodeErrorKind>, CsrConnectError> {
311        let result = match &mut self.inner {
312            DataDecoderInner::Csv(csv) => {
313                let result = csv.decode(bytes);
314                csv.reset_for_new_object();
315                result
316            }
317            DataDecoderInner::DelimitedBytes { format, .. } => {
318                let data = std::mem::take(bytes);
319                // If we hit EOF with no bytes left in the buffer it means the file had a trailing
320                // \n character that can be ignored. Otherwise, we decode the final bytes as normal
321                if data.is_empty() {
322                    Ok(None)
323                } else {
324                    format.decode(data)
325                }
326            }
327            _ => Ok(None),
328        };
329        Ok(result)
330    }
331
332    pub fn log_errors(&self, n: usize) {
333        self.metrics.count_errors(&self.inner, n);
334    }
335
336    pub fn log_successes(&self, n: usize) {
337        self.metrics.count_successes(&self.inner, n);
338    }
339}
340
341async fn get_decoder(
342    encoding: DataEncoding,
343    debug_name: &str,
344    // Information about optional transformations that can be eagerly done.
345    // If the decoding elects to perform them, it should replace this with
346    // `None`.
347    is_connection_delimited: bool,
348    metrics: DecodeMetricDefs,
349    storage_configuration: &StorageConfiguration,
350) -> Result<DataDecoder, CsrConnectError> {
351    let decoder = match encoding {
352        DataEncoding::Avro(AvroEncoding {
353            schema,
354            reference_schemas,
355            csr_connection,
356            confluent_wire_format,
357        }) => {
358            let csr_client = match csr_connection {
359                None => None,
360                Some(csr_connection) => {
361                    let csr_client = csr_connection
362                        .connect(storage_configuration, InTask::Yes)
363                        .await?;
364                    Some(csr_client)
365                }
366            };
367            let state = avro::AvroDecoderState::new(
368                &schema,
369                &reference_schemas,
370                csr_client,
371                debug_name.to_string(),
372                confluent_wire_format,
373            )
374            .expect("Failed to create avro decoder, even though we validated ccsr client creation in purification.");
375            DataDecoder {
376                inner: DataDecoderInner::Avro(state),
377                metrics,
378            }
379        }
380        DataEncoding::Text
381        | DataEncoding::Bytes
382        | DataEncoding::Json
383        | DataEncoding::Protobuf(_)
384        | DataEncoding::Regex(_) => {
385            let after_delimiting = match encoding {
386                DataEncoding::Regex(RegexEncoding { regex }) => {
387                    PreDelimitedFormat::Regex(regex.regex, Default::default())
388                }
389                DataEncoding::Protobuf(encoding) => {
390                    PreDelimitedFormat::Protobuf(ProtobufDecoderState::new(encoding).expect(
391                        "Failed to create protobuf decoder, even though we validated ccsr \
392                                    client creation in purification.",
393                    ))
394                }
395                DataEncoding::Bytes => PreDelimitedFormat::Bytes,
396                DataEncoding::Json => PreDelimitedFormat::Json,
397                DataEncoding::Text => PreDelimitedFormat::Text,
398                _ => unreachable!(),
399            };
400            let inner = if is_connection_delimited {
401                DataDecoderInner::PreDelimited(after_delimiting)
402            } else {
403                DataDecoderInner::DelimitedBytes {
404                    delimiter: b'\n',
405                    format: after_delimiting,
406                }
407            };
408            DataDecoder { inner, metrics }
409        }
410        DataEncoding::Csv(enc) => {
411            let state = CsvDecoderState::new(enc);
412            DataDecoder {
413                inner: DataDecoderInner::Csv(state),
414                metrics,
415            }
416        }
417    };
418    Ok(decoder)
419}
420
421async fn decode_delimited(
422    decoder: &mut DataDecoder,
423    buf: &[u8],
424) -> Result<Result<Option<Row>, DecodeError>, CsrConnectError> {
425    let mut remaining_buf = buf;
426    let value = decoder.next(&mut remaining_buf).await?;
427
428    let result = match value {
429        Ok(value) => {
430            if remaining_buf.is_empty() {
431                match value {
432                    Some(value) => Ok(Some(value)),
433                    None => decoder.eof(&mut remaining_buf)?,
434                }
435            } else {
436                Err(DecodeErrorKind::Text(
437                    format!("Unexpected bytes remaining for decoded value: {remaining_buf:?}")
438                        .into(),
439                ))
440            }
441        }
442        Err(err) => Err(err),
443    };
444
445    Ok(result.map_err(|inner| DecodeError {
446        kind: inner,
447        raw: buf.to_vec(),
448    }))
449}
450
451/// Decode already delimited records of data.
452///
453/// Precondition: each record in the stream has at most one key and at most one value.
454/// This function is useful mainly for decoding data from systems like Kafka,
455/// that have already separated the stream into records/messages/etc. before we
456/// decode them.
457///
458/// Because we expect the upstream connection to have already delimited the data,
459/// we return an error here if the decoder does not consume all the bytes. This
460/// often lets us, for example, detect when Avro decoding has gone off the rails
461/// (which is not always possible otherwise, since often gibberish strings can be interpreted as Avro,
462///  so the only signal is how many bytes you managed to decode).
463pub fn render_decode_delimited<'scope, T: Timestamp, FromTime: Timestamp>(
464    input: VecCollection<'scope, T, SourceOutput<FromTime>, Diff>,
465    key_encoding: Option<DataEncoding>,
466    value_encoding: DataEncoding,
467    debug_name: String,
468    metrics: DecodeMetricDefs,
469    storage_configuration: StorageConfiguration,
470) -> (
471    VecCollection<'scope, T, DecodeResult<FromTime>, Diff>,
472    StreamVec<'scope, T, HealthStatusMessage>,
473) {
474    let op_name = format!(
475        "{}{}DecodeDelimited",
476        key_encoding
477            .as_ref()
478            .map(|key_encoding| key_encoding.op_name())
479            .unwrap_or(""),
480        value_encoding.op_name()
481    );
482    let dist = |(x, _, _): &(SourceOutput<FromTime>, _, _)| x.value.hashed();
483
484    let mut builder = AsyncOperatorBuilder::new(op_name, input.scope());
485
486    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
487    let mut input = builder.new_input_for(input.inner, Exchange::new(dist), &output_handle);
488
489    let (_, transient_errors) = builder.build_fallible(move |caps| {
490        Box::pin(async move {
491            let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
492
493            let mut key_decoder = match key_encoding {
494                Some(encoding) => Some(
495                    get_decoder(
496                        encoding,
497                        &debug_name,
498                        true,
499                        metrics.clone(),
500                        &storage_configuration,
501                    )
502                    .await?,
503                ),
504                None => None,
505            };
506
507            let mut value_decoder = get_decoder(
508                value_encoding,
509                &debug_name,
510                true,
511                metrics,
512                &storage_configuration,
513            )
514            .await?;
515
516            let mut output_container = Vec::new();
517
518            while let Some(event) = input.next().await {
519                match event {
520                    AsyncEvent::Data(cap, data) => {
521                        let mut n_errors = 0;
522                        let mut n_successes = 0;
523                        for (output, ts, diff) in data.iter() {
524                            let key_buf = match output.key.unpack_first() {
525                                Datum::Bytes(buf) => Some(buf),
526                                Datum::Null => None,
527                                d => unreachable!("invalid datum: {d}"),
528                            };
529
530                            let key = match key_decoder.as_mut().zip(key_buf) {
531                                Some((decoder, buf)) => {
532                                    decode_delimited(decoder, buf).await?.transpose()
533                                }
534                                None => None,
535                            };
536
537                            let value = match output.value.unpack_first() {
538                                Datum::Bytes(buf) => {
539                                    decode_delimited(&mut value_decoder, buf).await?.transpose()
540                                }
541                                Datum::Null => None,
542                                d => unreachable!("invalid datum: {d}"),
543                            };
544
545                            if matches!(&key, Some(Err(_))) || matches!(&value, Some(Err(_))) {
546                                n_errors += 1;
547                            } else if matches!(&value, Some(Ok(_))) {
548                                n_successes += 1;
549                            }
550
551                            let result = DecodeResult {
552                                key,
553                                value,
554                                metadata: output.metadata.clone(),
555                                from_time: output.from_time.clone(),
556                            };
557                            output_container.push((result, ts.clone(), *diff));
558                        }
559
560                        // Matching historical practice, we only log metrics on the value decoder.
561                        if n_errors > 0 {
562                            value_decoder.log_errors(n_errors);
563                        }
564                        if n_successes > 0 {
565                            value_decoder.log_successes(n_successes);
566                        }
567
568                        output_handle.give_container(&cap, &mut output_container);
569                    }
570                    AsyncEvent::Progress(frontier) => cap_set.downgrade(frontier.iter()),
571                }
572            }
573
574            Ok(())
575        })
576    });
577
578    let health = transient_errors.map(|err: Rc<CsrConnectError>| {
579        let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
580        HealthStatusMessage {
581            id: None,
582            namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
583                StatusNamespace::Ssh
584            } else {
585                StatusNamespace::Decode
586            },
587            update: halt_status,
588        }
589    });
590
591    (output.as_collection(), health)
592}