Skip to main content

mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::InstanceMissing;
20
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::timestamp::TimestampError;
28use mz_repr::explain::ExplainError;
29use mz_repr::{ColumnDiff, ColumnName, KeyDiff, NotNullViolation, RelationDescDiff, Timestamp};
30use mz_sql::plan::PlanError;
31use mz_sql::rbac;
32use mz_sql::session::vars::VarError;
33use mz_storage_types::connections::ConnectionValidationError;
34use mz_storage_types::controller::StorageError;
35use mz_storage_types::errors::CollectionMissing;
36use smallvec::SmallVec;
37use timely::progress::Antichain;
38use tokio::sync::oneshot;
39use tokio_postgres::error::SqlState;
40
41use crate::coord::NetworkPolicyError;
42use crate::optimize::OptimizerError;
43use crate::peek_client::CollectionLookupError;
44
45/// Errors that can occur in the coordinator.
46#[derive(Debug)]
47pub enum AdapterError {
48    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
49    AbsurdSubscribeBounds {
50        as_of: mz_repr::Timestamp,
51        up_to: mz_repr::Timestamp,
52    },
53    /// Attempted to use a potentially ambiguous column reference expression with a system table.
54    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
55    // resolved because it prevents us from adding columns to system tables.
56    AmbiguousSystemColumnReference,
57    /// An error occurred in a catalog operation.
58    Catalog(mz_catalog::memory::error::Error),
59    /// 1. The cached plan or descriptor changed,
60    /// 2. or some dependency of a statement disappeared during sequencing.
61    /// TODO(ggevay): we should refactor 2. usages to use `ConcurrentDependencyDrop` instead
62    /// (e.g., in MV sequencing)
63    ChangedPlan(String),
64    /// The cursor already exists.
65    DuplicateCursor(String),
66    /// An error while evaluating an expression.
67    Eval(EvalError),
68    /// An error occurred while planning the statement.
69    Explain(ExplainError),
70    /// The ID allocator exhausted all valid IDs.
71    IdExhaustionError,
72    /// Unexpected internal state was encountered.
73    Internal(String),
74    /// Attempted to read from log sources of a replica with disabled introspection.
75    IntrospectionDisabled {
76        log_names: Vec<String>,
77    },
78    /// Attempted to create an object dependent on log sources that doesn't support
79    /// log dependencies.
80    InvalidLogDependency {
81        object_type: String,
82        log_names: Vec<String>,
83    },
84    /// No such cluster replica size has been configured.
85    InvalidClusterReplicaAz {
86        az: String,
87        expected: Vec<String>,
88    },
89    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
90    InvalidSetIsolationLevel,
91    /// SET cluster was called in the middle of a transaction.
92    InvalidSetCluster,
93    /// No such storage instance size has been configured.
94    InvalidStorageClusterSize {
95        size: String,
96        expected: Vec<String>,
97    },
98    /// Creating a source or sink without specifying its size is forbidden.
99    SourceOrSinkSizeRequired {
100        expected: Vec<String>,
101    },
102    /// The selection value for a table mutation operation refers to an invalid object.
103    InvalidTableMutationSelection {
104        /// The full name of the problematic object (e.g. a source or source-export table).
105        object_name: String,
106        /// Human-readable type of the object (e.g. "source", "source-export table").
107        object_type: String,
108    },
109    /// Expression violated a column's constraint
110    ConstraintViolation(NotNullViolation),
111    /// Transaction cluster was dropped in the middle of a transaction.
112    ConcurrentClusterDrop,
113    /// A dependency was dropped while sequencing a statement.
114    ConcurrentDependencyDrop {
115        dependency_kind: &'static str,
116        dependency_id: String,
117    },
118    CollectionUnreadable {
119        id: String,
120    },
121    /// Target cluster has no replicas to service query.
122    NoClusterReplicasAvailable {
123        name: String,
124        is_managed: bool,
125    },
126    /// The named operation cannot be run in a transaction.
127    OperationProhibitsTransaction(String),
128    /// The named operation requires an active transaction.
129    OperationRequiresTransaction(String),
130    /// An error occurred while planning the statement.
131    PlanError(PlanError),
132    /// The named prepared statement already exists.
133    PreparedStatementExists(String),
134    /// Wrapper around parsing error
135    ParseError(mz_sql_parser::parser::ParserStatementError),
136    /// The transaction is in read-only mode.
137    ReadOnlyTransaction,
138    /// The transaction in in read-only mode and a read already occurred.
139    ReadWriteUnavailable,
140    /// The recursion limit of some operation was exceeded.
141    RecursionLimit(RecursionLimitError),
142    /// A query in a transaction referenced a relation outside the first query's
143    /// time domain.
144    RelationOutsideTimeDomain {
145        relations: Vec<String>,
146        names: Vec<String>,
147    },
148    /// A query tried to create more resources than is allowed in the system configuration.
149    ResourceExhaustion {
150        resource_type: String,
151        limit_name: String,
152        desired: String,
153        limit: String,
154        current: String,
155    },
156    /// Result size of a query is too large.
157    ResultSize(String),
158    /// The specified feature is not permitted in safe mode.
159    SafeModeViolation(String),
160    /// The current transaction had the wrong set of write locks.
161    WrongSetOfLocks,
162    /// Waiting on a query timed out.
163    ///
164    /// Note this differs slightly from PG's implementation/semantics.
165    StatementTimeout,
166    /// The user canceled the query
167    Canceled,
168    /// An idle session in a transaction has timed out.
169    IdleInTransactionSessionTimeout,
170    /// The transaction is in single-subscribe mode.
171    SubscribeOnlyTransaction,
172    /// An error occurred in the optimizer.
173    Optimizer(OptimizerError),
174    /// A query depends on items which are not allowed to be referenced from the current cluster.
175    UnallowedOnCluster {
176        depends_on: SmallVec<[String; 2]>,
177        cluster: String,
178    },
179    /// A user tried to perform an action that they were unauthorized to do.
180    Unauthorized(rbac::UnauthorizedError),
181    /// The named cursor does not exist.
182    UnknownCursor(String),
183    /// The named role does not exist.
184    UnknownLoginRole(String),
185    UnknownPreparedStatement(String),
186    /// The named cluster replica does not exist.
187    UnknownClusterReplica {
188        cluster_name: String,
189        replica_name: String,
190    },
191    /// The named setting does not exist.
192    UnrecognizedConfigurationParam(String),
193    /// A generic error occurred.
194    //
195    // TODO(benesch): convert all those errors to structured errors.
196    Unstructured(anyhow::Error),
197    /// The named feature is not supported and will (probably) not be.
198    Unsupported(&'static str),
199    /// Some feature isn't available for a (potentially opaque) reason.
200    /// For example, in cloud Self-Managed auth features aren't available,
201    /// but we don't want to mention self managed auth.
202    UnavailableFeature {
203        feature: String,
204        docs: Option<String>,
205    },
206    /// Attempted to read from log sources without selecting a target replica.
207    UntargetedLogRead {
208        log_names: Vec<String>,
209    },
210    /// The transaction is in write-only mode.
211    WriteOnlyTransaction,
212    /// The transaction can only execute a single statement.
213    SingleStatementTransaction,
214    /// The transaction can only execute simple DDL.
215    DDLOnlyTransaction,
216    /// Another session modified the Catalog while this transaction was open.
217    DDLTransactionRace,
218    /// Used to prevent us from durably committing state while a DDL transaction is open, should
219    /// never be returned to the user.
220    TransactionDryRun {
221        /// New operations that were run in the transaction.
222        new_ops: Vec<crate::catalog::Op>,
223        /// New resulting `CatalogState`.
224        new_state: crate::catalog::CatalogState,
225    },
226    /// An error occurred in the storage layer
227    Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
228    /// An error occurred in the compute layer
229    Compute(anyhow::Error),
230    /// An error in the orchestrator layer
231    Orchestrator(anyhow::Error),
232    /// A statement tried to drop a role that had dependent objects.
233    ///
234    /// The map keys are role names and values are detailed error messages.
235    DependentObject(BTreeMap<String, Vec<String>>),
236    /// When performing an `ALTER` of some variety, re-planning the statement
237    /// errored.
238    InvalidAlter(&'static str, PlanError),
239    /// An error occurred while validating a connection.
240    ConnectionValidation(ConnectionValidationError),
241    /// We refuse to create the materialized view, because it would never be refreshed, so it would
242    /// never be queryable. This can happen when the only specified refreshes are further back in
243    /// the past than the initial compaction window of the materialized view.
244    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
245    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
246    /// but was unable to get a precise read hold.
247    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
248    /// A humanized version of [`StorageError::RtrTimeout`].
249    RtrTimeout(String),
250    /// A humanized version of [`StorageError::RtrDropFailure`].
251    RtrDropFailure(String),
252    /// The collection requested to be sinked cannot be read at any timestamp
253    UnreadableSinkCollection,
254    /// User sessions have been blocked.
255    UserSessionsDisallowed,
256    /// This use session has been deneid by a NetworkPolicy.
257    NetworkPolicyDenied(NetworkPolicyError),
258    /// Something attempted a write (to catalog, storage, tables, etc.) while in
259    /// read-only mode.
260    ReadOnly,
261    AlterClusterTimeout,
262    AlterClusterWhilePendingReplicas,
263    AuthenticationError(AuthenticationError),
264    /// Schema of a replacement is incompatible with the target.
265    ReplacementSchemaMismatch(RelationDescDiff),
266    /// Attempt to apply a replacement to a sealed materialized view.
267    ReplaceMaterializedViewSealed {
268        name: String,
269    },
270}
271
272#[derive(Debug, thiserror::Error)]
273pub enum AuthenticationError {
274    #[error("invalid credentials")]
275    InvalidCredentials,
276    #[error("role is not allowed to login")]
277    NonLogin,
278    #[error("role does not exist")]
279    RoleNotFound,
280    #[error("password is required")]
281    PasswordRequired,
282}
283
284impl AdapterError {
285    pub fn into_response(self, severity: Severity) -> ErrorResponse {
286        ErrorResponse {
287            severity,
288            code: self.code(),
289            message: self.to_string(),
290            detail: self.detail(),
291            hint: self.hint(),
292            position: self.position(),
293        }
294    }
295
296    pub fn position(&self) -> Option<usize> {
297        match self {
298            AdapterError::ParseError(err) => Some(err.error.pos),
299            _ => None,
300        }
301    }
302
303    /// Reports additional details about the error, if any are available.
304    pub fn detail(&self) -> Option<String> {
305        match self {
306            AdapterError::AmbiguousSystemColumnReference => {
307                Some("This is a current limitation in Materialize".into())
308            }
309            AdapterError::Catalog(c) => c.detail(),
310            AdapterError::Eval(e) => e.detail(),
311            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
312                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
313                relations
314                    .iter()
315                    .map(|r| r.quoted().to_string())
316                    .collect::<Vec<_>>()
317                    .join("\n"),
318                match names.is_empty() {
319                    true => "No relations are available.".to_string(),
320                    false => format!(
321                        "Only the following relations are available:\n{}",
322                        names
323                            .iter()
324                            .map(|name| name.quoted().to_string())
325                            .collect::<Vec<_>>()
326                            .join("\n")
327                    ),
328                }
329            )),
330            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
331                "Either specify the cluster that will maintain this object via IN CLUSTER or \
332                specify size via SIZE option."
333                    .into(),
334            ),
335            AdapterError::InvalidTableMutationSelection {
336                object_name,
337                object_type,
338            } => Some(format!(
339                "{object_type} '{}' may not be used in this operation; \
340                     the selection may refer to views and materialized views, but transitive \
341                     dependencies must not include sources or source-export tables",
342                object_name.quoted()
343            )),
344            AdapterError::SafeModeViolation(_) => Some(
345                "The Materialize server you are connected to is running in \
346                 safe mode, which limits the features that are available."
347                    .into(),
348            ),
349            AdapterError::IntrospectionDisabled { log_names }
350            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
351                "The query references the following log sources:\n    {}",
352                log_names.join("\n    "),
353            )),
354            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
355                "The object depends on the following log sources:\n    {}",
356                log_names.join("\n    "),
357            )),
358            AdapterError::PlanError(e) => e.detail(),
359            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
360            AdapterError::DependentObject(dependent_objects) => Some(
361                dependent_objects
362                    .iter()
363                    .map(|(role_name, err_msgs)| {
364                        err_msgs
365                            .iter()
366                            .map(|err_msg| format!("{role_name}: {err_msg}"))
367                            .join("\n")
368                    })
369                    .join("\n"),
370            ),
371            AdapterError::Storage(storage_error) => storage_error
372                .source()
373                .map(|source_error| source_error.to_string_with_causes()),
374            AdapterError::ReadOnlyTransaction => Some(
375                "SELECT queries cannot be combined with other query types, including SUBSCRIBE."
376                    .into(),
377            ),
378            AdapterError::InvalidAlter(_, e) => e.detail(),
379            AdapterError::Optimizer(e) => e.detail(),
380            AdapterError::ConnectionValidation(e) => e.detail(),
381            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
382                Some(format!(
383                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
384                    view is {}.",
385                    last_refresh, earliest_possible,
386                ))
387            }
388            AdapterError::UnallowedOnCluster { cluster, .. } => {
389                (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(|| {
390                    format!(
391                        "The transaction is executing on the \
392                        {cluster} cluster, maybe having been routed \
393                        there by the first statement in the transaction."
394                    )
395                })
396            }
397            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
398                Some(format!(
399                    "The requested REFRESH AT time is {}, \
400                    but not all input collections are readable earlier than [{}].",
401                    oracle_read_ts,
402                    if least_valid_read.len() == 1 {
403                        format!(
404                            "{}",
405                            least_valid_read
406                                .as_option()
407                                .expect("antichain contains exactly 1 timestamp")
408                        )
409                    } else {
410                        // This can't occur currently
411                        format!("{:?}", least_valid_read)
412                    }
413                ))
414            }
415            AdapterError::RtrTimeout(name) => Some(format!(
416                "{name} failed to ingest data up to the real-time recency point"
417            )),
418            AdapterError::RtrDropFailure(name) => Some(format!(
419                "{name} dropped before ingesting data to the real-time recency point"
420            )),
421            AdapterError::UserSessionsDisallowed => {
422                Some("Your organization has been blocked. Please contact support.".to_string())
423            }
424            AdapterError::NetworkPolicyDenied(reason) => Some(format!("{reason}.")),
425            AdapterError::ReplacementSchemaMismatch(diff) => {
426                let mut lines: Vec<_> = diff.column_diffs.iter().map(|(idx, diff)| {
427                    let pos = idx + 1;
428                    match diff {
429                        ColumnDiff::Missing { name } => {
430                            let name = name.as_str().quoted();
431                            format!("missing column {name} at position {pos}")
432                        }
433                        ColumnDiff::Extra { name } => {
434                            let name = name.as_str().quoted();
435                            format!("extra column {name} at position {pos}")
436                        }
437                        ColumnDiff::TypeMismatch { name, left, right } => {
438                            let name = name.as_str().quoted();
439                            format!("column {name} at position {pos}: type mismatch (target: {left:?}, replacement: {right:?})")
440                        }
441                        ColumnDiff::NullabilityMismatch { name, left, right } => {
442                            let name = name.as_str().quoted();
443                            let left = if *left { "NULL" } else { "NOT NULL" };
444                            let right = if *right { "NULL" } else { "NOT NULL" };
445                            format!("column {name} at position {pos}: nullability mismatch (target: {left}, replacement: {right})")
446                        }
447                        ColumnDiff::NameMismatch { left, right } => {
448                            let left = left.as_str().quoted();
449                            let right = right.as_str().quoted();
450                            format!("column at position {pos}: name mismatch (target: {left}, replacement: {right})")
451                        }
452                    }
453                }).collect();
454
455                if let Some(KeyDiff { left, right }) = &diff.key_diff {
456                    let format_keys = |keys: &BTreeSet<Vec<ColumnName>>| {
457                        if keys.is_empty() {
458                            "(none)".to_string()
459                        } else {
460                            keys.iter()
461                                .map(|key| {
462                                    let cols = key.iter().map(|c| c.as_str()).join(", ");
463                                    format!("{{{cols}}}")
464                                })
465                                .join(", ")
466                        }
467                    };
468                    lines.push(format!(
469                        "keys differ (target: {}, replacement: {})",
470                        format_keys(left),
471                        format_keys(right)
472                    ));
473                }
474                Some(lines.join("\n"))
475            }
476            AdapterError::ReplaceMaterializedViewSealed { .. } => Some(
477                "The materialized view has already computed its output until the end of time, \
478                 so replacing its definition would have no effect."
479                    .into(),
480            ),
481            _ => None,
482        }
483    }
484
485    /// Reports a hint for the user about how the error could be fixed.
486    pub fn hint(&self) -> Option<String> {
487        match self {
488            AdapterError::AmbiguousSystemColumnReference => Some(
489                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
490                convert all NATURAL JOINs to USING joins."
491                    .to_string(),
492            ),
493            AdapterError::Catalog(c) => c.hint(),
494            AdapterError::Eval(e) => e.hint(),
495            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
496                Some(if expected.is_empty() {
497                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
498                } else {
499                    format!("Valid availability zones are: {}", expected.join(", "))
500                })
501            }
502            AdapterError::InvalidStorageClusterSize { expected, .. } => {
503                Some(format!("Valid sizes are: {}", expected.join(", ")))
504            }
505            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
506                "Try choosing one of the smaller sizes to start. Available sizes: {}",
507                expected.join(", ")
508            )),
509            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
510                Some(if *is_managed {
511                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
512                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
513                } else {
514                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
515                })
516            }
517            AdapterError::UntargetedLogRead { .. } => Some(
518                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
519                 active cluster. Note that subsequent queries will only be answered by \
520                 the selected replica, which might reduce availability. To undo the replica \
521                 selection, use `RESET cluster_replica`."
522                    .into(),
523            ),
524            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
525                "Drop an existing {resource_type} or contact support to request a limit increase."
526            )),
527            AdapterError::StatementTimeout => Some(
528                "Consider increasing the maximum allowed statement duration for this session by \
529                 setting the statement_timeout session variable. For example, `SET \
530                 statement_timeout = '120s'`."
531                    .into(),
532            ),
533            AdapterError::PlanError(e) => e.hint(),
534            AdapterError::UnallowedOnCluster { cluster, .. } => {
535                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
536                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
537                    .to_string()
538                )
539            }
540            AdapterError::InvalidAlter(_, e) => e.hint(),
541            AdapterError::Optimizer(e) => e.hint(),
542            AdapterError::ConnectionValidation(e) => e.hint(),
543            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
544                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
545                 either at the explicitly specified timestamp, or now if the given timestamp would \
546                 be in the past.".to_string()
547            ),
548            AdapterError::AlterClusterTimeout => Some(
549                "Consider increasing the timeout duration in the alter cluster statement.".into(),
550            ),
551            AdapterError::DDLTransactionRace => Some(
552                "Currently, DDL transactions fail when any other DDL happens concurrently, \
553                 even on unrelated schemas/clusters.".into()
554            ),
555            AdapterError::CollectionUnreadable { .. } => Some(
556                "This could be because the collection has recently been dropped.".into()
557            ),
558            _ => None,
559        }
560    }
561
562    pub fn code(&self) -> SqlState {
563        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
564        // those errors that are truly internal errors. At the moment we have
565        // a various classes of uncategorized errors that use this error code
566        // inappropriately.
567        match self {
568            // DATA_EXCEPTION to match what Postgres returns for degenerate
569            // range bounds
570            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
571            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
572            AdapterError::Catalog(e) => match &e.kind {
573                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
574                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
575                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
576                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
577                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
578                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
579                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
580                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
581                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
582                },
583                _ => SqlState::INTERNAL_ERROR,
584            },
585            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
586            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
587            AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
588                SqlState::PROGRAM_LIMIT_EXCEEDED
589            }
590            AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
591                SqlState::PROGRAM_LIMIT_EXCEEDED
592            }
593            AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
594            AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
595                SqlState::PROGRAM_LIMIT_EXCEEDED
596            }
597            AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
598            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
599            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
600            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
601            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
602            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
603            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
604            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
605            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
606            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
607            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
608            AdapterError::InvalidTableMutationSelection { .. } => {
609                SqlState::INVALID_TRANSACTION_STATE
610            }
611            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
612            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
613            AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
614            AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
615            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
616            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
617            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
618            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
619            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
620            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
621                SqlState::DUPLICATE_COLUMN
622            }
623            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
624                SqlState::UNDEFINED_PARAMETER
625            }
626            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
627                SqlState::UNDEFINED_PARAMETER
628            }
629            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
630            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
631            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
632            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
633            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
634            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
635            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
636            AdapterError::Canceled => SqlState::QUERY_CANCELED,
637            AdapterError::IdleInTransactionSessionTimeout => {
638                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
639            }
640            AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
641            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
642            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
643            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
644            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
645            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
646            AdapterError::Optimizer(e) => match e {
647                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
648                    SqlState::INVALID_SCHEMA_NAME
649                }
650                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
651                    SqlState::DUPLICATE_COLUMN
652                }
653                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
654                    SqlState::UNDEFINED_PARAMETER
655                }
656                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
657                    SqlState::UNDEFINED_PARAMETER
658                }
659                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
660                OptimizerError::RecursionLimitError(e) => {
661                    AdapterError::RecursionLimit(e.clone()).code() // Delegate to outer
662                }
663                OptimizerError::Internal(s) => {
664                    AdapterError::Internal(s.clone()).code() // Delegate to outer
665                }
666                OptimizerError::EvalError(e) => {
667                    AdapterError::Eval(e.clone()).code() // Delegate to outer
668                }
669                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
670                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
671                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
672                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
673                // This should be handled by peek optimization, so it's an internal error if it
674                // reaches the user.
675                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
676            },
677            AdapterError::UnallowedOnCluster { .. } => {
678                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
679            }
680            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
681            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
682            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
683            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
684            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
685            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
686            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
687            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
688            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
689            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
690            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
691            AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
692            // It's not immediately clear which error code to use here because a
693            // "write-only transaction", "single table write transaction", or "ddl only
694            // transaction" are not things in Postgres. This error code is the generic "bad txn
695            // thing" code, so it's probably the best choice.
696            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
697            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
698            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
699                SqlState::INTERNAL_ERROR
700            }
701            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
702            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
703            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
704            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
705            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
706            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
707            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
708            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
709            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
710            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
711            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
712            // In read-only mode all transactions are implicitly read-only
713            // transactions.
714            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
715            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
716            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
717            AdapterError::ReplacementSchemaMismatch(_) => SqlState::FEATURE_NOT_SUPPORTED,
718            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
719                SqlState::INVALID_PASSWORD
720            }
721            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
722            AdapterError::ReplaceMaterializedViewSealed { .. } => {
723                SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE
724            }
725        }
726    }
727
728    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
729        AdapterError::Internal(format!("{context}: {e}"))
730    }
731
732    // We don't want the following error conversions to `ConcurrentDependencyDrop` to happen
733    // automatically, because it might depend on the context whether `ConcurrentDependencyDrop`
734    // is appropriate, so we want to make the conversion target explicit at the call site.
735    // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
736    // in which case `ConcurrentDependencyDrop` would not be appropriate.
737
738    pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
739        AdapterError::ConcurrentDependencyDrop {
740            dependency_kind: "cluster",
741            dependency_id: e.0.to_string(),
742        }
743    }
744
745    pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
746        AdapterError::ConcurrentDependencyDrop {
747            dependency_kind: "collection",
748            dependency_id: e.0.to_string(),
749        }
750    }
751
752    pub fn concurrent_dependency_drop_from_collection_lookup_error(
753        e: CollectionLookupError,
754        compute_instance: ComputeInstanceId,
755    ) -> Self {
756        match e {
757            CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
758                dependency_kind: "cluster",
759                dependency_id: id.to_string(),
760            },
761            CollectionLookupError::CollectionMissing(id) => {
762                AdapterError::ConcurrentDependencyDrop {
763                    dependency_kind: "collection",
764                    dependency_id: id.to_string(),
765                }
766            }
767            CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
768                dependency_kind: "cluster",
769                dependency_id: compute_instance.to_string(),
770            },
771        }
772    }
773
774    pub fn concurrent_dependency_drop_from_watch_set_install_error(
775        e: compute_error::CollectionLookupError,
776    ) -> Self {
777        match e {
778            compute_error::CollectionLookupError::InstanceMissing(id) => {
779                AdapterError::ConcurrentDependencyDrop {
780                    dependency_kind: "cluster",
781                    dependency_id: id.to_string(),
782                }
783            }
784            compute_error::CollectionLookupError::CollectionMissing(id) => {
785                AdapterError::ConcurrentDependencyDrop {
786                    dependency_kind: "collection",
787                    dependency_id: id.to_string(),
788                }
789            }
790        }
791    }
792
793    pub fn concurrent_dependency_drop_from_instance_peek_error(
794        e: mz_compute_client::controller::instance_client::PeekError,
795        compute_instance: ComputeInstanceId,
796    ) -> AdapterError {
797        use mz_compute_client::controller::instance_client::PeekError::*;
798        match e {
799            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
800                dependency_kind: "replica",
801                dependency_id: id.to_string(),
802            },
803            InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
804                dependency_kind: "cluster",
805                dependency_id: compute_instance.to_string(),
806            },
807            e @ ReadHoldIdMismatch(_) => AdapterError::internal("instance peek error", e),
808            e @ ReadHoldInsufficient(_) => AdapterError::internal("instance peek error", e),
809        }
810    }
811
812    pub fn concurrent_dependency_drop_from_peek_error(
813        e: mz_compute_client::controller::error::PeekError,
814    ) -> AdapterError {
815        use mz_compute_client::controller::error::PeekError::*;
816        match e {
817            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
818                dependency_kind: "cluster",
819                dependency_id: id.to_string(),
820            },
821            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
822                dependency_kind: "collection",
823                dependency_id: id.to_string(),
824            },
825            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
826                dependency_kind: "replica",
827                dependency_id: id.to_string(),
828            },
829            e @ SinceViolation(_) => AdapterError::internal("peek error", e),
830        }
831    }
832
833    pub fn concurrent_dependency_drop_from_dataflow_creation_error(
834        e: compute_error::DataflowCreationError,
835    ) -> Self {
836        use compute_error::DataflowCreationError::*;
837        match e {
838            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
839                dependency_kind: "cluster",
840                dependency_id: id.to_string(),
841            },
842            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
843                dependency_kind: "collection",
844                dependency_id: id.to_string(),
845            },
846            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
847                dependency_kind: "replica",
848                dependency_id: id.to_string(),
849            },
850            MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
851                AdapterError::internal("dataflow creation error", e)
852            }
853        }
854    }
855}
856
857impl fmt::Display for AdapterError {
858    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
859        match self {
860            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
861                write!(
862                    f,
863                    "subscription lower bound (`AS OF`) is greater than its upper bound (`UP TO`): \
864                     {as_of} > {up_to}",
865                )
866            }
867            AdapterError::AmbiguousSystemColumnReference => {
868                write!(
869                    f,
870                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
871                    system objects"
872                )
873            }
874            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
875            AdapterError::Catalog(e) => e.fmt(f),
876            AdapterError::DuplicateCursor(name) => {
877                write!(f, "cursor {} already exists", name.quoted())
878            }
879            AdapterError::Eval(e) => e.fmt(f),
880            AdapterError::Explain(e) => e.fmt(f),
881            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
882            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
883            AdapterError::IntrospectionDisabled { .. } => write!(
884                f,
885                "cannot read log sources of replica with disabled introspection"
886            ),
887            AdapterError::InvalidLogDependency { object_type, .. } => {
888                write!(f, "{object_type} objects cannot depend on log sources")
889            }
890            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
891                write!(f, "unknown cluster replica availability zone {az}",)
892            }
893            AdapterError::InvalidSetIsolationLevel => write!(
894                f,
895                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
896            ),
897            AdapterError::InvalidSetCluster => {
898                write!(f, "SET cluster cannot be called in an active transaction")
899            }
900            AdapterError::InvalidStorageClusterSize { size, .. } => {
901                write!(f, "unknown source size {size}")
902            }
903            AdapterError::SourceOrSinkSizeRequired { .. } => {
904                write!(f, "must specify either cluster or size option")
905            }
906            AdapterError::InvalidTableMutationSelection { .. } => {
907                write!(
908                    f,
909                    "invalid selection: operation may only (transitively) refer to non-source, non-system tables"
910                )
911            }
912            AdapterError::ReplaceMaterializedViewSealed { name } => {
913                write!(
914                    f,
915                    "materialized view {name} is sealed and thus cannot be replaced"
916                )
917            }
918            AdapterError::ConstraintViolation(not_null_violation) => {
919                write!(f, "{}", not_null_violation)
920            }
921            AdapterError::ConcurrentClusterDrop => {
922                write!(f, "the transaction's active cluster has been dropped")
923            }
924            AdapterError::ConcurrentDependencyDrop {
925                dependency_kind,
926                dependency_id,
927            } => {
928                write!(f, "{dependency_kind} '{dependency_id}' was dropped")
929            }
930            AdapterError::CollectionUnreadable { id } => {
931                write!(f, "collection '{id}' is not readable at any timestamp")
932            }
933            AdapterError::NoClusterReplicasAvailable { name, .. } => {
934                write!(
935                    f,
936                    "CLUSTER {} has no replicas available to service request",
937                    name.quoted()
938                )
939            }
940            AdapterError::OperationProhibitsTransaction(op) => {
941                write!(f, "{} cannot be run inside a transaction block", op)
942            }
943            AdapterError::OperationRequiresTransaction(op) => {
944                write!(f, "{} can only be used in transaction blocks", op)
945            }
946            AdapterError::ParseError(e) => e.fmt(f),
947            AdapterError::PlanError(e) => e.fmt(f),
948            AdapterError::PreparedStatementExists(name) => {
949                write!(f, "prepared statement {} already exists", name.quoted())
950            }
951            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
952            AdapterError::SingleStatementTransaction => {
953                f.write_str("this transaction can only execute a single statement")
954            }
955            AdapterError::ReadWriteUnavailable => {
956                f.write_str("transaction read-write mode must be set before any query")
957            }
958            AdapterError::WrongSetOfLocks => {
959                write!(f, "internal error, wrong set of locks acquired")
960            }
961            AdapterError::StatementTimeout => {
962                write!(f, "canceling statement due to statement timeout")
963            }
964            AdapterError::Canceled => {
965                write!(f, "canceling statement due to user request")
966            }
967            AdapterError::IdleInTransactionSessionTimeout => {
968                write!(
969                    f,
970                    "terminating connection due to idle-in-transaction timeout"
971                )
972            }
973            AdapterError::RecursionLimit(e) => e.fmt(f),
974            AdapterError::RelationOutsideTimeDomain { .. } => {
975                write!(
976                    f,
977                    "Transactions can only reference objects in the same timedomain. \
978                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
979                )
980            }
981            AdapterError::ResourceExhaustion {
982                resource_type,
983                limit_name,
984                desired,
985                limit,
986                current,
987            } => {
988                write!(
989                    f,
990                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
991                )
992            }
993            AdapterError::ResultSize(e) => write!(f, "{e}"),
994            AdapterError::SafeModeViolation(feature) => {
995                write!(f, "cannot create {} in safe mode", feature)
996            }
997            AdapterError::SubscribeOnlyTransaction => {
998                f.write_str("SUBSCRIBE in transactions must be the only read statement")
999            }
1000            AdapterError::Optimizer(e) => e.fmt(f),
1001            AdapterError::UnallowedOnCluster {
1002                depends_on,
1003                cluster,
1004            } => {
1005                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
1006                write!(
1007                    f,
1008                    "querying the following items {items} is not allowed from the {} cluster",
1009                    cluster.quoted()
1010                )
1011            }
1012            AdapterError::Unauthorized(unauthorized) => {
1013                write!(f, "{unauthorized}")
1014            }
1015            AdapterError::UnknownCursor(name) => {
1016                write!(f, "cursor {} does not exist", name.quoted())
1017            }
1018            AdapterError::UnknownLoginRole(name) => {
1019                write!(f, "role {} does not exist", name.quoted())
1020            }
1021            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
1022            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
1023            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
1024            AdapterError::UnknownPreparedStatement(name) => {
1025                write!(f, "prepared statement {} does not exist", name.quoted())
1026            }
1027            AdapterError::UnknownClusterReplica {
1028                cluster_name,
1029                replica_name,
1030            } => write!(
1031                f,
1032                "cluster replica '{cluster_name}.{replica_name}' does not exist"
1033            ),
1034            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
1035                f,
1036                "unrecognized configuration parameter {}",
1037                setting_name.quoted()
1038            ),
1039            AdapterError::UntargetedLogRead { .. } => {
1040                f.write_str("log source reads must target a replica")
1041            }
1042            AdapterError::DDLOnlyTransaction => f.write_str(
1043                "transactions which modify objects are restricted to just modifying objects",
1044            ),
1045            AdapterError::DDLTransactionRace => f.write_str(
1046                "another session modified the catalog while this DDL transaction was open",
1047            ),
1048            AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
1049            AdapterError::Storage(e) => e.fmt(f),
1050            AdapterError::Compute(e) => e.fmt(f),
1051            AdapterError::Orchestrator(e) => e.fmt(f),
1052            AdapterError::DependentObject(dependent_objects) => {
1053                let role_str = if dependent_objects.keys().count() == 1 {
1054                    "role"
1055                } else {
1056                    "roles"
1057                };
1058                write!(
1059                    f,
1060                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
1061                    dependent_objects.keys().join(", ")
1062                )
1063            }
1064            AdapterError::InvalidAlter(t, e) => {
1065                write!(f, "invalid ALTER {t}: {e}")
1066            }
1067            AdapterError::ConnectionValidation(e) => e.fmt(f),
1068            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
1069                write!(
1070                    f,
1071                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
1072                    would never happen"
1073                )
1074            }
1075            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
1076                write!(
1077                    f,
1078                    "REFRESH AT requested for a time where not all the inputs are readable"
1079                )
1080            }
1081            AdapterError::RtrTimeout(_) => {
1082                write!(
1083                    f,
1084                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
1085                )
1086            }
1087            AdapterError::RtrDropFailure(_) => write!(
1088                f,
1089                "real-time source dropped before ingesting the upstream system's visible frontier"
1090            ),
1091            AdapterError::UnreadableSinkCollection => {
1092                write!(f, "collection is not readable at any time")
1093            }
1094            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
1095            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
1096            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
1097            AdapterError::AlterClusterTimeout => {
1098                write!(f, "canceling statement, provided timeout lapsed")
1099            }
1100            AdapterError::AuthenticationError(e) => {
1101                write!(f, "authentication error {e}")
1102            }
1103            AdapterError::UnavailableFeature { feature, docs } => {
1104                write!(f, "{} is not supported in this environment.", feature)?;
1105                if let Some(docs) = docs {
1106                    write!(
1107                        f,
1108                        " For more information consult the documentation at {docs}"
1109                    )?;
1110                }
1111                Ok(())
1112            }
1113            AdapterError::AlterClusterWhilePendingReplicas => {
1114                write!(f, "cannot alter clusters with pending updates")
1115            }
1116            AdapterError::ReplacementSchemaMismatch(_) => {
1117                write!(f, "replacement schema differs from target schema")
1118            }
1119        }
1120    }
1121}
1122
1123impl From<anyhow::Error> for AdapterError {
1124    fn from(e: anyhow::Error) -> AdapterError {
1125        match e.downcast::<PlanError>() {
1126            Ok(plan_error) => AdapterError::PlanError(plan_error),
1127            Err(e) => AdapterError::Unstructured(e),
1128        }
1129    }
1130}
1131
1132impl From<TryFromIntError> for AdapterError {
1133    fn from(e: TryFromIntError) -> AdapterError {
1134        AdapterError::Unstructured(e.into())
1135    }
1136}
1137
1138impl From<TryFromDecimalError> for AdapterError {
1139    fn from(e: TryFromDecimalError) -> AdapterError {
1140        AdapterError::Unstructured(e.into())
1141    }
1142}
1143
1144impl From<mz_catalog::memory::error::Error> for AdapterError {
1145    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
1146        AdapterError::Catalog(e)
1147    }
1148}
1149
1150impl From<mz_catalog::durable::CatalogError> for AdapterError {
1151    fn from(e: mz_catalog::durable::CatalogError) -> Self {
1152        mz_catalog::memory::error::Error::from(e).into()
1153    }
1154}
1155
1156impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
1157    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1158        mz_catalog::durable::CatalogError::from(e).into()
1159    }
1160}
1161
1162impl From<EvalError> for AdapterError {
1163    fn from(e: EvalError) -> AdapterError {
1164        AdapterError::Eval(e)
1165    }
1166}
1167
1168impl From<ExplainError> for AdapterError {
1169    fn from(e: ExplainError) -> AdapterError {
1170        match e {
1171            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1172            e => AdapterError::Explain(e),
1173        }
1174    }
1175}
1176
1177impl From<mz_sql::catalog::CatalogError> for AdapterError {
1178    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1179        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1180    }
1181}
1182
1183impl From<PlanError> for AdapterError {
1184    fn from(e: PlanError) -> AdapterError {
1185        match e {
1186            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1187            _ => AdapterError::PlanError(e),
1188        }
1189    }
1190}
1191
1192impl From<OptimizerError> for AdapterError {
1193    fn from(e: OptimizerError) -> AdapterError {
1194        use OptimizerError::*;
1195        match e {
1196            PlanError(e) => Self::PlanError(e),
1197            RecursionLimitError(e) => Self::RecursionLimit(e),
1198            EvalError(e) => Self::Eval(e),
1199            InternalUnsafeMfpPlan(e) => Self::Internal(e),
1200            Internal(e) => Self::Internal(e),
1201            e => Self::Optimizer(e),
1202        }
1203    }
1204}
1205
1206impl From<NotNullViolation> for AdapterError {
1207    fn from(e: NotNullViolation) -> AdapterError {
1208        AdapterError::ConstraintViolation(e)
1209    }
1210}
1211
1212impl From<RecursionLimitError> for AdapterError {
1213    fn from(e: RecursionLimitError) -> AdapterError {
1214        AdapterError::RecursionLimit(e)
1215    }
1216}
1217
1218impl From<oneshot::error::RecvError> for AdapterError {
1219    fn from(e: oneshot::error::RecvError) -> AdapterError {
1220        AdapterError::Unstructured(e.into())
1221    }
1222}
1223
1224impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1225    fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1226        AdapterError::Storage(e)
1227    }
1228}
1229
1230impl From<compute_error::InstanceExists> for AdapterError {
1231    fn from(e: compute_error::InstanceExists) -> Self {
1232        AdapterError::Compute(e.into())
1233    }
1234}
1235
1236impl From<TimestampError> for AdapterError {
1237    fn from(e: TimestampError) -> Self {
1238        let e: EvalError = e.into();
1239        e.into()
1240    }
1241}
1242
1243impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1244    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1245        AdapterError::ParseError(e)
1246    }
1247}
1248
1249impl From<VarError> for AdapterError {
1250    fn from(e: VarError) -> Self {
1251        let e: mz_catalog::memory::error::Error = e.into();
1252        e.into()
1253    }
1254}
1255
1256impl From<rbac::UnauthorizedError> for AdapterError {
1257    fn from(e: rbac::UnauthorizedError) -> Self {
1258        AdapterError::Unauthorized(e)
1259    }
1260}
1261
1262impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1263    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1264        AdapterError::PlanError(PlanError::InvalidIdent(value))
1265    }
1266}
1267
1268impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1269    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1270        match value {
1271            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1272                AdapterError::ResourceExhaustion {
1273                    resource_type: "connection".into(),
1274                    limit_name: "max_connections".into(),
1275                    desired: (current + 1).to_string(),
1276                    limit: limit.to_string(),
1277                    current: current.to_string(),
1278                }
1279            }
1280        }
1281    }
1282}
1283
1284impl From<NetworkPolicyError> for AdapterError {
1285    fn from(value: NetworkPolicyError) -> Self {
1286        AdapterError::NetworkPolicyDenied(value)
1287    }
1288}
1289
1290impl From<ConnectionValidationError> for AdapterError {
1291    fn from(e: ConnectionValidationError) -> AdapterError {
1292        AdapterError::ConnectionValidation(e)
1293    }
1294}
1295
1296impl Error for AdapterError {}