mz_compute/render/
errors.rs1use columnar::Columnar;
28use columnation::{Columnation, Region};
29use mz_expr::EvalError;
30use mz_proto::{ProtoType, RustType};
31use mz_repr::Row;
32use mz_storage_types::errors::{DataflowError, ProtoDataflowError};
33use prost::Message;
34use serde::{Deserialize, Serialize};
35use std::fmt;
36
37#[derive(
44 Clone,
45 Eq,
46 PartialEq,
47 Ord,
48 PartialOrd,
49 Hash,
50 Serialize,
51 Deserialize,
52 Columnar
53)]
54#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
55pub struct DataflowErrorSer(Vec<u8>);
56
57impl DataflowErrorSer {
58 pub fn deserialize(&self) -> DataflowError {
64 let proto = ProtoDataflowError::decode(self.0.as_slice())
65 .expect("DataflowErrorSer: invalid proto bytes");
66 proto
67 .into_rust()
68 .expect("DataflowErrorSer: failed to convert proto to DataflowError")
69 }
70}
71
72impl From<DataflowError> for DataflowErrorSer {
73 fn from(err: DataflowError) -> Self {
74 DataflowErrorSer(err.into_proto().encode_to_vec())
75 }
76}
77
78impl From<EvalError> for DataflowErrorSer {
79 fn from(err: EvalError) -> Self {
80 DataflowErrorSer::from(DataflowError::from(err))
83 }
84}
85
86impl fmt::Display for DataflowErrorSer {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 self.deserialize().fmt(f)
89 }
90}
91
92impl fmt::Debug for DataflowErrorSer {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.debug_tuple("DataflowErrorSer")
95 .field(&format_args!("{} bytes", self.0.len()))
96 .finish()
97 }
98}
99
100impl Columnation for DataflowErrorSer {
101 type InnerRegion = DataflowErrorSerRegion;
102}
103
104#[derive(Default)]
106pub struct DataflowErrorSerRegion {
107 inner: <Vec<u8> as Columnation>::InnerRegion,
108}
109
110impl Region for DataflowErrorSerRegion {
111 type Item = DataflowErrorSer;
112
113 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
114 DataflowErrorSer(unsafe { self.inner.copy(&item.0) })
116 }
117
118 fn clear(&mut self) {
119 self.inner.clear();
120 }
121
122 fn reserve_items<'a, I>(&mut self, items: I)
123 where
124 I: Iterator<Item = &'a Self::Item> + Clone,
125 {
126 self.inner.reserve_items(items.map(|item| &item.0));
127 }
128
129 fn reserve_regions<'a, I>(&mut self, regions: I)
130 where
131 I: Iterator<Item = &'a Self> + Clone,
132 {
133 self.inner.reserve_regions(regions.map(|r| &r.inner));
134 }
135
136 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
137 self.inner.heap_size(callback);
138 }
139}
140
141pub(super) trait MaybeValidatingRow<T, E> {
145 fn ok(t: T) -> Self;
146 fn into_error() -> Option<fn(E) -> Self>;
147}
148
149impl<E> MaybeValidatingRow<Row, E> for Row {
150 fn ok(t: Row) -> Self {
151 t
152 }
153
154 fn into_error() -> Option<fn(E) -> Self> {
155 None
156 }
157}
158
159impl<E> MaybeValidatingRow<(), E> for () {
160 fn ok(t: ()) -> Self {
161 t
162 }
163
164 fn into_error() -> Option<fn(E) -> Self> {
165 None
166 }
167}
168
169impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R> {
170 fn ok(t: Vec<R>) -> Self {
171 t
172 }
173
174 fn into_error() -> Option<fn(E) -> Self> {
175 None
176 }
177}
178
179impl<T, E> MaybeValidatingRow<T, E> for Result<T, E> {
180 fn ok(row: T) -> Self {
181 Ok(row)
182 }
183
184 fn into_error() -> Option<fn(E) -> Self> {
185 Some(Err)
186 }
187}
188
189#[derive(Clone)]
192pub(super) struct ErrorLogger {
193 dataflow_name: String,
194}
195
196impl ErrorLogger {
197 pub fn new(dataflow_name: String) -> Self {
198 Self { dataflow_name }
199 }
200
201 pub fn log(&self, message: &'static str, details: &str) {
218 tracing::warn!(
219 dataflow = self.dataflow_name,
220 "[customer-data] {message} ({details})"
221 );
222 tracing::error!(message);
223 }
224
225 pub fn soft_panic_or_log(&self, message: &'static str, details: &str) {
229 tracing::warn!(
230 dataflow = self.dataflow_name,
231 "[customer-data] {message} ({details})"
232 );
233 mz_ore::soft_panic_or_log!("{}", message);
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use mz_storage_types::errors::DataflowError;
241 use proptest::prelude::*;
242
243 #[mz_ore::test]
244 #[cfg_attr(miri, ignore)]
245 fn proptest_roundtrip_canonical() {
246 proptest!(|(err in any::<DataflowError>())| {
247 let ser = DataflowErrorSer::from(err.clone());
248
249 let deserialized = ser.deserialize();
251 let re_serialized = DataflowErrorSer::from(deserialized);
252 prop_assert_eq!(&ser, &re_serialized,
253 "Canonicality violation: round-trip produced different bytes");
254
255 let ser2 = DataflowErrorSer::from(err);
257 prop_assert_eq!(&ser, &ser2,
258 "Canonicality violation: same error produced different bytes");
259 });
260 }
261
262 #[mz_ore::test]
263 fn display_roundtrip() {
264 let eval_err = EvalError::DivisionByZero;
265 let dfe = DataflowError::from(eval_err.clone());
266 let ser = DataflowErrorSer::from(eval_err);
267
268 assert_eq!(dfe.to_string(), ser.to_string());
269 }
270}