mz_adapter/
util.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Debug;
11
12use mz_catalog::durable::{DurableCatalogError, FenceError};
13use mz_compute_client::controller::error::{
14    CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
15};
16use mz_controller_types::ClusterId;
17use mz_ore::tracing::OpenTelemetryContext;
18use mz_ore::{exit, soft_assert_no_log};
19use mz_repr::{RelationDesc, RowIterator, ScalarType};
20use mz_sql::names::FullItemName;
21use mz_sql::plan::StatementDesc;
22use mz_sql::session::metadata::SessionMetadata;
23use mz_sql::session::vars::Var;
24use mz_sql_parser::ast::display::AstDisplay;
25use mz_sql_parser::ast::{
26    CreateIndexStatement, FetchStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
27};
28use mz_storage_types::controller::StorageError;
29use mz_transform::TransformError;
30use tokio::sync::mpsc::UnboundedSender;
31use tokio::sync::oneshot;
32
33use crate::catalog::{Catalog, CatalogState};
34use crate::command::{Command, Response};
35use crate::coord::{Message, PendingTxnResponse};
36use crate::error::AdapterError;
37use crate::session::{EndTransactionAction, Session};
38use crate::{ExecuteContext, ExecuteResponse};
39
40/// Handles responding to clients.
41#[derive(Debug)]
42pub struct ClientTransmitter<T>
43where
44    T: Transmittable,
45    <T as Transmittable>::Allowed: 'static,
46{
47    tx: Option<oneshot::Sender<Response<T>>>,
48    internal_cmd_tx: UnboundedSender<Message>,
49    /// Expresses an optional soft-assert on the set of values allowed to be
50    /// sent from `self`.
51    allowed: Option<&'static [T::Allowed]>,
52}
53
54impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
55    /// Creates a new client transmitter.
56    pub fn new(
57        tx: oneshot::Sender<Response<T>>,
58        internal_cmd_tx: UnboundedSender<Message>,
59    ) -> ClientTransmitter<T> {
60        ClientTransmitter {
61            tx: Some(tx),
62            internal_cmd_tx,
63            allowed: None,
64        }
65    }
66
67    /// Transmits `result` to the client, returning ownership of the session
68    /// `session` as well.
69    ///
70    /// # Panics
71    /// - If in `soft_assert`, `result.is_ok()`, `self.allowed.is_some()`, and
72    ///   the result value is not in the set of allowed values.
73    #[mz_ore::instrument(level = "debug")]
74    pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
75        // Guarantee that the value sent is of an allowed type.
76        soft_assert_no_log!(
77            match (&result, self.allowed.take()) {
78                (Ok(t), Some(allowed)) => allowed.contains(&t.to_allowed()),
79                _ => true,
80            },
81            "tried to send disallowed value {result:?} through ClientTransmitter; \
82            see ClientTransmitter::set_allowed"
83        );
84
85        // If we were not able to send a message, we must clean up the session
86        // ourselves. Return it to the caller for disposal.
87        if let Err(res) = self
88            .tx
89            .take()
90            .expect("tx will always be `Some` unless `self` has been consumed")
91            .send(Response {
92                result,
93                session,
94                otel_ctx: OpenTelemetryContext::obtain(),
95            })
96        {
97            self.internal_cmd_tx
98                .send(Message::Command(
99                    OpenTelemetryContext::obtain(),
100                    Command::Terminate {
101                        conn_id: res.session.conn_id().clone(),
102                        tx: None,
103                    },
104                ))
105                .expect("coordinator unexpectedly gone");
106        }
107    }
108
109    pub fn take(mut self) -> oneshot::Sender<Response<T>> {
110        self.tx
111            .take()
112            .expect("tx will always be `Some` unless `self` has been consumed")
113    }
114
115    /// Sets `self` so that the next call to [`Self::send`] will soft-assert
116    /// that, if `Ok`, the value is one of `allowed`, as determined by
117    /// [`Transmittable::to_allowed`].
118    pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
119        self.allowed = Some(allowed);
120    }
121}
122
123/// A helper trait for [`ClientTransmitter`].
124pub trait Transmittable {
125    /// The type of values used to express which set of values are allowed.
126    type Allowed: Eq + PartialEq + std::fmt::Debug;
127    /// The conversion from the [`ClientTransmitter`]'s type to `Allowed`.
128    ///
129    /// The benefit of this style of trait, rather than relying on a bound on
130    /// `Allowed`, are:
131    /// - Not requiring a clone
132    /// - The flexibility for facile implementations that do not plan to make
133    ///   use of the `allowed` feature. Those types can simply implement this
134    ///   trait for `bool`, and return `true`. However, it might not be
135    ///   semantically appropriate to expose `From<&Self> for bool`.
136    fn to_allowed(&self) -> Self::Allowed;
137}
138
139impl Transmittable for () {
140    type Allowed = bool;
141
142    fn to_allowed(&self) -> Self::Allowed {
143        true
144    }
145}
146
147/// `ClientTransmitter` with a response to send.
148#[derive(Debug)]
149pub struct CompletedClientTransmitter {
150    ctx: ExecuteContext,
151    response: Result<PendingTxnResponse, AdapterError>,
152    action: EndTransactionAction,
153}
154
155impl CompletedClientTransmitter {
156    /// Creates a new completed client transmitter.
157    pub fn new(
158        ctx: ExecuteContext,
159        response: Result<PendingTxnResponse, AdapterError>,
160        action: EndTransactionAction,
161    ) -> Self {
162        CompletedClientTransmitter {
163            ctx,
164            response,
165            action,
166        }
167    }
168
169    /// Returns the execute context to be finalized, and the result to send it.
170    pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
171        let changed = self
172            .ctx
173            .session_mut()
174            .vars_mut()
175            .end_transaction(self.action);
176
177        // Append any parameters that changed to the response.
178        let response = self.response.map(|mut r| {
179            r.extend_params(changed);
180            ExecuteResponse::from(r)
181        });
182
183        (self.ctx, response)
184    }
185}
186
187impl<T: Transmittable> Drop for ClientTransmitter<T> {
188    fn drop(&mut self) {
189        if self.tx.is_some() {
190            panic!("client transmitter dropped without send")
191        }
192    }
193}
194
195// TODO(benesch): constructing the canonical CREATE INDEX statement should be
196// the responsibility of the SQL package.
197pub fn index_sql(
198    index_name: String,
199    cluster_id: ClusterId,
200    view_name: FullItemName,
201    view_desc: &RelationDesc,
202    keys: &[usize],
203) -> String {
204    use mz_sql::ast::{Expr, Value};
205
206    CreateIndexStatement::<Raw> {
207        name: Some(Ident::new_unchecked(index_name)),
208        on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
209        in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
210        key_parts: Some(
211            keys.iter()
212                .map(|i| match view_desc.get_unambiguous_name(*i) {
213                    Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
214                    _ => Expr::Value(Value::Number((i + 1).to_string())),
215                })
216                .collect(),
217        ),
218        with_options: vec![],
219        if_not_exists: false,
220    }
221    .to_ast_string_stable()
222}
223
224/// Creates a description of the statement `stmt`.
225///
226/// This function is identical to sql::plan::describe except this is also
227/// supports describing FETCH statements which need access to bound portals
228/// through the session.
229pub fn describe(
230    catalog: &Catalog,
231    stmt: Statement<Raw>,
232    param_types: &[Option<ScalarType>],
233    session: &Session,
234) -> Result<StatementDesc, AdapterError> {
235    match stmt {
236        // FETCH's description depends on the current session, which describe_statement
237        // doesn't (and shouldn't?) have access to, so intercept it here.
238        Statement::Fetch(FetchStatement { ref name, .. }) => {
239            // Unverified portal is ok here because Coordinator::execute will verify the
240            // named portal during execution.
241            match session
242                .get_portal_unverified(name.as_str())
243                .map(|p| p.desc.clone())
244            {
245                Some(mut desc) => {
246                    // Parameters are already bound to the portal and will not be accepted through
247                    // FETCH.
248                    desc.param_types = Vec::new();
249                    Ok(desc)
250                }
251                None => Err(AdapterError::UnknownCursor(name.to_string())),
252            }
253        }
254        _ => {
255            let catalog = &catalog.for_session(session);
256            let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
257            Ok(mz_sql::plan::describe(
258                session.pcx(),
259                catalog,
260                stmt,
261                param_types,
262            )?)
263        }
264    }
265}
266
267pub trait ResultExt<T> {
268    /// Like [`Result::expect`], but terminates the process with `halt` or
269    /// exit code 0 instead of `panic` if the error indicates that it should
270    /// cause a halt of graceful termination.
271    fn unwrap_or_terminate(self, context: &str) -> T;
272
273    /// Terminates the process with `halt` or exit code 0 if `self` is an
274    /// error that should halt or cause graceful termination. Otherwise,
275    /// does nothing.
276    fn maybe_terminate(self, context: &str) -> Self;
277}
278
279impl<T, E> ResultExt<T> for Result<T, E>
280where
281    E: ShouldTerminateGracefully + Debug,
282{
283    fn unwrap_or_terminate(self, context: &str) -> T {
284        match self {
285            Ok(t) => t,
286            Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
287            Err(e) => panic!("{context}: {e:?}"),
288        }
289    }
290
291    fn maybe_terminate(self, context: &str) -> Self {
292        if let Err(e) = &self {
293            if e.should_terminate_gracefully() {
294                exit!(0, "{context}: {e:?}");
295            }
296        }
297
298        self
299    }
300}
301
302/// A trait for errors that should terminate gracefully rather than panic
303/// the process.
304trait ShouldTerminateGracefully {
305    /// Reports whether the error should terminate the process gracefully
306    /// rather than panic.
307    fn should_terminate_gracefully(&self) -> bool;
308}
309
310impl ShouldTerminateGracefully for AdapterError {
311    fn should_terminate_gracefully(&self) -> bool {
312        match self {
313            AdapterError::Catalog(e) => e.should_terminate_gracefully(),
314            _ => false,
315        }
316    }
317}
318
319impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
320    fn should_terminate_gracefully(&self) -> bool {
321        match &self.kind {
322            mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
323            _ => false,
324        }
325    }
326}
327
328impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
329    fn should_terminate_gracefully(&self) -> bool {
330        match &self {
331            Self::Durable(e) => e.should_terminate_gracefully(),
332            _ => false,
333        }
334    }
335}
336
337impl ShouldTerminateGracefully for DurableCatalogError {
338    fn should_terminate_gracefully(&self) -> bool {
339        match self {
340            DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
341            DurableCatalogError::IncompatibleDataVersion { .. }
342            | DurableCatalogError::IncompatiblePersistVersion { .. }
343            | DurableCatalogError::Proto(_)
344            | DurableCatalogError::Uninitialized
345            | DurableCatalogError::NotWritable(_)
346            | DurableCatalogError::DuplicateKey
347            | DurableCatalogError::UniquenessViolation
348            | DurableCatalogError::Storage(_)
349            | DurableCatalogError::Internal(_) => false,
350        }
351    }
352}
353
354impl ShouldTerminateGracefully for FenceError {
355    fn should_terminate_gracefully(&self) -> bool {
356        match self {
357            FenceError::DeployGeneration { .. } => true,
358            FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
359        }
360    }
361}
362
363impl<T> ShouldTerminateGracefully for StorageError<T> {
364    fn should_terminate_gracefully(&self) -> bool {
365        match self {
366            StorageError::ResourceExhausted(_)
367            | StorageError::CollectionMetadataAlreadyExists(_)
368            | StorageError::PersistShardAlreadyInUse(_)
369            | StorageError::PersistSchemaEvolveRace { .. }
370            | StorageError::PersistInvalidSchemaEvolve { .. }
371            | StorageError::TxnWalShardAlreadyExists
372            | StorageError::UpdateBeyondUpper(_)
373            | StorageError::ReadBeforeSince(_)
374            | StorageError::InvalidUppers(_)
375            | StorageError::InvalidUsage(_)
376            | StorageError::CollectionIdReused(_)
377            | StorageError::SinkIdReused(_)
378            | StorageError::IdentifierMissing(_)
379            | StorageError::IdentifierInvalid(_)
380            | StorageError::IngestionInstanceMissing { .. }
381            | StorageError::ExportInstanceMissing { .. }
382            | StorageError::Generic(_)
383            | StorageError::ReadOnly
384            | StorageError::DataflowError(_)
385            | StorageError::InvalidAlter { .. }
386            | StorageError::ShuttingDown(_)
387            | StorageError::MissingSubsourceReference { .. }
388            | StorageError::RtrTimeout(_)
389            | StorageError::RtrDropFailure(_) => false,
390        }
391    }
392}
393
394impl ShouldTerminateGracefully for DataflowCreationError {
395    fn should_terminate_gracefully(&self) -> bool {
396        match self {
397            DataflowCreationError::SinceViolation(_)
398            | DataflowCreationError::InstanceMissing(_)
399            | DataflowCreationError::CollectionMissing(_)
400            | DataflowCreationError::ReplicaMissing(_)
401            | DataflowCreationError::MissingAsOf
402            | DataflowCreationError::EmptyAsOfForSubscribe
403            | DataflowCreationError::EmptyAsOfForCopyTo => false,
404        }
405    }
406}
407
408impl ShouldTerminateGracefully for CollectionUpdateError {
409    fn should_terminate_gracefully(&self) -> bool {
410        match self {
411            CollectionUpdateError::InstanceMissing(_)
412            | CollectionUpdateError::CollectionMissing(_) => false,
413        }
414    }
415}
416
417impl ShouldTerminateGracefully for PeekError {
418    fn should_terminate_gracefully(&self) -> bool {
419        match self {
420            PeekError::SinceViolation(_)
421            | PeekError::InstanceMissing(_)
422            | PeekError::CollectionMissing(_)
423            | PeekError::ReplicaMissing(_) => false,
424        }
425    }
426}
427
428impl ShouldTerminateGracefully for ReadPolicyError {
429    fn should_terminate_gracefully(&self) -> bool {
430        match self {
431            ReadPolicyError::InstanceMissing(_)
432            | ReadPolicyError::CollectionMissing(_)
433            | ReadPolicyError::WriteOnlyCollection(_) => false,
434        }
435    }
436}
437
438impl ShouldTerminateGracefully for TransformError {
439    fn should_terminate_gracefully(&self) -> bool {
440        match self {
441            TransformError::Internal(_)
442            | TransformError::IdentifierMissing(_)
443            | TransformError::CallerShouldPanic(_) => false,
444        }
445    }
446}
447
448impl ShouldTerminateGracefully for InstanceMissing {
449    fn should_terminate_gracefully(&self) -> bool {
450        false
451    }
452}
453
454/// Returns the viewable session and system variables.
455pub(crate) fn viewable_variables<'a>(
456    catalog: &'a CatalogState,
457    session: &'a dyn SessionMetadata,
458) -> impl Iterator<Item = &'a dyn Var> {
459    session
460        .vars()
461        .iter()
462        .chain(catalog.system_config().iter())
463        .filter(|v| v.visible(session.user(), catalog.system_config()).is_ok())
464}
465
466/// Verify that the rows in [`RowIterator`] match the expected [`RelationDesc`].
467pub fn verify_datum_desc(
468    desc: &RelationDesc,
469    rows: &mut dyn RowIterator,
470) -> Result<(), AdapterError> {
471    // Verify the first row is of the expected type. This is often good enough to
472    // find problems.
473    //
474    // Notably it failed to find database-issues#1946 when "FETCH 2" was used in a test, instead
475    // we had to use "FETCH 1" twice.
476
477    let Some(row) = rows.peek() else {
478        return Ok(());
479    };
480
481    let datums = row.unpack();
482    let col_types = &desc.typ().column_types;
483    if datums.len() != col_types.len() {
484        let msg = format!(
485            "internal error: row descriptor has {} columns but row has {} columns",
486            col_types.len(),
487            datums.len(),
488        );
489        return Err(AdapterError::Internal(msg));
490    }
491
492    for (i, (d, t)) in datums.iter().zip(col_types).enumerate() {
493        if !d.is_instance_of(t) {
494            let msg = format!(
495                "internal error: column {} is not of expected type {:?}: {:?}",
496                i, t, d
497            );
498            return Err(AdapterError::Internal(msg));
499        }
500    }
501
502    Ok(())
503}