mz_storage_types/
errors.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use std::error::Error;
12use std::fmt::Display;
13
14use bytes::BufMut;
15use mz_expr::EvalError;
16use mz_kafka_util::client::TunnelingClientContext;
17use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
18use mz_repr::Row;
19use mz_ssh_util::tunnel::SshTunnelStatus;
20use proptest_derive::Arbitrary;
21use prost::Message;
22use rdkafka::error::KafkaError;
23use serde::{Deserialize, Serialize};
24use tracing::warn;
25
26include!(concat!(env!("OUT_DIR"), "/mz_storage_types.errors.rs"));
27
28/// The underlying data was not decodable in the format we expected: eg.
29/// invalid JSON or Avro data that doesn't match a schema.
30#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
31pub struct DecodeError {
32    pub kind: DecodeErrorKind,
33    pub raw: Vec<u8>,
34}
35
36impl RustType<ProtoDecodeError> for DecodeError {
37    fn into_proto(&self) -> ProtoDecodeError {
38        ProtoDecodeError {
39            kind: Some(RustType::into_proto(&self.kind)),
40            raw: Some(self.raw.clone()),
41        }
42    }
43
44    fn from_proto(proto: ProtoDecodeError) -> Result<Self, TryFromProtoError> {
45        let kind = match proto.kind {
46            Some(kind) => RustType::from_proto(kind)?,
47            None => return Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
48        };
49        let raw = proto.raw.into_rust_if_some("raw")?;
50        Ok(Self { kind, raw })
51    }
52}
53
54impl DecodeError {
55    pub fn encode<B>(&self, buf: &mut B)
56    where
57        B: BufMut,
58    {
59        self.into_proto()
60            .encode(buf)
61            .expect("no required fields means no initialization errors")
62    }
63
64    pub fn decode(buf: &[u8]) -> Result<Self, String> {
65        let proto = ProtoDecodeError::decode(buf).map_err(|err| err.to_string())?;
66        proto.into_rust().map_err(|err| err.to_string())
67    }
68}
69
70impl Display for DecodeError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        // See if we can output the bytes that failed to decode as a string.
73        let str_repr = std::str::from_utf8(&self.raw).ok();
74        // strip any NUL characters from the str_repr to prevent an error
75        // in the postgres protocol decoding
76        let str_repr = str_repr.map(|s| s.replace('\0', "NULL"));
77        let bytes_repr = hex::encode(&self.raw);
78        match str_repr {
79            Some(s) => write!(
80                f,
81                "{} (original text: {}, original bytes: {:x?})",
82                self.kind, s, bytes_repr
83            ),
84            None => write!(f, "{} (original bytes: {:x?})", self.kind, bytes_repr),
85        }
86    }
87}
88
89#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
90pub enum DecodeErrorKind {
91    Text(Box<str>),
92    Bytes(Box<str>),
93}
94
95impl RustType<ProtoDecodeErrorKind> for DecodeErrorKind {
96    fn into_proto(&self) -> ProtoDecodeErrorKind {
97        use proto_decode_error_kind::Kind::*;
98        ProtoDecodeErrorKind {
99            kind: Some(match self {
100                DecodeErrorKind::Text(v) => Text(v.into_proto()),
101                DecodeErrorKind::Bytes(v) => Bytes(v.into_proto()),
102            }),
103        }
104    }
105
106    fn from_proto(proto: ProtoDecodeErrorKind) -> Result<Self, TryFromProtoError> {
107        use proto_decode_error_kind::Kind::*;
108        match proto.kind {
109            Some(Text(v)) => Ok(DecodeErrorKind::Text(v.into())),
110            Some(Bytes(v)) => Ok(DecodeErrorKind::Bytes(v.into())),
111            None => Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
112        }
113    }
114}
115
116impl Display for DecodeErrorKind {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            DecodeErrorKind::Text(e) => f.write_str(e),
120            DecodeErrorKind::Bytes(e) => f.write_str(e),
121        }
122    }
123}
124
125/// Errors arising during envelope processing.
126#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
127pub enum EnvelopeError {
128    /// An error that can be retracted by a future message using upsert logic.
129    Upsert(UpsertError),
130    /// Errors corresponding to `ENVELOPE NONE`. Naming this
131    /// `None`, though, would have been too confusing.
132    Flat(Box<str>),
133}
134
135impl RustType<ProtoEnvelopeErrorV1> for EnvelopeError {
136    fn into_proto(&self) -> ProtoEnvelopeErrorV1 {
137        use proto_envelope_error_v1::Kind;
138        ProtoEnvelopeErrorV1 {
139            kind: Some(match self {
140                EnvelopeError::Upsert(rust) => Kind::Upsert(rust.into_proto()),
141                EnvelopeError::Flat(text) => Kind::Flat(text.into_proto()),
142            }),
143        }
144    }
145
146    fn from_proto(proto: ProtoEnvelopeErrorV1) -> Result<Self, TryFromProtoError> {
147        use proto_envelope_error_v1::Kind;
148        match proto.kind {
149            Some(Kind::Upsert(proto)) => {
150                let rust = RustType::from_proto(proto)?;
151                Ok(Self::Upsert(rust))
152            }
153            Some(Kind::Flat(text)) => Ok(Self::Flat(text.into())),
154            None => Err(TryFromProtoError::missing_field(
155                "ProtoEnvelopeErrorV1::kind",
156            )),
157        }
158    }
159}
160
161impl Display for EnvelopeError {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        match self {
164            EnvelopeError::Upsert(err) => write!(f, "Upsert: {err}"),
165            EnvelopeError::Flat(err) => write!(f, "Flat: {err}"),
166        }
167    }
168}
169
170/// An error from a value in an upsert source. The corresponding key is included, allowing
171/// us to reconstruct their entry in the upsert map upon restart.
172#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
173pub struct UpsertValueError {
174    /// The underlying error.
175    pub inner: DecodeError,
176    /// The (good) key associated with the errored value.
177    pub for_key: Row,
178}
179
180impl RustType<ProtoUpsertValueError> for UpsertValueError {
181    fn into_proto(&self) -> ProtoUpsertValueError {
182        ProtoUpsertValueError {
183            inner: Some(self.inner.into_proto()),
184            for_key: Some(self.for_key.into_proto()),
185        }
186    }
187
188    fn from_proto(proto: ProtoUpsertValueError) -> Result<Self, TryFromProtoError> {
189        Ok(UpsertValueError {
190            inner: proto
191                .inner
192                .into_rust_if_some("ProtoUpsertValueError::inner")?,
193            for_key: proto
194                .for_key
195                .into_rust_if_some("ProtoUpsertValueError::for_key")?,
196        })
197    }
198}
199
200impl Display for UpsertValueError {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        let UpsertValueError { inner, for_key } = self;
203        write!(f, "{inner}, decoded key: {for_key:?}")?;
204        Ok(())
205    }
206}
207
208/// A source contained a record with a NULL key, which we don't support.
209#[derive(
210    Arbitrary, Ord, PartialOrd, Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash,
211)]
212pub struct UpsertNullKeyError;
213
214impl RustType<ProtoUpsertNullKeyError> for UpsertNullKeyError {
215    fn into_proto(&self) -> ProtoUpsertNullKeyError {
216        ProtoUpsertNullKeyError {}
217    }
218
219    fn from_proto(_proto: ProtoUpsertNullKeyError) -> Result<Self, TryFromProtoError> {
220        Ok(UpsertNullKeyError)
221    }
222}
223
224impl Display for UpsertNullKeyError {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        write!(
227            f,
228            "record with NULL key in UPSERT source; to retract this error, "
229        )?;
230        write!(
231            f,
232            "produce a record upstream with a NULL key and NULL value"
233        )
234    }
235}
236
237/// An error that can be retracted by a future message using upsert logic.
238#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
239pub enum UpsertError {
240    /// Wrapper around a key decoding error.
241    /// We use this instead of emitting the underlying `DataflowError::DecodeError` because with only
242    /// the underlying error, we can't distinguish between an error with the key and an error
243    /// with the value.
244    ///
245    /// It is necessary to distinguish them because the necessary record to retract them is different.
246    /// `(K, <errored V>)` is retracted by `(K, null)`, whereas `(<errored K>, anything)` is retracted by
247    /// `("bytes", null)`, where "bytes" is the string that failed to correctly decode as a key.
248    KeyDecode(DecodeError),
249    /// Wrapper around an error related to the value.
250    Value(UpsertValueError),
251    NullKey(UpsertNullKeyError),
252}
253
254impl RustType<ProtoUpsertError> for UpsertError {
255    fn into_proto(&self) -> ProtoUpsertError {
256        use proto_upsert_error::Kind;
257        ProtoUpsertError {
258            kind: Some(match self {
259                UpsertError::KeyDecode(err) => Kind::KeyDecode(err.into_proto()),
260                UpsertError::Value(err) => Kind::Value(err.into_proto()),
261                UpsertError::NullKey(err) => Kind::NullKey(err.into_proto()),
262            }),
263        }
264    }
265
266    fn from_proto(proto: ProtoUpsertError) -> Result<Self, TryFromProtoError> {
267        use proto_upsert_error::Kind;
268        match proto.kind {
269            Some(Kind::KeyDecode(proto)) => {
270                let rust = RustType::from_proto(proto)?;
271                Ok(Self::KeyDecode(rust))
272            }
273            Some(Kind::Value(proto)) => {
274                let rust = RustType::from_proto(proto)?;
275                Ok(Self::Value(rust))
276            }
277            Some(Kind::NullKey(proto)) => {
278                let rust = RustType::from_proto(proto)?;
279                Ok(Self::NullKey(rust))
280            }
281            None => Err(TryFromProtoError::missing_field("ProtoUpsertError::kind")),
282        }
283    }
284}
285
286impl Display for UpsertError {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        match self {
289            UpsertError::KeyDecode(err) => write!(f, "Key decode: {err}"),
290            UpsertError::Value(err) => write!(f, "Value error: {err}"),
291            UpsertError::NullKey(err) => write!(f, "Null key: {err}"),
292        }
293    }
294}
295
296/// Source-wide durable errors; for example, a replication log being meaningless or corrupted.
297/// This should _not_ include transient source errors, like connection issues or misconfigurations.
298#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
299pub struct SourceError {
300    pub error: SourceErrorDetails,
301}
302
303impl RustType<ProtoSourceError> for SourceError {
304    fn into_proto(&self) -> ProtoSourceError {
305        ProtoSourceError {
306            error: Some(self.error.into_proto()),
307        }
308    }
309
310    fn from_proto(proto: ProtoSourceError) -> Result<Self, TryFromProtoError> {
311        Ok(SourceError {
312            error: proto.error.into_rust_if_some("ProtoSourceError::error")?,
313        })
314    }
315}
316
317impl Display for SourceError {
318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        self.error.fmt(f)
320    }
321}
322
323#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
324pub enum SourceErrorDetails {
325    Initialization(Box<str>),
326    Other(Box<str>),
327}
328
329impl RustType<ProtoSourceErrorDetails> for SourceErrorDetails {
330    fn into_proto(&self) -> ProtoSourceErrorDetails {
331        use proto_source_error_details::Kind;
332        ProtoSourceErrorDetails {
333            kind: Some(match self {
334                SourceErrorDetails::Initialization(s) => Kind::Initialization(s.into_proto()),
335                SourceErrorDetails::Other(s) => Kind::Other(s.into_proto()),
336            }),
337        }
338    }
339
340    fn from_proto(proto: ProtoSourceErrorDetails) -> Result<Self, TryFromProtoError> {
341        use proto_source_error_details::Kind;
342        match proto.kind {
343            Some(kind) => match kind {
344                Kind::Initialization(s) => Ok(SourceErrorDetails::Initialization(s.into())),
345                Kind::DeprecatedFileIo(s) | Kind::DeprecatedPersistence(s) => {
346                    warn!("Deprecated source error kind: {s}");
347                    Ok(SourceErrorDetails::Other(s.into()))
348                }
349                Kind::Other(s) => Ok(SourceErrorDetails::Other(s.into())),
350            },
351            None => Err(TryFromProtoError::missing_field(
352                "ProtoSourceErrorDetails::kind",
353            )),
354        }
355    }
356}
357
358impl Display for SourceErrorDetails {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        match self {
361            SourceErrorDetails::Initialization(e) => {
362                write!(
363                    f,
364                    "failed during initialization, source must be dropped and recreated: {}",
365                    e
366                )
367            }
368            SourceErrorDetails::Other(e) => {
369                write!(
370                    f,
371                    "source must be dropped and recreated due to failure: {}",
372                    e
373                )
374            }
375        }
376    }
377}
378
379/// An error that's destined to be presented to the user in a differential dataflow collection.
380/// For example, a divide by zero will be visible in the error collection for a particular row.
381///
382/// All of the variants are boxed to minimize the memory size of `DataflowError`. This type is
383/// likely to appear in `Result<Row, DataflowError>`s on high-throughput code paths, so keeping its
384/// size less than or equal to that of `Row` is important to ensure we are not wasting memory.
385#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
386pub enum DataflowError {
387    DecodeError(Box<DecodeError>),
388    EvalError(Box<EvalError>),
389    SourceError(Box<SourceError>),
390    EnvelopeError(Box<EnvelopeError>),
391}
392
393impl Error for DataflowError {}
394
395mod boxed_str {
396
397    use differential_dataflow::containers::Region;
398    use differential_dataflow::containers::StableRegion;
399
400    /// Region allocation for `String` data.
401    ///
402    /// Content bytes are stored in stable contiguous memory locations,
403    /// and then a `String` referencing them is falsified.
404    #[derive(Default)]
405    pub struct BoxStrStack {
406        region: StableRegion<u8>,
407    }
408
409    impl Region for BoxStrStack {
410        type Item = Box<str>;
411        #[inline]
412        fn clear(&mut self) {
413            self.region.clear();
414        }
415        // Removing `(always)` is a 20% performance regression in
416        // the `string10_copy` benchmark.
417        #[inline(always)]
418        unsafe fn copy(&mut self, item: &Box<str>) -> Box<str> {
419            let bytes = self.region.copy_slice(item.as_bytes());
420            // SAFETY: The bytes are copied from the region, and the region is stable.
421            // We never drop the box.
422            std::str::from_boxed_utf8_unchecked(Box::from_raw(bytes))
423        }
424        #[inline(always)]
425        fn reserve_items<'a, I>(&mut self, items: I)
426        where
427            Self: 'a,
428            I: Iterator<Item = &'a Self::Item> + Clone,
429        {
430            self.region.reserve(items.map(|x| x.len()).sum());
431        }
432
433        fn reserve_regions<'a, I>(&mut self, regions: I)
434        where
435            Self: 'a,
436            I: Iterator<Item = &'a Self> + Clone,
437        {
438            self.region.reserve(regions.map(|r| r.region.len()).sum());
439        }
440        #[inline]
441        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
442            self.region.heap_size(callback)
443        }
444    }
445}
446
447mod columnation {
448    use std::iter::once;
449
450    use differential_dataflow::containers::{Columnation, Region, StableRegion};
451    use mz_expr::EvalError;
452    use mz_repr::Row;
453    use mz_repr::adt::range::InvalidRangeError;
454    use mz_repr::strconv::ParseError;
455
456    use crate::errors::boxed_str::BoxStrStack;
457    use crate::errors::{
458        DataflowError, DecodeError, DecodeErrorKind, EnvelopeError, SourceError,
459        SourceErrorDetails, UpsertError, UpsertValueError,
460    };
461
462    impl Columnation for DataflowError {
463        type InnerRegion = DataflowErrorRegion;
464    }
465
466    /// A region to store [`DataflowError`].
467    #[derive(Default)]
468    pub struct DataflowErrorRegion {
469        /// Stable location for [`DecodeError`] for inserting into a box.
470        decode_error_region: StableRegion<DecodeError>,
471        /// Stable location for [`EnvelopeError`] for inserting into a box.
472        envelope_error_region: StableRegion<EnvelopeError>,
473        /// Stable location for [`EvalError`] for inserting into a box.
474        eval_error_region: StableRegion<EvalError>,
475        /// Region for storing rows.
476        row_region: <Row as Columnation>::InnerRegion,
477        /// Stable location for [`SourceError`] for inserting into a box.
478        source_error_region: StableRegion<SourceError>,
479        /// Region for storing strings.
480        string_region: BoxStrStack,
481        /// Region for storing u8 vectors.
482        u8_region: <Vec<u8> as Columnation>::InnerRegion,
483    }
484
485    impl DataflowErrorRegion {
486        /// Copy a decode error into its region, return an owned object.
487        ///
488        /// This is unsafe because the returned value must not be dropped.
489        unsafe fn copy_decode_error(&mut self, decode_error: &DecodeError) -> DecodeError {
490            DecodeError {
491                kind: match &decode_error.kind {
492                    DecodeErrorKind::Text(string) => {
493                        DecodeErrorKind::Text(self.string_region.copy(string))
494                    }
495                    DecodeErrorKind::Bytes(string) => {
496                        DecodeErrorKind::Bytes(self.string_region.copy(string))
497                    }
498                },
499                raw: self.u8_region.copy(&decode_error.raw),
500            }
501        }
502    }
503
504    /// Compile-time assertion that a value is `Copy`.
505    fn assert_copy<T: Copy>(_: &T) {}
506
507    impl Region for DataflowErrorRegion {
508        type Item = DataflowError;
509
510        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
511            // Unsafe Box::from_raw reasoning:
512            // Construct a box from a provided value. This is safe because a box is
513            // a pointer to a memory address, and the value is stored on the heap.
514            // Note that the box must not be dropped.
515
516            // SAFETY: When adding new enum variants, care must be taken that all types containing
517            // references are region-allocated, otherwise we'll leak memory.
518
519            // Types that are `Copy` should be asserted using `assert_copy`, or copied, to detect
520            // changes that introduce pointers.
521
522            let err = match item {
523                DataflowError::DecodeError(err) => {
524                    let err = self.copy_decode_error(&*err);
525                    let reference = self.decode_error_region.copy_iter(once(err));
526                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
527                    DataflowError::DecodeError(boxed)
528                }
529                DataflowError::EvalError(err) => {
530                    let err: &EvalError = &*err;
531                    let err = match err {
532                        e @ EvalError::CharacterNotValidForEncoding(x) => {
533                            assert_copy(x);
534                            e.clone()
535                        }
536                        e @ EvalError::CharacterTooLargeForEncoding(x) => {
537                            assert_copy(x);
538                            e.clone()
539                        }
540                        EvalError::DateBinOutOfRange(string) => {
541                            EvalError::DateBinOutOfRange(self.string_region.copy(string))
542                        }
543                        e @ EvalError::DivisionByZero
544                        | e @ EvalError::FloatOverflow
545                        | e @ EvalError::FloatUnderflow
546                        | e @ EvalError::NumericFieldOverflow
547                        | e @ EvalError::MzTimestampStepOverflow
548                        | e @ EvalError::TimestampCannotBeNan
549                        | e @ EvalError::TimestampOutOfRange
550                        | e @ EvalError::NegSqrt
551                        | e @ EvalError::NegLimit
552                        | e @ EvalError::NullCharacterNotPermitted
553                        | e @ EvalError::KeyCannotBeNull
554                        | e @ EvalError::UnterminatedLikeEscapeSequence
555                        | e @ EvalError::MultipleRowsFromSubquery
556                        | e @ EvalError::LikePatternTooLong
557                        | e @ EvalError::LikeEscapeTooLong
558                        | e @ EvalError::MultidimensionalArrayRemovalNotSupported
559                        | e @ EvalError::MultiDimensionalArraySearch
560                        | e @ EvalError::ArrayFillWrongArraySubscripts
561                        | e @ EvalError::DateOutOfRange
562                        | e @ EvalError::CharOutOfRange
563                        | e @ EvalError::InvalidBase64Equals
564                        | e @ EvalError::InvalidBase64EndSequence
565                        | e @ EvalError::InvalidTimezoneInterval
566                        | e @ EvalError::InvalidTimezoneConversion
567                        | e @ EvalError::LengthTooLarge
568                        | e @ EvalError::AclArrayNullElement
569                        | e @ EvalError::MzAclArrayNullElement => e.clone(),
570                        EvalError::Unsupported {
571                            feature,
572                            discussion_no,
573                        } => EvalError::Unsupported {
574                            feature: self.string_region.copy(feature),
575                            discussion_no: *discussion_no,
576                        },
577                        EvalError::Float32OutOfRange(string) => {
578                            EvalError::Float32OutOfRange(self.string_region.copy(string))
579                        }
580                        EvalError::Float64OutOfRange(string) => {
581                            EvalError::Float64OutOfRange(self.string_region.copy(string))
582                        }
583                        EvalError::Int16OutOfRange(string) => {
584                            EvalError::Int16OutOfRange(self.string_region.copy(string))
585                        }
586                        EvalError::Int32OutOfRange(string) => {
587                            EvalError::Int32OutOfRange(self.string_region.copy(string))
588                        }
589                        EvalError::Int64OutOfRange(string) => {
590                            EvalError::Int64OutOfRange(self.string_region.copy(string))
591                        }
592                        EvalError::UInt16OutOfRange(string) => {
593                            EvalError::UInt16OutOfRange(self.string_region.copy(string))
594                        }
595                        EvalError::UInt32OutOfRange(string) => {
596                            EvalError::UInt32OutOfRange(self.string_region.copy(string))
597                        }
598                        EvalError::UInt64OutOfRange(string) => {
599                            EvalError::UInt64OutOfRange(self.string_region.copy(string))
600                        }
601                        EvalError::MzTimestampOutOfRange(string) => {
602                            EvalError::MzTimestampOutOfRange(self.string_region.copy(string))
603                        }
604                        EvalError::OidOutOfRange(string) => {
605                            EvalError::OidOutOfRange(self.string_region.copy(string))
606                        }
607                        EvalError::IntervalOutOfRange(string) => {
608                            EvalError::IntervalOutOfRange(self.string_region.copy(string))
609                        }
610                        e @ EvalError::IndexOutOfRange {
611                            provided,
612                            valid_end,
613                        } => {
614                            assert_copy(provided);
615                            assert_copy(valid_end);
616                            e.clone()
617                        }
618                        e @ EvalError::InvalidBase64Symbol(c) => {
619                            assert_copy(c);
620                            e.clone()
621                        }
622                        EvalError::InvalidTimezone(x) => {
623                            EvalError::InvalidTimezone(self.string_region.copy(x))
624                        }
625                        e @ EvalError::InvalidLayer { max_layer, val } => {
626                            assert_copy(max_layer);
627                            assert_copy(val);
628                            e.clone()
629                        }
630                        EvalError::InvalidArray(err) => EvalError::InvalidArray(*err),
631                        EvalError::InvalidEncodingName(x) => {
632                            EvalError::InvalidEncodingName(self.string_region.copy(x))
633                        }
634                        EvalError::InvalidHashAlgorithm(x) => {
635                            EvalError::InvalidHashAlgorithm(self.string_region.copy(x))
636                        }
637                        EvalError::InvalidByteSequence {
638                            byte_sequence,
639                            encoding_name,
640                        } => EvalError::InvalidByteSequence {
641                            byte_sequence: self.string_region.copy(byte_sequence),
642                            encoding_name: self.string_region.copy(encoding_name),
643                        },
644                        EvalError::InvalidJsonbCast { from, to } => EvalError::InvalidJsonbCast {
645                            from: self.string_region.copy(from),
646                            to: self.string_region.copy(to),
647                        },
648                        EvalError::InvalidRegex(x) => {
649                            EvalError::InvalidRegex(self.string_region.copy(x))
650                        }
651                        e @ EvalError::InvalidRegexFlag(x) => {
652                            assert_copy(x);
653                            e.clone()
654                        }
655                        EvalError::InvalidParameterValue(x) => {
656                            EvalError::InvalidParameterValue(self.string_region.copy(x))
657                        }
658                        EvalError::InvalidDatePart(x) => {
659                            EvalError::InvalidDatePart(self.string_region.copy(x))
660                        }
661                        EvalError::UnknownUnits(x) => {
662                            EvalError::UnknownUnits(self.string_region.copy(x))
663                        }
664                        EvalError::UnsupportedUnits(x, y) => EvalError::UnsupportedUnits(
665                            self.string_region.copy(x),
666                            self.string_region.copy(y),
667                        ),
668                        EvalError::Parse(ParseError {
669                            kind,
670                            type_name,
671                            input,
672                            details,
673                        }) => EvalError::Parse(ParseError {
674                            kind: *kind,
675                            type_name: self.string_region.copy(type_name),
676                            input: self.string_region.copy(input),
677                            details: details
678                                .as_ref()
679                                .map(|details| self.string_region.copy(details)),
680                        }),
681                        e @ EvalError::ParseHex(x) => {
682                            assert_copy(x);
683                            e.clone()
684                        }
685                        EvalError::Internal(x) => EvalError::Internal(self.string_region.copy(x)),
686                        EvalError::InfinityOutOfDomain(x) => {
687                            EvalError::InfinityOutOfDomain(self.string_region.copy(x))
688                        }
689                        EvalError::NegativeOutOfDomain(x) => {
690                            EvalError::NegativeOutOfDomain(self.string_region.copy(x))
691                        }
692                        EvalError::ZeroOutOfDomain(x) => {
693                            EvalError::ZeroOutOfDomain(self.string_region.copy(x))
694                        }
695                        EvalError::OutOfDomain(x, y, z) => {
696                            assert_copy(x);
697                            assert_copy(y);
698                            EvalError::OutOfDomain(*x, *y, self.string_region.copy(z))
699                        }
700                        EvalError::ComplexOutOfRange(x) => {
701                            EvalError::ComplexOutOfRange(self.string_region.copy(x))
702                        }
703                        EvalError::Undefined(x) => EvalError::Undefined(self.string_region.copy(x)),
704                        EvalError::StringValueTooLong {
705                            target_type,
706                            length,
707                        } => EvalError::StringValueTooLong {
708                            target_type: self.string_region.copy(target_type),
709                            length: *length,
710                        },
711                        e @ EvalError::IncompatibleArrayDimensions { dims } => {
712                            assert_copy(dims);
713                            e.clone()
714                        }
715                        EvalError::TypeFromOid(x) => {
716                            EvalError::TypeFromOid(self.string_region.copy(x))
717                        }
718                        EvalError::InvalidRange(x) => {
719                            let err = match x {
720                                e @ InvalidRangeError::MisorderedRangeBounds
721                                | e @ InvalidRangeError::InvalidRangeBoundFlags
722                                | e @ InvalidRangeError::DiscontiguousUnion
723                                | e @ InvalidRangeError::DiscontiguousDifference
724                                | e @ InvalidRangeError::NullRangeBoundFlags => e.clone(),
725                                InvalidRangeError::CanonicalizationOverflow(string) => {
726                                    InvalidRangeError::CanonicalizationOverflow(
727                                        self.string_region.copy(string),
728                                    )
729                                }
730                            };
731                            EvalError::InvalidRange(err)
732                        }
733                        EvalError::InvalidRoleId(x) => {
734                            EvalError::InvalidRoleId(self.string_region.copy(x))
735                        }
736                        EvalError::InvalidPrivileges(x) => {
737                            EvalError::InvalidPrivileges(self.string_region.copy(x))
738                        }
739                        EvalError::LetRecLimitExceeded(x) => {
740                            EvalError::LetRecLimitExceeded(self.string_region.copy(x))
741                        }
742                        EvalError::MustNotBeNull(x) => {
743                            EvalError::MustNotBeNull(self.string_region.copy(x))
744                        }
745                        EvalError::InvalidIdentifier { ident, detail } => {
746                            EvalError::InvalidIdentifier {
747                                ident: self.string_region.copy(ident),
748                                detail: detail
749                                    .as_ref()
750                                    .map(|detail| self.string_region.copy(detail)),
751                            }
752                        }
753                        e @ EvalError::MaxArraySizeExceeded(x) => {
754                            assert_copy(x);
755                            e.clone()
756                        }
757                        EvalError::DateDiffOverflow { unit, a, b } => EvalError::DateDiffOverflow {
758                            unit: self.string_region.copy(unit),
759                            a: self.string_region.copy(a),
760                            b: self.string_region.copy(b),
761                        },
762                        EvalError::IfNullError(x) => {
763                            EvalError::IfNullError(self.string_region.copy(x))
764                        }
765                        EvalError::InvalidIanaTimezoneId(x) => {
766                            EvalError::InvalidIanaTimezoneId(self.string_region.copy(x))
767                        }
768                        EvalError::PrettyError(x) => {
769                            EvalError::PrettyError(self.string_region.copy(x))
770                        }
771                    };
772                    let reference = self.eval_error_region.copy_iter(once(err));
773                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
774                    DataflowError::EvalError(boxed)
775                }
776                DataflowError::SourceError(err) => {
777                    let err: &SourceError = &*err;
778                    let err = SourceError {
779                        error: match &err.error {
780                            SourceErrorDetails::Initialization(string) => {
781                                SourceErrorDetails::Initialization(self.string_region.copy(string))
782                            }
783                            SourceErrorDetails::Other(string) => {
784                                SourceErrorDetails::Other(self.string_region.copy(string))
785                            }
786                        },
787                    };
788                    let reference = self.source_error_region.copy_iter(once(err));
789                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
790                    DataflowError::SourceError(boxed)
791                }
792                DataflowError::EnvelopeError(err) => {
793                    let err: &EnvelopeError = &*err;
794                    let err = match err {
795                        EnvelopeError::Upsert(err) => {
796                            let err = match err {
797                                UpsertError::KeyDecode(err) => {
798                                    UpsertError::KeyDecode(self.copy_decode_error(err))
799                                }
800                                UpsertError::Value(err) => UpsertError::Value(UpsertValueError {
801                                    inner: self.copy_decode_error(&err.inner),
802                                    for_key: self.row_region.copy(&err.for_key),
803                                }),
804                                UpsertError::NullKey(err) => UpsertError::NullKey(*err),
805                            };
806                            EnvelopeError::Upsert(err)
807                        }
808                        EnvelopeError::Flat(string) => {
809                            EnvelopeError::Flat(self.string_region.copy(string))
810                        }
811                    };
812                    let reference = self.envelope_error_region.copy_iter(once(err));
813                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
814                    DataflowError::EnvelopeError(boxed)
815                }
816            };
817            // Debug-only check that we're returning an equal object.
818            debug_assert_eq!(item, &err);
819            err
820        }
821
822        fn clear(&mut self) {
823            // De-structure `self` to make sure we're clearing all regions.
824            let Self {
825                decode_error_region,
826                envelope_error_region,
827                eval_error_region,
828                row_region,
829                source_error_region,
830                string_region,
831                u8_region,
832            } = self;
833            decode_error_region.clear();
834            envelope_error_region.clear();
835            eval_error_region.clear();
836            row_region.clear();
837            source_error_region.clear();
838            string_region.clear();
839            u8_region.clear();
840        }
841
842        fn reserve_items<'a, I>(&mut self, items: I)
843        where
844            Self: 'a,
845            I: Iterator<Item = &'a Self::Item> + Clone,
846        {
847            // Reserve space on all stable regions.
848            self.decode_error_region.reserve(
849                items
850                    .clone()
851                    .filter(|x| matches!(x, DataflowError::DecodeError(_)))
852                    .count(),
853            );
854            self.envelope_error_region.reserve(
855                items
856                    .clone()
857                    .filter(|x| matches!(x, DataflowError::EnvelopeError(_)))
858                    .count(),
859            );
860            self.eval_error_region.reserve(
861                items
862                    .clone()
863                    .filter(|x| matches!(x, DataflowError::EvalError(_)))
864                    .count(),
865            );
866            self.source_error_region.reserve(
867                items
868                    .filter(|x| matches!(x, DataflowError::SourceError(_)))
869                    .count(),
870            );
871        }
872
873        fn reserve_regions<'a, I>(&mut self, regions: I)
874        where
875            Self: 'a,
876            I: Iterator<Item = &'a Self> + Clone,
877        {
878            // Reserve space on all region allocators.
879            self.row_region
880                .reserve_regions(regions.clone().map(|r| &r.row_region));
881            self.string_region
882                .reserve_regions(regions.clone().map(|r| &r.string_region));
883            self.u8_region
884                .reserve_regions(regions.clone().map(|r| &r.u8_region));
885        }
886
887        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
888            // De-structure `self` to make sure we're counting all regions.
889            let Self {
890                decode_error_region,
891                envelope_error_region,
892                eval_error_region,
893                row_region,
894                source_error_region,
895                string_region,
896                u8_region,
897            } = &self;
898            decode_error_region.heap_size(&mut callback);
899            envelope_error_region.heap_size(&mut callback);
900            eval_error_region.heap_size(&mut callback);
901            row_region.heap_size(&mut callback);
902            source_error_region.heap_size(&mut callback);
903            string_region.heap_size(&mut callback);
904            u8_region.heap_size(&mut callback);
905        }
906    }
907
908    #[cfg(test)]
909    mod tests {
910        use differential_dataflow::containers::TimelyStack;
911        use proptest::prelude::*;
912
913        use super::*;
914
915        fn columnation_roundtrip<T: Columnation>(item: &T) -> TimelyStack<T> {
916            let mut container = TimelyStack::with_capacity(1);
917            container.copy(item);
918            container
919        }
920
921        proptest! {
922            #[mz_ore::test]
923            #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
924            fn dataflow_error_roundtrip(expect in any::<DataflowError>()) {
925                let actual = columnation_roundtrip(&expect);
926                proptest::prop_assert_eq!(&expect, &actual[0])
927            }
928        }
929    }
930}
931
932impl RustType<ProtoDataflowError> for DataflowError {
933    fn into_proto(&self) -> ProtoDataflowError {
934        use proto_dataflow_error::Kind::*;
935        ProtoDataflowError {
936            kind: Some(match self {
937                DataflowError::DecodeError(err) => DecodeError(*err.into_proto()),
938                DataflowError::EvalError(err) => EvalError(*err.into_proto()),
939                DataflowError::SourceError(err) => SourceError(*err.into_proto()),
940                DataflowError::EnvelopeError(err) => EnvelopeErrorV1(*err.into_proto()),
941            }),
942        }
943    }
944
945    fn from_proto(proto: ProtoDataflowError) -> Result<Self, TryFromProtoError> {
946        use proto_dataflow_error::Kind::*;
947        match proto.kind {
948            Some(kind) => match kind {
949                DecodeError(err) => Ok(DataflowError::DecodeError(Box::new(err.into_rust()?))),
950                EvalError(err) => Ok(DataflowError::EvalError(Box::new(err.into_rust()?))),
951                SourceError(err) => Ok(DataflowError::SourceError(Box::new(err.into_rust()?))),
952                EnvelopeErrorV1(err) => {
953                    Ok(DataflowError::EnvelopeError(Box::new(err.into_rust()?)))
954                }
955            },
956            None => Err(TryFromProtoError::missing_field("ProtoDataflowError::kind")),
957        }
958    }
959}
960
961impl Display for DataflowError {
962    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
963        match self {
964            DataflowError::DecodeError(e) => write!(f, "Decode error: {}", e),
965            DataflowError::EvalError(e) => write!(
966                f,
967                "{}{}",
968                match **e {
969                    EvalError::IfNullError(_) => "",
970                    _ => "Evaluation error: ",
971                },
972                e
973            ),
974            DataflowError::SourceError(e) => write!(f, "Source error: {}", e),
975            DataflowError::EnvelopeError(e) => write!(f, "Envelope error: {}", e),
976        }
977    }
978}
979
980impl From<DecodeError> for DataflowError {
981    fn from(e: DecodeError) -> Self {
982        Self::DecodeError(Box::new(e))
983    }
984}
985
986impl From<EvalError> for DataflowError {
987    fn from(e: EvalError) -> Self {
988        Self::EvalError(Box::new(e))
989    }
990}
991
992impl From<SourceError> for DataflowError {
993    fn from(e: SourceError) -> Self {
994        Self::SourceError(Box::new(e))
995    }
996}
997
998impl From<EnvelopeError> for DataflowError {
999    fn from(e: EnvelopeError) -> Self {
1000        Self::EnvelopeError(Box::new(e))
1001    }
1002}
1003
1004/// An error returned by `KafkaConnection::create_with_context`.
1005#[derive(thiserror::Error, Debug)]
1006pub enum ContextCreationError {
1007    // This ends up double-printing `ssh:` in status tables, but makes for
1008    // a better experience during ddl.
1009    #[error("ssh: {0}")]
1010    Ssh(#[source] anyhow::Error),
1011    #[error(transparent)]
1012    KafkaError(#[from] KafkaError),
1013    #[error(transparent)]
1014    Dns(#[from] mz_ore::netio::DnsResolutionError),
1015    #[error(transparent)]
1016    Other(#[from] anyhow::Error),
1017    #[error(transparent)]
1018    Io(#[from] std::io::Error),
1019}
1020
1021/// An extension trait for `Result<T, E>` that makes producing `ContextCreationError`s easier.
1022pub trait ContextCreationErrorExt<T> {
1023    /// Override the error case with an ssh error from `cx`, if there is one.
1024    fn check_ssh_status<C>(self, cx: &TunnelingClientContext<C>)
1025    -> Result<T, ContextCreationError>;
1026    /// Add context to the errors within the variants of `ContextCreationError`, without
1027    /// altering the `Ssh` variant.
1028    fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError>;
1029}
1030
1031impl<T, E> ContextCreationErrorExt<T> for Result<T, E>
1032where
1033    ContextCreationError: From<E>,
1034{
1035    fn check_ssh_status<C>(
1036        self,
1037        cx: &TunnelingClientContext<C>,
1038    ) -> Result<T, ContextCreationError> {
1039        self.map_err(|e| {
1040            if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
1041                ContextCreationError::Ssh(anyhow!(e))
1042            } else {
1043                ContextCreationError::from(e)
1044            }
1045        })
1046    }
1047
1048    fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError> {
1049        self.map_err(|e| {
1050            let e = ContextCreationError::from(e);
1051            match e {
1052                // We need to preserve the `Ssh` variant here, so we add the context inside of it.
1053                ContextCreationError::Ssh(e) => ContextCreationError::Ssh(anyhow!(e.context(msg))),
1054                ContextCreationError::Other(e) => {
1055                    ContextCreationError::Other(anyhow!(e.context(msg)))
1056                }
1057                ContextCreationError::KafkaError(e) => {
1058                    ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1059                }
1060                ContextCreationError::Io(e) => {
1061                    ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1062                }
1063                ContextCreationError::Dns(e) => {
1064                    ContextCreationError::Other(anyhow!(e).context(msg))
1065                }
1066            }
1067        })
1068    }
1069}
1070
1071impl From<CsrConnectError> for ContextCreationError {
1072    fn from(csr: CsrConnectError) -> ContextCreationError {
1073        use ContextCreationError::*;
1074
1075        match csr {
1076            CsrConnectError::Ssh(e) => Ssh(e),
1077            other => Other(anyhow::anyhow!(other)),
1078        }
1079    }
1080}
1081
1082/// An error returned by `CsrConnection::connect`.
1083#[derive(thiserror::Error, Debug)]
1084pub enum CsrConnectError {
1085    // This ends up double-printing `ssh:` in status tables, but makes for
1086    // a better experience during ddl.
1087    #[error("ssh: {0}")]
1088    Ssh(#[source] anyhow::Error),
1089    #[error(transparent)]
1090    NativeTls(#[from] native_tls::Error),
1091    #[error(transparent)]
1092    Openssl(#[from] openssl::error::ErrorStack),
1093    #[error(transparent)]
1094    Dns(#[from] mz_ore::netio::DnsResolutionError),
1095    #[error(transparent)]
1096    Io(#[from] std::io::Error),
1097    #[error(transparent)]
1098    Other(#[from] anyhow::Error),
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103    use crate::errors::DecodeErrorKind;
1104
1105    use super::DecodeError;
1106
1107    #[mz_ore::test]
1108    fn test_decode_error_codec_roundtrip() -> Result<(), String> {
1109        let original = DecodeError {
1110            kind: DecodeErrorKind::Text("ciao".into()),
1111            raw: b"oaic".to_vec(),
1112        };
1113        let mut encoded = Vec::new();
1114        original.encode(&mut encoded);
1115        let decoded = DecodeError::decode(&encoded)?;
1116
1117        assert_eq!(decoded, original);
1118
1119        Ok(())
1120    }
1121}