1use anyhow::anyhow;
11use std::error::Error;
12use std::fmt::Display;
13
14use bytes::BufMut;
15use mz_expr::EvalError;
16use mz_kafka_util::client::TunnelingClientContext;
17use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
18use mz_repr::{GlobalId, Row};
19use mz_ssh_util::tunnel::SshTunnelStatus;
20use proptest_derive::Arbitrary;
21use prost::Message;
22use rdkafka::error::KafkaError;
23use serde::{Deserialize, Serialize};
24use thiserror::Error;
25use tracing::warn;
26
27include!(concat!(env!("OUT_DIR"), "/mz_storage_types.errors.rs"));
28
29#[derive(
32 Arbitrary,
33 Ord,
34 PartialOrd,
35 Clone,
36 Debug,
37 Eq,
38 PartialEq,
39 Serialize,
40 Deserialize,
41 Hash
42)]
43pub struct DecodeError {
44 pub kind: DecodeErrorKind,
45 pub raw: Vec<u8>,
46}
47
48impl RustType<ProtoDecodeError> for DecodeError {
49 fn into_proto(&self) -> ProtoDecodeError {
50 ProtoDecodeError {
51 kind: Some(RustType::into_proto(&self.kind)),
52 raw: Some(self.raw.clone()),
53 }
54 }
55
56 fn from_proto(proto: ProtoDecodeError) -> Result<Self, TryFromProtoError> {
57 let kind = match proto.kind {
58 Some(kind) => RustType::from_proto(kind)?,
59 None => return Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
60 };
61 let raw = proto.raw.into_rust_if_some("raw")?;
62 Ok(Self { kind, raw })
63 }
64}
65
66impl DecodeError {
67 pub fn encode<B>(&self, buf: &mut B)
68 where
69 B: BufMut,
70 {
71 self.into_proto()
72 .encode(buf)
73 .expect("no required fields means no initialization errors")
74 }
75
76 pub fn decode(buf: &[u8]) -> Result<Self, String> {
77 let proto = ProtoDecodeError::decode(buf).map_err(|err| err.to_string())?;
78 proto.into_rust().map_err(|err| err.to_string())
79 }
80}
81
82impl Display for DecodeError {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let str_repr = std::str::from_utf8(&self.raw).ok();
86 let str_repr = str_repr.map(|s| s.replace('\0', "NULL"));
89 let bytes_repr = hex::encode(&self.raw);
90 match str_repr {
91 Some(s) => write!(
92 f,
93 "{} (original text: {}, original bytes: {:x?})",
94 self.kind, s, bytes_repr
95 ),
96 None => write!(f, "{} (original bytes: {:x?})", self.kind, bytes_repr),
97 }
98 }
99}
100
101#[derive(
102 Arbitrary,
103 Ord,
104 PartialOrd,
105 Clone,
106 Debug,
107 Eq,
108 PartialEq,
109 Serialize,
110 Deserialize,
111 Hash
112)]
113pub enum DecodeErrorKind {
114 Text(Box<str>),
115 Bytes(Box<str>),
116}
117
118impl RustType<ProtoDecodeErrorKind> for DecodeErrorKind {
119 fn into_proto(&self) -> ProtoDecodeErrorKind {
120 use proto_decode_error_kind::Kind::*;
121 ProtoDecodeErrorKind {
122 kind: Some(match self {
123 DecodeErrorKind::Text(v) => Text(v.into_proto()),
124 DecodeErrorKind::Bytes(v) => Bytes(v.into_proto()),
125 }),
126 }
127 }
128
129 fn from_proto(proto: ProtoDecodeErrorKind) -> Result<Self, TryFromProtoError> {
130 use proto_decode_error_kind::Kind::*;
131 match proto.kind {
132 Some(Text(v)) => Ok(DecodeErrorKind::Text(v.into())),
133 Some(Bytes(v)) => Ok(DecodeErrorKind::Bytes(v.into())),
134 None => Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
135 }
136 }
137}
138
139impl Display for DecodeErrorKind {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 match self {
142 DecodeErrorKind::Text(e) => f.write_str(e),
143 DecodeErrorKind::Bytes(e) => f.write_str(e),
144 }
145 }
146}
147
148#[derive(
150 Arbitrary,
151 Ord,
152 PartialOrd,
153 Clone,
154 Debug,
155 Eq,
156 Deserialize,
157 Serialize,
158 PartialEq,
159 Hash
160)]
161pub enum EnvelopeError {
162 Upsert(UpsertError),
164 Flat(Box<str>),
167}
168
169impl RustType<ProtoEnvelopeErrorV1> for EnvelopeError {
170 fn into_proto(&self) -> ProtoEnvelopeErrorV1 {
171 use proto_envelope_error_v1::Kind;
172 ProtoEnvelopeErrorV1 {
173 kind: Some(match self {
174 EnvelopeError::Upsert(rust) => Kind::Upsert(rust.into_proto()),
175 EnvelopeError::Flat(text) => Kind::Flat(text.into_proto()),
176 }),
177 }
178 }
179
180 fn from_proto(proto: ProtoEnvelopeErrorV1) -> Result<Self, TryFromProtoError> {
181 use proto_envelope_error_v1::Kind;
182 match proto.kind {
183 Some(Kind::Upsert(proto)) => {
184 let rust = RustType::from_proto(proto)?;
185 Ok(Self::Upsert(rust))
186 }
187 Some(Kind::Flat(text)) => Ok(Self::Flat(text.into())),
188 None => Err(TryFromProtoError::missing_field(
189 "ProtoEnvelopeErrorV1::kind",
190 )),
191 }
192 }
193}
194
195impl Display for EnvelopeError {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 match self {
198 EnvelopeError::Upsert(err) => write!(f, "Upsert: {err}"),
199 EnvelopeError::Flat(err) => write!(f, "Flat: {err}"),
200 }
201 }
202}
203
204#[derive(
207 Arbitrary,
208 Ord,
209 PartialOrd,
210 Clone,
211 Debug,
212 Eq,
213 PartialEq,
214 Serialize,
215 Deserialize,
216 Hash
217)]
218pub struct UpsertValueError {
219 pub inner: DecodeError,
221 pub for_key: Row,
223}
224
225impl RustType<ProtoUpsertValueError> for UpsertValueError {
226 fn into_proto(&self) -> ProtoUpsertValueError {
227 ProtoUpsertValueError {
228 inner: Some(self.inner.into_proto()),
229 for_key: Some(self.for_key.into_proto()),
230 }
231 }
232
233 fn from_proto(proto: ProtoUpsertValueError) -> Result<Self, TryFromProtoError> {
234 Ok(UpsertValueError {
235 inner: proto
236 .inner
237 .into_rust_if_some("ProtoUpsertValueError::inner")?,
238 for_key: proto
239 .for_key
240 .into_rust_if_some("ProtoUpsertValueError::for_key")?,
241 })
242 }
243}
244
245impl Display for UpsertValueError {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 let UpsertValueError { inner, for_key } = self;
248 write!(f, "{inner}, decoded key: {for_key:?}")?;
249 Ok(())
250 }
251}
252
253#[derive(
255 Arbitrary,
256 Ord,
257 PartialOrd,
258 Copy,
259 Clone,
260 Debug,
261 Eq,
262 PartialEq,
263 Serialize,
264 Deserialize,
265 Hash
266)]
267pub struct UpsertNullKeyError;
268
269impl RustType<ProtoUpsertNullKeyError> for UpsertNullKeyError {
270 fn into_proto(&self) -> ProtoUpsertNullKeyError {
271 ProtoUpsertNullKeyError {}
272 }
273
274 fn from_proto(_proto: ProtoUpsertNullKeyError) -> Result<Self, TryFromProtoError> {
275 Ok(UpsertNullKeyError)
276 }
277}
278
279impl Display for UpsertNullKeyError {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 write!(
282 f,
283 "record with NULL key in UPSERT source; to retract this error, "
284 )?;
285 write!(
286 f,
287 "produce a record upstream with a NULL key and NULL value"
288 )
289 }
290}
291
292#[derive(
294 Arbitrary,
295 Ord,
296 PartialOrd,
297 Clone,
298 Debug,
299 Eq,
300 PartialEq,
301 Serialize,
302 Deserialize,
303 Hash
304)]
305pub enum UpsertError {
306 KeyDecode(DecodeError),
315 Value(UpsertValueError),
317 NullKey(UpsertNullKeyError),
318}
319
320impl RustType<ProtoUpsertError> for UpsertError {
321 fn into_proto(&self) -> ProtoUpsertError {
322 use proto_upsert_error::Kind;
323 ProtoUpsertError {
324 kind: Some(match self {
325 UpsertError::KeyDecode(err) => Kind::KeyDecode(err.into_proto()),
326 UpsertError::Value(err) => Kind::Value(err.into_proto()),
327 UpsertError::NullKey(err) => Kind::NullKey(err.into_proto()),
328 }),
329 }
330 }
331
332 fn from_proto(proto: ProtoUpsertError) -> Result<Self, TryFromProtoError> {
333 use proto_upsert_error::Kind;
334 match proto.kind {
335 Some(Kind::KeyDecode(proto)) => {
336 let rust = RustType::from_proto(proto)?;
337 Ok(Self::KeyDecode(rust))
338 }
339 Some(Kind::Value(proto)) => {
340 let rust = RustType::from_proto(proto)?;
341 Ok(Self::Value(rust))
342 }
343 Some(Kind::NullKey(proto)) => {
344 let rust = RustType::from_proto(proto)?;
345 Ok(Self::NullKey(rust))
346 }
347 None => Err(TryFromProtoError::missing_field("ProtoUpsertError::kind")),
348 }
349 }
350}
351
352impl Display for UpsertError {
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354 match self {
355 UpsertError::KeyDecode(err) => write!(f, "Key decode: {err}"),
356 UpsertError::Value(err) => write!(f, "Value error: {err}"),
357 UpsertError::NullKey(err) => write!(f, "Null key: {err}"),
358 }
359 }
360}
361
362#[derive(
365 Arbitrary,
366 Ord,
367 PartialOrd,
368 Clone,
369 Debug,
370 Eq,
371 PartialEq,
372 Serialize,
373 Deserialize,
374 Hash
375)]
376pub struct SourceError {
377 pub error: SourceErrorDetails,
378}
379
380impl RustType<ProtoSourceError> for SourceError {
381 fn into_proto(&self) -> ProtoSourceError {
382 ProtoSourceError {
383 error: Some(self.error.into_proto()),
384 }
385 }
386
387 fn from_proto(proto: ProtoSourceError) -> Result<Self, TryFromProtoError> {
388 Ok(SourceError {
389 error: proto.error.into_rust_if_some("ProtoSourceError::error")?,
390 })
391 }
392}
393
394impl Display for SourceError {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 self.error.fmt(f)
397 }
398}
399
400#[derive(
401 Arbitrary,
402 Ord,
403 PartialOrd,
404 Clone,
405 Debug,
406 Eq,
407 PartialEq,
408 Serialize,
409 Deserialize,
410 Hash
411)]
412pub enum SourceErrorDetails {
413 Initialization(Box<str>),
414 Other(Box<str>),
415}
416
417impl RustType<ProtoSourceErrorDetails> for SourceErrorDetails {
418 fn into_proto(&self) -> ProtoSourceErrorDetails {
419 use proto_source_error_details::Kind;
420 ProtoSourceErrorDetails {
421 kind: Some(match self {
422 SourceErrorDetails::Initialization(s) => Kind::Initialization(s.into_proto()),
423 SourceErrorDetails::Other(s) => Kind::Other(s.into_proto()),
424 }),
425 }
426 }
427
428 fn from_proto(proto: ProtoSourceErrorDetails) -> Result<Self, TryFromProtoError> {
429 use proto_source_error_details::Kind;
430 match proto.kind {
431 Some(kind) => match kind {
432 Kind::Initialization(s) => Ok(SourceErrorDetails::Initialization(s.into())),
433 Kind::DeprecatedFileIo(s) | Kind::DeprecatedPersistence(s) => {
434 warn!("Deprecated source error kind: {s}");
435 Ok(SourceErrorDetails::Other(s.into()))
436 }
437 Kind::Other(s) => Ok(SourceErrorDetails::Other(s.into())),
438 },
439 None => Err(TryFromProtoError::missing_field(
440 "ProtoSourceErrorDetails::kind",
441 )),
442 }
443 }
444}
445
446impl Display for SourceErrorDetails {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448 match self {
449 SourceErrorDetails::Initialization(e) => {
450 write!(
451 f,
452 "failed during initialization, source must be dropped and recreated: {}",
453 e
454 )
455 }
456 SourceErrorDetails::Other(e) => {
457 write!(
458 f,
459 "source must be dropped and recreated due to failure: {}",
460 e
461 )
462 }
463 }
464 }
465}
466
467#[derive(
474 Arbitrary,
475 Ord,
476 PartialOrd,
477 Clone,
478 Debug,
479 Eq,
480 Deserialize,
481 Serialize,
482 PartialEq,
483 Hash
484)]
485pub enum DataflowError {
486 DecodeError(Box<DecodeError>),
487 EvalError(Box<EvalError>),
488 SourceError(Box<SourceError>),
489 EnvelopeError(Box<EnvelopeError>),
490}
491
492impl Error for DataflowError {}
493
494mod boxed_str {
495
496 use differential_dataflow::containers::Region;
497 use differential_dataflow::containers::StableRegion;
498
499 #[derive(Default)]
504 pub struct BoxStrStack {
505 region: StableRegion<u8>,
506 }
507
508 impl Region for BoxStrStack {
509 type Item = Box<str>;
510 #[inline]
511 fn clear(&mut self) {
512 self.region.clear();
513 }
514 #[inline(always)]
517 unsafe fn copy(&mut self, item: &Box<str>) -> Box<str> {
518 let bytes = self.region.copy_slice(item.as_bytes());
519 std::str::from_boxed_utf8_unchecked(Box::from_raw(bytes))
522 }
523 #[inline(always)]
524 fn reserve_items<'a, I>(&mut self, items: I)
525 where
526 Self: 'a,
527 I: Iterator<Item = &'a Self::Item> + Clone,
528 {
529 self.region.reserve(items.map(|x| x.len()).sum());
530 }
531
532 fn reserve_regions<'a, I>(&mut self, regions: I)
533 where
534 Self: 'a,
535 I: Iterator<Item = &'a Self> + Clone,
536 {
537 self.region.reserve(regions.map(|r| r.region.len()).sum());
538 }
539 #[inline]
540 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
541 self.region.heap_size(callback)
542 }
543 }
544}
545
546mod columnation {
547 use std::iter::once;
548
549 use differential_dataflow::containers::{Columnation, Region, StableRegion};
550 use mz_expr::EvalError;
551 use mz_repr::Row;
552 use mz_repr::adt::range::InvalidRangeError;
553 use mz_repr::strconv::ParseError;
554
555 use crate::errors::boxed_str::BoxStrStack;
556 use crate::errors::{
557 DataflowError, DecodeError, DecodeErrorKind, EnvelopeError, SourceError,
558 SourceErrorDetails, UpsertError, UpsertValueError,
559 };
560
561 impl Columnation for DataflowError {
562 type InnerRegion = DataflowErrorRegion;
563 }
564
565 #[derive(Default)]
567 pub struct DataflowErrorRegion {
568 decode_error_region: StableRegion<DecodeError>,
570 envelope_error_region: StableRegion<EnvelopeError>,
572 eval_error_region: StableRegion<EvalError>,
574 row_region: <Row as Columnation>::InnerRegion,
576 source_error_region: StableRegion<SourceError>,
578 string_region: BoxStrStack,
580 u8_region: <Vec<u8> as Columnation>::InnerRegion,
582 }
583
584 impl DataflowErrorRegion {
585 unsafe fn copy_decode_error(&mut self, decode_error: &DecodeError) -> DecodeError {
589 DecodeError {
590 kind: match &decode_error.kind {
591 DecodeErrorKind::Text(string) => {
592 DecodeErrorKind::Text(self.string_region.copy(string))
593 }
594 DecodeErrorKind::Bytes(string) => {
595 DecodeErrorKind::Bytes(self.string_region.copy(string))
596 }
597 },
598 raw: self.u8_region.copy(&decode_error.raw),
599 }
600 }
601 }
602
603 fn assert_copy<T: Copy>(_: &T) {}
605
606 impl Region for DataflowErrorRegion {
607 type Item = DataflowError;
608
609 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
610 let err = match item {
622 DataflowError::DecodeError(err) => {
623 let err = self.copy_decode_error(&*err);
624 let reference = self.decode_error_region.copy_iter(once(err));
625 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
626 DataflowError::DecodeError(boxed)
627 }
628 DataflowError::EvalError(err) => {
629 let err: &EvalError = &*err;
630 let err = match err {
631 e @ EvalError::CharacterNotValidForEncoding(x) => {
632 assert_copy(x);
633 e.clone()
634 }
635 e @ EvalError::CharacterTooLargeForEncoding(x) => {
636 assert_copy(x);
637 e.clone()
638 }
639 EvalError::DateBinOutOfRange(string) => {
640 EvalError::DateBinOutOfRange(self.string_region.copy(string))
641 }
642 e @ EvalError::DivisionByZero
643 | e @ EvalError::FloatOverflow
644 | e @ EvalError::FloatUnderflow
645 | e @ EvalError::NumericFieldOverflow
646 | e @ EvalError::MzTimestampStepOverflow
647 | e @ EvalError::TimestampCannotBeNan
648 | e @ EvalError::TimestampOutOfRange
649 | e @ EvalError::NegSqrt
650 | e @ EvalError::NegLimit
651 | e @ EvalError::NullCharacterNotPermitted
652 | e @ EvalError::KeyCannotBeNull
653 | e @ EvalError::UnterminatedLikeEscapeSequence
654 | e @ EvalError::MultipleRowsFromSubquery
655 | e @ EvalError::LikePatternTooLong
656 | e @ EvalError::LikeEscapeTooLong
657 | e @ EvalError::MultidimensionalArrayRemovalNotSupported
658 | e @ EvalError::MultiDimensionalArraySearch
659 | e @ EvalError::ArrayFillWrongArraySubscripts
660 | e @ EvalError::DateOutOfRange
661 | e @ EvalError::CharOutOfRange
662 | e @ EvalError::InvalidBase64Equals
663 | e @ EvalError::InvalidBase64EndSequence
664 | e @ EvalError::InvalidTimezoneInterval
665 | e @ EvalError::InvalidTimezoneConversion
666 | e @ EvalError::LengthTooLarge
667 | e @ EvalError::AclArrayNullElement
668 | e @ EvalError::MzAclArrayNullElement => e.clone(),
669 EvalError::Unsupported {
670 feature,
671 discussion_no,
672 } => EvalError::Unsupported {
673 feature: self.string_region.copy(feature),
674 discussion_no: *discussion_no,
675 },
676 EvalError::Float32OutOfRange(string) => {
677 EvalError::Float32OutOfRange(self.string_region.copy(string))
678 }
679 EvalError::Float64OutOfRange(string) => {
680 EvalError::Float64OutOfRange(self.string_region.copy(string))
681 }
682 EvalError::Int16OutOfRange(string) => {
683 EvalError::Int16OutOfRange(self.string_region.copy(string))
684 }
685 EvalError::Int32OutOfRange(string) => {
686 EvalError::Int32OutOfRange(self.string_region.copy(string))
687 }
688 EvalError::Int64OutOfRange(string) => {
689 EvalError::Int64OutOfRange(self.string_region.copy(string))
690 }
691 EvalError::UInt16OutOfRange(string) => {
692 EvalError::UInt16OutOfRange(self.string_region.copy(string))
693 }
694 EvalError::UInt32OutOfRange(string) => {
695 EvalError::UInt32OutOfRange(self.string_region.copy(string))
696 }
697 EvalError::UInt64OutOfRange(string) => {
698 EvalError::UInt64OutOfRange(self.string_region.copy(string))
699 }
700 EvalError::MzTimestampOutOfRange(string) => {
701 EvalError::MzTimestampOutOfRange(self.string_region.copy(string))
702 }
703 EvalError::OidOutOfRange(string) => {
704 EvalError::OidOutOfRange(self.string_region.copy(string))
705 }
706 EvalError::IntervalOutOfRange(string) => {
707 EvalError::IntervalOutOfRange(self.string_region.copy(string))
708 }
709 e @ EvalError::IndexOutOfRange {
710 provided,
711 valid_end,
712 } => {
713 assert_copy(provided);
714 assert_copy(valid_end);
715 e.clone()
716 }
717 e @ EvalError::InvalidBase64Symbol(c) => {
718 assert_copy(c);
719 e.clone()
720 }
721 EvalError::InvalidTimezone(x) => {
722 EvalError::InvalidTimezone(self.string_region.copy(x))
723 }
724 e @ EvalError::InvalidLayer { max_layer, val } => {
725 assert_copy(max_layer);
726 assert_copy(val);
727 e.clone()
728 }
729 EvalError::InvalidArray(err) => EvalError::InvalidArray(*err),
730 EvalError::InvalidEncodingName(x) => {
731 EvalError::InvalidEncodingName(self.string_region.copy(x))
732 }
733 EvalError::InvalidHashAlgorithm(x) => {
734 EvalError::InvalidHashAlgorithm(self.string_region.copy(x))
735 }
736 EvalError::InvalidByteSequence {
737 byte_sequence,
738 encoding_name,
739 } => EvalError::InvalidByteSequence {
740 byte_sequence: self.string_region.copy(byte_sequence),
741 encoding_name: self.string_region.copy(encoding_name),
742 },
743 EvalError::InvalidJsonbCast { from, to } => EvalError::InvalidJsonbCast {
744 from: self.string_region.copy(from),
745 to: self.string_region.copy(to),
746 },
747 EvalError::InvalidRegex(x) => {
748 EvalError::InvalidRegex(self.string_region.copy(x))
749 }
750 e @ EvalError::InvalidRegexFlag(x) => {
751 assert_copy(x);
752 e.clone()
753 }
754 EvalError::InvalidParameterValue(x) => {
755 EvalError::InvalidParameterValue(self.string_region.copy(x))
756 }
757 EvalError::InvalidDatePart(x) => {
758 EvalError::InvalidDatePart(self.string_region.copy(x))
759 }
760 EvalError::UnknownUnits(x) => {
761 EvalError::UnknownUnits(self.string_region.copy(x))
762 }
763 EvalError::UnsupportedUnits(x, y) => EvalError::UnsupportedUnits(
764 self.string_region.copy(x),
765 self.string_region.copy(y),
766 ),
767 EvalError::Parse(ParseError {
768 kind,
769 type_name,
770 input,
771 details,
772 }) => EvalError::Parse(ParseError {
773 kind: *kind,
774 type_name: self.string_region.copy(type_name),
775 input: self.string_region.copy(input),
776 details: details
777 .as_ref()
778 .map(|details| self.string_region.copy(details)),
779 }),
780 e @ EvalError::ParseHex(x) => {
781 assert_copy(x);
782 e.clone()
783 }
784 EvalError::Internal(x) => EvalError::Internal(self.string_region.copy(x)),
785 EvalError::InfinityOutOfDomain(x) => {
786 EvalError::InfinityOutOfDomain(self.string_region.copy(x))
787 }
788 EvalError::NegativeOutOfDomain(x) => {
789 EvalError::NegativeOutOfDomain(self.string_region.copy(x))
790 }
791 EvalError::ZeroOutOfDomain(x) => {
792 EvalError::ZeroOutOfDomain(self.string_region.copy(x))
793 }
794 EvalError::OutOfDomain(x, y, z) => {
795 assert_copy(x);
796 assert_copy(y);
797 EvalError::OutOfDomain(*x, *y, self.string_region.copy(z))
798 }
799 EvalError::ComplexOutOfRange(x) => {
800 EvalError::ComplexOutOfRange(self.string_region.copy(x))
801 }
802 EvalError::Undefined(x) => EvalError::Undefined(self.string_region.copy(x)),
803 EvalError::StringValueTooLong {
804 target_type,
805 length,
806 } => EvalError::StringValueTooLong {
807 target_type: self.string_region.copy(target_type),
808 length: *length,
809 },
810 e @ EvalError::IncompatibleArrayDimensions { dims } => {
811 assert_copy(dims);
812 e.clone()
813 }
814 EvalError::TypeFromOid(x) => {
815 EvalError::TypeFromOid(self.string_region.copy(x))
816 }
817 EvalError::InvalidRange(x) => {
818 let err = match x {
819 e @ InvalidRangeError::MisorderedRangeBounds
820 | e @ InvalidRangeError::InvalidRangeBoundFlags
821 | e @ InvalidRangeError::DiscontiguousUnion
822 | e @ InvalidRangeError::DiscontiguousDifference
823 | e @ InvalidRangeError::NullRangeBoundFlags => e.clone(),
824 InvalidRangeError::CanonicalizationOverflow(string) => {
825 InvalidRangeError::CanonicalizationOverflow(
826 self.string_region.copy(string),
827 )
828 }
829 };
830 EvalError::InvalidRange(err)
831 }
832 EvalError::InvalidRoleId(x) => {
833 EvalError::InvalidRoleId(self.string_region.copy(x))
834 }
835 EvalError::InvalidPrivileges(x) => {
836 EvalError::InvalidPrivileges(self.string_region.copy(x))
837 }
838 EvalError::LetRecLimitExceeded(x) => {
839 EvalError::LetRecLimitExceeded(self.string_region.copy(x))
840 }
841 EvalError::MustNotBeNull(x) => {
842 EvalError::MustNotBeNull(self.string_region.copy(x))
843 }
844 EvalError::InvalidIdentifier { ident, detail } => {
845 EvalError::InvalidIdentifier {
846 ident: self.string_region.copy(ident),
847 detail: detail
848 .as_ref()
849 .map(|detail| self.string_region.copy(detail)),
850 }
851 }
852 e @ EvalError::MaxArraySizeExceeded(x) => {
853 assert_copy(x);
854 e.clone()
855 }
856 EvalError::DateDiffOverflow { unit, a, b } => EvalError::DateDiffOverflow {
857 unit: self.string_region.copy(unit),
858 a: self.string_region.copy(a),
859 b: self.string_region.copy(b),
860 },
861 EvalError::IfNullError(x) => {
862 EvalError::IfNullError(self.string_region.copy(x))
863 }
864 EvalError::InvalidIanaTimezoneId(x) => {
865 EvalError::InvalidIanaTimezoneId(self.string_region.copy(x))
866 }
867 EvalError::PrettyError(x) => {
868 EvalError::PrettyError(self.string_region.copy(x))
869 }
870 };
871 let reference = self.eval_error_region.copy_iter(once(err));
872 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
873 DataflowError::EvalError(boxed)
874 }
875 DataflowError::SourceError(err) => {
876 let err: &SourceError = &*err;
877 let err = SourceError {
878 error: match &err.error {
879 SourceErrorDetails::Initialization(string) => {
880 SourceErrorDetails::Initialization(self.string_region.copy(string))
881 }
882 SourceErrorDetails::Other(string) => {
883 SourceErrorDetails::Other(self.string_region.copy(string))
884 }
885 },
886 };
887 let reference = self.source_error_region.copy_iter(once(err));
888 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
889 DataflowError::SourceError(boxed)
890 }
891 DataflowError::EnvelopeError(err) => {
892 let err: &EnvelopeError = &*err;
893 let err = match err {
894 EnvelopeError::Upsert(err) => {
895 let err = match err {
896 UpsertError::KeyDecode(err) => {
897 UpsertError::KeyDecode(self.copy_decode_error(err))
898 }
899 UpsertError::Value(err) => UpsertError::Value(UpsertValueError {
900 inner: self.copy_decode_error(&err.inner),
901 for_key: self.row_region.copy(&err.for_key),
902 }),
903 UpsertError::NullKey(err) => UpsertError::NullKey(*err),
904 };
905 EnvelopeError::Upsert(err)
906 }
907 EnvelopeError::Flat(string) => {
908 EnvelopeError::Flat(self.string_region.copy(string))
909 }
910 };
911 let reference = self.envelope_error_region.copy_iter(once(err));
912 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
913 DataflowError::EnvelopeError(boxed)
914 }
915 };
916 debug_assert_eq!(item, &err);
918 err
919 }
920
921 fn clear(&mut self) {
922 let Self {
924 decode_error_region,
925 envelope_error_region,
926 eval_error_region,
927 row_region,
928 source_error_region,
929 string_region,
930 u8_region,
931 } = self;
932 decode_error_region.clear();
933 envelope_error_region.clear();
934 eval_error_region.clear();
935 row_region.clear();
936 source_error_region.clear();
937 string_region.clear();
938 u8_region.clear();
939 }
940
941 fn reserve_items<'a, I>(&mut self, items: I)
942 where
943 Self: 'a,
944 I: Iterator<Item = &'a Self::Item> + Clone,
945 {
946 self.decode_error_region.reserve(
948 items
949 .clone()
950 .filter(|x| matches!(x, DataflowError::DecodeError(_)))
951 .count(),
952 );
953 self.envelope_error_region.reserve(
954 items
955 .clone()
956 .filter(|x| matches!(x, DataflowError::EnvelopeError(_)))
957 .count(),
958 );
959 self.eval_error_region.reserve(
960 items
961 .clone()
962 .filter(|x| matches!(x, DataflowError::EvalError(_)))
963 .count(),
964 );
965 self.source_error_region.reserve(
966 items
967 .filter(|x| matches!(x, DataflowError::SourceError(_)))
968 .count(),
969 );
970 }
971
972 fn reserve_regions<'a, I>(&mut self, regions: I)
973 where
974 Self: 'a,
975 I: Iterator<Item = &'a Self> + Clone,
976 {
977 self.row_region
979 .reserve_regions(regions.clone().map(|r| &r.row_region));
980 self.string_region
981 .reserve_regions(regions.clone().map(|r| &r.string_region));
982 self.u8_region
983 .reserve_regions(regions.clone().map(|r| &r.u8_region));
984 }
985
986 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
987 let Self {
989 decode_error_region,
990 envelope_error_region,
991 eval_error_region,
992 row_region,
993 source_error_region,
994 string_region,
995 u8_region,
996 } = &self;
997 decode_error_region.heap_size(&mut callback);
998 envelope_error_region.heap_size(&mut callback);
999 eval_error_region.heap_size(&mut callback);
1000 row_region.heap_size(&mut callback);
1001 source_error_region.heap_size(&mut callback);
1002 string_region.heap_size(&mut callback);
1003 u8_region.heap_size(&mut callback);
1004 }
1005 }
1006
1007 #[cfg(test)]
1008 mod tests {
1009 use differential_dataflow::containers::TimelyStack;
1010 use proptest::prelude::*;
1011
1012 use super::*;
1013
1014 fn columnation_roundtrip<T: Columnation>(item: &T) -> TimelyStack<T> {
1015 let mut container = TimelyStack::with_capacity(1);
1016 container.copy(item);
1017 container
1018 }
1019
1020 proptest! {
1021 #[mz_ore::test]
1022 #[cfg_attr(miri, ignore)] fn dataflow_error_roundtrip(expect in any::<DataflowError>()) {
1024 let actual = columnation_roundtrip(&expect);
1025 proptest::prop_assert_eq!(&expect, &actual[0])
1026 }
1027 }
1028 }
1029}
1030
1031impl RustType<ProtoDataflowError> for DataflowError {
1032 fn into_proto(&self) -> ProtoDataflowError {
1033 use proto_dataflow_error::Kind::*;
1034 ProtoDataflowError {
1035 kind: Some(match self {
1036 DataflowError::DecodeError(err) => DecodeError(*err.into_proto()),
1037 DataflowError::EvalError(err) => EvalError(*err.into_proto()),
1038 DataflowError::SourceError(err) => SourceError(*err.into_proto()),
1039 DataflowError::EnvelopeError(err) => EnvelopeErrorV1(*err.into_proto()),
1040 }),
1041 }
1042 }
1043
1044 fn from_proto(proto: ProtoDataflowError) -> Result<Self, TryFromProtoError> {
1045 use proto_dataflow_error::Kind::*;
1046 match proto.kind {
1047 Some(kind) => match kind {
1048 DecodeError(err) => Ok(DataflowError::DecodeError(Box::new(err.into_rust()?))),
1049 EvalError(err) => Ok(DataflowError::EvalError(Box::new(err.into_rust()?))),
1050 SourceError(err) => Ok(DataflowError::SourceError(Box::new(err.into_rust()?))),
1051 EnvelopeErrorV1(err) => {
1052 Ok(DataflowError::EnvelopeError(Box::new(err.into_rust()?)))
1053 }
1054 },
1055 None => Err(TryFromProtoError::missing_field("ProtoDataflowError::kind")),
1056 }
1057 }
1058}
1059
1060impl Display for DataflowError {
1061 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1062 match self {
1063 DataflowError::DecodeError(e) => write!(f, "Decode error: {}", e),
1064 DataflowError::EvalError(e) => write!(
1065 f,
1066 "{}{}",
1067 match **e {
1068 EvalError::IfNullError(_) => "",
1069 _ => "Evaluation error: ",
1070 },
1071 e
1072 ),
1073 DataflowError::SourceError(e) => write!(f, "Source error: {}", e),
1074 DataflowError::EnvelopeError(e) => write!(f, "Envelope error: {}", e),
1075 }
1076 }
1077}
1078
1079impl From<DecodeError> for DataflowError {
1080 fn from(e: DecodeError) -> Self {
1081 Self::DecodeError(Box::new(e))
1082 }
1083}
1084
1085impl From<EvalError> for DataflowError {
1086 fn from(e: EvalError) -> Self {
1087 Self::EvalError(Box::new(e))
1088 }
1089}
1090
1091impl From<SourceError> for DataflowError {
1092 fn from(e: SourceError) -> Self {
1093 Self::SourceError(Box::new(e))
1094 }
1095}
1096
1097impl From<EnvelopeError> for DataflowError {
1098 fn from(e: EnvelopeError) -> Self {
1099 Self::EnvelopeError(Box::new(e))
1100 }
1101}
1102
1103#[derive(thiserror::Error, Debug)]
1105pub enum ContextCreationError {
1106 #[error("ssh: {0}")]
1109 Ssh(#[source] anyhow::Error),
1110 #[error(transparent)]
1111 KafkaError(#[from] KafkaError),
1112 #[error(transparent)]
1113 Dns(#[from] mz_ore::netio::DnsResolutionError),
1114 #[error(transparent)]
1115 Other(#[from] anyhow::Error),
1116 #[error(transparent)]
1117 Io(#[from] std::io::Error),
1118}
1119
1120pub trait ContextCreationErrorExt<T> {
1122 fn check_ssh_status<C>(self, cx: &TunnelingClientContext<C>)
1124 -> Result<T, ContextCreationError>;
1125 fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError>;
1128}
1129
1130impl<T, E> ContextCreationErrorExt<T> for Result<T, E>
1131where
1132 ContextCreationError: From<E>,
1133{
1134 fn check_ssh_status<C>(
1135 self,
1136 cx: &TunnelingClientContext<C>,
1137 ) -> Result<T, ContextCreationError> {
1138 self.map_err(|e| {
1139 if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
1140 ContextCreationError::Ssh(anyhow!(e))
1141 } else {
1142 ContextCreationError::from(e)
1143 }
1144 })
1145 }
1146
1147 fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError> {
1148 self.map_err(|e| {
1149 let e = ContextCreationError::from(e);
1150 match e {
1151 ContextCreationError::Ssh(e) => ContextCreationError::Ssh(anyhow!(e.context(msg))),
1153 ContextCreationError::Other(e) => {
1154 ContextCreationError::Other(anyhow!(e.context(msg)))
1155 }
1156 ContextCreationError::KafkaError(e) => {
1157 ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1158 }
1159 ContextCreationError::Io(e) => {
1160 ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1161 }
1162 ContextCreationError::Dns(e) => {
1163 ContextCreationError::Other(anyhow!(e).context(msg))
1164 }
1165 }
1166 })
1167 }
1168}
1169
1170impl From<CsrConnectError> for ContextCreationError {
1171 fn from(csr: CsrConnectError) -> ContextCreationError {
1172 use ContextCreationError::*;
1173
1174 match csr {
1175 CsrConnectError::Ssh(e) => Ssh(e),
1176 other => Other(anyhow::anyhow!(other)),
1177 }
1178 }
1179}
1180
1181#[derive(thiserror::Error, Debug)]
1183pub enum CsrConnectError {
1184 #[error("ssh: {0}")]
1187 Ssh(#[source] anyhow::Error),
1188 #[error(transparent)]
1189 NativeTls(#[from] native_tls::Error),
1190 #[error(transparent)]
1191 Openssl(#[from] openssl::error::ErrorStack),
1192 #[error(transparent)]
1193 Dns(#[from] mz_ore::netio::DnsResolutionError),
1194 #[error(transparent)]
1195 Io(#[from] std::io::Error),
1196 #[error(transparent)]
1197 Other(#[from] anyhow::Error),
1198}
1199
1200#[derive(Error, Debug)]
1202#[error("collection does not exist: {0}")]
1203pub struct CollectionMissing(pub GlobalId);
1204
1205#[cfg(test)]
1206mod tests {
1207 use crate::errors::DecodeErrorKind;
1208
1209 use super::DecodeError;
1210
1211 #[mz_ore::test]
1212 fn test_decode_error_codec_roundtrip() -> Result<(), String> {
1213 let original = DecodeError {
1214 kind: DecodeErrorKind::Text("ciao".into()),
1215 raw: b"oaic".to_vec(),
1216 };
1217 let mut encoded = Vec::new();
1218 original.encode(&mut encoded);
1219 let decoded = DecodeError::decode(&encoded)?;
1220
1221 assert_eq!(decoded, original);
1222
1223 Ok(())
1224 }
1225}