Skip to main content

mz_compute/render/
errors.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for handling errors encountered by operators.
11//!
12//! # `DataflowErrorSer`
13//!
14//! [`DataflowErrorSer`] is a serialized byte representation of
15//! [`DataflowError`] used on compute-internal dataflow edges instead of
16//! `DataflowError` directly.
17//!
18//! It is backed by proto-encoded [`ProtoDataflowError`] bytes. Because proto3 +
19//! prost + no map fields = deterministic encoding, byte-equality implies
20//! semantic equality, which lets us use `Ord`, `Hash`, etc. directly on the
21//! bytes.
22//!
23//! **Invariant**: NEVER add `map` fields to `ProtoDataflowError` or any of its
24//! transitive message types, as map fields have non-deterministic encoding
25//! order in protobuf.
26
27use columnar::Columnar;
28use columnation::{Columnation, Region};
29use mz_expr::EvalError;
30use mz_proto::{ProtoType, RustType};
31use mz_repr::Row;
32use mz_storage_types::errors::{DataflowError, ProtoDataflowError};
33use prost::Message;
34use serde::{Deserialize, Serialize};
35use std::fmt;
36
37/// Serialized representation of a [`DataflowError`], backed by proto-encoded bytes.
38///
39/// This type is used on compute-internal dataflow edges to avoid the cost of
40/// carrying a full `DataflowError` enum through the dataflow graph. Because the
41/// proto encoding is canonical (proto3 + prost + no map fields), byte-equality
42/// implies semantic equality.
43#[derive(
44    Clone,
45    Eq,
46    PartialEq,
47    Ord,
48    PartialOrd,
49    Hash,
50    Serialize,
51    Deserialize,
52    Columnar
53)]
54#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
55pub struct DataflowErrorSer(Vec<u8>);
56
57impl DataflowErrorSer {
58    /// Decode the serialized bytes back into a [`DataflowError`].
59    ///
60    /// # Panics
61    ///
62    /// Panics if the bytes do not represent a valid `ProtoDataflowError`.
63    pub fn deserialize(&self) -> DataflowError {
64        let proto = ProtoDataflowError::decode(self.0.as_slice())
65            .expect("DataflowErrorSer: invalid proto bytes");
66        proto
67            .into_rust()
68            .expect("DataflowErrorSer: failed to convert proto to DataflowError")
69    }
70}
71
72impl From<DataflowError> for DataflowErrorSer {
73    fn from(err: DataflowError) -> Self {
74        DataflowErrorSer(err.into_proto().encode_to_vec())
75    }
76}
77
78impl From<EvalError> for DataflowErrorSer {
79    fn from(err: EvalError) -> Self {
80        // Note: this allocates a Box via DataflowError::EvalError(Box::new(e)).
81        // Acceptable in v1.
82        DataflowErrorSer::from(DataflowError::from(err))
83    }
84}
85
86impl fmt::Display for DataflowErrorSer {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        self.deserialize().fmt(f)
89    }
90}
91
92impl fmt::Debug for DataflowErrorSer {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.debug_tuple("DataflowErrorSer")
95            .field(&format_args!("{} bytes", self.0.len()))
96            .finish()
97    }
98}
99
100impl Columnation for DataflowErrorSer {
101    type InnerRegion = DataflowErrorSerRegion;
102}
103
104/// A [`Region`] for [`DataflowErrorSer`], delegating to the region for `Vec<u8>`.
105#[derive(Default)]
106pub struct DataflowErrorSerRegion {
107    inner: <Vec<u8> as Columnation>::InnerRegion,
108}
109
110impl Region for DataflowErrorSerRegion {
111    type Item = DataflowErrorSer;
112
113    unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
114        // SAFETY: delegating to the inner Vec<u8> region which handles the allocation.
115        DataflowErrorSer(unsafe { self.inner.copy(&item.0) })
116    }
117
118    fn clear(&mut self) {
119        self.inner.clear();
120    }
121
122    fn reserve_items<'a, I>(&mut self, items: I)
123    where
124        I: Iterator<Item = &'a Self::Item> + Clone,
125    {
126        self.inner.reserve_items(items.map(|item| &item.0));
127    }
128
129    fn reserve_regions<'a, I>(&mut self, regions: I)
130    where
131        I: Iterator<Item = &'a Self> + Clone,
132    {
133        self.inner.reserve_regions(regions.map(|r| &r.inner));
134    }
135
136    fn heap_size(&self, callback: impl FnMut(usize, usize)) {
137        self.inner.heap_size(callback);
138    }
139}
140
141/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
142/// specialized for use in compute.  Validation code will only run when the error constructor is
143/// Some.
144pub(super) trait MaybeValidatingRow<T, E> {
145    fn ok(t: T) -> Self;
146    fn into_error() -> Option<fn(E) -> Self>;
147}
148
149impl<E> MaybeValidatingRow<Row, E> for Row {
150    fn ok(t: Row) -> Self {
151        t
152    }
153
154    fn into_error() -> Option<fn(E) -> Self> {
155        None
156    }
157}
158
159impl<E> MaybeValidatingRow<(), E> for () {
160    fn ok(t: ()) -> Self {
161        t
162    }
163
164    fn into_error() -> Option<fn(E) -> Self> {
165        None
166    }
167}
168
169impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R> {
170    fn ok(t: Vec<R>) -> Self {
171        t
172    }
173
174    fn into_error() -> Option<fn(E) -> Self> {
175        None
176    }
177}
178
179impl<T, E> MaybeValidatingRow<T, E> for Result<T, E> {
180    fn ok(row: T) -> Self {
181        Ok(row)
182    }
183
184    fn into_error() -> Option<fn(E) -> Self> {
185        Some(Err)
186    }
187}
188
189/// Error logger to be used by rendering code.
190// TODO: Consider removing this struct.
191#[derive(Clone)]
192pub(super) struct ErrorLogger {
193    dataflow_name: String,
194}
195
196impl ErrorLogger {
197    pub fn new(dataflow_name: String) -> Self {
198        Self { dataflow_name }
199    }
200
201    /// Log the given error.
202    ///
203    /// The logging format is optimized for surfacing errors with Sentry:
204    ///  * `error` is logged at ERROR level and will appear as the error title in Sentry.
205    ///    We require it to be a static string, to ensure that Sentry always merges instances of
206    ///    the same error together.
207    ///  * `details` is logged at WARN level and will appear in the breadcrumbs.
208    ///    Put relevant dynamic information here.
209    ///
210    /// The message that's logged at WARN level has the format
211    ///   "[customer-data] {message} ({details})"
212    /// We include the [customer-data] tag out of the expectation that `details` will always
213    /// contain some sensitive customer data. We include the `message` to make it possible to match
214    /// the breadcrumbs to their associated error in Sentry.
215    ///
216    // TODO(database-issues#5362): Rethink or justify our error logging strategy.
217    pub fn log(&self, message: &'static str, details: &str) {
218        tracing::warn!(
219            dataflow = self.dataflow_name,
220            "[customer-data] {message} ({details})"
221        );
222        tracing::error!(message);
223    }
224
225    /// Like [`Self::log`], but panics in debug mode.
226    ///
227    /// Use this method to notify about errors that are certainly caused by bugs in Materialize.
228    pub fn soft_panic_or_log(&self, message: &'static str, details: &str) {
229        tracing::warn!(
230            dataflow = self.dataflow_name,
231            "[customer-data] {message} ({details})"
232        );
233        mz_ore::soft_panic_or_log!("{}", message);
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use mz_storage_types::errors::DataflowError;
241    use proptest::prelude::*;
242
243    #[mz_ore::test]
244    #[cfg_attr(miri, ignore)]
245    fn proptest_roundtrip_canonical() {
246        proptest!(|(err in any::<DataflowError>())| {
247            let ser = DataflowErrorSer::from(err.clone());
248
249            // Round-trip: ser -> deser -> ser must produce identical bytes.
250            let deserialized = ser.deserialize();
251            let re_serialized = DataflowErrorSer::from(deserialized);
252            prop_assert_eq!(&ser, &re_serialized,
253                "Canonicality violation: round-trip produced different bytes");
254
255            // Equality: equal errors must produce equal bytes.
256            let ser2 = DataflowErrorSer::from(err);
257            prop_assert_eq!(&ser, &ser2,
258                "Canonicality violation: same error produced different bytes");
259        });
260    }
261
262    #[mz_ore::test]
263    fn display_roundtrip() {
264        let eval_err = EvalError::DivisionByZero;
265        let dfe = DataflowError::from(eval_err.clone());
266        let ser = DataflowErrorSer::from(eval_err);
267
268        assert_eq!(dfe.to_string(), ser.to_string());
269    }
270}