mz_compute/render/
errors.rs1use columnar::Columnar;
28use columnation::{Columnation, Region};
29use mz_expr::EvalError;
30use mz_proto::{ProtoType, RustType};
31use mz_repr::Row;
32use mz_storage_types::errors::{DataflowError, ProtoDataflowError};
33use prost::Message;
34use serde::{Deserialize, Serialize};
35use std::fmt;
36
37#[derive(
44 Clone,
45 Eq,
46 PartialEq,
47 Ord,
48 PartialOrd,
49 Hash,
50 Serialize,
51 Deserialize,
52 Columnar
53)]
54pub struct DataflowErrorSer(Vec<u8>);
55
56impl DataflowErrorSer {
57 pub fn deserialize(&self) -> DataflowError {
63 let proto = ProtoDataflowError::decode(self.0.as_slice())
64 .expect("DataflowErrorSer: invalid proto bytes");
65 proto
66 .into_rust()
67 .expect("DataflowErrorSer: failed to convert proto to DataflowError")
68 }
69}
70
71impl From<DataflowError> for DataflowErrorSer {
72 fn from(err: DataflowError) -> Self {
73 DataflowErrorSer(err.into_proto().encode_to_vec())
74 }
75}
76
77impl From<EvalError> for DataflowErrorSer {
78 fn from(err: EvalError) -> Self {
79 DataflowErrorSer::from(DataflowError::from(err))
82 }
83}
84
85impl fmt::Display for DataflowErrorSer {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 self.deserialize().fmt(f)
88 }
89}
90
91impl fmt::Debug for DataflowErrorSer {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_tuple("DataflowErrorSer")
94 .field(&format_args!("{} bytes", self.0.len()))
95 .finish()
96 }
97}
98
99impl Columnation for DataflowErrorSer {
100 type InnerRegion = DataflowErrorSerRegion;
101}
102
103#[derive(Default)]
105pub struct DataflowErrorSerRegion {
106 inner: <Vec<u8> as Columnation>::InnerRegion,
107}
108
109impl Region for DataflowErrorSerRegion {
110 type Item = DataflowErrorSer;
111
112 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
113 DataflowErrorSer(unsafe { self.inner.copy(&item.0) })
115 }
116
117 fn clear(&mut self) {
118 self.inner.clear();
119 }
120
121 fn reserve_items<'a, I>(&mut self, items: I)
122 where
123 I: Iterator<Item = &'a Self::Item> + Clone,
124 {
125 self.inner.reserve_items(items.map(|item| &item.0));
126 }
127
128 fn reserve_regions<'a, I>(&mut self, regions: I)
129 where
130 I: Iterator<Item = &'a Self> + Clone,
131 {
132 self.inner.reserve_regions(regions.map(|r| &r.inner));
133 }
134
135 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
136 self.inner.heap_size(callback);
137 }
138}
139
140pub(super) trait MaybeValidatingRow<T, E> {
144 fn ok(t: T) -> Self;
145 fn into_error() -> Option<fn(E) -> Self>;
146}
147
148impl<E> MaybeValidatingRow<Row, E> for Row {
149 fn ok(t: Row) -> Self {
150 t
151 }
152
153 fn into_error() -> Option<fn(E) -> Self> {
154 None
155 }
156}
157
158impl<E> MaybeValidatingRow<(), E> for () {
159 fn ok(t: ()) -> Self {
160 t
161 }
162
163 fn into_error() -> Option<fn(E) -> Self> {
164 None
165 }
166}
167
168impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R> {
169 fn ok(t: Vec<R>) -> Self {
170 t
171 }
172
173 fn into_error() -> Option<fn(E) -> Self> {
174 None
175 }
176}
177
178impl<T, E> MaybeValidatingRow<T, E> for Result<T, E> {
179 fn ok(row: T) -> Self {
180 Ok(row)
181 }
182
183 fn into_error() -> Option<fn(E) -> Self> {
184 Some(Err)
185 }
186}
187
188#[derive(Clone)]
191pub(super) struct ErrorLogger {
192 dataflow_name: String,
193}
194
195impl ErrorLogger {
196 pub fn new(dataflow_name: String) -> Self {
197 Self { dataflow_name }
198 }
199
200 pub fn log(&self, message: &'static str, details: &str) {
217 tracing::warn!(
218 dataflow = self.dataflow_name,
219 "[customer-data] {message} ({details})"
220 );
221 tracing::error!(message);
222 }
223
224 pub fn soft_panic_or_log(&self, message: &'static str, details: &str) {
228 tracing::warn!(
229 dataflow = self.dataflow_name,
230 "[customer-data] {message} ({details})"
231 );
232 mz_ore::soft_panic_or_log!("{}", message);
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use mz_storage_types::errors::DataflowError;
240 use proptest::prelude::*;
241
242 #[mz_ore::test]
243 #[cfg_attr(miri, ignore)]
244 fn proptest_roundtrip_canonical() {
245 proptest!(|(err in any::<DataflowError>())| {
246 let ser = DataflowErrorSer::from(err.clone());
247
248 let deserialized = ser.deserialize();
250 let re_serialized = DataflowErrorSer::from(deserialized);
251 prop_assert_eq!(&ser, &re_serialized,
252 "Canonicality violation: round-trip produced different bytes");
253
254 let ser2 = DataflowErrorSer::from(err);
256 prop_assert_eq!(&ser, &ser2,
257 "Canonicality violation: same error produced different bytes");
258 });
259 }
260
261 #[mz_ore::test]
262 fn display_roundtrip() {
263 let eval_err = EvalError::DivisionByZero;
264 let dfe = DataflowError::from(eval_err.clone());
265 let ser = DataflowErrorSer::from(eval_err);
266
267 assert_eq!(dfe.to_string(), ser.to_string());
268 }
269}