1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Helpers for handling errors encountered by operators.

use std::hash::Hash;

use differential_dataflow::ExchangeData;
use mz_repr::Row;
use timely::container::columnation::Columnation;

use crate::render::context::ShutdownToken;

/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
/// specialized for use in compute.  Validation code will only run when the error constructor is
/// Some.
pub(super) trait MaybeValidatingRow<T, E>: ExchangeData + Columnation + Hash {
    fn ok(t: T) -> Self;
    fn into_error() -> Option<fn(E) -> Self>;
}

impl<E> MaybeValidatingRow<Row, E> for Row {
    fn ok(t: Row) -> Self {
        t
    }

    fn into_error() -> Option<fn(E) -> Self> {
        None
    }
}

impl<E> MaybeValidatingRow<(), E> for () {
    fn ok(t: ()) -> Self {
        t
    }

    fn into_error() -> Option<fn(E) -> Self> {
        None
    }
}

impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R>
where
    R: ExchangeData + Columnation + Hash,
{
    fn ok(t: Vec<R>) -> Self {
        t
    }

    fn into_error() -> Option<fn(E) -> Self> {
        None
    }
}

impl<T, E> MaybeValidatingRow<T, E> for Result<T, E>
where
    T: ExchangeData + Columnation + Hash,
    E: ExchangeData + Columnation + Hash,
{
    fn ok(row: T) -> Self {
        Ok(row)
    }

    fn into_error() -> Option<fn(E) -> Self> {
        Some(Err)
    }
}

/// Error logger to be used by rendering code.
///
/// Holds onto a token to ensure that no false-positive errors are logged while the dataflow is in
/// the process of shutting down.
#[derive(Clone)]
pub(super) struct ErrorLogger {
    token: ShutdownToken,
    dataflow_name: String,
}

impl ErrorLogger {
    pub fn new(token: ShutdownToken, dataflow_name: String) -> Self {
        Self {
            token,
            dataflow_name,
        }
    }

    /// Log the given error, unless the dataflow is shutting down.
    ///
    /// The logging format is optimized for surfacing errors with Sentry:
    ///  * `error` is logged at ERROR level and will appear as the error title in Sentry.
    ///    We require it to be a static string, to ensure that Sentry always merges instances of
    ///    the same error together.
    ///  * `details` is logged at WARN level and will appear in the breadcrumbs.
    ///    Put relevant dynamic information here.
    ///
    /// The message that's logged at WARN level has the format
    ///   "[customer-data] {message} ({details})"
    /// We include the [customer-data] tag out of the expectation that `details` will always
    /// contain some sensitive customer data. We include the `message` to make it possible to match
    /// the breadcrumbs to their associated error in Sentry.
    ///
    // TODO(#18214): Rethink or justify our error logging strategy.
    pub fn log(&self, message: &'static str, details: &str) {
        if !self.token.in_shutdown() {
            self.log_always(message, details);
        }
    }

    /// Like [`Self::log`], but also logs errors when the dataflow is shutting down.
    ///
    /// Use this method to notify about errors that cannot be caused by dataflow shutdown.
    pub fn log_always(&self, message: &'static str, details: &str) {
        tracing::warn!(
            dataflow = self.dataflow_name,
            "[customer-data] {message} ({details})"
        );
        tracing::error!(message);
    }

    /// Like [`Self::log_always`], but panics in debug mode.
    ///
    /// Use this method to notify about errors that are certainly caused by bugs in Materialize.
    pub fn soft_panic_or_log(&self, message: &'static str, details: &str) {
        tracing::warn!(
            dataflow = self.dataflow_name,
            "[customer-data] {message} ({details})"
        );
        mz_ore::soft_panic_or_log!("{}", message);
    }
}