Skip to main content

mz_compute/render/
errors.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for handling errors encountered by operators.
11//!
12//! # `DataflowErrorSer`
13//!
14//! [`DataflowErrorSer`] is a serialized byte representation of
15//! [`DataflowError`] used on compute-internal dataflow edges instead of
16//! `DataflowError` directly.
17//!
18//! It is backed by proto-encoded [`ProtoDataflowError`] bytes. Because proto3 +
19//! prost + no map fields = deterministic encoding, byte-equality implies
20//! semantic equality, which lets us use `Ord`, `Hash`, etc. directly on the
21//! bytes.
22//!
23//! **Invariant**: NEVER add `map` fields to `ProtoDataflowError` or any of its
24//! transitive message types, as map fields have non-deterministic encoding
25//! order in protobuf.
26
27use columnar::Columnar;
28use columnation::{Columnation, Region};
29use mz_expr::EvalError;
30use mz_proto::{ProtoType, RustType};
31use mz_repr::Row;
32use mz_storage_types::errors::{DataflowError, ProtoDataflowError};
33use prost::Message;
34use serde::{Deserialize, Serialize};
35use std::fmt;
36
37/// Serialized representation of a [`DataflowError`], backed by proto-encoded bytes.
38///
39/// This type is used on compute-internal dataflow edges to avoid the cost of
40/// carrying a full `DataflowError` enum through the dataflow graph. Because the
41/// proto encoding is canonical (proto3 + prost + no map fields), byte-equality
42/// implies semantic equality.
43#[derive(
44    Clone,
45    Eq,
46    PartialEq,
47    Ord,
48    PartialOrd,
49    Hash,
50    Serialize,
51    Deserialize,
52    Columnar
53)]
54pub struct DataflowErrorSer(Vec<u8>);
55
56impl DataflowErrorSer {
57    /// Decode the serialized bytes back into a [`DataflowError`].
58    ///
59    /// # Panics
60    ///
61    /// Panics if the bytes do not represent a valid `ProtoDataflowError`.
62    pub fn deserialize(&self) -> DataflowError {
63        let proto = ProtoDataflowError::decode(self.0.as_slice())
64            .expect("DataflowErrorSer: invalid proto bytes");
65        proto
66            .into_rust()
67            .expect("DataflowErrorSer: failed to convert proto to DataflowError")
68    }
69}
70
71impl From<DataflowError> for DataflowErrorSer {
72    fn from(err: DataflowError) -> Self {
73        DataflowErrorSer(err.into_proto().encode_to_vec())
74    }
75}
76
77impl From<EvalError> for DataflowErrorSer {
78    fn from(err: EvalError) -> Self {
79        // Note: this allocates a Box via DataflowError::EvalError(Box::new(e)).
80        // Acceptable in v1.
81        DataflowErrorSer::from(DataflowError::from(err))
82    }
83}
84
85impl fmt::Display for DataflowErrorSer {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        self.deserialize().fmt(f)
88    }
89}
90
91impl fmt::Debug for DataflowErrorSer {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_tuple("DataflowErrorSer")
94            .field(&format_args!("{} bytes", self.0.len()))
95            .finish()
96    }
97}
98
99impl Columnation for DataflowErrorSer {
100    type InnerRegion = DataflowErrorSerRegion;
101}
102
103/// A [`Region`] for [`DataflowErrorSer`], delegating to the region for `Vec<u8>`.
104#[derive(Default)]
105pub struct DataflowErrorSerRegion {
106    inner: <Vec<u8> as Columnation>::InnerRegion,
107}
108
109impl Region for DataflowErrorSerRegion {
110    type Item = DataflowErrorSer;
111
112    unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
113        // SAFETY: delegating to the inner Vec<u8> region which handles the allocation.
114        DataflowErrorSer(unsafe { self.inner.copy(&item.0) })
115    }
116
117    fn clear(&mut self) {
118        self.inner.clear();
119    }
120
121    fn reserve_items<'a, I>(&mut self, items: I)
122    where
123        I: Iterator<Item = &'a Self::Item> + Clone,
124    {
125        self.inner.reserve_items(items.map(|item| &item.0));
126    }
127
128    fn reserve_regions<'a, I>(&mut self, regions: I)
129    where
130        I: Iterator<Item = &'a Self> + Clone,
131    {
132        self.inner.reserve_regions(regions.map(|r| &r.inner));
133    }
134
135    fn heap_size(&self, callback: impl FnMut(usize, usize)) {
136        self.inner.heap_size(callback);
137    }
138}
139
140/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
141/// specialized for use in compute.  Validation code will only run when the error constructor is
142/// Some.
143pub(super) trait MaybeValidatingRow<T, E> {
144    fn ok(t: T) -> Self;
145    fn into_error() -> Option<fn(E) -> Self>;
146}
147
148impl<E> MaybeValidatingRow<Row, E> for Row {
149    fn ok(t: Row) -> Self {
150        t
151    }
152
153    fn into_error() -> Option<fn(E) -> Self> {
154        None
155    }
156}
157
158impl<E> MaybeValidatingRow<(), E> for () {
159    fn ok(t: ()) -> Self {
160        t
161    }
162
163    fn into_error() -> Option<fn(E) -> Self> {
164        None
165    }
166}
167
168impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R> {
169    fn ok(t: Vec<R>) -> Self {
170        t
171    }
172
173    fn into_error() -> Option<fn(E) -> Self> {
174        None
175    }
176}
177
178impl<T, E> MaybeValidatingRow<T, E> for Result<T, E> {
179    fn ok(row: T) -> Self {
180        Ok(row)
181    }
182
183    fn into_error() -> Option<fn(E) -> Self> {
184        Some(Err)
185    }
186}
187
188/// Error logger to be used by rendering code.
189// TODO: Consider removing this struct.
190#[derive(Clone)]
191pub(super) struct ErrorLogger {
192    dataflow_name: String,
193}
194
195impl ErrorLogger {
196    pub fn new(dataflow_name: String) -> Self {
197        Self { dataflow_name }
198    }
199
200    /// Log the given error.
201    ///
202    /// The logging format is optimized for surfacing errors with Sentry:
203    ///  * `error` is logged at ERROR level and will appear as the error title in Sentry.
204    ///    We require it to be a static string, to ensure that Sentry always merges instances of
205    ///    the same error together.
206    ///  * `details` is logged at WARN level and will appear in the breadcrumbs.
207    ///    Put relevant dynamic information here.
208    ///
209    /// The message that's logged at WARN level has the format
210    ///   "[customer-data] {message} ({details})"
211    /// We include the [customer-data] tag out of the expectation that `details` will always
212    /// contain some sensitive customer data. We include the `message` to make it possible to match
213    /// the breadcrumbs to their associated error in Sentry.
214    ///
215    // TODO(database-issues#5362): Rethink or justify our error logging strategy.
216    pub fn log(&self, message: &'static str, details: &str) {
217        tracing::warn!(
218            dataflow = self.dataflow_name,
219            "[customer-data] {message} ({details})"
220        );
221        tracing::error!(message);
222    }
223
224    /// Like [`Self::log`], but panics in debug mode.
225    ///
226    /// Use this method to notify about errors that are certainly caused by bugs in Materialize.
227    pub fn soft_panic_or_log(&self, message: &'static str, details: &str) {
228        tracing::warn!(
229            dataflow = self.dataflow_name,
230            "[customer-data] {message} ({details})"
231        );
232        mz_ore::soft_panic_or_log!("{}", message);
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use mz_storage_types::errors::DataflowError;
240    use proptest::prelude::*;
241
242    #[mz_ore::test]
243    #[cfg_attr(miri, ignore)]
244    fn proptest_roundtrip_canonical() {
245        proptest!(|(err in any::<DataflowError>())| {
246            let ser = DataflowErrorSer::from(err.clone());
247
248            // Round-trip: ser -> deser -> ser must produce identical bytes.
249            let deserialized = ser.deserialize();
250            let re_serialized = DataflowErrorSer::from(deserialized);
251            prop_assert_eq!(&ser, &re_serialized,
252                "Canonicality violation: round-trip produced different bytes");
253
254            // Equality: equal errors must produce equal bytes.
255            let ser2 = DataflowErrorSer::from(err);
256            prop_assert_eq!(&ser, &ser2,
257                "Canonicality violation: same error produced different bytes");
258        });
259    }
260
261    #[mz_ore::test]
262    fn display_roundtrip() {
263        let eval_err = EvalError::DivisionByZero;
264        let dfe = DataflowError::from(eval_err.clone());
265        let ser = DataflowErrorSer::from(eval_err);
266
267        assert_eq!(dfe.to_string(), ser.to_string());
268    }
269}