mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
20use mz_compute_client::controller::instance::PeekError;
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::timestamp::TimestampError;
28use mz_repr::explain::ExplainError;
29use mz_repr::{NotNullViolation, Timestamp};
30use mz_sql::plan::PlanError;
31use mz_sql::rbac;
32use mz_sql::session::vars::VarError;
33use mz_storage_types::connections::ConnectionValidationError;
34use mz_storage_types::controller::StorageError;
35use mz_storage_types::errors::CollectionMissing;
36use smallvec::SmallVec;
37use timely::progress::Antichain;
38use tokio::sync::oneshot;
39use tokio_postgres::error::SqlState;
40
41use crate::coord::NetworkPolicyError;
42use crate::optimize::OptimizerError;
43
44/// Errors that can occur in the coordinator.
45#[derive(Debug)]
46pub enum AdapterError {
47    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
48    AbsurdSubscribeBounds {
49        as_of: mz_repr::Timestamp,
50        up_to: mz_repr::Timestamp,
51    },
52    /// Attempted to use a potentially ambiguous column reference expression with a system table.
53    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
54    // resolved because it prevents us from adding columns to system tables.
55    AmbiguousSystemColumnReference,
56    /// An error occurred in a catalog operation.
57    Catalog(mz_catalog::memory::error::Error),
58    /// 1. The cached plan or descriptor changed,
59    /// 2. or some dependency of a statement disappeared during sequencing.
60    /// TODO(ggevay): we should refactor 2. usages to use `ConcurrentDependencyDrop` instead
61    /// (e.g., in MV sequencing)
62    ChangedPlan(String),
63    /// The cursor already exists.
64    DuplicateCursor(String),
65    /// An error while evaluating an expression.
66    Eval(EvalError),
67    /// An error occurred while planning the statement.
68    Explain(ExplainError),
69    /// The ID allocator exhausted all valid IDs.
70    IdExhaustionError,
71    /// Unexpected internal state was encountered.
72    Internal(String),
73    /// Attempted to read from log sources of a replica with disabled introspection.
74    IntrospectionDisabled {
75        log_names: Vec<String>,
76    },
77    /// Attempted to create an object dependent on log sources that doesn't support
78    /// log dependencies.
79    InvalidLogDependency {
80        object_type: String,
81        log_names: Vec<String>,
82    },
83    /// No such cluster replica size has been configured.
84    InvalidClusterReplicaAz {
85        az: String,
86        expected: Vec<String>,
87    },
88    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
89    InvalidSetIsolationLevel,
90    /// SET cluster was called in the middle of a transaction.
91    InvalidSetCluster,
92    /// No such storage instance size has been configured.
93    InvalidStorageClusterSize {
94        size: String,
95        expected: Vec<String>,
96    },
97    /// Creating a source or sink without specifying its size is forbidden.
98    SourceOrSinkSizeRequired {
99        expected: Vec<String>,
100    },
101    /// The selection value for a table mutation operation refers to an invalid object.
102    InvalidTableMutationSelection,
103    /// Expression violated a column's constraint
104    ConstraintViolation(NotNullViolation),
105    /// Transaction cluster was dropped in the middle of a transaction.
106    ConcurrentClusterDrop,
107    /// A dependency was dropped while sequencing a statement.
108    ConcurrentDependencyDrop {
109        dependency_kind: &'static str,
110        dependency_id: String,
111    },
112    CollectionUnreadable {
113        id: String,
114    },
115    /// Target cluster has no replicas to service query.
116    NoClusterReplicasAvailable {
117        name: String,
118        is_managed: bool,
119    },
120    /// The named operation cannot be run in a transaction.
121    OperationProhibitsTransaction(String),
122    /// The named operation requires an active transaction.
123    OperationRequiresTransaction(String),
124    /// An error occurred while planning the statement.
125    PlanError(PlanError),
126    /// The named prepared statement already exists.
127    PreparedStatementExists(String),
128    /// Wrapper around parsing error
129    ParseError(mz_sql_parser::parser::ParserStatementError),
130    /// The transaction is in read-only mode.
131    ReadOnlyTransaction,
132    /// The transaction in in read-only mode and a read already occurred.
133    ReadWriteUnavailable,
134    /// The recursion limit of some operation was exceeded.
135    RecursionLimit(RecursionLimitError),
136    /// A query in a transaction referenced a relation outside the first query's
137    /// time domain.
138    RelationOutsideTimeDomain {
139        relations: Vec<String>,
140        names: Vec<String>,
141    },
142    /// A query tried to create more resources than is allowed in the system configuration.
143    ResourceExhaustion {
144        resource_type: String,
145        limit_name: String,
146        desired: String,
147        limit: String,
148        current: String,
149    },
150    /// Result size of a query is too large.
151    ResultSize(String),
152    /// The specified feature is not permitted in safe mode.
153    SafeModeViolation(String),
154    /// The current transaction had the wrong set of write locks.
155    WrongSetOfLocks,
156    /// Waiting on a query timed out.
157    ///
158    /// Note this differs slightly from PG's implementation/semantics.
159    StatementTimeout,
160    /// The user canceled the query
161    Canceled,
162    /// An idle session in a transaction has timed out.
163    IdleInTransactionSessionTimeout,
164    /// The transaction is in single-subscribe mode.
165    SubscribeOnlyTransaction,
166    /// An error occurred in the optimizer.
167    Optimizer(OptimizerError),
168    /// A query depends on items which are not allowed to be referenced from the current cluster.
169    UnallowedOnCluster {
170        depends_on: SmallVec<[String; 2]>,
171        cluster: String,
172    },
173    /// A user tried to perform an action that they were unauthorized to do.
174    Unauthorized(rbac::UnauthorizedError),
175    /// The named cursor does not exist.
176    UnknownCursor(String),
177    /// The named role does not exist.
178    UnknownLoginRole(String),
179    UnknownPreparedStatement(String),
180    /// The named cluster replica does not exist.
181    UnknownClusterReplica {
182        cluster_name: String,
183        replica_name: String,
184    },
185    /// The named setting does not exist.
186    UnrecognizedConfigurationParam(String),
187    /// A generic error occurred.
188    //
189    // TODO(benesch): convert all those errors to structured errors.
190    Unstructured(anyhow::Error),
191    /// The named feature is not supported and will (probably) not be.
192    Unsupported(&'static str),
193    /// Some feature isn't available for a (potentially opaque) reason.
194    /// For example, in cloud Self-Managed auth features aren't available,
195    /// but we don't want to mention self managed auth.
196    UnavailableFeature {
197        feature: String,
198        docs: Option<String>,
199    },
200    /// Attempted to read from log sources without selecting a target replica.
201    UntargetedLogRead {
202        log_names: Vec<String>,
203    },
204    /// The transaction is in write-only mode.
205    WriteOnlyTransaction,
206    /// The transaction can only execute a single statement.
207    SingleStatementTransaction,
208    /// The transaction can only execute simple DDL.
209    DDLOnlyTransaction,
210    /// Another session modified the Catalog while this transaction was open.
211    DDLTransactionRace,
212    /// Used to prevent us from durably committing state while a DDL transaction is open, should
213    /// never be returned to the user.
214    TransactionDryRun {
215        /// New operations that were run in the transaction.
216        new_ops: Vec<crate::catalog::Op>,
217        /// New resulting `CatalogState`.
218        new_state: crate::catalog::CatalogState,
219    },
220    /// An error occurred in the storage layer
221    Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
222    /// An error occurred in the compute layer
223    Compute(anyhow::Error),
224    /// An error in the orchestrator layer
225    Orchestrator(anyhow::Error),
226    /// A statement tried to drop a role that had dependent objects.
227    ///
228    /// The map keys are role names and values are detailed error messages.
229    DependentObject(BTreeMap<String, Vec<String>>),
230    /// When performing an `ALTER` of some variety, re-planning the statement
231    /// errored.
232    InvalidAlter(&'static str, PlanError),
233    /// An error occurred while validating a connection.
234    ConnectionValidation(ConnectionValidationError),
235    /// We refuse to create the materialized view, because it would never be refreshed, so it would
236    /// never be queryable. This can happen when the only specified refreshes are further back in
237    /// the past than the initial compaction window of the materialized view.
238    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
239    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
240    /// but was unable to get a precise read hold.
241    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
242    /// A humanized version of [`StorageError::RtrTimeout`].
243    RtrTimeout(String),
244    /// A humanized version of [`StorageError::RtrDropFailure`].
245    RtrDropFailure(String),
246    /// The collection requested to be sinked cannot be read at any timestamp
247    UnreadableSinkCollection,
248    /// User sessions have been blocked.
249    UserSessionsDisallowed,
250    /// This use session has been deneid by a NetworkPolicy.
251    NetworkPolicyDenied(NetworkPolicyError),
252    /// Something attempted a write (to catalog, storage, tables, etc.) while in
253    /// read-only mode.
254    ReadOnly,
255    AlterClusterTimeout,
256    AlterClusterWhilePendingReplicas,
257    AuthenticationError(AuthenticationError),
258}
259
260#[derive(Debug, thiserror::Error)]
261pub enum AuthenticationError {
262    #[error("invalid credentials")]
263    InvalidCredentials,
264    #[error("role is not allowed to login")]
265    NonLogin,
266    #[error("role does not exist")]
267    RoleNotFound,
268    #[error("password is required")]
269    PasswordRequired,
270}
271
272impl AdapterError {
273    pub fn into_response(self, severity: Severity) -> ErrorResponse {
274        ErrorResponse {
275            severity,
276            code: self.code(),
277            message: self.to_string(),
278            detail: self.detail(),
279            hint: self.hint(),
280            position: self.position(),
281        }
282    }
283
284    pub fn position(&self) -> Option<usize> {
285        match self {
286            AdapterError::ParseError(err) => Some(err.error.pos),
287            _ => None,
288        }
289    }
290
291    /// Reports additional details about the error, if any are available.
292    pub fn detail(&self) -> Option<String> {
293        match self {
294            AdapterError::AmbiguousSystemColumnReference => {
295                Some("This is a current limitation in Materialize".into())
296            },
297            AdapterError::Catalog(c) => c.detail(),
298            AdapterError::Eval(e) => e.detail(),
299            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
300                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
301                relations
302                    .iter()
303                    .map(|r| r.quoted().to_string())
304                    .collect::<Vec<_>>()
305                    .join("\n"),
306                match names.is_empty() {
307                    true => "No relations are available.".to_string(),
308                    false => format!(
309                        "Only the following relations are available:\n{}",
310                        names
311                            .iter()
312                            .map(|name| name.quoted().to_string())
313                            .collect::<Vec<_>>()
314                            .join("\n")
315                    ),
316                }
317            )),
318            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
319                "Either specify the cluster that will maintain this object via IN CLUSTER or \
320                specify size via SIZE option."
321                    .into(),
322            ),
323            AdapterError::SafeModeViolation(_) => Some(
324                "The Materialize server you are connected to is running in \
325                 safe mode, which limits the features that are available."
326                    .into(),
327            ),
328            AdapterError::IntrospectionDisabled { log_names }
329            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
330                "The query references the following log sources:\n    {}",
331                log_names.join("\n    "),
332            )),
333            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
334                "The object depends on the following log sources:\n    {}",
335                log_names.join("\n    "),
336            )),
337            AdapterError::PlanError(e) => e.detail(),
338            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
339            AdapterError::DependentObject(dependent_objects) => {
340                Some(dependent_objects
341                    .iter()
342                    .map(|(role_name, err_msgs)| err_msgs
343                        .iter()
344                        .map(|err_msg| format!("{role_name}: {err_msg}"))
345                        .join("\n"))
346                    .join("\n"))
347            },
348            AdapterError::Storage(storage_error) => {
349                storage_error.source().map(|source_error| source_error.to_string_with_causes())
350            }
351            AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
352            AdapterError::InvalidAlter(_, e) => e.detail(),
353            AdapterError::Optimizer(e) => e.detail(),
354            AdapterError::ConnectionValidation(e) => e.detail(),
355            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
356                Some(format!(
357                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
358                    view is {}.",
359                    last_refresh,
360                    earliest_possible,
361                ))
362            }
363            AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
364                format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
365            ),
366            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
367                Some(format!(
368                    "The requested REFRESH AT time is {}, \
369                    but not all input collections are readable earlier than [{}].",
370                    oracle_read_ts,
371                    if least_valid_read.len() == 1 {
372                        format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
373                    } else {
374                        // This can't occur currently
375                        format!("{:?}", least_valid_read)
376                    }
377                ))
378            }
379            AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
380            AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
381            AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
382            AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
383            _ => None,
384        }
385    }
386
387    /// Reports a hint for the user about how the error could be fixed.
388    pub fn hint(&self) -> Option<String> {
389        match self {
390            AdapterError::AmbiguousSystemColumnReference => Some(
391                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
392                convert all NATURAL JOINs to USING joins."
393                    .to_string(),
394            ),
395            AdapterError::Catalog(c) => c.hint(),
396            AdapterError::Eval(e) => e.hint(),
397            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
398                Some(if expected.is_empty() {
399                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
400                } else {
401                    format!("Valid availability zones are: {}", expected.join(", "))
402                })
403            }
404            AdapterError::InvalidStorageClusterSize { expected, .. } => {
405                Some(format!("Valid sizes are: {}", expected.join(", ")))
406            }
407            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
408                "Try choosing one of the smaller sizes to start. Available sizes: {}",
409                expected.join(", ")
410            )),
411            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
412                Some(if *is_managed {
413                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
414                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
415                } else {
416                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
417                })
418            }
419            AdapterError::UntargetedLogRead { .. } => Some(
420                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
421                 active cluster. Note that subsequent queries will only be answered by \
422                 the selected replica, which might reduce availability. To undo the replica \
423                 selection, use `RESET cluster_replica`."
424                    .into(),
425            ),
426            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
427                "Drop an existing {resource_type} or contact support to request a limit increase."
428            )),
429            AdapterError::StatementTimeout => Some(
430                "Consider increasing the maximum allowed statement duration for this session by \
431                 setting the statement_timeout session variable. For example, `SET \
432                 statement_timeout = '120s'`."
433                    .into(),
434            ),
435            AdapterError::PlanError(e) => e.hint(),
436            AdapterError::UnallowedOnCluster { cluster, .. } => {
437                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
438                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
439                    .to_string()
440                )
441            }
442            AdapterError::InvalidAlter(_, e) => e.hint(),
443            AdapterError::Optimizer(e) => e.hint(),
444            AdapterError::ConnectionValidation(e) => e.hint(),
445            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
446                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
447                 either at the explicitly specified timestamp, or now if the given timestamp would \
448                 be in the past.".to_string()
449            ),
450            AdapterError::AlterClusterTimeout => Some(
451                "Consider increasing the timeout duration in the alter cluster statement.".into(),
452            ),
453            AdapterError::DDLTransactionRace => Some(
454                "Currently, DDL transactions fail when any other DDL happens concurrently, \
455                 even on unrelated schemas/clusters.".into()
456            ),
457            AdapterError::CollectionUnreadable { .. } => Some(
458                "This could be because the collection has recently been dropped.".into()
459            ),
460            _ => None,
461        }
462    }
463
464    pub fn code(&self) -> SqlState {
465        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
466        // those errors that are truly internal errors. At the moment we have
467        // a various classes of uncategorized errors that use this error code
468        // inappropriately.
469        match self {
470            // DATA_EXCEPTION to match what Postgres returns for degenerate
471            // range bounds
472            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
473            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
474            AdapterError::Catalog(e) => match &e.kind {
475                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
476                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
477                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
478                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
479                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
480                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
481                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
482                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
483                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
484                },
485                _ => SqlState::INTERNAL_ERROR,
486            },
487            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
488            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
489            AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
490                SqlState::PROGRAM_LIMIT_EXCEEDED
491            }
492            AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
493                SqlState::PROGRAM_LIMIT_EXCEEDED
494            }
495            AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
496            AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
497                SqlState::PROGRAM_LIMIT_EXCEEDED
498            }
499            AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
500            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
501            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
502            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
503            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
504            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
505            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
506            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
507            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
508            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
509            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
510            AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
511            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
512            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
513            AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
514            AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
515            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
516            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
517            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
518            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
519            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
520            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
521                SqlState::DUPLICATE_COLUMN
522            }
523            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
524                SqlState::UNDEFINED_PARAMETER
525            }
526            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
527                SqlState::UNDEFINED_PARAMETER
528            }
529            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
530            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
531            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
532            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
533            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
534            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
535            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
536            AdapterError::Canceled => SqlState::QUERY_CANCELED,
537            AdapterError::IdleInTransactionSessionTimeout => {
538                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
539            }
540            AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
541            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
542            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
543            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
544            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
545            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
546            AdapterError::Optimizer(e) => match e {
547                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
548                    SqlState::INVALID_SCHEMA_NAME
549                }
550                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
551                    SqlState::DUPLICATE_COLUMN
552                }
553                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
554                    SqlState::UNDEFINED_PARAMETER
555                }
556                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
557                    SqlState::UNDEFINED_PARAMETER
558                }
559                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
560                OptimizerError::RecursionLimitError(e) => {
561                    AdapterError::RecursionLimit(e.clone()).code() // Delegate to outer
562                }
563                OptimizerError::Internal(s) => {
564                    AdapterError::Internal(s.clone()).code() // Delegate to outer
565                }
566                OptimizerError::EvalError(e) => {
567                    AdapterError::Eval(e.clone()).code() // Delegate to outer
568                }
569                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
570                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
571                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
572                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
573                // This should be handled by peek optimization, so it's an internal error if it
574                // reaches the user.
575                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
576            },
577            AdapterError::UnallowedOnCluster { .. } => {
578                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
579            }
580            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
581            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
582            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
583            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
584            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
585            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
586            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
587            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
588            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
589            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
590            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
591            AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
592            // It's not immediately clear which error code to use here because a
593            // "write-only transaction", "single table write transaction", or "ddl only
594            // transaction" are not things in Postgres. This error code is the generic "bad txn
595            // thing" code, so it's probably the best choice.
596            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
597            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
598            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
599                SqlState::INTERNAL_ERROR
600            }
601            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
602            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
603            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
604            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
605            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
606            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
607            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
608            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
609            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
610            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
611            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
612            // In read-only mode all transactions are implicitly read-only
613            // transactions.
614            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
615            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
616            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
617            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
618                SqlState::INVALID_PASSWORD
619            }
620            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
621        }
622    }
623
624    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
625        AdapterError::Internal(format!("{context}: {e}"))
626    }
627
628    // We don't want the following error conversions to `ConcurrentDependencyDrop` to happen
629    // automatically, because it might depend on the context whether `ConcurrentDependencyDrop`
630    // is appropriate, so we want to make the conversion target explicit at the call site.
631    // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
632    // in which case `ConcurrentDependencyDrop` would not be appropriate.
633    pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
634        AdapterError::ConcurrentDependencyDrop {
635            dependency_kind: "cluster",
636            dependency_id: e.0.to_string(),
637        }
638    }
639    pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
640        AdapterError::ConcurrentDependencyDrop {
641            dependency_kind: "collection",
642            dependency_id: e.0.to_string(),
643        }
644    }
645
646    pub fn concurrent_dependency_drop_from_collection_lookup_error(
647        e: CollectionLookupError,
648        compute_instance: ComputeInstanceId,
649    ) -> Self {
650        match e {
651            CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
652                dependency_kind: "cluster",
653                dependency_id: id.to_string(),
654            },
655            CollectionLookupError::CollectionMissing(id) => {
656                AdapterError::ConcurrentDependencyDrop {
657                    dependency_kind: "collection",
658                    dependency_id: id.to_string(),
659                }
660            }
661            CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
662                dependency_kind: "cluster",
663                dependency_id: compute_instance.to_string(),
664            },
665        }
666    }
667
668    pub fn concurrent_dependency_drop_from_peek_error(
669        e: PeekError,
670        compute_instance: ComputeInstanceId,
671    ) -> AdapterError {
672        match e {
673            PeekError::ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
674                dependency_kind: "replica",
675                dependency_id: id.to_string(),
676            },
677            PeekError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
678                dependency_kind: "cluster",
679                dependency_id: compute_instance.to_string(),
680            },
681            e @ PeekError::ReadHoldIdMismatch(_) => AdapterError::internal("peek error", e),
682            e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e),
683        }
684    }
685
686    pub fn concurrent_dependency_drop_from_dataflow_creation_error(
687        e: compute_error::DataflowCreationError,
688    ) -> Self {
689        use compute_error::DataflowCreationError::*;
690        match e {
691            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
692                dependency_kind: "cluster",
693                dependency_id: id.to_string(),
694            },
695            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
696                dependency_kind: "collection",
697                dependency_id: id.to_string(),
698            },
699            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
700                dependency_kind: "replica",
701                dependency_id: id.to_string(),
702            },
703            MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
704                AdapterError::internal("dataflow creation error", e)
705            }
706        }
707    }
708}
709
710impl fmt::Display for AdapterError {
711    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712        match self {
713            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
714                assert!(up_to < as_of);
715                write!(
716                    f,
717                    r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
718                    up_to, as_of
719                )
720            }
721            AdapterError::AmbiguousSystemColumnReference => {
722                write!(
723                    f,
724                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
725                    system objects"
726                )
727            }
728            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
729            AdapterError::Catalog(e) => e.fmt(f),
730            AdapterError::DuplicateCursor(name) => {
731                write!(f, "cursor {} already exists", name.quoted())
732            }
733            AdapterError::Eval(e) => e.fmt(f),
734            AdapterError::Explain(e) => e.fmt(f),
735            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
736            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
737            AdapterError::IntrospectionDisabled { .. } => write!(
738                f,
739                "cannot read log sources of replica with disabled introspection"
740            ),
741            AdapterError::InvalidLogDependency { object_type, .. } => {
742                write!(f, "{object_type} objects cannot depend on log sources")
743            }
744            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
745                write!(f, "unknown cluster replica availability zone {az}",)
746            }
747            AdapterError::InvalidSetIsolationLevel => write!(
748                f,
749                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
750            ),
751            AdapterError::InvalidSetCluster => {
752                write!(f, "SET cluster cannot be called in an active transaction")
753            }
754            AdapterError::InvalidStorageClusterSize { size, .. } => {
755                write!(f, "unknown source size {size}")
756            }
757            AdapterError::SourceOrSinkSizeRequired { .. } => {
758                write!(f, "must specify either cluster or size option")
759            }
760            AdapterError::InvalidTableMutationSelection => {
761                f.write_str("invalid selection: operation may only refer to user-defined tables")
762            }
763            AdapterError::ConstraintViolation(not_null_violation) => {
764                write!(f, "{}", not_null_violation)
765            }
766            AdapterError::ConcurrentClusterDrop => {
767                write!(f, "the transaction's active cluster has been dropped")
768            }
769            AdapterError::ConcurrentDependencyDrop {
770                dependency_kind,
771                dependency_id,
772            } => {
773                write!(f, "{dependency_kind} '{dependency_id}' was dropped")
774            }
775            AdapterError::CollectionUnreadable { id } => {
776                write!(f, "collection '{id}' is not readable at any timestamp")
777            }
778            AdapterError::NoClusterReplicasAvailable { name, .. } => {
779                write!(
780                    f,
781                    "CLUSTER {} has no replicas available to service request",
782                    name.quoted()
783                )
784            }
785            AdapterError::OperationProhibitsTransaction(op) => {
786                write!(f, "{} cannot be run inside a transaction block", op)
787            }
788            AdapterError::OperationRequiresTransaction(op) => {
789                write!(f, "{} can only be used in transaction blocks", op)
790            }
791            AdapterError::ParseError(e) => e.fmt(f),
792            AdapterError::PlanError(e) => e.fmt(f),
793            AdapterError::PreparedStatementExists(name) => {
794                write!(f, "prepared statement {} already exists", name.quoted())
795            }
796            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
797            AdapterError::SingleStatementTransaction => {
798                f.write_str("this transaction can only execute a single statement")
799            }
800            AdapterError::ReadWriteUnavailable => {
801                f.write_str("transaction read-write mode must be set before any query")
802            }
803            AdapterError::WrongSetOfLocks => {
804                write!(f, "internal error, wrong set of locks acquired")
805            }
806            AdapterError::StatementTimeout => {
807                write!(f, "canceling statement due to statement timeout")
808            }
809            AdapterError::Canceled => {
810                write!(f, "canceling statement due to user request")
811            }
812            AdapterError::IdleInTransactionSessionTimeout => {
813                write!(
814                    f,
815                    "terminating connection due to idle-in-transaction timeout"
816                )
817            }
818            AdapterError::RecursionLimit(e) => e.fmt(f),
819            AdapterError::RelationOutsideTimeDomain { .. } => {
820                write!(
821                    f,
822                    "Transactions can only reference objects in the same timedomain. \
823                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
824                )
825            }
826            AdapterError::ResourceExhaustion {
827                resource_type,
828                limit_name,
829                desired,
830                limit,
831                current,
832            } => {
833                write!(
834                    f,
835                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
836                )
837            }
838            AdapterError::ResultSize(e) => write!(f, "{e}"),
839            AdapterError::SafeModeViolation(feature) => {
840                write!(f, "cannot create {} in safe mode", feature)
841            }
842            AdapterError::SubscribeOnlyTransaction => {
843                f.write_str("SUBSCRIBE in transactions must be the only read statement")
844            }
845            AdapterError::Optimizer(e) => e.fmt(f),
846            AdapterError::UnallowedOnCluster {
847                depends_on,
848                cluster,
849            } => {
850                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
851                write!(
852                    f,
853                    "querying the following items {items} is not allowed from the {} cluster",
854                    cluster.quoted()
855                )
856            }
857            AdapterError::Unauthorized(unauthorized) => {
858                write!(f, "{unauthorized}")
859            }
860            AdapterError::UnknownCursor(name) => {
861                write!(f, "cursor {} does not exist", name.quoted())
862            }
863            AdapterError::UnknownLoginRole(name) => {
864                write!(f, "role {} does not exist", name.quoted())
865            }
866            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
867            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
868            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
869            AdapterError::UnknownPreparedStatement(name) => {
870                write!(f, "prepared statement {} does not exist", name.quoted())
871            }
872            AdapterError::UnknownClusterReplica {
873                cluster_name,
874                replica_name,
875            } => write!(
876                f,
877                "cluster replica '{cluster_name}.{replica_name}' does not exist"
878            ),
879            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
880                f,
881                "unrecognized configuration parameter {}",
882                setting_name.quoted()
883            ),
884            AdapterError::UntargetedLogRead { .. } => {
885                f.write_str("log source reads must target a replica")
886            }
887            AdapterError::DDLOnlyTransaction => f.write_str(
888                "transactions which modify objects are restricted to just modifying objects",
889            ),
890            AdapterError::DDLTransactionRace => f.write_str(
891                "another session modified the catalog while this DDL transaction was open",
892            ),
893            AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
894            AdapterError::Storage(e) => e.fmt(f),
895            AdapterError::Compute(e) => e.fmt(f),
896            AdapterError::Orchestrator(e) => e.fmt(f),
897            AdapterError::DependentObject(dependent_objects) => {
898                let role_str = if dependent_objects.keys().count() == 1 {
899                    "role"
900                } else {
901                    "roles"
902                };
903                write!(
904                    f,
905                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
906                    dependent_objects.keys().join(", ")
907                )
908            }
909            AdapterError::InvalidAlter(t, e) => {
910                write!(f, "invalid ALTER {t}: {e}")
911            }
912            AdapterError::ConnectionValidation(e) => e.fmt(f),
913            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
914                write!(
915                    f,
916                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
917                    would never happen"
918                )
919            }
920            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
921                write!(
922                    f,
923                    "REFRESH AT requested for a time where not all the inputs are readable"
924                )
925            }
926            AdapterError::RtrTimeout(_) => {
927                write!(
928                    f,
929                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
930                )
931            }
932            AdapterError::RtrDropFailure(_) => write!(
933                f,
934                "real-time source dropped before ingesting the upstream system's visible frontier"
935            ),
936            AdapterError::UnreadableSinkCollection => {
937                write!(f, "collection is not readable at any time")
938            }
939            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
940            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
941            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
942            AdapterError::AlterClusterTimeout => {
943                write!(f, "canceling statement, provided timeout lapsed")
944            }
945            AdapterError::AuthenticationError(e) => {
946                write!(f, "authentication error {e}")
947            }
948            AdapterError::UnavailableFeature { feature, docs } => {
949                write!(f, "{} is not supported in this environment.", feature)?;
950                if let Some(docs) = docs {
951                    write!(
952                        f,
953                        " For more information consult the documentation at {docs}"
954                    )?;
955                }
956                Ok(())
957            }
958            AdapterError::AlterClusterWhilePendingReplicas => {
959                write!(f, "cannot alter clusters with pending updates")
960            }
961        }
962    }
963}
964
965impl From<anyhow::Error> for AdapterError {
966    fn from(e: anyhow::Error) -> AdapterError {
967        match e.downcast::<PlanError>() {
968            Ok(plan_error) => AdapterError::PlanError(plan_error),
969            Err(e) => AdapterError::Unstructured(e),
970        }
971    }
972}
973
974impl From<TryFromIntError> for AdapterError {
975    fn from(e: TryFromIntError) -> AdapterError {
976        AdapterError::Unstructured(e.into())
977    }
978}
979
980impl From<TryFromDecimalError> for AdapterError {
981    fn from(e: TryFromDecimalError) -> AdapterError {
982        AdapterError::Unstructured(e.into())
983    }
984}
985
986impl From<mz_catalog::memory::error::Error> for AdapterError {
987    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
988        AdapterError::Catalog(e)
989    }
990}
991
992impl From<mz_catalog::durable::CatalogError> for AdapterError {
993    fn from(e: mz_catalog::durable::CatalogError) -> Self {
994        mz_catalog::memory::error::Error::from(e).into()
995    }
996}
997
998impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
999    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1000        mz_catalog::durable::CatalogError::from(e).into()
1001    }
1002}
1003
1004impl From<EvalError> for AdapterError {
1005    fn from(e: EvalError) -> AdapterError {
1006        AdapterError::Eval(e)
1007    }
1008}
1009
1010impl From<ExplainError> for AdapterError {
1011    fn from(e: ExplainError) -> AdapterError {
1012        match e {
1013            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1014            e => AdapterError::Explain(e),
1015        }
1016    }
1017}
1018
1019impl From<mz_sql::catalog::CatalogError> for AdapterError {
1020    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1021        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1022    }
1023}
1024
1025impl From<PlanError> for AdapterError {
1026    fn from(e: PlanError) -> AdapterError {
1027        match e {
1028            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1029            _ => AdapterError::PlanError(e),
1030        }
1031    }
1032}
1033
1034impl From<OptimizerError> for AdapterError {
1035    fn from(e: OptimizerError) -> AdapterError {
1036        use OptimizerError::*;
1037        match e {
1038            PlanError(e) => Self::PlanError(e),
1039            RecursionLimitError(e) => Self::RecursionLimit(e),
1040            EvalError(e) => Self::Eval(e),
1041            InternalUnsafeMfpPlan(e) => Self::Internal(e),
1042            Internal(e) => Self::Internal(e),
1043            e => Self::Optimizer(e),
1044        }
1045    }
1046}
1047
1048impl From<NotNullViolation> for AdapterError {
1049    fn from(e: NotNullViolation) -> AdapterError {
1050        AdapterError::ConstraintViolation(e)
1051    }
1052}
1053
1054impl From<RecursionLimitError> for AdapterError {
1055    fn from(e: RecursionLimitError) -> AdapterError {
1056        AdapterError::RecursionLimit(e)
1057    }
1058}
1059
1060impl From<oneshot::error::RecvError> for AdapterError {
1061    fn from(e: oneshot::error::RecvError) -> AdapterError {
1062        AdapterError::Unstructured(e.into())
1063    }
1064}
1065
1066impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1067    fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1068        AdapterError::Storage(e)
1069    }
1070}
1071
1072impl From<compute_error::InstanceExists> for AdapterError {
1073    fn from(e: compute_error::InstanceExists) -> Self {
1074        AdapterError::Compute(e.into())
1075    }
1076}
1077
1078impl From<TimestampError> for AdapterError {
1079    fn from(e: TimestampError) -> Self {
1080        let e: EvalError = e.into();
1081        e.into()
1082    }
1083}
1084
1085impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1086    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1087        AdapterError::ParseError(e)
1088    }
1089}
1090
1091impl From<VarError> for AdapterError {
1092    fn from(e: VarError) -> Self {
1093        let e: mz_catalog::memory::error::Error = e.into();
1094        e.into()
1095    }
1096}
1097
1098impl From<rbac::UnauthorizedError> for AdapterError {
1099    fn from(e: rbac::UnauthorizedError) -> Self {
1100        AdapterError::Unauthorized(e)
1101    }
1102}
1103
1104impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1105    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1106        AdapterError::PlanError(PlanError::InvalidIdent(value))
1107    }
1108}
1109
1110impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1111    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1112        match value {
1113            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1114                AdapterError::ResourceExhaustion {
1115                    resource_type: "connection".into(),
1116                    limit_name: "max_connections".into(),
1117                    desired: (current + 1).to_string(),
1118                    limit: limit.to_string(),
1119                    current: current.to_string(),
1120                }
1121            }
1122        }
1123    }
1124}
1125
1126impl From<NetworkPolicyError> for AdapterError {
1127    fn from(value: NetworkPolicyError) -> Self {
1128        AdapterError::NetworkPolicyDenied(value)
1129    }
1130}
1131
1132impl From<ConnectionValidationError> for AdapterError {
1133    fn from(e: ConnectionValidationError) -> AdapterError {
1134        AdapterError::ConnectionValidation(e)
1135    }
1136}
1137
1138impl Error for AdapterError {}