mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_expr::EvalError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::StrExt;
23use mz_pgwire_common::{ErrorResponse, Severity};
24use mz_repr::adt::timestamp::TimestampError;
25use mz_repr::explain::ExplainError;
26use mz_repr::{NotNullViolation, Timestamp};
27use mz_sql::plan::PlanError;
28use mz_sql::rbac;
29use mz_sql::session::vars::VarError;
30use mz_storage_types::connections::ConnectionValidationError;
31use mz_storage_types::controller::StorageError;
32use smallvec::SmallVec;
33use timely::progress::Antichain;
34use tokio::sync::oneshot;
35use tokio_postgres::error::SqlState;
36
37use crate::coord::NetworkPolicyError;
38use crate::optimize::OptimizerError;
39
40/// Errors that can occur in the coordinator.
41#[derive(Debug)]
42pub enum AdapterError {
43    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
44    AbsurdSubscribeBounds {
45        as_of: mz_repr::Timestamp,
46        up_to: mz_repr::Timestamp,
47    },
48    /// Attempted to use a potentially ambiguous column reference expression with a system table.
49    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
50    // resolved because it prevents us from adding columns to system tables.
51    AmbiguousSystemColumnReference,
52    /// An error occurred in a catalog operation.
53    Catalog(mz_catalog::memory::error::Error),
54    /// The cached plan or descriptor changed.
55    ChangedPlan(String),
56    /// The cursor already exists.
57    DuplicateCursor(String),
58    /// An error while evaluating an expression.
59    Eval(EvalError),
60    /// An error occurred while planning the statement.
61    Explain(ExplainError),
62    /// The ID allocator exhausted all valid IDs.
63    IdExhaustionError,
64    /// Unexpected internal state was encountered.
65    Internal(String),
66    /// Attempted to read from log sources of a replica with disabled introspection.
67    IntrospectionDisabled {
68        log_names: Vec<String>,
69    },
70    /// Attempted to create an object dependent on log sources that doesn't support
71    /// log dependencies.
72    InvalidLogDependency {
73        object_type: String,
74        log_names: Vec<String>,
75    },
76    /// No such cluster replica size has been configured.
77    InvalidClusterReplicaAz {
78        az: String,
79        expected: Vec<String>,
80    },
81    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
82    InvalidSetIsolationLevel,
83    /// SET cluster was called in the middle of a transaction.
84    InvalidSetCluster,
85    /// No such storage instance size has been configured.
86    InvalidStorageClusterSize {
87        size: String,
88        expected: Vec<String>,
89    },
90    /// Creating a source or sink without specifying its size is forbidden.
91    SourceOrSinkSizeRequired {
92        expected: Vec<String>,
93    },
94    /// The selection value for a table mutation operation refers to an invalid object.
95    InvalidTableMutationSelection,
96    /// Expression violated a column's constraint
97    ConstraintViolation(NotNullViolation),
98    /// Transaction cluster was dropped in the middle of a transaction.
99    ConcurrentClusterDrop,
100    /// Target cluster has no replicas to service query.
101    NoClusterReplicasAvailable {
102        name: String,
103        is_managed: bool,
104    },
105    /// The named operation cannot be run in a transaction.
106    OperationProhibitsTransaction(String),
107    /// The named operation requires an active transaction.
108    OperationRequiresTransaction(String),
109    /// An error occurred while planning the statement.
110    PlanError(PlanError),
111    /// The named prepared statement already exists.
112    PreparedStatementExists(String),
113    /// Wrapper around parsing error
114    ParseError(mz_sql_parser::parser::ParserStatementError),
115    /// The transaction is in read-only mode.
116    ReadOnlyTransaction,
117    /// The transaction in in read-only mode and a read already occurred.
118    ReadWriteUnavailable,
119    /// The recursion limit of some operation was exceeded.
120    RecursionLimit(RecursionLimitError),
121    /// A query in a transaction referenced a relation outside the first query's
122    /// time domain.
123    RelationOutsideTimeDomain {
124        relations: Vec<String>,
125        names: Vec<String>,
126    },
127    /// A query tried to create more resources than is allowed in the system configuration.
128    ResourceExhaustion {
129        resource_type: String,
130        limit_name: String,
131        desired: String,
132        limit: String,
133        current: String,
134    },
135    /// Result size of a query is too large.
136    ResultSize(String),
137    /// The specified feature is not permitted in safe mode.
138    SafeModeViolation(String),
139    /// The current transaction had the wrong set of write locks.
140    WrongSetOfLocks,
141    /// Waiting on a query timed out.
142    ///
143    /// Note this differs slightly from PG's implementation/semantics.
144    StatementTimeout,
145    /// The user canceled the query
146    Canceled,
147    /// An idle session in a transaction has timed out.
148    IdleInTransactionSessionTimeout,
149    /// The transaction is in single-subscribe mode.
150    SubscribeOnlyTransaction,
151    /// An error occurred in the optimizer.
152    Optimizer(OptimizerError),
153    /// A query depends on items which are not allowed to be referenced from the current cluster.
154    UnallowedOnCluster {
155        depends_on: SmallVec<[String; 2]>,
156        cluster: String,
157    },
158    /// A user tried to perform an action that they were unauthorized to do.
159    Unauthorized(rbac::UnauthorizedError),
160    /// The named cursor does not exist.
161    UnknownCursor(String),
162    /// The named role does not exist.
163    UnknownLoginRole(String),
164    UnknownPreparedStatement(String),
165    /// The named cluster replica does not exist.
166    UnknownClusterReplica {
167        cluster_name: String,
168        replica_name: String,
169    },
170    /// The named setting does not exist.
171    UnrecognizedConfigurationParam(String),
172    /// A generic error occurred.
173    //
174    // TODO(benesch): convert all those errors to structured errors.
175    Unstructured(anyhow::Error),
176    /// The named feature is not supported and will (probably) not be.
177    Unsupported(&'static str),
178    /// Some feature isn't available for a (potentially opaque) reason.
179    /// For example, in cloud Self-Managed auth features aren't available,
180    /// but we don't want to mention self managed auth.
181    UnavailableFeature {
182        feature: String,
183        docs: Option<String>,
184    },
185    /// Attempted to read from log sources without selecting a target replica.
186    UntargetedLogRead {
187        log_names: Vec<String>,
188    },
189    /// The transaction is in write-only mode.
190    WriteOnlyTransaction,
191    /// The transaction can only execute a single statement.
192    SingleStatementTransaction,
193    /// The transaction can only execute simple DDL.
194    DDLOnlyTransaction,
195    /// Another session modified the Catalog while this transaction was open.
196    DDLTransactionRace,
197    /// Used to prevent us from durably committing state while a DDL transaction is open, should
198    /// never be returned to the user.
199    TransactionDryRun {
200        /// New operations that were run in the transaction.
201        new_ops: Vec<crate::catalog::Op>,
202        /// New resulting `CatalogState`.
203        new_state: crate::catalog::CatalogState,
204    },
205    /// An error occurred in the storage layer
206    Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
207    /// An error occurred in the compute layer
208    Compute(anyhow::Error),
209    /// An error in the orchestrator layer
210    Orchestrator(anyhow::Error),
211    /// A statement tried to drop a role that had dependent objects.
212    ///
213    /// The map keys are role names and values are detailed error messages.
214    DependentObject(BTreeMap<String, Vec<String>>),
215    /// When performing an `ALTER` of some variety, re-planning the statement
216    /// errored.
217    InvalidAlter(&'static str, PlanError),
218    /// An error occurred while validating a connection.
219    ConnectionValidation(ConnectionValidationError),
220    /// We refuse to create the materialized view, because it would never be refreshed, so it would
221    /// never be queryable. This can happen when the only specified refreshes are further back in
222    /// the past than the initial compaction window of the materialized view.
223    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
224    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
225    /// but was unable to get a precise read hold.
226    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
227    /// A humanized version of [`StorageError::RtrTimeout`].
228    RtrTimeout(String),
229    /// A humanized version of [`StorageError::RtrDropFailure`].
230    RtrDropFailure(String),
231    /// The collection requested to be sinked cannot be read at any timestamp
232    UnreadableSinkCollection,
233    /// User sessions have been blocked.
234    UserSessionsDisallowed,
235    /// This use session has been deneid by a NetworkPolicy.
236    NetworkPolicyDenied(NetworkPolicyError),
237    /// Something attempted a write (to catalog, storage, tables, etc.) while in
238    /// read-only mode.
239    ReadOnly,
240    AlterClusterTimeout,
241    AlterClusterWhilePendingReplicas,
242    AuthenticationError(AuthenticationError),
243}
244
245#[derive(Debug, thiserror::Error)]
246pub enum AuthenticationError {
247    #[error("invalid credentials")]
248    InvalidCredentials,
249    #[error("role is not allowed to login")]
250    NonLogin,
251    #[error("role does not exist")]
252    RoleNotFound,
253    #[error("password is required")]
254    PasswordRequired,
255}
256
257impl AdapterError {
258    pub fn into_response(self, severity: Severity) -> ErrorResponse {
259        ErrorResponse {
260            severity,
261            code: self.code(),
262            message: self.to_string(),
263            detail: self.detail(),
264            hint: self.hint(),
265            position: self.position(),
266        }
267    }
268
269    pub fn position(&self) -> Option<usize> {
270        match self {
271            AdapterError::ParseError(err) => Some(err.error.pos),
272            _ => None,
273        }
274    }
275
276    /// Reports additional details about the error, if any are available.
277    pub fn detail(&self) -> Option<String> {
278        match self {
279            AdapterError::AmbiguousSystemColumnReference => {
280                Some("This is a current limitation in Materialize".into())
281            },
282            AdapterError::Catalog(c) => c.detail(),
283            AdapterError::Eval(e) => e.detail(),
284            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
285                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
286                relations
287                    .iter()
288                    .map(|r| r.quoted().to_string())
289                    .collect::<Vec<_>>()
290                    .join("\n"),
291                match names.is_empty() {
292                    true => "No relations are available.".to_string(),
293                    false => format!(
294                        "Only the following relations are available:\n{}",
295                        names
296                            .iter()
297                            .map(|name| name.quoted().to_string())
298                            .collect::<Vec<_>>()
299                            .join("\n")
300                    ),
301                }
302            )),
303            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
304                "Either specify the cluster that will maintain this object via IN CLUSTER or \
305                specify size via SIZE option."
306                    .into(),
307            ),
308            AdapterError::SafeModeViolation(_) => Some(
309                "The Materialize server you are connected to is running in \
310                 safe mode, which limits the features that are available."
311                    .into(),
312            ),
313            AdapterError::IntrospectionDisabled { log_names }
314            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
315                "The query references the following log sources:\n    {}",
316                log_names.join("\n    "),
317            )),
318            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
319                "The object depends on the following log sources:\n    {}",
320                log_names.join("\n    "),
321            )),
322            AdapterError::PlanError(e) => e.detail(),
323            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
324            AdapterError::DependentObject(dependent_objects) => {
325                Some(dependent_objects
326                    .iter()
327                    .map(|(role_name, err_msgs)| err_msgs
328                        .iter()
329                        .map(|err_msg| format!("{role_name}: {err_msg}"))
330                        .join("\n"))
331                    .join("\n"))
332            },
333            AdapterError::Storage(storage_error) => {
334                storage_error.source().map(|source_error| source_error.to_string_with_causes())
335            }
336            AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
337            AdapterError::InvalidAlter(_, e) => e.detail(),
338            AdapterError::Optimizer(e) => e.detail(),
339            AdapterError::ConnectionValidation(e) => e.detail(),
340            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
341                Some(format!(
342                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
343                    view is {}.",
344                    last_refresh,
345                    earliest_possible,
346                ))
347            }
348            AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
349                format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
350            ),
351            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
352                Some(format!(
353                    "The requested REFRESH AT time is {}, \
354                    but not all input collections are readable earlier than [{}].",
355                    oracle_read_ts,
356                    if least_valid_read.len() == 1 {
357                        format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
358                    } else {
359                        // This can't occur currently
360                        format!("{:?}", least_valid_read)
361                    }
362                ))
363            }
364            AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
365            AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
366            AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
367            AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
368            _ => None,
369        }
370    }
371
372    /// Reports a hint for the user about how the error could be fixed.
373    pub fn hint(&self) -> Option<String> {
374        match self {
375            AdapterError::AmbiguousSystemColumnReference => Some(
376                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
377                convert all NATURAL JOINs to USING joins."
378                    .to_string(),
379            ),
380            AdapterError::Catalog(c) => c.hint(),
381            AdapterError::Eval(e) => e.hint(),
382            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
383                Some(if expected.is_empty() {
384                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
385                } else {
386                    format!("Valid availability zones are: {}", expected.join(", "))
387                })
388            }
389            AdapterError::InvalidStorageClusterSize { expected, .. } => {
390                Some(format!("Valid sizes are: {}", expected.join(", ")))
391            }
392            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
393                "Try choosing one of the smaller sizes to start. Available sizes: {}",
394                expected.join(", ")
395            )),
396            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
397                Some(if *is_managed {
398                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
399                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
400                } else {
401                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
402                })
403            }
404            AdapterError::UntargetedLogRead { .. } => Some(
405                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
406                 active cluster. Note that subsequent queries will only be answered by \
407                 the selected replica, which might reduce availability. To undo the replica \
408                 selection, use `RESET cluster_replica`."
409                    .into(),
410            ),
411            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
412                "Drop an existing {resource_type} or contact support to request a limit increase."
413            )),
414            AdapterError::StatementTimeout => Some(
415                "Consider increasing the maximum allowed statement duration for this session by \
416                 setting the statement_timeout session variable. For example, `SET \
417                 statement_timeout = '120s'`."
418                    .into(),
419            ),
420            AdapterError::PlanError(e) => e.hint(),
421            AdapterError::UnallowedOnCluster { cluster, .. } => {
422                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
423                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
424                    .to_string()
425                )
426            }
427            AdapterError::InvalidAlter(_, e) => e.hint(),
428            AdapterError::Optimizer(e) => e.hint(),
429            AdapterError::ConnectionValidation(e) => e.hint(),
430            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
431                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
432                 either at the explicitly specified timestamp, or now if the given timestamp would \
433                 be in the past.".to_string()
434            ),
435            AdapterError::AlterClusterTimeout => Some(
436                "Consider increasing the timeout duration in the alter cluster statement.".into(),
437            ),
438            AdapterError::DDLTransactionRace => Some(
439                "Currently, DDL transactions fail when any other DDL happens concurrently, \
440                 even on unrelated schemas/clusters.".into()
441            ),
442            _ => None,
443        }
444    }
445
446    pub fn code(&self) -> SqlState {
447        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
448        // those errors that are truly internal errors. At the moment we have
449        // a various classes of uncategorized errors that use this error code
450        // inappropriately.
451        match self {
452            // DATA_EXCEPTION to match what Postgres returns for degenerate
453            // range bounds
454            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
455            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
456            AdapterError::Catalog(e) => match &e.kind {
457                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
458                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
459                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
460                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
461                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
462                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
463                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
464                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
465                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
466                },
467                _ => SqlState::INTERNAL_ERROR,
468            },
469            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
470            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
471            AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
472                SqlState::PROGRAM_LIMIT_EXCEEDED
473            }
474            AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
475                SqlState::PROGRAM_LIMIT_EXCEEDED
476            }
477            AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
478            AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
479                SqlState::PROGRAM_LIMIT_EXCEEDED
480            }
481            AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
482            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
483            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
484            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
485            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
486            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
487            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
488            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
489            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
490            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
491            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
492            AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
493            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
494            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
495            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
496            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
497            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
498            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
499            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
500            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
501                SqlState::DUPLICATE_COLUMN
502            }
503            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
504                SqlState::UNDEFINED_PARAMETER
505            }
506            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
507                SqlState::UNDEFINED_PARAMETER
508            }
509            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
510            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
511            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
512            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
513            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
514            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
515            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
516            AdapterError::Canceled => SqlState::QUERY_CANCELED,
517            AdapterError::IdleInTransactionSessionTimeout => {
518                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
519            }
520            AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
521            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
522            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
523            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
524            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
525            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
526            AdapterError::Optimizer(e) => match e {
527                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
528                    SqlState::INVALID_SCHEMA_NAME
529                }
530                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
531                    SqlState::DUPLICATE_COLUMN
532                }
533                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
534                    SqlState::UNDEFINED_PARAMETER
535                }
536                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
537                    SqlState::UNDEFINED_PARAMETER
538                }
539                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
540                OptimizerError::RecursionLimitError(e) => {
541                    AdapterError::RecursionLimit(e.clone()).code() // Delegate to outer
542                }
543                OptimizerError::Internal(s) => {
544                    AdapterError::Internal(s.clone()).code() // Delegate to outer
545                }
546                OptimizerError::EvalError(e) => {
547                    AdapterError::Eval(e.clone()).code() // Delegate to outer
548                }
549                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
550                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
551                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
552                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
553                // This should be handled by peek optimization, so it's an internal error if it
554                // reaches the user.
555                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
556            },
557            AdapterError::UnallowedOnCluster { .. } => {
558                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
559            }
560            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
561            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
562            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
563            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
564            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
565            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
566            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
567            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
568            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
569            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
570            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
571            AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
572            // It's not immediately clear which error code to use here because a
573            // "write-only transaction", "single table write transaction", or "ddl only
574            // transaction" are not things in Postgres. This error code is the generic "bad txn
575            // thing" code, so it's probably the best choice.
576            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
577            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
578            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
579                SqlState::INTERNAL_ERROR
580            }
581            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
582            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
583            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
584            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
585            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
586            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
587            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
588            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
589            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
590            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
591            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
592            // In read-only mode all transactions are implicitly read-only
593            // transactions.
594            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
595            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
596            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
597            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
598                SqlState::INVALID_PASSWORD
599            }
600            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
601        }
602    }
603
604    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
605        AdapterError::Internal(format!("{context}: {e}"))
606    }
607}
608
609impl fmt::Display for AdapterError {
610    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
611        match self {
612            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
613                assert!(up_to < as_of);
614                write!(
615                    f,
616                    r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
617                    up_to, as_of
618                )
619            }
620            AdapterError::AmbiguousSystemColumnReference => {
621                write!(
622                    f,
623                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
624                    system objects"
625                )
626            }
627            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
628            AdapterError::Catalog(e) => e.fmt(f),
629            AdapterError::DuplicateCursor(name) => {
630                write!(f, "cursor {} already exists", name.quoted())
631            }
632            AdapterError::Eval(e) => e.fmt(f),
633            AdapterError::Explain(e) => e.fmt(f),
634            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
635            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
636            AdapterError::IntrospectionDisabled { .. } => write!(
637                f,
638                "cannot read log sources of replica with disabled introspection"
639            ),
640            AdapterError::InvalidLogDependency { object_type, .. } => {
641                write!(f, "{object_type} objects cannot depend on log sources")
642            }
643            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
644                write!(f, "unknown cluster replica availability zone {az}",)
645            }
646            AdapterError::InvalidSetIsolationLevel => write!(
647                f,
648                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
649            ),
650            AdapterError::InvalidSetCluster => {
651                write!(f, "SET cluster cannot be called in an active transaction")
652            }
653            AdapterError::InvalidStorageClusterSize { size, .. } => {
654                write!(f, "unknown source size {size}")
655            }
656            AdapterError::SourceOrSinkSizeRequired { .. } => {
657                write!(f, "must specify either cluster or size option")
658            }
659            AdapterError::InvalidTableMutationSelection => {
660                f.write_str("invalid selection: operation may only refer to user-defined tables")
661            }
662            AdapterError::ConstraintViolation(not_null_violation) => {
663                write!(f, "{}", not_null_violation)
664            }
665            AdapterError::ConcurrentClusterDrop => {
666                write!(f, "the transaction's active cluster has been dropped")
667            }
668            AdapterError::NoClusterReplicasAvailable { name, .. } => {
669                write!(
670                    f,
671                    "CLUSTER {} has no replicas available to service request",
672                    name.quoted()
673                )
674            }
675            AdapterError::OperationProhibitsTransaction(op) => {
676                write!(f, "{} cannot be run inside a transaction block", op)
677            }
678            AdapterError::OperationRequiresTransaction(op) => {
679                write!(f, "{} can only be used in transaction blocks", op)
680            }
681            AdapterError::ParseError(e) => e.fmt(f),
682            AdapterError::PlanError(e) => e.fmt(f),
683            AdapterError::PreparedStatementExists(name) => {
684                write!(f, "prepared statement {} already exists", name.quoted())
685            }
686            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
687            AdapterError::SingleStatementTransaction => {
688                f.write_str("this transaction can only execute a single statement")
689            }
690            AdapterError::ReadWriteUnavailable => {
691                f.write_str("transaction read-write mode must be set before any query")
692            }
693            AdapterError::WrongSetOfLocks => {
694                write!(f, "internal error, wrong set of locks acquired")
695            }
696            AdapterError::StatementTimeout => {
697                write!(f, "canceling statement due to statement timeout")
698            }
699            AdapterError::Canceled => {
700                write!(f, "canceling statement due to user request")
701            }
702            AdapterError::IdleInTransactionSessionTimeout => {
703                write!(
704                    f,
705                    "terminating connection due to idle-in-transaction timeout"
706                )
707            }
708            AdapterError::RecursionLimit(e) => e.fmt(f),
709            AdapterError::RelationOutsideTimeDomain { .. } => {
710                write!(
711                    f,
712                    "Transactions can only reference objects in the same timedomain. \
713                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
714                )
715            }
716            AdapterError::ResourceExhaustion {
717                resource_type,
718                limit_name,
719                desired,
720                limit,
721                current,
722            } => {
723                write!(
724                    f,
725                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
726                )
727            }
728            AdapterError::ResultSize(e) => write!(f, "{e}"),
729            AdapterError::SafeModeViolation(feature) => {
730                write!(f, "cannot create {} in safe mode", feature)
731            }
732            AdapterError::SubscribeOnlyTransaction => {
733                f.write_str("SUBSCRIBE in transactions must be the only read statement")
734            }
735            AdapterError::Optimizer(e) => e.fmt(f),
736            AdapterError::UnallowedOnCluster {
737                depends_on,
738                cluster,
739            } => {
740                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
741                write!(
742                    f,
743                    "querying the following items {items} is not allowed from the {} cluster",
744                    cluster.quoted()
745                )
746            }
747            AdapterError::Unauthorized(unauthorized) => {
748                write!(f, "{unauthorized}")
749            }
750            AdapterError::UnknownCursor(name) => {
751                write!(f, "cursor {} does not exist", name.quoted())
752            }
753            AdapterError::UnknownLoginRole(name) => {
754                write!(f, "role {} does not exist", name.quoted())
755            }
756            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
757            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
758            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
759            AdapterError::UnknownPreparedStatement(name) => {
760                write!(f, "prepared statement {} does not exist", name.quoted())
761            }
762            AdapterError::UnknownClusterReplica {
763                cluster_name,
764                replica_name,
765            } => write!(
766                f,
767                "cluster replica '{cluster_name}.{replica_name}' does not exist"
768            ),
769            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
770                f,
771                "unrecognized configuration parameter {}",
772                setting_name.quoted()
773            ),
774            AdapterError::UntargetedLogRead { .. } => {
775                f.write_str("log source reads must target a replica")
776            }
777            AdapterError::DDLOnlyTransaction => f.write_str(
778                "transactions which modify objects are restricted to just modifying objects",
779            ),
780            AdapterError::DDLTransactionRace => f.write_str(
781                "another session modified the catalog while this DDL transaction was open",
782            ),
783            AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
784            AdapterError::Storage(e) => e.fmt(f),
785            AdapterError::Compute(e) => e.fmt(f),
786            AdapterError::Orchestrator(e) => e.fmt(f),
787            AdapterError::DependentObject(dependent_objects) => {
788                let role_str = if dependent_objects.keys().count() == 1 {
789                    "role"
790                } else {
791                    "roles"
792                };
793                write!(
794                    f,
795                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
796                    dependent_objects.keys().join(", ")
797                )
798            }
799            AdapterError::InvalidAlter(t, e) => {
800                write!(f, "invalid ALTER {t}: {e}")
801            }
802            AdapterError::ConnectionValidation(e) => e.fmt(f),
803            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
804                write!(
805                    f,
806                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
807                    would never happen"
808                )
809            }
810            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
811                write!(
812                    f,
813                    "REFRESH AT requested for a time where not all the inputs are readable"
814                )
815            }
816            AdapterError::RtrTimeout(_) => {
817                write!(
818                    f,
819                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
820                )
821            }
822            AdapterError::RtrDropFailure(_) => write!(
823                f,
824                "real-time source dropped before ingesting the upstream system's visible frontier"
825            ),
826            AdapterError::UnreadableSinkCollection => {
827                write!(f, "collection is not readable at any time")
828            }
829            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
830            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
831            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
832            AdapterError::AlterClusterTimeout => {
833                write!(f, "canceling statement, provided timeout lapsed")
834            }
835            AdapterError::AuthenticationError(e) => {
836                write!(f, "authentication error {e}")
837            }
838            AdapterError::UnavailableFeature { feature, docs } => {
839                write!(f, "{} is not supported in this environment.", feature)?;
840                if let Some(docs) = docs {
841                    write!(
842                        f,
843                        " For more information consult the documentation at {docs}"
844                    )?;
845                }
846                Ok(())
847            }
848            AdapterError::AlterClusterWhilePendingReplicas => {
849                write!(f, "cannot alter clusters with pending updates")
850            }
851        }
852    }
853}
854
855impl From<anyhow::Error> for AdapterError {
856    fn from(e: anyhow::Error) -> AdapterError {
857        match e.downcast::<PlanError>() {
858            Ok(plan_error) => AdapterError::PlanError(plan_error),
859            Err(e) => AdapterError::Unstructured(e),
860        }
861    }
862}
863
864impl From<TryFromIntError> for AdapterError {
865    fn from(e: TryFromIntError) -> AdapterError {
866        AdapterError::Unstructured(e.into())
867    }
868}
869
870impl From<TryFromDecimalError> for AdapterError {
871    fn from(e: TryFromDecimalError) -> AdapterError {
872        AdapterError::Unstructured(e.into())
873    }
874}
875
876impl From<mz_catalog::memory::error::Error> for AdapterError {
877    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
878        AdapterError::Catalog(e)
879    }
880}
881
882impl From<mz_catalog::durable::CatalogError> for AdapterError {
883    fn from(e: mz_catalog::durable::CatalogError) -> Self {
884        mz_catalog::memory::error::Error::from(e).into()
885    }
886}
887
888impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
889    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
890        mz_catalog::durable::CatalogError::from(e).into()
891    }
892}
893
894impl From<EvalError> for AdapterError {
895    fn from(e: EvalError) -> AdapterError {
896        AdapterError::Eval(e)
897    }
898}
899
900impl From<ExplainError> for AdapterError {
901    fn from(e: ExplainError) -> AdapterError {
902        match e {
903            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
904            e => AdapterError::Explain(e),
905        }
906    }
907}
908
909impl From<mz_sql::catalog::CatalogError> for AdapterError {
910    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
911        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
912    }
913}
914
915impl From<PlanError> for AdapterError {
916    fn from(e: PlanError) -> AdapterError {
917        match e {
918            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
919            _ => AdapterError::PlanError(e),
920        }
921    }
922}
923
924impl From<OptimizerError> for AdapterError {
925    fn from(e: OptimizerError) -> AdapterError {
926        use OptimizerError::*;
927        match e {
928            PlanError(e) => Self::PlanError(e),
929            RecursionLimitError(e) => Self::RecursionLimit(e),
930            EvalError(e) => Self::Eval(e),
931            InternalUnsafeMfpPlan(e) => Self::Internal(e),
932            Internal(e) => Self::Internal(e),
933            e => Self::Optimizer(e),
934        }
935    }
936}
937
938impl From<NotNullViolation> for AdapterError {
939    fn from(e: NotNullViolation) -> AdapterError {
940        AdapterError::ConstraintViolation(e)
941    }
942}
943
944impl From<RecursionLimitError> for AdapterError {
945    fn from(e: RecursionLimitError) -> AdapterError {
946        AdapterError::RecursionLimit(e)
947    }
948}
949
950impl From<oneshot::error::RecvError> for AdapterError {
951    fn from(e: oneshot::error::RecvError) -> AdapterError {
952        AdapterError::Unstructured(e.into())
953    }
954}
955
956impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
957    fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
958        AdapterError::Storage(e)
959    }
960}
961
962impl From<compute_error::InstanceExists> for AdapterError {
963    fn from(e: compute_error::InstanceExists) -> Self {
964        AdapterError::Compute(e.into())
965    }
966}
967
968impl From<TimestampError> for AdapterError {
969    fn from(e: TimestampError) -> Self {
970        let e: EvalError = e.into();
971        e.into()
972    }
973}
974
975impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
976    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
977        AdapterError::ParseError(e)
978    }
979}
980
981impl From<VarError> for AdapterError {
982    fn from(e: VarError) -> Self {
983        let e: mz_catalog::memory::error::Error = e.into();
984        e.into()
985    }
986}
987
988impl From<rbac::UnauthorizedError> for AdapterError {
989    fn from(e: rbac::UnauthorizedError) -> Self {
990        AdapterError::Unauthorized(e)
991    }
992}
993
994impl From<mz_sql_parser::ast::IdentError> for AdapterError {
995    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
996        AdapterError::PlanError(PlanError::InvalidIdent(value))
997    }
998}
999
1000impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1001    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1002        match value {
1003            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1004                AdapterError::ResourceExhaustion {
1005                    resource_type: "connection".into(),
1006                    limit_name: "max_connections".into(),
1007                    desired: (current + 1).to_string(),
1008                    limit: limit.to_string(),
1009                    current: current.to_string(),
1010                }
1011            }
1012        }
1013    }
1014}
1015
1016impl From<NetworkPolicyError> for AdapterError {
1017    fn from(value: NetworkPolicyError) -> Self {
1018        AdapterError::NetworkPolicyDenied(value)
1019    }
1020}
1021
1022impl From<ConnectionValidationError> for AdapterError {
1023    fn from(e: ConnectionValidationError) -> AdapterError {
1024        AdapterError::ConnectionValidation(e)
1025    }
1026}
1027
1028impl Error for AdapterError {}