mz_adapter/
util.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Debug;
11
12use itertools::Itertools;
13use mz_catalog::durable::{DurableCatalogError, FenceError};
14use mz_compute_client::controller::error::{
15    CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
16};
17use mz_controller_types::ClusterId;
18use mz_ore::tracing::OpenTelemetryContext;
19use mz_ore::{exit, soft_assert_no_log};
20use mz_repr::{RelationDesc, RowIterator, ScalarType};
21use mz_sql::names::FullItemName;
22use mz_sql::plan::StatementDesc;
23use mz_sql::session::metadata::SessionMetadata;
24use mz_sql::session::vars::Var;
25use mz_sql_parser::ast::display::AstDisplay;
26use mz_sql_parser::ast::{
27    CreateIndexStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
28};
29use mz_storage_types::controller::StorageError;
30use mz_transform::TransformError;
31use tokio::sync::mpsc::UnboundedSender;
32use tokio::sync::oneshot;
33
34use crate::catalog::{Catalog, CatalogState};
35use crate::command::{Command, Response};
36use crate::coord::{Message, PendingTxnResponse};
37use crate::error::AdapterError;
38use crate::session::{EndTransactionAction, Session};
39use crate::{ExecuteContext, ExecuteResponse};
40
41/// Handles responding to clients.
42#[derive(Debug)]
43pub struct ClientTransmitter<T>
44where
45    T: Transmittable,
46    <T as Transmittable>::Allowed: 'static,
47{
48    tx: Option<oneshot::Sender<Response<T>>>,
49    internal_cmd_tx: UnboundedSender<Message>,
50    /// Expresses an optional soft-assert on the set of values allowed to be
51    /// sent from `self`.
52    allowed: Option<&'static [T::Allowed]>,
53}
54
55impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
56    /// Creates a new client transmitter.
57    pub fn new(
58        tx: oneshot::Sender<Response<T>>,
59        internal_cmd_tx: UnboundedSender<Message>,
60    ) -> ClientTransmitter<T> {
61        ClientTransmitter {
62            tx: Some(tx),
63            internal_cmd_tx,
64            allowed: None,
65        }
66    }
67
68    /// Transmits `result` to the client, returning ownership of the session
69    /// `session` as well.
70    ///
71    /// # Panics
72    /// - If in `soft_assert`, `result.is_ok()`, `self.allowed.is_some()`, and
73    ///   the result value is not in the set of allowed values.
74    #[mz_ore::instrument(level = "debug")]
75    pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
76        // Guarantee that the value sent is of an allowed type.
77        soft_assert_no_log!(
78            match (&result, self.allowed.take()) {
79                (Ok(t), Some(allowed)) => allowed.contains(&t.to_allowed()),
80                _ => true,
81            },
82            "tried to send disallowed value {result:?} through ClientTransmitter; \
83            see ClientTransmitter::set_allowed"
84        );
85
86        // If we were not able to send a message, we must clean up the session
87        // ourselves. Return it to the caller for disposal.
88        if let Err(res) = self
89            .tx
90            .take()
91            .expect("tx will always be `Some` unless `self` has been consumed")
92            .send(Response {
93                result,
94                session,
95                otel_ctx: OpenTelemetryContext::obtain(),
96            })
97        {
98            self.internal_cmd_tx
99                .send(Message::Command(
100                    OpenTelemetryContext::obtain(),
101                    Command::Terminate {
102                        conn_id: res.session.conn_id().clone(),
103                        tx: None,
104                    },
105                ))
106                .expect("coordinator unexpectedly gone");
107        }
108    }
109
110    pub fn take(mut self) -> oneshot::Sender<Response<T>> {
111        self.tx
112            .take()
113            .expect("tx will always be `Some` unless `self` has been consumed")
114    }
115
116    /// Sets `self` so that the next call to [`Self::send`] will soft-assert
117    /// that, if `Ok`, the value is one of `allowed`, as determined by
118    /// [`Transmittable::to_allowed`].
119    pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
120        self.allowed = Some(allowed);
121    }
122}
123
124/// A helper trait for [`ClientTransmitter`].
125pub trait Transmittable {
126    /// The type of values used to express which set of values are allowed.
127    type Allowed: Eq + PartialEq + std::fmt::Debug;
128    /// The conversion from the [`ClientTransmitter`]'s type to `Allowed`.
129    ///
130    /// The benefit of this style of trait, rather than relying on a bound on
131    /// `Allowed`, are:
132    /// - Not requiring a clone
133    /// - The flexibility for facile implementations that do not plan to make
134    ///   use of the `allowed` feature. Those types can simply implement this
135    ///   trait for `bool`, and return `true`. However, it might not be
136    ///   semantically appropriate to expose `From<&Self> for bool`.
137    fn to_allowed(&self) -> Self::Allowed;
138}
139
140impl Transmittable for () {
141    type Allowed = bool;
142
143    fn to_allowed(&self) -> Self::Allowed {
144        true
145    }
146}
147
148/// `ClientTransmitter` with a response to send.
149#[derive(Debug)]
150pub struct CompletedClientTransmitter {
151    ctx: ExecuteContext,
152    response: Result<PendingTxnResponse, AdapterError>,
153    action: EndTransactionAction,
154}
155
156impl CompletedClientTransmitter {
157    /// Creates a new completed client transmitter.
158    pub fn new(
159        ctx: ExecuteContext,
160        response: Result<PendingTxnResponse, AdapterError>,
161        action: EndTransactionAction,
162    ) -> Self {
163        CompletedClientTransmitter {
164            ctx,
165            response,
166            action,
167        }
168    }
169
170    /// Returns the execute context to be finalized, and the result to send it.
171    pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
172        let changed = self
173            .ctx
174            .session_mut()
175            .vars_mut()
176            .end_transaction(self.action);
177
178        // Append any parameters that changed to the response.
179        let response = self.response.map(|mut r| {
180            r.extend_params(changed);
181            ExecuteResponse::from(r)
182        });
183
184        (self.ctx, response)
185    }
186}
187
188impl<T: Transmittable> Drop for ClientTransmitter<T> {
189    fn drop(&mut self) {
190        if self.tx.is_some() {
191            panic!("client transmitter dropped without send")
192        }
193    }
194}
195
196// TODO(benesch): constructing the canonical CREATE INDEX statement should be
197// the responsibility of the SQL package.
198pub fn index_sql(
199    index_name: String,
200    cluster_id: ClusterId,
201    view_name: FullItemName,
202    view_desc: &RelationDesc,
203    keys: &[usize],
204) -> String {
205    use mz_sql::ast::{Expr, Value};
206
207    CreateIndexStatement::<Raw> {
208        name: Some(Ident::new_unchecked(index_name)),
209        on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
210        in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
211        key_parts: Some(
212            keys.iter()
213                .map(|i| match view_desc.get_unambiguous_name(*i) {
214                    Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
215                    _ => Expr::Value(Value::Number((i + 1).to_string())),
216                })
217                .collect(),
218        ),
219        with_options: vec![],
220        if_not_exists: false,
221    }
222    .to_ast_string_stable()
223}
224
225/// Creates a description of the statement `stmt`.
226pub fn describe(
227    catalog: &Catalog,
228    stmt: Statement<Raw>,
229    param_types: &[Option<ScalarType>],
230    session: &Session,
231) -> Result<StatementDesc, AdapterError> {
232    let catalog = &catalog.for_session(session);
233    let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
234    Ok(mz_sql::plan::describe(
235        session.pcx(),
236        catalog,
237        stmt,
238        param_types,
239    )?)
240}
241
242pub trait ResultExt<T> {
243    /// Like [`Result::expect`], but terminates the process with `halt` or
244    /// exit code 0 instead of `panic` if the error indicates that it should
245    /// cause a halt of graceful termination.
246    fn unwrap_or_terminate(self, context: &str) -> T;
247
248    /// Terminates the process with `halt` or exit code 0 if `self` is an
249    /// error that should halt or cause graceful termination. Otherwise,
250    /// does nothing.
251    fn maybe_terminate(self, context: &str) -> Self;
252}
253
254impl<T, E> ResultExt<T> for Result<T, E>
255where
256    E: ShouldTerminateGracefully + Debug,
257{
258    fn unwrap_or_terminate(self, context: &str) -> T {
259        match self {
260            Ok(t) => t,
261            Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
262            Err(e) => panic!("{context}: {e:?}"),
263        }
264    }
265
266    fn maybe_terminate(self, context: &str) -> Self {
267        if let Err(e) = &self {
268            if e.should_terminate_gracefully() {
269                exit!(0, "{context}: {e:?}");
270            }
271        }
272
273        self
274    }
275}
276
277/// A trait for errors that should terminate gracefully rather than panic
278/// the process.
279trait ShouldTerminateGracefully {
280    /// Reports whether the error should terminate the process gracefully
281    /// rather than panic.
282    fn should_terminate_gracefully(&self) -> bool;
283}
284
285impl ShouldTerminateGracefully for AdapterError {
286    fn should_terminate_gracefully(&self) -> bool {
287        match self {
288            AdapterError::Catalog(e) => e.should_terminate_gracefully(),
289            _ => false,
290        }
291    }
292}
293
294impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
295    fn should_terminate_gracefully(&self) -> bool {
296        match &self.kind {
297            mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
298            _ => false,
299        }
300    }
301}
302
303impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
304    fn should_terminate_gracefully(&self) -> bool {
305        match &self {
306            Self::Durable(e) => e.should_terminate_gracefully(),
307            _ => false,
308        }
309    }
310}
311
312impl ShouldTerminateGracefully for DurableCatalogError {
313    fn should_terminate_gracefully(&self) -> bool {
314        match self {
315            DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
316            DurableCatalogError::IncompatibleDataVersion { .. }
317            | DurableCatalogError::IncompatiblePersistVersion { .. }
318            | DurableCatalogError::Proto(_)
319            | DurableCatalogError::Uninitialized
320            | DurableCatalogError::NotWritable(_)
321            | DurableCatalogError::DuplicateKey
322            | DurableCatalogError::UniquenessViolation
323            | DurableCatalogError::Storage(_)
324            | DurableCatalogError::Internal(_) => false,
325        }
326    }
327}
328
329impl ShouldTerminateGracefully for FenceError {
330    fn should_terminate_gracefully(&self) -> bool {
331        match self {
332            FenceError::DeployGeneration { .. } => true,
333            FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
334        }
335    }
336}
337
338impl<T> ShouldTerminateGracefully for StorageError<T> {
339    fn should_terminate_gracefully(&self) -> bool {
340        match self {
341            StorageError::ResourceExhausted(_)
342            | StorageError::CollectionMetadataAlreadyExists(_)
343            | StorageError::PersistShardAlreadyInUse(_)
344            | StorageError::PersistSchemaEvolveRace { .. }
345            | StorageError::PersistInvalidSchemaEvolve { .. }
346            | StorageError::TxnWalShardAlreadyExists
347            | StorageError::UpdateBeyondUpper(_)
348            | StorageError::ReadBeforeSince(_)
349            | StorageError::InvalidUppers(_)
350            | StorageError::InvalidUsage(_)
351            | StorageError::CollectionIdReused(_)
352            | StorageError::SinkIdReused(_)
353            | StorageError::IdentifierMissing(_)
354            | StorageError::IdentifierInvalid(_)
355            | StorageError::IngestionInstanceMissing { .. }
356            | StorageError::ExportInstanceMissing { .. }
357            | StorageError::Generic(_)
358            | StorageError::ReadOnly
359            | StorageError::DataflowError(_)
360            | StorageError::InvalidAlter { .. }
361            | StorageError::ShuttingDown(_)
362            | StorageError::MissingSubsourceReference { .. }
363            | StorageError::RtrTimeout(_)
364            | StorageError::RtrDropFailure(_) => false,
365        }
366    }
367}
368
369impl ShouldTerminateGracefully for DataflowCreationError {
370    fn should_terminate_gracefully(&self) -> bool {
371        match self {
372            DataflowCreationError::SinceViolation(_)
373            | DataflowCreationError::InstanceMissing(_)
374            | DataflowCreationError::CollectionMissing(_)
375            | DataflowCreationError::ReplicaMissing(_)
376            | DataflowCreationError::MissingAsOf
377            | DataflowCreationError::EmptyAsOfForSubscribe
378            | DataflowCreationError::EmptyAsOfForCopyTo => false,
379        }
380    }
381}
382
383impl ShouldTerminateGracefully for CollectionUpdateError {
384    fn should_terminate_gracefully(&self) -> bool {
385        match self {
386            CollectionUpdateError::InstanceMissing(_)
387            | CollectionUpdateError::CollectionMissing(_) => false,
388        }
389    }
390}
391
392impl ShouldTerminateGracefully for PeekError {
393    fn should_terminate_gracefully(&self) -> bool {
394        match self {
395            PeekError::SinceViolation(_)
396            | PeekError::InstanceMissing(_)
397            | PeekError::CollectionMissing(_)
398            | PeekError::ReplicaMissing(_) => false,
399        }
400    }
401}
402
403impl ShouldTerminateGracefully for ReadPolicyError {
404    fn should_terminate_gracefully(&self) -> bool {
405        match self {
406            ReadPolicyError::InstanceMissing(_)
407            | ReadPolicyError::CollectionMissing(_)
408            | ReadPolicyError::WriteOnlyCollection(_) => false,
409        }
410    }
411}
412
413impl ShouldTerminateGracefully for TransformError {
414    fn should_terminate_gracefully(&self) -> bool {
415        match self {
416            TransformError::Internal(_)
417            | TransformError::IdentifierMissing(_)
418            | TransformError::CallerShouldPanic(_) => false,
419        }
420    }
421}
422
423impl ShouldTerminateGracefully for InstanceMissing {
424    fn should_terminate_gracefully(&self) -> bool {
425        false
426    }
427}
428
429/// Returns the viewable session and system variables.
430pub(crate) fn viewable_variables<'a>(
431    catalog: &'a CatalogState,
432    session: &'a dyn SessionMetadata,
433) -> impl Iterator<Item = &'a dyn Var> {
434    session
435        .vars()
436        .iter()
437        .chain(catalog.system_config().iter())
438        .filter(|v| v.visible(session.user(), catalog.system_config()).is_ok())
439}
440
441/// Verify that the rows in [`RowIterator`] match the expected [`RelationDesc`].
442pub fn verify_datum_desc(
443    desc: &RelationDesc,
444    rows: &mut dyn RowIterator,
445) -> Result<(), AdapterError> {
446    // Verify the first row is of the expected type. This is often good enough to
447    // find problems.
448    //
449    // Notably it failed to find database-issues#1946 when "FETCH 2" was used in a test, instead
450    // we had to use "FETCH 1" twice.
451
452    let Some(row) = rows.peek() else {
453        return Ok(());
454    };
455
456    let datums = row.unpack();
457    let col_types = &desc.typ().column_types;
458    if datums.len() != col_types.len() {
459        let msg = format!(
460            "internal error: row descriptor has {} columns but row has {} columns",
461            col_types.len(),
462            datums.len(),
463        );
464        return Err(AdapterError::Internal(msg));
465    }
466
467    for (i, (d, t)) in datums.iter().zip_eq(col_types).enumerate() {
468        if !d.is_instance_of(t) {
469            let msg = format!(
470                "internal error: column {} is not of expected type {:?}: {:?}",
471                i, t, d
472            );
473            return Err(AdapterError::Internal(msg));
474        }
475    }
476
477    Ok(())
478}