1use anyhow::anyhow;
11use std::error::Error;
12use std::fmt::Display;
13
14use bytes::BufMut;
15use mz_expr::EvalError;
16use mz_kafka_util::client::TunnelingClientContext;
17use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
18use mz_repr::Row;
19use mz_ssh_util::tunnel::SshTunnelStatus;
20use proptest_derive::Arbitrary;
21use prost::Message;
22use rdkafka::error::KafkaError;
23use serde::{Deserialize, Serialize};
24use tracing::warn;
25
26include!(concat!(env!("OUT_DIR"), "/mz_storage_types.errors.rs"));
27
28#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
31pub struct DecodeError {
32 pub kind: DecodeErrorKind,
33 pub raw: Vec<u8>,
34}
35
36impl RustType<ProtoDecodeError> for DecodeError {
37 fn into_proto(&self) -> ProtoDecodeError {
38 ProtoDecodeError {
39 kind: Some(RustType::into_proto(&self.kind)),
40 raw: Some(self.raw.clone()),
41 }
42 }
43
44 fn from_proto(proto: ProtoDecodeError) -> Result<Self, TryFromProtoError> {
45 let kind = match proto.kind {
46 Some(kind) => RustType::from_proto(kind)?,
47 None => return Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
48 };
49 let raw = proto.raw.into_rust_if_some("raw")?;
50 Ok(Self { kind, raw })
51 }
52}
53
54impl DecodeError {
55 pub fn encode<B>(&self, buf: &mut B)
56 where
57 B: BufMut,
58 {
59 self.into_proto()
60 .encode(buf)
61 .expect("no required fields means no initialization errors")
62 }
63
64 pub fn decode(buf: &[u8]) -> Result<Self, String> {
65 let proto = ProtoDecodeError::decode(buf).map_err(|err| err.to_string())?;
66 proto.into_rust().map_err(|err| err.to_string())
67 }
68}
69
70impl Display for DecodeError {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 let str_repr = std::str::from_utf8(&self.raw).ok();
74 let str_repr = str_repr.map(|s| s.replace('\0', "NULL"));
77 let bytes_repr = hex::encode(&self.raw);
78 match str_repr {
79 Some(s) => write!(
80 f,
81 "{} (original text: {}, original bytes: {:x?})",
82 self.kind, s, bytes_repr
83 ),
84 None => write!(f, "{} (original bytes: {:x?})", self.kind, bytes_repr),
85 }
86 }
87}
88
89#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
90pub enum DecodeErrorKind {
91 Text(Box<str>),
92 Bytes(Box<str>),
93}
94
95impl RustType<ProtoDecodeErrorKind> for DecodeErrorKind {
96 fn into_proto(&self) -> ProtoDecodeErrorKind {
97 use proto_decode_error_kind::Kind::*;
98 ProtoDecodeErrorKind {
99 kind: Some(match self {
100 DecodeErrorKind::Text(v) => Text(v.into_proto()),
101 DecodeErrorKind::Bytes(v) => Bytes(v.into_proto()),
102 }),
103 }
104 }
105
106 fn from_proto(proto: ProtoDecodeErrorKind) -> Result<Self, TryFromProtoError> {
107 use proto_decode_error_kind::Kind::*;
108 match proto.kind {
109 Some(Text(v)) => Ok(DecodeErrorKind::Text(v.into())),
110 Some(Bytes(v)) => Ok(DecodeErrorKind::Bytes(v.into())),
111 None => Err(TryFromProtoError::missing_field("ProtoDecodeError::kind")),
112 }
113 }
114}
115
116impl Display for DecodeErrorKind {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self {
119 DecodeErrorKind::Text(e) => f.write_str(e),
120 DecodeErrorKind::Bytes(e) => f.write_str(e),
121 }
122 }
123}
124
125#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
127pub enum EnvelopeError {
128 Upsert(UpsertError),
130 Flat(Box<str>),
133}
134
135impl RustType<ProtoEnvelopeErrorV1> for EnvelopeError {
136 fn into_proto(&self) -> ProtoEnvelopeErrorV1 {
137 use proto_envelope_error_v1::Kind;
138 ProtoEnvelopeErrorV1 {
139 kind: Some(match self {
140 EnvelopeError::Upsert(rust) => Kind::Upsert(rust.into_proto()),
141 EnvelopeError::Flat(text) => Kind::Flat(text.into_proto()),
142 }),
143 }
144 }
145
146 fn from_proto(proto: ProtoEnvelopeErrorV1) -> Result<Self, TryFromProtoError> {
147 use proto_envelope_error_v1::Kind;
148 match proto.kind {
149 Some(Kind::Upsert(proto)) => {
150 let rust = RustType::from_proto(proto)?;
151 Ok(Self::Upsert(rust))
152 }
153 Some(Kind::Flat(text)) => Ok(Self::Flat(text.into())),
154 None => Err(TryFromProtoError::missing_field(
155 "ProtoEnvelopeErrorV1::kind",
156 )),
157 }
158 }
159}
160
161impl Display for EnvelopeError {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 match self {
164 EnvelopeError::Upsert(err) => write!(f, "Upsert: {err}"),
165 EnvelopeError::Flat(err) => write!(f, "Flat: {err}"),
166 }
167 }
168}
169
170#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
173pub struct UpsertValueError {
174 pub inner: DecodeError,
176 pub for_key: Row,
178}
179
180impl RustType<ProtoUpsertValueError> for UpsertValueError {
181 fn into_proto(&self) -> ProtoUpsertValueError {
182 ProtoUpsertValueError {
183 inner: Some(self.inner.into_proto()),
184 for_key: Some(self.for_key.into_proto()),
185 }
186 }
187
188 fn from_proto(proto: ProtoUpsertValueError) -> Result<Self, TryFromProtoError> {
189 Ok(UpsertValueError {
190 inner: proto
191 .inner
192 .into_rust_if_some("ProtoUpsertValueError::inner")?,
193 for_key: proto
194 .for_key
195 .into_rust_if_some("ProtoUpsertValueError::for_key")?,
196 })
197 }
198}
199
200impl Display for UpsertValueError {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 let UpsertValueError { inner, for_key } = self;
203 write!(f, "{inner}, decoded key: {for_key:?}")?;
204 Ok(())
205 }
206}
207
208#[derive(
210 Arbitrary, Ord, PartialOrd, Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash,
211)]
212pub struct UpsertNullKeyError;
213
214impl RustType<ProtoUpsertNullKeyError> for UpsertNullKeyError {
215 fn into_proto(&self) -> ProtoUpsertNullKeyError {
216 ProtoUpsertNullKeyError {}
217 }
218
219 fn from_proto(_proto: ProtoUpsertNullKeyError) -> Result<Self, TryFromProtoError> {
220 Ok(UpsertNullKeyError)
221 }
222}
223
224impl Display for UpsertNullKeyError {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 write!(
227 f,
228 "record with NULL key in UPSERT source; to retract this error, "
229 )?;
230 write!(
231 f,
232 "produce a record upstream with a NULL key and NULL value"
233 )
234 }
235}
236
237#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
239pub enum UpsertError {
240 KeyDecode(DecodeError),
249 Value(UpsertValueError),
251 NullKey(UpsertNullKeyError),
252}
253
254impl RustType<ProtoUpsertError> for UpsertError {
255 fn into_proto(&self) -> ProtoUpsertError {
256 use proto_upsert_error::Kind;
257 ProtoUpsertError {
258 kind: Some(match self {
259 UpsertError::KeyDecode(err) => Kind::KeyDecode(err.into_proto()),
260 UpsertError::Value(err) => Kind::Value(err.into_proto()),
261 UpsertError::NullKey(err) => Kind::NullKey(err.into_proto()),
262 }),
263 }
264 }
265
266 fn from_proto(proto: ProtoUpsertError) -> Result<Self, TryFromProtoError> {
267 use proto_upsert_error::Kind;
268 match proto.kind {
269 Some(Kind::KeyDecode(proto)) => {
270 let rust = RustType::from_proto(proto)?;
271 Ok(Self::KeyDecode(rust))
272 }
273 Some(Kind::Value(proto)) => {
274 let rust = RustType::from_proto(proto)?;
275 Ok(Self::Value(rust))
276 }
277 Some(Kind::NullKey(proto)) => {
278 let rust = RustType::from_proto(proto)?;
279 Ok(Self::NullKey(rust))
280 }
281 None => Err(TryFromProtoError::missing_field("ProtoUpsertError::kind")),
282 }
283 }
284}
285
286impl Display for UpsertError {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 match self {
289 UpsertError::KeyDecode(err) => write!(f, "Key decode: {err}"),
290 UpsertError::Value(err) => write!(f, "Value error: {err}"),
291 UpsertError::NullKey(err) => write!(f, "Null key: {err}"),
292 }
293 }
294}
295
296#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
299pub struct SourceError {
300 pub error: SourceErrorDetails,
301}
302
303impl RustType<ProtoSourceError> for SourceError {
304 fn into_proto(&self) -> ProtoSourceError {
305 ProtoSourceError {
306 error: Some(self.error.into_proto()),
307 }
308 }
309
310 fn from_proto(proto: ProtoSourceError) -> Result<Self, TryFromProtoError> {
311 Ok(SourceError {
312 error: proto.error.into_rust_if_some("ProtoSourceError::error")?,
313 })
314 }
315}
316
317impl Display for SourceError {
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 self.error.fmt(f)
320 }
321}
322
323#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
324pub enum SourceErrorDetails {
325 Initialization(Box<str>),
326 Other(Box<str>),
327}
328
329impl RustType<ProtoSourceErrorDetails> for SourceErrorDetails {
330 fn into_proto(&self) -> ProtoSourceErrorDetails {
331 use proto_source_error_details::Kind;
332 ProtoSourceErrorDetails {
333 kind: Some(match self {
334 SourceErrorDetails::Initialization(s) => Kind::Initialization(s.into_proto()),
335 SourceErrorDetails::Other(s) => Kind::Other(s.into_proto()),
336 }),
337 }
338 }
339
340 fn from_proto(proto: ProtoSourceErrorDetails) -> Result<Self, TryFromProtoError> {
341 use proto_source_error_details::Kind;
342 match proto.kind {
343 Some(kind) => match kind {
344 Kind::Initialization(s) => Ok(SourceErrorDetails::Initialization(s.into())),
345 Kind::DeprecatedFileIo(s) | Kind::DeprecatedPersistence(s) => {
346 warn!("Deprecated source error kind: {s}");
347 Ok(SourceErrorDetails::Other(s.into()))
348 }
349 Kind::Other(s) => Ok(SourceErrorDetails::Other(s.into())),
350 },
351 None => Err(TryFromProtoError::missing_field(
352 "ProtoSourceErrorDetails::kind",
353 )),
354 }
355 }
356}
357
358impl Display for SourceErrorDetails {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 match self {
361 SourceErrorDetails::Initialization(e) => {
362 write!(
363 f,
364 "failed during initialization, source must be dropped and recreated: {}",
365 e
366 )
367 }
368 SourceErrorDetails::Other(e) => {
369 write!(
370 f,
371 "source must be dropped and recreated due to failure: {}",
372 e
373 )
374 }
375 }
376 }
377}
378
379#[derive(Arbitrary, Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
386pub enum DataflowError {
387 DecodeError(Box<DecodeError>),
388 EvalError(Box<EvalError>),
389 SourceError(Box<SourceError>),
390 EnvelopeError(Box<EnvelopeError>),
391}
392
393impl Error for DataflowError {}
394
395mod boxed_str {
396
397 use differential_dataflow::containers::Region;
398 use differential_dataflow::containers::StableRegion;
399
400 #[derive(Default)]
405 pub struct BoxStrStack {
406 region: StableRegion<u8>,
407 }
408
409 impl Region for BoxStrStack {
410 type Item = Box<str>;
411 #[inline]
412 fn clear(&mut self) {
413 self.region.clear();
414 }
415 #[inline(always)]
418 unsafe fn copy(&mut self, item: &Box<str>) -> Box<str> {
419 let bytes = self.region.copy_slice(item.as_bytes());
420 std::str::from_boxed_utf8_unchecked(Box::from_raw(bytes))
423 }
424 #[inline(always)]
425 fn reserve_items<'a, I>(&mut self, items: I)
426 where
427 Self: 'a,
428 I: Iterator<Item = &'a Self::Item> + Clone,
429 {
430 self.region.reserve(items.map(|x| x.len()).sum());
431 }
432
433 fn reserve_regions<'a, I>(&mut self, regions: I)
434 where
435 Self: 'a,
436 I: Iterator<Item = &'a Self> + Clone,
437 {
438 self.region.reserve(regions.map(|r| r.region.len()).sum());
439 }
440 #[inline]
441 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
442 self.region.heap_size(callback)
443 }
444 }
445}
446
447mod columnation {
448 use std::iter::once;
449
450 use differential_dataflow::containers::{Columnation, Region, StableRegion};
451 use mz_expr::EvalError;
452 use mz_repr::Row;
453 use mz_repr::adt::range::InvalidRangeError;
454 use mz_repr::strconv::ParseError;
455
456 use crate::errors::boxed_str::BoxStrStack;
457 use crate::errors::{
458 DataflowError, DecodeError, DecodeErrorKind, EnvelopeError, SourceError,
459 SourceErrorDetails, UpsertError, UpsertValueError,
460 };
461
462 impl Columnation for DataflowError {
463 type InnerRegion = DataflowErrorRegion;
464 }
465
466 #[derive(Default)]
468 pub struct DataflowErrorRegion {
469 decode_error_region: StableRegion<DecodeError>,
471 envelope_error_region: StableRegion<EnvelopeError>,
473 eval_error_region: StableRegion<EvalError>,
475 row_region: <Row as Columnation>::InnerRegion,
477 source_error_region: StableRegion<SourceError>,
479 string_region: BoxStrStack,
481 u8_region: <Vec<u8> as Columnation>::InnerRegion,
483 }
484
485 impl DataflowErrorRegion {
486 unsafe fn copy_decode_error(&mut self, decode_error: &DecodeError) -> DecodeError {
490 DecodeError {
491 kind: match &decode_error.kind {
492 DecodeErrorKind::Text(string) => {
493 DecodeErrorKind::Text(self.string_region.copy(string))
494 }
495 DecodeErrorKind::Bytes(string) => {
496 DecodeErrorKind::Bytes(self.string_region.copy(string))
497 }
498 },
499 raw: self.u8_region.copy(&decode_error.raw),
500 }
501 }
502 }
503
504 fn assert_copy<T: Copy>(_: &T) {}
506
507 impl Region for DataflowErrorRegion {
508 type Item = DataflowError;
509
510 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
511 let err = match item {
523 DataflowError::DecodeError(err) => {
524 let err = self.copy_decode_error(&*err);
525 let reference = self.decode_error_region.copy_iter(once(err));
526 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
527 DataflowError::DecodeError(boxed)
528 }
529 DataflowError::EvalError(err) => {
530 let err: &EvalError = &*err;
531 let err = match err {
532 e @ EvalError::CharacterNotValidForEncoding(x) => {
533 assert_copy(x);
534 e.clone()
535 }
536 e @ EvalError::CharacterTooLargeForEncoding(x) => {
537 assert_copy(x);
538 e.clone()
539 }
540 EvalError::DateBinOutOfRange(string) => {
541 EvalError::DateBinOutOfRange(self.string_region.copy(string))
542 }
543 e @ EvalError::DivisionByZero
544 | e @ EvalError::FloatOverflow
545 | e @ EvalError::FloatUnderflow
546 | e @ EvalError::NumericFieldOverflow
547 | e @ EvalError::MzTimestampStepOverflow
548 | e @ EvalError::TimestampCannotBeNan
549 | e @ EvalError::TimestampOutOfRange
550 | e @ EvalError::NegSqrt
551 | e @ EvalError::NegLimit
552 | e @ EvalError::NullCharacterNotPermitted
553 | e @ EvalError::KeyCannotBeNull
554 | e @ EvalError::UnterminatedLikeEscapeSequence
555 | e @ EvalError::MultipleRowsFromSubquery
556 | e @ EvalError::LikePatternTooLong
557 | e @ EvalError::LikeEscapeTooLong
558 | e @ EvalError::MultidimensionalArrayRemovalNotSupported
559 | e @ EvalError::MultiDimensionalArraySearch
560 | e @ EvalError::ArrayFillWrongArraySubscripts
561 | e @ EvalError::DateOutOfRange
562 | e @ EvalError::CharOutOfRange
563 | e @ EvalError::InvalidBase64Equals
564 | e @ EvalError::InvalidBase64EndSequence
565 | e @ EvalError::InvalidTimezoneInterval
566 | e @ EvalError::InvalidTimezoneConversion
567 | e @ EvalError::LengthTooLarge
568 | e @ EvalError::AclArrayNullElement
569 | e @ EvalError::MzAclArrayNullElement => e.clone(),
570 EvalError::Unsupported {
571 feature,
572 discussion_no,
573 } => EvalError::Unsupported {
574 feature: self.string_region.copy(feature),
575 discussion_no: *discussion_no,
576 },
577 EvalError::Float32OutOfRange(string) => {
578 EvalError::Float32OutOfRange(self.string_region.copy(string))
579 }
580 EvalError::Float64OutOfRange(string) => {
581 EvalError::Float64OutOfRange(self.string_region.copy(string))
582 }
583 EvalError::Int16OutOfRange(string) => {
584 EvalError::Int16OutOfRange(self.string_region.copy(string))
585 }
586 EvalError::Int32OutOfRange(string) => {
587 EvalError::Int32OutOfRange(self.string_region.copy(string))
588 }
589 EvalError::Int64OutOfRange(string) => {
590 EvalError::Int64OutOfRange(self.string_region.copy(string))
591 }
592 EvalError::UInt16OutOfRange(string) => {
593 EvalError::UInt16OutOfRange(self.string_region.copy(string))
594 }
595 EvalError::UInt32OutOfRange(string) => {
596 EvalError::UInt32OutOfRange(self.string_region.copy(string))
597 }
598 EvalError::UInt64OutOfRange(string) => {
599 EvalError::UInt64OutOfRange(self.string_region.copy(string))
600 }
601 EvalError::MzTimestampOutOfRange(string) => {
602 EvalError::MzTimestampOutOfRange(self.string_region.copy(string))
603 }
604 EvalError::OidOutOfRange(string) => {
605 EvalError::OidOutOfRange(self.string_region.copy(string))
606 }
607 EvalError::IntervalOutOfRange(string) => {
608 EvalError::IntervalOutOfRange(self.string_region.copy(string))
609 }
610 e @ EvalError::IndexOutOfRange {
611 provided,
612 valid_end,
613 } => {
614 assert_copy(provided);
615 assert_copy(valid_end);
616 e.clone()
617 }
618 e @ EvalError::InvalidBase64Symbol(c) => {
619 assert_copy(c);
620 e.clone()
621 }
622 EvalError::InvalidTimezone(x) => {
623 EvalError::InvalidTimezone(self.string_region.copy(x))
624 }
625 e @ EvalError::InvalidLayer { max_layer, val } => {
626 assert_copy(max_layer);
627 assert_copy(val);
628 e.clone()
629 }
630 EvalError::InvalidArray(err) => EvalError::InvalidArray(*err),
631 EvalError::InvalidEncodingName(x) => {
632 EvalError::InvalidEncodingName(self.string_region.copy(x))
633 }
634 EvalError::InvalidHashAlgorithm(x) => {
635 EvalError::InvalidHashAlgorithm(self.string_region.copy(x))
636 }
637 EvalError::InvalidByteSequence {
638 byte_sequence,
639 encoding_name,
640 } => EvalError::InvalidByteSequence {
641 byte_sequence: self.string_region.copy(byte_sequence),
642 encoding_name: self.string_region.copy(encoding_name),
643 },
644 EvalError::InvalidJsonbCast { from, to } => EvalError::InvalidJsonbCast {
645 from: self.string_region.copy(from),
646 to: self.string_region.copy(to),
647 },
648 EvalError::InvalidRegex(x) => {
649 EvalError::InvalidRegex(self.string_region.copy(x))
650 }
651 e @ EvalError::InvalidRegexFlag(x) => {
652 assert_copy(x);
653 e.clone()
654 }
655 EvalError::InvalidParameterValue(x) => {
656 EvalError::InvalidParameterValue(self.string_region.copy(x))
657 }
658 EvalError::InvalidDatePart(x) => {
659 EvalError::InvalidDatePart(self.string_region.copy(x))
660 }
661 EvalError::UnknownUnits(x) => {
662 EvalError::UnknownUnits(self.string_region.copy(x))
663 }
664 EvalError::UnsupportedUnits(x, y) => EvalError::UnsupportedUnits(
665 self.string_region.copy(x),
666 self.string_region.copy(y),
667 ),
668 EvalError::Parse(ParseError {
669 kind,
670 type_name,
671 input,
672 details,
673 }) => EvalError::Parse(ParseError {
674 kind: *kind,
675 type_name: self.string_region.copy(type_name),
676 input: self.string_region.copy(input),
677 details: details
678 .as_ref()
679 .map(|details| self.string_region.copy(details)),
680 }),
681 e @ EvalError::ParseHex(x) => {
682 assert_copy(x);
683 e.clone()
684 }
685 EvalError::Internal(x) => EvalError::Internal(self.string_region.copy(x)),
686 EvalError::InfinityOutOfDomain(x) => {
687 EvalError::InfinityOutOfDomain(self.string_region.copy(x))
688 }
689 EvalError::NegativeOutOfDomain(x) => {
690 EvalError::NegativeOutOfDomain(self.string_region.copy(x))
691 }
692 EvalError::ZeroOutOfDomain(x) => {
693 EvalError::ZeroOutOfDomain(self.string_region.copy(x))
694 }
695 EvalError::OutOfDomain(x, y, z) => {
696 assert_copy(x);
697 assert_copy(y);
698 EvalError::OutOfDomain(*x, *y, self.string_region.copy(z))
699 }
700 EvalError::ComplexOutOfRange(x) => {
701 EvalError::ComplexOutOfRange(self.string_region.copy(x))
702 }
703 EvalError::Undefined(x) => EvalError::Undefined(self.string_region.copy(x)),
704 EvalError::StringValueTooLong {
705 target_type,
706 length,
707 } => EvalError::StringValueTooLong {
708 target_type: self.string_region.copy(target_type),
709 length: *length,
710 },
711 e @ EvalError::IncompatibleArrayDimensions { dims } => {
712 assert_copy(dims);
713 e.clone()
714 }
715 EvalError::TypeFromOid(x) => {
716 EvalError::TypeFromOid(self.string_region.copy(x))
717 }
718 EvalError::InvalidRange(x) => {
719 let err = match x {
720 e @ InvalidRangeError::MisorderedRangeBounds
721 | e @ InvalidRangeError::InvalidRangeBoundFlags
722 | e @ InvalidRangeError::DiscontiguousUnion
723 | e @ InvalidRangeError::DiscontiguousDifference
724 | e @ InvalidRangeError::NullRangeBoundFlags => e.clone(),
725 InvalidRangeError::CanonicalizationOverflow(string) => {
726 InvalidRangeError::CanonicalizationOverflow(
727 self.string_region.copy(string),
728 )
729 }
730 };
731 EvalError::InvalidRange(err)
732 }
733 EvalError::InvalidRoleId(x) => {
734 EvalError::InvalidRoleId(self.string_region.copy(x))
735 }
736 EvalError::InvalidPrivileges(x) => {
737 EvalError::InvalidPrivileges(self.string_region.copy(x))
738 }
739 EvalError::LetRecLimitExceeded(x) => {
740 EvalError::LetRecLimitExceeded(self.string_region.copy(x))
741 }
742 EvalError::MustNotBeNull(x) => {
743 EvalError::MustNotBeNull(self.string_region.copy(x))
744 }
745 EvalError::InvalidIdentifier { ident, detail } => {
746 EvalError::InvalidIdentifier {
747 ident: self.string_region.copy(ident),
748 detail: detail
749 .as_ref()
750 .map(|detail| self.string_region.copy(detail)),
751 }
752 }
753 e @ EvalError::MaxArraySizeExceeded(x) => {
754 assert_copy(x);
755 e.clone()
756 }
757 EvalError::DateDiffOverflow { unit, a, b } => EvalError::DateDiffOverflow {
758 unit: self.string_region.copy(unit),
759 a: self.string_region.copy(a),
760 b: self.string_region.copy(b),
761 },
762 EvalError::IfNullError(x) => {
763 EvalError::IfNullError(self.string_region.copy(x))
764 }
765 EvalError::InvalidIanaTimezoneId(x) => {
766 EvalError::InvalidIanaTimezoneId(self.string_region.copy(x))
767 }
768 EvalError::PrettyError(x) => {
769 EvalError::PrettyError(self.string_region.copy(x))
770 }
771 };
772 let reference = self.eval_error_region.copy_iter(once(err));
773 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
774 DataflowError::EvalError(boxed)
775 }
776 DataflowError::SourceError(err) => {
777 let err: &SourceError = &*err;
778 let err = SourceError {
779 error: match &err.error {
780 SourceErrorDetails::Initialization(string) => {
781 SourceErrorDetails::Initialization(self.string_region.copy(string))
782 }
783 SourceErrorDetails::Other(string) => {
784 SourceErrorDetails::Other(self.string_region.copy(string))
785 }
786 },
787 };
788 let reference = self.source_error_region.copy_iter(once(err));
789 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
790 DataflowError::SourceError(boxed)
791 }
792 DataflowError::EnvelopeError(err) => {
793 let err: &EnvelopeError = &*err;
794 let err = match err {
795 EnvelopeError::Upsert(err) => {
796 let err = match err {
797 UpsertError::KeyDecode(err) => {
798 UpsertError::KeyDecode(self.copy_decode_error(err))
799 }
800 UpsertError::Value(err) => UpsertError::Value(UpsertValueError {
801 inner: self.copy_decode_error(&err.inner),
802 for_key: self.row_region.copy(&err.for_key),
803 }),
804 UpsertError::NullKey(err) => UpsertError::NullKey(*err),
805 };
806 EnvelopeError::Upsert(err)
807 }
808 EnvelopeError::Flat(string) => {
809 EnvelopeError::Flat(self.string_region.copy(string))
810 }
811 };
812 let reference = self.envelope_error_region.copy_iter(once(err));
813 let boxed = unsafe { Box::from_raw(reference.as_mut_ptr()) };
814 DataflowError::EnvelopeError(boxed)
815 }
816 };
817 debug_assert_eq!(item, &err);
819 err
820 }
821
822 fn clear(&mut self) {
823 let Self {
825 decode_error_region,
826 envelope_error_region,
827 eval_error_region,
828 row_region,
829 source_error_region,
830 string_region,
831 u8_region,
832 } = self;
833 decode_error_region.clear();
834 envelope_error_region.clear();
835 eval_error_region.clear();
836 row_region.clear();
837 source_error_region.clear();
838 string_region.clear();
839 u8_region.clear();
840 }
841
842 fn reserve_items<'a, I>(&mut self, items: I)
843 where
844 Self: 'a,
845 I: Iterator<Item = &'a Self::Item> + Clone,
846 {
847 self.decode_error_region.reserve(
849 items
850 .clone()
851 .filter(|x| matches!(x, DataflowError::DecodeError(_)))
852 .count(),
853 );
854 self.envelope_error_region.reserve(
855 items
856 .clone()
857 .filter(|x| matches!(x, DataflowError::EnvelopeError(_)))
858 .count(),
859 );
860 self.eval_error_region.reserve(
861 items
862 .clone()
863 .filter(|x| matches!(x, DataflowError::EvalError(_)))
864 .count(),
865 );
866 self.source_error_region.reserve(
867 items
868 .filter(|x| matches!(x, DataflowError::SourceError(_)))
869 .count(),
870 );
871 }
872
873 fn reserve_regions<'a, I>(&mut self, regions: I)
874 where
875 Self: 'a,
876 I: Iterator<Item = &'a Self> + Clone,
877 {
878 self.row_region
880 .reserve_regions(regions.clone().map(|r| &r.row_region));
881 self.string_region
882 .reserve_regions(regions.clone().map(|r| &r.string_region));
883 self.u8_region
884 .reserve_regions(regions.clone().map(|r| &r.u8_region));
885 }
886
887 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
888 let Self {
890 decode_error_region,
891 envelope_error_region,
892 eval_error_region,
893 row_region,
894 source_error_region,
895 string_region,
896 u8_region,
897 } = &self;
898 decode_error_region.heap_size(&mut callback);
899 envelope_error_region.heap_size(&mut callback);
900 eval_error_region.heap_size(&mut callback);
901 row_region.heap_size(&mut callback);
902 source_error_region.heap_size(&mut callback);
903 string_region.heap_size(&mut callback);
904 u8_region.heap_size(&mut callback);
905 }
906 }
907
908 #[cfg(test)]
909 mod tests {
910 use differential_dataflow::containers::TimelyStack;
911 use proptest::prelude::*;
912
913 use super::*;
914
915 fn columnation_roundtrip<T: Columnation>(item: &T) -> TimelyStack<T> {
916 let mut container = TimelyStack::with_capacity(1);
917 container.copy(item);
918 container
919 }
920
921 proptest! {
922 #[mz_ore::test]
923 #[cfg_attr(miri, ignore)] fn dataflow_error_roundtrip(expect in any::<DataflowError>()) {
925 let actual = columnation_roundtrip(&expect);
926 proptest::prop_assert_eq!(&expect, &actual[0])
927 }
928 }
929 }
930}
931
932impl RustType<ProtoDataflowError> for DataflowError {
933 fn into_proto(&self) -> ProtoDataflowError {
934 use proto_dataflow_error::Kind::*;
935 ProtoDataflowError {
936 kind: Some(match self {
937 DataflowError::DecodeError(err) => DecodeError(*err.into_proto()),
938 DataflowError::EvalError(err) => EvalError(*err.into_proto()),
939 DataflowError::SourceError(err) => SourceError(*err.into_proto()),
940 DataflowError::EnvelopeError(err) => EnvelopeErrorV1(*err.into_proto()),
941 }),
942 }
943 }
944
945 fn from_proto(proto: ProtoDataflowError) -> Result<Self, TryFromProtoError> {
946 use proto_dataflow_error::Kind::*;
947 match proto.kind {
948 Some(kind) => match kind {
949 DecodeError(err) => Ok(DataflowError::DecodeError(Box::new(err.into_rust()?))),
950 EvalError(err) => Ok(DataflowError::EvalError(Box::new(err.into_rust()?))),
951 SourceError(err) => Ok(DataflowError::SourceError(Box::new(err.into_rust()?))),
952 EnvelopeErrorV1(err) => {
953 Ok(DataflowError::EnvelopeError(Box::new(err.into_rust()?)))
954 }
955 },
956 None => Err(TryFromProtoError::missing_field("ProtoDataflowError::kind")),
957 }
958 }
959}
960
961impl Display for DataflowError {
962 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
963 match self {
964 DataflowError::DecodeError(e) => write!(f, "Decode error: {}", e),
965 DataflowError::EvalError(e) => write!(
966 f,
967 "{}{}",
968 match **e {
969 EvalError::IfNullError(_) => "",
970 _ => "Evaluation error: ",
971 },
972 e
973 ),
974 DataflowError::SourceError(e) => write!(f, "Source error: {}", e),
975 DataflowError::EnvelopeError(e) => write!(f, "Envelope error: {}", e),
976 }
977 }
978}
979
980impl From<DecodeError> for DataflowError {
981 fn from(e: DecodeError) -> Self {
982 Self::DecodeError(Box::new(e))
983 }
984}
985
986impl From<EvalError> for DataflowError {
987 fn from(e: EvalError) -> Self {
988 Self::EvalError(Box::new(e))
989 }
990}
991
992impl From<SourceError> for DataflowError {
993 fn from(e: SourceError) -> Self {
994 Self::SourceError(Box::new(e))
995 }
996}
997
998impl From<EnvelopeError> for DataflowError {
999 fn from(e: EnvelopeError) -> Self {
1000 Self::EnvelopeError(Box::new(e))
1001 }
1002}
1003
1004#[derive(thiserror::Error, Debug)]
1006pub enum ContextCreationError {
1007 #[error("ssh: {0}")]
1010 Ssh(#[source] anyhow::Error),
1011 #[error(transparent)]
1012 KafkaError(#[from] KafkaError),
1013 #[error(transparent)]
1014 Dns(#[from] mz_ore::netio::DnsResolutionError),
1015 #[error(transparent)]
1016 Other(#[from] anyhow::Error),
1017 #[error(transparent)]
1018 Io(#[from] std::io::Error),
1019}
1020
1021pub trait ContextCreationErrorExt<T> {
1023 fn check_ssh_status<C>(self, cx: &TunnelingClientContext<C>)
1025 -> Result<T, ContextCreationError>;
1026 fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError>;
1029}
1030
1031impl<T, E> ContextCreationErrorExt<T> for Result<T, E>
1032where
1033 ContextCreationError: From<E>,
1034{
1035 fn check_ssh_status<C>(
1036 self,
1037 cx: &TunnelingClientContext<C>,
1038 ) -> Result<T, ContextCreationError> {
1039 self.map_err(|e| {
1040 if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
1041 ContextCreationError::Ssh(anyhow!(e))
1042 } else {
1043 ContextCreationError::from(e)
1044 }
1045 })
1046 }
1047
1048 fn add_context(self, msg: &'static str) -> Result<T, ContextCreationError> {
1049 self.map_err(|e| {
1050 let e = ContextCreationError::from(e);
1051 match e {
1052 ContextCreationError::Ssh(e) => ContextCreationError::Ssh(anyhow!(e.context(msg))),
1054 ContextCreationError::Other(e) => {
1055 ContextCreationError::Other(anyhow!(e.context(msg)))
1056 }
1057 ContextCreationError::KafkaError(e) => {
1058 ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1059 }
1060 ContextCreationError::Io(e) => {
1061 ContextCreationError::Other(anyhow!(anyhow!(e).context(msg)))
1062 }
1063 ContextCreationError::Dns(e) => {
1064 ContextCreationError::Other(anyhow!(e).context(msg))
1065 }
1066 }
1067 })
1068 }
1069}
1070
1071impl From<CsrConnectError> for ContextCreationError {
1072 fn from(csr: CsrConnectError) -> ContextCreationError {
1073 use ContextCreationError::*;
1074
1075 match csr {
1076 CsrConnectError::Ssh(e) => Ssh(e),
1077 other => Other(anyhow::anyhow!(other)),
1078 }
1079 }
1080}
1081
1082#[derive(thiserror::Error, Debug)]
1084pub enum CsrConnectError {
1085 #[error("ssh: {0}")]
1088 Ssh(#[source] anyhow::Error),
1089 #[error(transparent)]
1090 NativeTls(#[from] native_tls::Error),
1091 #[error(transparent)]
1092 Openssl(#[from] openssl::error::ErrorStack),
1093 #[error(transparent)]
1094 Dns(#[from] mz_ore::netio::DnsResolutionError),
1095 #[error(transparent)]
1096 Io(#[from] std::io::Error),
1097 #[error(transparent)]
1098 Other(#[from] anyhow::Error),
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103 use crate::errors::DecodeErrorKind;
1104
1105 use super::DecodeError;
1106
1107 #[mz_ore::test]
1108 fn test_decode_error_codec_roundtrip() -> Result<(), String> {
1109 let original = DecodeError {
1110 kind: DecodeErrorKind::Text("ciao".into()),
1111 raw: b"oaic".to_vec(),
1112 };
1113 let mut encoded = Vec::new();
1114 original.encode(&mut encoded);
1115 let decoded = DecodeError::decode(&encoded)?;
1116
1117 assert_eq!(decoded, original);
1118
1119 Ok(())
1120 }
1121}