Skip to main content

mz_storage_types/
errors.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use std::error::Error;
12use std::fmt::Display;
13
14use bytes::BufMut;
15use mz_expr::EvalError;
16use mz_kafka_util::client::TunnelingClientContext;
17use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
18use mz_repr::{GlobalId, Row};
19use mz_ssh_util::tunnel::SshTunnelStatus;
20use proptest_derive::Arbitrary;
21use prost::Message;
22use rdkafka::error::KafkaError;
23use serde::{Deserialize, Serialize};
24use thiserror::Error;
25use tracing::warn;
26
27include!(concat!(env!("OUT_DIR"), "/mz_storage_types.errors.rs"));
28
29/// The underlying data was not decodable in the format we expected: eg.
30/// invalid JSON or Avro data that doesn't match a schema.
31#[derive(
32    Arbitrary,
33    Ord,
34    PartialOrd,
35    Clone,
36    Debug,
37    Eq,
38    PartialEq,
39    Serialize,
40    Deserialize,
41    Hash
42)]
43pub struct DecodeError {
44    pub kind: DecodeErrorKind,
45    pub raw: Vec<u8>,
46}
47
48impl RustType<ProtoDecodeError> for DecodeError {
49    fn into_proto(&self) -> ProtoDecodeError {
50        ProtoDecodeError {
51            kind: Some(RustType::into_proto(&self.kind)),
52            raw: Some(self.raw.clone()),
53        }
54    }
55
56    fn from_proto(proto: ProtoDecodeError) -> Result<Self, TryFromProtoError> {
57        let kind = match proto.kind {
58            Some(kind) => RustType::from_proto(kind)?,
59            None => return Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
60        };
61        let raw = proto.raw.into_rust_if_some("raw")?;
62        Ok(Self { kind, raw })
63    }
64}
65
66impl DecodeError {
67    pub fn encode<B>(&self, buf: &mut B)
68    where
69        B: BufMut,
70    {
71        self.into_proto()
72            .encode(buf)
73            .expect("no required fields means no initialization errors")
74    }
75
76    pub fn decode(buf: &[u8]) -> Result<Self, String> {
77        let proto = ProtoDecodeError::decode(buf).map_err(|err| err.to_string())?;
78        proto.into_rust().map_err(|err| err.to_string())
79    }
80}
81
82impl Display for DecodeError {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        // See if we can output the bytes that failed to decode as a string.
85        let str_repr = std::str::from_utf8(&self.raw).ok();
86        // strip any NUL characters from the str_repr to prevent an error
87        // in the postgres protocol decoding
88        let str_repr = str_repr.map(|s| s.replace('\0', "NULL"));
89        let bytes_repr = hex::encode(&self.raw);
90        match str_repr {
91            Some(s) => write!(
92                f,
93                "{} (original text: {}, original bytes: {:x?})",
94                self.kind, s, bytes_repr
95            ),
96            None => write!(f, "{} (original bytes: {:x?})", self.kind, bytes_repr),
97        }
98    }
99}
100
101#[derive(
102    Arbitrary,
103    Ord,
104    PartialOrd,
105    Clone,
106    Debug,
107    Eq,
108    PartialEq,
109    Serialize,
110    Deserialize,
111    Hash
112)]
113pub enum DecodeErrorKind {
114    Text(Box<str>),
115    Bytes(Box<str>),
116}
117
118impl RustType<ProtoDecodeErrorKind> for DecodeErrorKind {
119    fn into_proto(&self) -> ProtoDecodeErrorKind {
120        use proto_decode_error_kind::Kind::*;
121        ProtoDecodeErrorKind {
122            kind: Some(match self {
123                DecodeErrorKind::Text(v) => Text(v.into_proto()),
124                DecodeErrorKind::Bytes(v) => Bytes(v.into_proto()),
125            }),
126        }
127    }
128
129    fn from_proto(proto: ProtoDecodeErrorKind) -> Result<Self, TryFromProtoError> {
130        use proto_decode_error_kind::Kind::*;
131        match proto.kind {
132            Some(Text(v)) => Ok(DecodeErrorKind::Text(v.into())),
133            Some(Bytes(v)) => Ok(DecodeErrorKind::Bytes(v.into())),
134            None => Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
135        }
136    }
137}
138
139impl Display for DecodeErrorKind {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        match self {
142            DecodeErrorKind::Text(e) => f.write_str(e),
143            DecodeErrorKind::Bytes(e) => f.write_str(e),
144        }
145    }
146}
147
148/// Errors arising during envelope processing.
149#[derive(
150    Arbitrary,
151    Ord,
152    PartialOrd,
153    Clone,
154    Debug,
155    Eq,
156    Deserialize,
157    Serialize,
158    PartialEq,
159    Hash
160)]
161pub enum EnvelopeError {
162    /// An error that can be retracted by a future message using upsert logic.
163    Upsert(UpsertError),
164    /// Errors corresponding to `ENVELOPE NONE`. Naming this
165    /// `None`, though, would have been too confusing.
166    Flat(Box<str>),
167}
168
169impl RustType<ProtoEnvelopeErrorV1> for EnvelopeError {
170    fn into_proto(&self) -> ProtoEnvelopeErrorV1 {
171        use proto_envelope_error_v1::Kind;
172        ProtoEnvelopeErrorV1 {
173            kind: Some(match self {
174                EnvelopeError::Upsert(rust) => Kind::Upsert(rust.into_proto()),
175                EnvelopeError::Flat(text) => Kind::Flat(text.into_proto()),
176            }),
177        }
178    }
179
180    fn from_proto(proto: ProtoEnvelopeErrorV1) -> Result<Self, TryFromProtoError> {
181        use proto_envelope_error_v1::Kind;
182        match proto.kind {
183            Some(Kind::Upsert(proto)) => {
184                let rust = RustType::from_proto(proto)?;
185                Ok(Self::Upsert(rust))
186            }
187            Some(Kind::Flat(text)) => Ok(Self::Flat(text.into())),
188            None => Err(TryFromProtoError::missing_field(
189                "ProtoEnvelopeErrorV1::kind",
190            )),
191        }
192    }
193}
194
195impl Display for EnvelopeError {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        match self {
198            EnvelopeError::Upsert(err) => write!(f, "Upsert: {err}"),
199            EnvelopeError::Flat(err) => write!(f, "Flat: {err}"),
200        }
201    }
202}
203
204/// An error from a value in an upsert source. The corresponding key is included, allowing
205/// us to reconstruct their entry in the upsert map upon restart.
206#[derive(
207    Arbitrary,
208    Ord,
209    PartialOrd,
210    Clone,
211    Debug,
212    Eq,
213    PartialEq,
214    Serialize,
215    Deserialize,
216    Hash
217)]
218pub struct UpsertValueError {
219    /// The underlying error.
220    pub inner: DecodeError,
221    /// The (good) key associated with the errored value.
222    pub for_key: Row,
223}
224
225impl RustType<ProtoUpsertValueError> for UpsertValueError {
226    fn into_proto(&self) -> ProtoUpsertValueError {
227        ProtoUpsertValueError {
228            inner: Some(self.inner.into_proto()),
229            for_key: Some(self.for_key.into_proto()),
230        }
231    }
232
233    fn from_proto(proto: ProtoUpsertValueError) -> Result<Self, TryFromProtoError> {
234        Ok(UpsertValueError {
235            inner: proto
236                .inner
237                .into_rust_if_some("ProtoUpsertValueError::inner")?,
238            for_key: proto
239                .for_key
240                .into_rust_if_some("ProtoUpsertValueError::for_key")?,
241        })
242    }
243}
244
245impl Display for UpsertValueError {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        let UpsertValueError { inner, for_key } = self;
248        write!(f, "{inner}, decoded key: {for_key:?}")?;
249        Ok(())
250    }
251}
252
253/// A source contained a record with a NULL key, which we don't support.
254#[derive(
255    Arbitrary,
256    Ord,
257    PartialOrd,
258    Copy,
259    Clone,
260    Debug,
261    Eq,
262    PartialEq,
263    Serialize,
264    Deserialize,
265    Hash
266)]
267pub struct UpsertNullKeyError;
268
269impl RustType<ProtoUpsertNullKeyError> for UpsertNullKeyError {
270    fn into_proto(&self) -> ProtoUpsertNullKeyError {
271        ProtoUpsertNullKeyError {}
272    }
273
274    fn from_proto(_proto: ProtoUpsertNullKeyError) -> Result<Self, TryFromProtoError> {
275        Ok(UpsertNullKeyError)
276    }
277}
278
279impl Display for UpsertNullKeyError {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        write!(
282            f,
283            "record with NULL key in UPSERT source; to retract this error, "
284        )?;
285        write!(
286            f,
287            "produce a record upstream with a NULL key and NULL value"
288        )
289    }
290}
291
292/// An error that can be retracted by a future message using upsert logic.
293#[derive(
294    Arbitrary,
295    Ord,
296    PartialOrd,
297    Clone,
298    Debug,
299    Eq,
300    PartialEq,
301    Serialize,
302    Deserialize,
303    Hash
304)]
305pub enum UpsertError {
306    /// Wrapper around a key decoding error.
307    /// We use this instead of emitting the underlying `DataflowError::DecodeError` because with only
308    /// the underlying error, we can't distinguish between an error with the key and an error
309    /// with the value.
310    ///
311    /// It is necessary to distinguish them because the necessary record to retract them is different.
312    /// `(K, <errored V>)` is retracted by `(K, null)`, whereas `(<errored K>, anything)` is retracted by
313    /// `("bytes", null)`, where "bytes" is the string that failed to correctly decode as a key.
314    KeyDecode(DecodeError),
315    /// Wrapper around an error related to the value.
316    Value(UpsertValueError),
317    NullKey(UpsertNullKeyError),
318}
319
320impl RustType<ProtoUpsertError> for UpsertError {
321    fn into_proto(&self) -> ProtoUpsertError {
322        use proto_upsert_error::Kind;
323        ProtoUpsertError {
324            kind: Some(match self {
325                UpsertError::KeyDecode(err) => Kind::KeyDecode(err.into_proto()),
326                UpsertError::Value(err) => Kind::Value(err.into_proto()),
327                UpsertError::NullKey(err) => Kind::NullKey(err.into_proto()),
328            }),
329        }
330    }
331
332    fn from_proto(proto: ProtoUpsertError) -> Result<Self, TryFromProtoError> {
333        use proto_upsert_error::Kind;
334        match proto.kind {
335            Some(Kind::KeyDecode(proto)) => {
336                let rust = RustType::from_proto(proto)?;
337                Ok(Self::KeyDecode(rust))
338            }
339            Some(Kind::Value(proto)) => {
340                let rust = RustType::from_proto(proto)?;
341                Ok(Self::Value(rust))
342            }
343            Some(Kind::NullKey(proto)) => {
344                let rust = RustType::from_proto(proto)?;
345                Ok(Self::NullKey(rust))
346            }
347            None => Err(TryFromProtoError::missing_field("ProtoUpsertError::kind")),
348        }
349    }
350}
351
352impl Display for UpsertError {
353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354        match self {
355            UpsertError::KeyDecode(err) => write!(f, "Key decode: {err}"),
356            UpsertError::Value(err) => write!(f, "Value error: {err}"),
357            UpsertError::NullKey(err) => write!(f, "Null key: {err}"),
358        }
359    }
360}
361
362/// Source-wide durable errors; for example, a replication log being meaningless or corrupted.
363/// This should _not_ include transient source errors, like connection issues or misconfigurations.
364#[derive(
365    Arbitrary,
366    Ord,
367    PartialOrd,
368    Clone,
369    Debug,
370    Eq,
371    PartialEq,
372    Serialize,
373    Deserialize,
374    Hash
375)]
376pub struct SourceError {
377    pub error: SourceErrorDetails,
378}
379
380impl RustType<ProtoSourceError> for SourceError {
381    fn into_proto(&self) -> ProtoSourceError {
382        ProtoSourceError {
383            error: Some(self.error.into_proto()),
384        }
385    }
386
387    fn from_proto(proto: ProtoSourceError) -> Result<Self, TryFromProtoError> {
388        Ok(SourceError {
389            error: proto.error.into_rust_if_some("ProtoSourceError::error")?,
390        })
391    }
392}
393
394impl Display for SourceError {
395    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396        self.error.fmt(f)
397    }
398}
399
400#[derive(
401    Arbitrary,
402    Ord,
403    PartialOrd,
404    Clone,
405    Debug,
406    Eq,
407    PartialEq,
408    Serialize,
409    Deserialize,
410    Hash
411)]
412pub enum SourceErrorDetails {
413    Initialization(Box<str>),
414    Other(Box<str>),
415}
416
417impl RustType<ProtoSourceErrorDetails> for SourceErrorDetails {
418    fn into_proto(&self) -> ProtoSourceErrorDetails {
419        use proto_source_error_details::Kind;
420        ProtoSourceErrorDetails {
421            kind: Some(match self {
422                SourceErrorDetails::Initialization(s) => Kind::Initialization(s.into_proto()),
423                SourceErrorDetails::Other(s) => Kind::Other(s.into_proto()),
424            }),
425        }
426    }
427
428    fn from_proto(proto: ProtoSourceErrorDetails) -> Result<Self, TryFromProtoError> {
429        use proto_source_error_details::Kind;
430        match proto.kind {
431            Some(kind) => match kind {
432                Kind::Initialization(s) => Ok(SourceErrorDetails::Initialization(s.into())),
433                Kind::DeprecatedFileIo(s) | Kind::DeprecatedPersistence(s) => {
434                    warn!("Deprecated source error kind: {s}");
435                    Ok(SourceErrorDetails::Other(s.into()))
436                }
437                Kind::Other(s) => Ok(SourceErrorDetails::Other(s.into())),
438            },
439            None => Err(TryFromProtoError::missing_field(
440                "ProtoSourceErrorDetails::kind",
441            )),
442        }
443    }
444}
445
446impl Display for SourceErrorDetails {
447    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448        match self {
449            SourceErrorDetails::Initialization(e) => {
450                write!(
451                    f,
452                    "failed during initialization, source must be dropped and recreated: {}",
453                    e
454                )
455            }
456            SourceErrorDetails::Other(e) => {
457                write!(
458                    f,
459                    "source must be dropped and recreated due to failure: {}",
460                    e
461                )
462            }
463        }
464    }
465}
466
467/// An error that's destined to be presented to the user in a differential dataflow collection.
468/// For example, a divide by zero will be visible in the error collection for a particular row.
469///
470/// All of the variants are boxed to minimize the memory size of `DataflowError`. This type is
471/// likely to appear in `Result<Row, DataflowError>`s on high-throughput code paths, so keeping its
472/// size less than or equal to that of `Row` is important to ensure we are not wasting memory.
473#[derive(
474    Arbitrary,
475    Ord,
476    PartialOrd,
477    Clone,
478    Debug,
479    Eq,
480    Deserialize,
481    Serialize,
482    PartialEq,
483    Hash
484)]
485pub enum DataflowError {
486    DecodeError(Box<DecodeError>),
487    EvalError(Box<EvalError>),
488    SourceError(Box<SourceError>),
489    EnvelopeError(Box<EnvelopeError>),
490}
491
492impl Error for DataflowError {}
493
494mod boxed_str {
495
496    use differential_dataflow::containers::Region;
497    use differential_dataflow::containers::StableRegion;
498
499    /// Region allocation for `String` data.
500    ///
501    /// Content bytes are stored in stable contiguous memory locations,
502    /// and then a `String` referencing them is falsified.
503    #[derive(Default)]
504    pub struct BoxStrStack {
505        region: StableRegion<u8>,
506    }
507
508    impl Region for BoxStrStack {
509        type Item = Box<str>;
510        #[inline]
511        fn clear(&mut self) {
512            self.region.clear();
513        }
514        // Removing `(always)` is a 20% performance regression in
515        // the `string10_copy` benchmark.
516        #[inline(always)]
517        unsafe fn copy(&mut self, item: &Box<str>) -> Box<str> {
518            let bytes = self.region.copy_slice(item.as_bytes());
519            // SAFETY: The bytes are copied from the region, and the region is stable.
520            // We never drop the box.
521            std::str::from_boxed_utf8_unchecked(Box::from_raw(bytes))
522        }
523        #[inline(always)]
524        fn reserve_items<'a, I>(&mut self, items: I)
525        where
526            Self: 'a,
527            I: Iterator<Item = &'a Self::Item> + Clone,
528        {
529            self.region.reserve(items.map(|x| x.len()).sum());
530        }
531
532        fn reserve_regions<'a, I>(&mut self, regions: I)
533        where
534            Self: 'a,
535            I: Iterator<Item = &'a Self> + Clone,
536        {
537            self.region.reserve(regions.map(|r| r.region.len()).sum());
538        }
539        #[inline]
540        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
541            self.region.heap_size(callback)
542        }
543    }
544}
545
546mod columnation {
547    use std::iter::once;
548
549    use differential_dataflow::containers::{Columnation, Region, StableRegion};
550    use mz_expr::EvalError;
551    use mz_repr::Row;
552    use mz_repr::adt::range::InvalidRangeError;
553    use mz_repr::strconv::ParseError;
554
555    use crate::errors::boxed_str::BoxStrStack;
556    use crate::errors::{
557        DataflowError, DecodeError, DecodeErrorKind, EnvelopeError, SourceError,
558        SourceErrorDetails, UpsertError, UpsertValueError,
559    };
560
561    impl Columnation for DataflowError {
562        type InnerRegion = DataflowErrorRegion;
563    }
564
565    /// A region to store [`DataflowError`].
566    #[derive(Default)]
567    pub struct DataflowErrorRegion {
568        /// Stable location for [`DecodeError`] for inserting into a box.
569        decode_error_region: StableRegion<DecodeError>,
570        /// Stable location for [`EnvelopeError`] for inserting into a box.
571        envelope_error_region: StableRegion<EnvelopeError>,
572        /// Stable location for [`EvalError`] for inserting into a box.
573        eval_error_region: StableRegion<EvalError>,
574        /// Region for storing rows.
575        row_region: <Row as Columnation>::InnerRegion,
576        /// Stable location for [`SourceError`] for inserting into a box.
577        source_error_region: StableRegion<SourceError>,
578        /// Region for storing strings.
579        string_region: BoxStrStack,
580        /// Region for storing u8 vectors.
581        u8_region: <Vec<u8> as Columnation>::InnerRegion,
582    }
583
584    impl DataflowErrorRegion {
585        /// Copy a decode error into its region, return an owned object.
586        ///
587        /// This is unsafe because the returned value must not be dropped.
588        unsafe fn copy_decode_error(&mut self, decode_error: &DecodeError) -> DecodeError {
589            DecodeError {
590                kind: match &decode_error.kind {
591                    DecodeErrorKind::Text(string) => {
592                        DecodeErrorKind::Text(self.string_region.copy(string))
593                    }
594                    DecodeErrorKind::Bytes(string) => {
595                        DecodeErrorKind::Bytes(self.string_region.copy(string))
596                    }
597                },
598                raw: self.u8_region.copy(&decode_error.raw),
599            }
600        }
601    }
602
603    /// Compile-time assertion that a value is `Copy`.
604    fn assert_copy<T: Copy>(_: &T) {}
605
606    impl Region for DataflowErrorRegion {
607        type Item = DataflowError;
608
609        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
610            // Unsafe Box::from_raw reasoning:
611            // Construct a box from a provided value. This is safe because a box is
612            // a pointer to a memory address, and the value is stored on the heap.
613            // Note that the box must not be dropped.
614
615            // SAFETY: When adding new enum variants, care must be taken that all types containing
616            // references are region-allocated, otherwise we'll leak memory.
617
618            // Types that are `Copy` should be asserted using `assert_copy`, or copied, to detect
619            // changes that introduce pointers.
620
621            let err = match item {
622                DataflowError::DecodeError(err) => {
623                    let err = self.copy_decode_error(&*err);
624                    let reference = self.decode_error_region.copy_iter(once(err));
625                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
626                    DataflowError::DecodeError(boxed)
627                }
628                DataflowError::EvalError(err) => {
629                    let err: &EvalError = &*err;
630                    let err = match err {
631                        e @ EvalError::CharacterNotValidForEncoding(x) => {
632                            assert_copy(x);
633                            e.clone()
634                        }
635                        e @ EvalError::CharacterTooLargeForEncoding(x) => {
636                            assert_copy(x);
637                            e.clone()
638                        }
639                        EvalError::DateBinOutOfRange(string) => {
640                            EvalError::DateBinOutOfRange(self.string_region.copy(string))
641                        }
642                        e @ EvalError::DivisionByZero
643                        | e @ EvalError::FloatOverflow
644                        | e @ EvalError::FloatUnderflow
645                        | e @ EvalError::NumericFieldOverflow
646                        | e @ EvalError::MzTimestampStepOverflow
647                        | e @ EvalError::TimestampCannotBeNan
648                        | e @ EvalError::TimestampOutOfRange
649                        | e @ EvalError::NegSqrt
650                        | e @ EvalError::NegLimit
651                        | e @ EvalError::NullCharacterNotPermitted
652                        | e @ EvalError::KeyCannotBeNull
653                        | e @ EvalError::UnterminatedLikeEscapeSequence
654                        | e @ EvalError::MultipleRowsFromSubquery
655                        | e @ EvalError::LikePatternTooLong
656                        | e @ EvalError::LikeEscapeTooLong
657                        | e @ EvalError::MultidimensionalArrayRemovalNotSupported
658                        | e @ EvalError::MultiDimensionalArraySearch
659                        | e @ EvalError::ArrayFillWrongArraySubscripts
660                        | e @ EvalError::DateOutOfRange
661                        | e @ EvalError::CharOutOfRange
662                        | e @ EvalError::InvalidBase64Equals
663                        | e @ EvalError::InvalidBase64EndSequence
664                        | e @ EvalError::InvalidTimezoneInterval
665                        | e @ EvalError::InvalidTimezoneConversion
666                        | e @ EvalError::LengthTooLarge
667                        | e @ EvalError::AclArrayNullElement
668                        | e @ EvalError::MzAclArrayNullElement => e.clone(),
669                        EvalError::Unsupported {
670                            feature,
671                            discussion_no,
672                        } => EvalError::Unsupported {
673                            feature: self.string_region.copy(feature),
674                            discussion_no: *discussion_no,
675                        },
676                        EvalError::Float32OutOfRange(string) => {
677                            EvalError::Float32OutOfRange(self.string_region.copy(string))
678                        }
679                        EvalError::Float64OutOfRange(string) => {
680                            EvalError::Float64OutOfRange(self.string_region.copy(string))
681                        }
682                        EvalError::Int16OutOfRange(string) => {
683                            EvalError::Int16OutOfRange(self.string_region.copy(string))
684                        }
685                        EvalError::Int32OutOfRange(string) => {
686                            EvalError::Int32OutOfRange(self.string_region.copy(string))
687                        }
688                        EvalError::Int64OutOfRange(string) => {
689                            EvalError::Int64OutOfRange(self.string_region.copy(string))
690                        }
691                        EvalError::UInt16OutOfRange(string) => {
692                            EvalError::UInt16OutOfRange(self.string_region.copy(string))
693                        }
694                        EvalError::UInt32OutOfRange(string) => {
695                            EvalError::UInt32OutOfRange(self.string_region.copy(string))
696                        }
697                        EvalError::UInt64OutOfRange(string) => {
698                            EvalError::UInt64OutOfRange(self.string_region.copy(string))
699                        }
700                        EvalError::MzTimestampOutOfRange(string) => {
701                            EvalError::MzTimestampOutOfRange(self.string_region.copy(string))
702                        }
703                        EvalError::OidOutOfRange(string) => {
704                            EvalError::OidOutOfRange(self.string_region.copy(string))
705                        }
706                        EvalError::IntervalOutOfRange(string) => {
707                            EvalError::IntervalOutOfRange(self.string_region.copy(string))
708                        }
709                        e @ EvalError::IndexOutOfRange {
710                            provided,
711                            valid_end,
712                        } => {
713                            assert_copy(provided);
714                            assert_copy(valid_end);
715                            e.clone()
716                        }
717                        e @ EvalError::InvalidBase64Symbol(c) => {
718                            assert_copy(c);
719                            e.clone()
720                        }
721                        EvalError::InvalidTimezone(x) => {
722                            EvalError::InvalidTimezone(self.string_region.copy(x))
723                        }
724                        e @ EvalError::InvalidLayer { max_layer, val } => {
725                            assert_copy(max_layer);
726                            assert_copy(val);
727                            e.clone()
728                        }
729                        EvalError::InvalidArray(err) => EvalError::InvalidArray(*err),
730                        EvalError::InvalidEncodingName(x) => {
731                            EvalError::InvalidEncodingName(self.string_region.copy(x))
732                        }
733                        EvalError::InvalidHashAlgorithm(x) => {
734                            EvalError::InvalidHashAlgorithm(self.string_region.copy(x))
735                        }
736                        EvalError::InvalidByteSequence {
737                            byte_sequence,
738                            encoding_name,
739                        } => EvalError::InvalidByteSequence {
740                            byte_sequence: self.string_region.copy(byte_sequence),
741                            encoding_name: self.string_region.copy(encoding_name),
742                        },
743                        EvalError::InvalidJsonbCast { from, to } => EvalError::InvalidJsonbCast {
744                            from: self.string_region.copy(from),
745                            to: self.string_region.copy(to),
746                        },
747                        EvalError::InvalidRegex(x) => {
748                            EvalError::InvalidRegex(self.string_region.copy(x))
749                        }
750                        e @ EvalError::InvalidRegexFlag(x) => {
751                            assert_copy(x);
752                            e.clone()
753                        }
754                        EvalError::InvalidParameterValue(x) => {
755                            EvalError::InvalidParameterValue(self.string_region.copy(x))
756                        }
757                        EvalError::InvalidDatePart(x) => {
758                            EvalError::InvalidDatePart(self.string_region.copy(x))
759                        }
760                        EvalError::UnknownUnits(x) => {
761                            EvalError::UnknownUnits(self.string_region.copy(x))
762                        }
763                        EvalError::UnsupportedUnits(x, y) => EvalError::UnsupportedUnits(
764                            self.string_region.copy(x),
765                            self.string_region.copy(y),
766                        ),
767                        EvalError::Parse(ParseError {
768                            kind,
769                            type_name,
770                            input,
771                            details,
772                        }) => EvalError::Parse(ParseError {
773                            kind: *kind,
774                            type_name: self.string_region.copy(type_name),
775                            input: self.string_region.copy(input),
776                            details: details
777                                .as_ref()
778                                .map(|details| self.string_region.copy(details)),
779                        }),
780                        e @ EvalError::ParseHex(x) => {
781                            assert_copy(x);
782                            e.clone()
783                        }
784                        EvalError::Internal(x) => EvalError::Internal(self.string_region.copy(x)),
785                        EvalError::InfinityOutOfDomain(x) => {
786                            EvalError::InfinityOutOfDomain(self.string_region.copy(x))
787                        }
788                        EvalError::NegativeOutOfDomain(x) => {
789                            EvalError::NegativeOutOfDomain(self.string_region.copy(x))
790                        }
791                        EvalError::ZeroOutOfDomain(x) => {
792                            EvalError::ZeroOutOfDomain(self.string_region.copy(x))
793                        }
794                        EvalError::OutOfDomain(x, y, z) => {
795                            assert_copy(x);
796                            assert_copy(y);
797                            EvalError::OutOfDomain(*x, *y, self.string_region.copy(z))
798                        }
799                        EvalError::ComplexOutOfRange(x) => {
800                            EvalError::ComplexOutOfRange(self.string_region.copy(x))
801                        }
802                        EvalError::Undefined(x) => EvalError::Undefined(self.string_region.copy(x)),
803                        EvalError::StringValueTooLong {
804                            target_type,
805                            length,
806                        } => EvalError::StringValueTooLong {
807                            target_type: self.string_region.copy(target_type),
808                            length: *length,
809                        },
810                        e @ EvalError::IncompatibleArrayDimensions { dims } => {
811                            assert_copy(dims);
812                            e.clone()
813                        }
814                        EvalError::TypeFromOid(x) => {
815                            EvalError::TypeFromOid(self.string_region.copy(x))
816                        }
817                        EvalError::InvalidRange(x) => {
818                            let err = match x {
819                                e @ InvalidRangeError::MisorderedRangeBounds
820                                | e @ InvalidRangeError::InvalidRangeBoundFlags
821                                | e @ InvalidRangeError::DiscontiguousUnion
822                                | e @ InvalidRangeError::DiscontiguousDifference
823                                | e @ InvalidRangeError::NullRangeBoundFlags => e.clone(),
824                                InvalidRangeError::CanonicalizationOverflow(string) => {
825                                    InvalidRangeError::CanonicalizationOverflow(
826                                        self.string_region.copy(string),
827                                    )
828                                }
829                            };
830                            EvalError::InvalidRange(err)
831                        }
832                        EvalError::InvalidRoleId(x) => {
833                            EvalError::InvalidRoleId(self.string_region.copy(x))
834                        }
835                        EvalError::InvalidPrivileges(x) => {
836                            EvalError::InvalidPrivileges(self.string_region.copy(x))
837                        }
838                        EvalError::LetRecLimitExceeded(x) => {
839                            EvalError::LetRecLimitExceeded(self.string_region.copy(x))
840                        }
841                        EvalError::MustNotBeNull(x) => {
842                            EvalError::MustNotBeNull(self.string_region.copy(x))
843                        }
844                        EvalError::InvalidIdentifier { ident, detail } => {
845                            EvalError::InvalidIdentifier {
846                                ident: self.string_region.copy(ident),
847                                detail: detail
848                                    .as_ref()
849                                    .map(|detail| self.string_region.copy(detail)),
850                            }
851                        }
852                        e @ EvalError::MaxArraySizeExceeded(x) => {
853                            assert_copy(x);
854                            e.clone()
855                        }
856                        EvalError::DateDiffOverflow { unit, a, b } => EvalError::DateDiffOverflow {
857                            unit: self.string_region.copy(unit),
858                            a: self.string_region.copy(a),
859                            b: self.string_region.copy(b),
860                        },
861                        EvalError::IfNullError(x) => {
862                            EvalError::IfNullError(self.string_region.copy(x))
863                        }
864                        EvalError::InvalidIanaTimezoneId(x) => {
865                            EvalError::InvalidIanaTimezoneId(self.string_region.copy(x))
866                        }
867                        EvalError::PrettyError(x) => {
868                            EvalError::PrettyError(self.string_region.copy(x))
869                        }
870                    };
871                    let reference = self.eval_error_region.copy_iter(once(err));
872                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
873                    DataflowError::EvalError(boxed)
874                }
875                DataflowError::SourceError(err) => {
876                    let err: &SourceError = &*err;
877                    let err = SourceError {
878                        error: match &err.error {
879                            SourceErrorDetails::Initialization(string) => {
880                                SourceErrorDetails::Initialization(self.string_region.copy(string))
881                            }
882                            SourceErrorDetails::Other(string) => {
883                                SourceErrorDetails::Other(self.string_region.copy(string))
884                            }
885                        },
886                    };
887                    let reference = self.source_error_region.copy_iter(once(err));
888                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
889                    DataflowError::SourceError(boxed)
890                }
891                DataflowError::EnvelopeError(err) => {
892                    let err: &EnvelopeError = &*err;
893                    let err = match err {
894                        EnvelopeError::Upsert(err) => {
895                            let err = match err {
896                                UpsertError::KeyDecode(err) => {
897                                    UpsertError::KeyDecode(self.copy_decode_error(err))
898                                }
899                                UpsertError::Value(err) => UpsertError::Value(UpsertValueError {
900                                    inner: self.copy_decode_error(&err.inner),
901                                    for_key: self.row_region.copy(&err.for_key),
902                                }),
903                                UpsertError::NullKey(err) => UpsertError::NullKey(*err),
904                            };
905                            EnvelopeError::Upsert(err)
906                        }
907                        EnvelopeError::Flat(string) => {
908                            EnvelopeError::Flat(self.string_region.copy(string))
909                        }
910                    };
911                    let reference = self.envelope_error_region.copy_iter(once(err));
912                    let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
913                    DataflowError::EnvelopeError(boxed)
914                }
915            };
916            // Debug-only check that we're returning an equal object.
917            debug_assert_eq!(item, &err);
918            err
919        }
920
921        fn clear(&mut self) {
922            // De-structure `self` to make sure we're clearing all regions.
923            let Self {
924                decode_error_region,
925                envelope_error_region,
926                eval_error_region,
927                row_region,
928                source_error_region,
929                string_region,
930                u8_region,
931            } = self;
932            decode_error_region.clear();
933            envelope_error_region.clear();
934            eval_error_region.clear();
935            row_region.clear();
936            source_error_region.clear();
937            string_region.clear();
938            u8_region.clear();
939        }
940
941        fn reserve_items<'a, I>(&mut self, items: I)
942        where
943            Self: 'a,
944            I: Iterator<Item = &'a Self::Item> + Clone,
945        {
946            // Reserve space on all stable regions.
947            self.decode_error_region.reserve(
948                items
949                    .clone()
950                    .filter(|x| matches!(x, DataflowError::DecodeError(_)))
951                    .count(),
952            );
953            self.envelope_error_region.reserve(
954                items
955                    .clone()
956                    .filter(|x| matches!(x, DataflowError::EnvelopeError(_)))
957                    .count(),
958            );
959            self.eval_error_region.reserve(
960                items
961                    .clone()
962                    .filter(|x| matches!(x, DataflowError::EvalError(_)))
963                    .count(),
964            );
965            self.source_error_region.reserve(
966                items
967                    .filter(|x| matches!(x, DataflowError::SourceError(_)))
968                    .count(),
969            );
970        }
971
972        fn reserve_regions<'a, I>(&mut self, regions: I)
973        where
974            Self: 'a,
975            I: Iterator<Item = &'a Self> + Clone,
976        {
977            // Reserve space on all region allocators.
978            self.row_region
979                .reserve_regions(regions.clone().map(|r| &r.row_region));
980            self.string_region
981                .reserve_regions(regions.clone().map(|r| &r.string_region));
982            self.u8_region
983                .reserve_regions(regions.clone().map(|r| &r.u8_region));
984        }
985
986        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
987            // De-structure `self` to make sure we're counting all regions.
988            let Self {
989                decode_error_region,
990                envelope_error_region,
991                eval_error_region,
992                row_region,
993                source_error_region,
994                string_region,
995                u8_region,
996            } = &self;
997            decode_error_region.heap_size(&mut callback);
998            envelope_error_region.heap_size(&mut callback);
999            eval_error_region.heap_size(&mut callback);
1000            row_region.heap_size(&mut callback);
1001            source_error_region.heap_size(&mut callback);
1002            string_region.heap_size(&mut callback);
1003            u8_region.heap_size(&mut callback);
1004        }
1005    }
1006
1007    #[cfg(test)]
1008    mod tests {
1009        use differential_dataflow::containers::TimelyStack;
1010        use proptest::prelude::*;
1011
1012        use super::*;
1013
1014        fn columnation_roundtrip<T: Columnation>(item: &T) -> TimelyStack<T> {
1015            let mut container = TimelyStack::with_capacity(1);
1016            container.copy(item);
1017            container
1018        }
1019
1020        proptest! {
1021            #[mz_ore::test]
1022            #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1023            fn dataflow_error_roundtrip(expect in any::<DataflowError>()) {
1024                let actual = columnation_roundtrip(&expect);
1025                proptest::prop_assert_eq!(&expect, &actual[0])
1026            }
1027        }
1028    }
1029}
1030
1031impl RustType<ProtoDataflowError> for DataflowError {
1032    fn into_proto(&self) -> ProtoDataflowError {
1033        use proto_dataflow_error::Kind::*;
1034        ProtoDataflowError {
1035            kind: Some(match self {
1036                DataflowError::DecodeError(err) => DecodeError(*err.into_proto()),
1037                DataflowError::EvalError(err) => EvalError(*err.into_proto()),
1038                DataflowError::SourceError(err) => SourceError(*err.into_proto()),
1039                DataflowError::EnvelopeError(err) => EnvelopeErrorV1(*err.into_proto()),
1040            }),
1041        }
1042    }
1043
1044    fn from_proto(proto: ProtoDataflowError) -> Result<Self, TryFromProtoError> {
1045        use proto_dataflow_error::Kind::*;
1046        match proto.kind {
1047            Some(kind) => match kind {
1048                DecodeError(err) => Ok(DataflowError::DecodeError(Box::new(err.into_rust()?))),
1049                EvalError(err) => Ok(DataflowError::EvalError(Box::new(err.into_rust()?))),
1050                SourceError(err) => Ok(DataflowError::SourceError(Box::new(err.into_rust()?))),
1051                EnvelopeErrorV1(err) => {
1052                    Ok(DataflowError::EnvelopeError(Box::new(err.into_rust()?)))
1053                }
1054            },
1055            None => Err(TryFromProtoError::missing_field("ProtoDataflowError::kind")),
1056        }
1057    }
1058}
1059
1060impl Display for DataflowError {
1061    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1062        match self {
1063            DataflowError::DecodeError(e) => write!(f, "Decode error: {}", e),
1064            DataflowError::EvalError(e) => write!(
1065                f,
1066                "{}{}",
1067                match **e {
1068                    EvalError::IfNullError(_) => "",
1069                    _ => "Evaluation error: ",
1070                },
1071                e
1072            ),
1073            DataflowError::SourceError(e) => write!(f, "Source error: {}", e),
1074            DataflowError::EnvelopeError(e) => write!(f, "Envelope error: {}", e),
1075        }
1076    }
1077}
1078
1079impl From<DecodeError> for DataflowError {
1080    fn from(e: DecodeError) -> Self {
1081        Self::DecodeError(Box::new(e))
1082    }
1083}
1084
1085impl From<EvalError> for DataflowError {
1086    fn from(e: EvalError) -> Self {
1087        Self::EvalError(Box::new(e))
1088    }
1089}
1090
1091impl From<SourceError> for DataflowError {
1092    fn from(e: SourceError) -> Self {
1093        Self::SourceError(Box::new(e))
1094    }
1095}
1096
1097impl From<EnvelopeError> for DataflowError {
1098    fn from(e: EnvelopeError) -> Self {
1099        Self::EnvelopeError(Box::new(e))
1100    }
1101}
1102
1103/// An error returned by `KafkaConnection::create_with_context`.
1104#[derive(thiserror::Error, Debug)]
1105pub enum ContextCreationError {
1106    // This ends up double-printing `ssh:` in status tables, but makes for
1107    // a better experience during ddl.
1108    #[error("ssh: {0}")]
1109    Ssh(#[source] anyhow::Error),
1110    #[error(transparent)]
1111    KafkaError(#[from] KafkaError),
1112    #[error(transparent)]
1113    Dns(#[from] mz_ore::netio::DnsResolutionError),
1114    #[error(transparent)]
1115    Other(#[from] anyhow::Error),
1116    #[error(transparent)]
1117    Io(#[from] std::io::Error),
1118}
1119
1120/// An extension trait for `Result<T, E>` that makes producing `ContextCreationError`s easier.
1121pub trait ContextCreationErrorExt<T> {
1122    /// Override the error case with an ssh error from `cx`, if there is one.
1123    fn check_ssh_status<C>(self, cx: &TunnelingClientContext<C>)
1124    -> Result<T, ContextCreationError>;
1125    /// Add context to the errors within the variants of `ContextCreationError`, without
1126    /// altering the `Ssh` variant.
1127    fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError>;
1128}
1129
1130impl<T, E> ContextCreationErrorExt<T> for Result<T, E>
1131where
1132    ContextCreationError: From<E>,
1133{
1134    fn check_ssh_status<C>(
1135        self,
1136        cx: &TunnelingClientContext<C>,
1137    ) -> Result<T, ContextCreationError> {
1138        self.map_err(|e| {
1139            if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
1140                ContextCreationError::Ssh(anyhow!(e))
1141            } else {
1142                ContextCreationError::from(e)
1143            }
1144        })
1145    }
1146
1147    fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError> {
1148        self.map_err(|e| {
1149            let e = ContextCreationError::from(e);
1150            match e {
1151                // We need to preserve the `Ssh` variant here, so we add the context inside of it.
1152                ContextCreationError::Ssh(e) => ContextCreationError::Ssh(anyhow!(e.context(msg))),
1153                ContextCreationError::Other(e) => {
1154                    ContextCreationError::Other(anyhow!(e.context(msg)))
1155                }
1156                ContextCreationError::KafkaError(e) => {
1157                    ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1158                }
1159                ContextCreationError::Io(e) => {
1160                    ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1161                }
1162                ContextCreationError::Dns(e) => {
1163                    ContextCreationError::Other(anyhow!(e).context(msg))
1164                }
1165            }
1166        })
1167    }
1168}
1169
1170impl From<CsrConnectError> for ContextCreationError {
1171    fn from(csr: CsrConnectError) -> ContextCreationError {
1172        use ContextCreationError::*;
1173
1174        match csr {
1175            CsrConnectError::Ssh(e) => Ssh(e),
1176            other => Other(anyhow::anyhow!(other)),
1177        }
1178    }
1179}
1180
1181/// An error returned by `CsrConnection::connect`.
1182#[derive(thiserror::Error, Debug)]
1183pub enum CsrConnectError {
1184    // This ends up double-printing `ssh:` in status tables, but makes for
1185    // a better experience during ddl.
1186    #[error("ssh: {0}")]
1187    Ssh(#[source] anyhow::Error),
1188    #[error(transparent)]
1189    NativeTls(#[from] native_tls::Error),
1190    #[error(transparent)]
1191    Openssl(#[from] openssl::error::ErrorStack),
1192    #[error(transparent)]
1193    Dns(#[from] mz_ore::netio::DnsResolutionError),
1194    #[error(transparent)]
1195    Io(#[from] std::io::Error),
1196    #[error(transparent)]
1197    Other(#[from] anyhow::Error),
1198}
1199
1200/// Error returned in response to a reference to an unknown collection.
1201#[derive(Error, Debug)]
1202#[error("collection does not exist: {0}")]
1203pub struct CollectionMissing(pub GlobalId);
1204
1205#[cfg(test)]
1206mod tests {
1207    use crate::errors::DecodeErrorKind;
1208
1209    use super::DecodeError;
1210
1211    #[mz_ore::test]
1212    fn test_decode_error_codec_roundtrip() -> Result<(), String> {
1213        let original = DecodeError {
1214            kind: DecodeErrorKind::Text("ciao".into()),
1215            raw: b"oaic".to_vec(),
1216        };
1217        let mut encoded = Vec::new();
1218        original.encode(&mut encoded);
1219        let decoded = DecodeError::decode(&encoded)?;
1220
1221        assert_eq!(decoded, original);
1222
1223        Ok(())
1224    }
1225}