mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
20use mz_compute_client::controller::instance::PeekError;
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::timestamp::TimestampError;
28use mz_repr::explain::ExplainError;
29use mz_repr::{NotNullViolation, Timestamp};
30use mz_sql::plan::PlanError;
31use mz_sql::rbac;
32use mz_sql::session::vars::VarError;
33use mz_storage_types::connections::ConnectionValidationError;
34use mz_storage_types::controller::StorageError;
35use mz_storage_types::errors::CollectionMissing;
36use smallvec::SmallVec;
37use timely::progress::Antichain;
38use tokio::sync::oneshot;
39use tokio_postgres::error::SqlState;
40
41use crate::coord::NetworkPolicyError;
42use crate::optimize::OptimizerError;
43
44/// Errors that can occur in the coordinator.
45#[derive(Debug)]
46pub enum AdapterError {
47    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
48    AbsurdSubscribeBounds {
49        as_of: mz_repr::Timestamp,
50        up_to: mz_repr::Timestamp,
51    },
52    /// Attempted to use a potentially ambiguous column reference expression with a system table.
53    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
54    // resolved because it prevents us from adding columns to system tables.
55    AmbiguousSystemColumnReference,
56    /// An error occurred in a catalog operation.
57    Catalog(mz_catalog::memory::error::Error),
58    /// 1. The cached plan or descriptor changed,
59    /// 2. or some dependency of a statement disappeared during sequencing.
60    /// TODO(ggevay): we should refactor 2. usages to use `ConcurrentDependencyDrop` instead
61    /// (e.g., in MV sequencing)
62    ChangedPlan(String),
63    /// The cursor already exists.
64    DuplicateCursor(String),
65    /// An error while evaluating an expression.
66    Eval(EvalError),
67    /// An error occurred while planning the statement.
68    Explain(ExplainError),
69    /// The ID allocator exhausted all valid IDs.
70    IdExhaustionError,
71    /// Unexpected internal state was encountered.
72    Internal(String),
73    /// Attempted to read from log sources of a replica with disabled introspection.
74    IntrospectionDisabled {
75        log_names: Vec<String>,
76    },
77    /// Attempted to create an object dependent on log sources that doesn't support
78    /// log dependencies.
79    InvalidLogDependency {
80        object_type: String,
81        log_names: Vec<String>,
82    },
83    /// No such cluster replica size has been configured.
84    InvalidClusterReplicaAz {
85        az: String,
86        expected: Vec<String>,
87    },
88    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
89    InvalidSetIsolationLevel,
90    /// SET cluster was called in the middle of a transaction.
91    InvalidSetCluster,
92    /// No such storage instance size has been configured.
93    InvalidStorageClusterSize {
94        size: String,
95        expected: Vec<String>,
96    },
97    /// Creating a source or sink without specifying its size is forbidden.
98    SourceOrSinkSizeRequired {
99        expected: Vec<String>,
100    },
101    /// The selection value for a table mutation operation refers to an invalid object.
102    InvalidTableMutationSelection,
103    /// Expression violated a column's constraint
104    ConstraintViolation(NotNullViolation),
105    /// Transaction cluster was dropped in the middle of a transaction.
106    ConcurrentClusterDrop,
107    /// A dependency was dropped while sequencing a statement.
108    ConcurrentDependencyDrop {
109        dependency_kind: &'static str,
110        dependency_id: String,
111    },
112    /// Target cluster has no replicas to service query.
113    NoClusterReplicasAvailable {
114        name: String,
115        is_managed: bool,
116    },
117    /// The named operation cannot be run in a transaction.
118    OperationProhibitsTransaction(String),
119    /// The named operation requires an active transaction.
120    OperationRequiresTransaction(String),
121    /// An error occurred while planning the statement.
122    PlanError(PlanError),
123    /// The named prepared statement already exists.
124    PreparedStatementExists(String),
125    /// Wrapper around parsing error
126    ParseError(mz_sql_parser::parser::ParserStatementError),
127    /// The transaction is in read-only mode.
128    ReadOnlyTransaction,
129    /// The transaction in in read-only mode and a read already occurred.
130    ReadWriteUnavailable,
131    /// The recursion limit of some operation was exceeded.
132    RecursionLimit(RecursionLimitError),
133    /// A query in a transaction referenced a relation outside the first query's
134    /// time domain.
135    RelationOutsideTimeDomain {
136        relations: Vec<String>,
137        names: Vec<String>,
138    },
139    /// A query tried to create more resources than is allowed in the system configuration.
140    ResourceExhaustion {
141        resource_type: String,
142        limit_name: String,
143        desired: String,
144        limit: String,
145        current: String,
146    },
147    /// Result size of a query is too large.
148    ResultSize(String),
149    /// The specified feature is not permitted in safe mode.
150    SafeModeViolation(String),
151    /// The current transaction had the wrong set of write locks.
152    WrongSetOfLocks,
153    /// Waiting on a query timed out.
154    ///
155    /// Note this differs slightly from PG's implementation/semantics.
156    StatementTimeout,
157    /// The user canceled the query
158    Canceled,
159    /// An idle session in a transaction has timed out.
160    IdleInTransactionSessionTimeout,
161    /// The transaction is in single-subscribe mode.
162    SubscribeOnlyTransaction,
163    /// An error occurred in the optimizer.
164    Optimizer(OptimizerError),
165    /// A query depends on items which are not allowed to be referenced from the current cluster.
166    UnallowedOnCluster {
167        depends_on: SmallVec<[String; 2]>,
168        cluster: String,
169    },
170    /// A user tried to perform an action that they were unauthorized to do.
171    Unauthorized(rbac::UnauthorizedError),
172    /// The named cursor does not exist.
173    UnknownCursor(String),
174    /// The named role does not exist.
175    UnknownLoginRole(String),
176    UnknownPreparedStatement(String),
177    /// The named cluster replica does not exist.
178    UnknownClusterReplica {
179        cluster_name: String,
180        replica_name: String,
181    },
182    /// The named setting does not exist.
183    UnrecognizedConfigurationParam(String),
184    /// A generic error occurred.
185    //
186    // TODO(benesch): convert all those errors to structured errors.
187    Unstructured(anyhow::Error),
188    /// The named feature is not supported and will (probably) not be.
189    Unsupported(&'static str),
190    /// Some feature isn't available for a (potentially opaque) reason.
191    /// For example, in cloud Self-Managed auth features aren't available,
192    /// but we don't want to mention self managed auth.
193    UnavailableFeature {
194        feature: String,
195        docs: Option<String>,
196    },
197    /// Attempted to read from log sources without selecting a target replica.
198    UntargetedLogRead {
199        log_names: Vec<String>,
200    },
201    /// The transaction is in write-only mode.
202    WriteOnlyTransaction,
203    /// The transaction can only execute a single statement.
204    SingleStatementTransaction,
205    /// The transaction can only execute simple DDL.
206    DDLOnlyTransaction,
207    /// Another session modified the Catalog while this transaction was open.
208    DDLTransactionRace,
209    /// Used to prevent us from durably committing state while a DDL transaction is open, should
210    /// never be returned to the user.
211    TransactionDryRun {
212        /// New operations that were run in the transaction.
213        new_ops: Vec<crate::catalog::Op>,
214        /// New resulting `CatalogState`.
215        new_state: crate::catalog::CatalogState,
216    },
217    /// An error occurred in the storage layer
218    Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
219    /// An error occurred in the compute layer
220    Compute(anyhow::Error),
221    /// An error in the orchestrator layer
222    Orchestrator(anyhow::Error),
223    /// A statement tried to drop a role that had dependent objects.
224    ///
225    /// The map keys are role names and values are detailed error messages.
226    DependentObject(BTreeMap<String, Vec<String>>),
227    /// When performing an `ALTER` of some variety, re-planning the statement
228    /// errored.
229    InvalidAlter(&'static str, PlanError),
230    /// An error occurred while validating a connection.
231    ConnectionValidation(ConnectionValidationError),
232    /// We refuse to create the materialized view, because it would never be refreshed, so it would
233    /// never be queryable. This can happen when the only specified refreshes are further back in
234    /// the past than the initial compaction window of the materialized view.
235    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
236    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
237    /// but was unable to get a precise read hold.
238    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
239    /// A humanized version of [`StorageError::RtrTimeout`].
240    RtrTimeout(String),
241    /// A humanized version of [`StorageError::RtrDropFailure`].
242    RtrDropFailure(String),
243    /// The collection requested to be sinked cannot be read at any timestamp
244    UnreadableSinkCollection,
245    /// User sessions have been blocked.
246    UserSessionsDisallowed,
247    /// This use session has been deneid by a NetworkPolicy.
248    NetworkPolicyDenied(NetworkPolicyError),
249    /// Something attempted a write (to catalog, storage, tables, etc.) while in
250    /// read-only mode.
251    ReadOnly,
252    AlterClusterTimeout,
253    AlterClusterWhilePendingReplicas,
254    AuthenticationError(AuthenticationError),
255}
256
257#[derive(Debug, thiserror::Error)]
258pub enum AuthenticationError {
259    #[error("invalid credentials")]
260    InvalidCredentials,
261    #[error("role is not allowed to login")]
262    NonLogin,
263    #[error("role does not exist")]
264    RoleNotFound,
265    #[error("password is required")]
266    PasswordRequired,
267}
268
269impl AdapterError {
270    pub fn into_response(self, severity: Severity) -> ErrorResponse {
271        ErrorResponse {
272            severity,
273            code: self.code(),
274            message: self.to_string(),
275            detail: self.detail(),
276            hint: self.hint(),
277            position: self.position(),
278        }
279    }
280
281    pub fn position(&self) -> Option<usize> {
282        match self {
283            AdapterError::ParseError(err) => Some(err.error.pos),
284            _ => None,
285        }
286    }
287
288    /// Reports additional details about the error, if any are available.
289    pub fn detail(&self) -> Option<String> {
290        match self {
291            AdapterError::AmbiguousSystemColumnReference => {
292                Some("This is a current limitation in Materialize".into())
293            },
294            AdapterError::Catalog(c) => c.detail(),
295            AdapterError::Eval(e) => e.detail(),
296            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
297                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
298                relations
299                    .iter()
300                    .map(|r| r.quoted().to_string())
301                    .collect::<Vec<_>>()
302                    .join("\n"),
303                match names.is_empty() {
304                    true => "No relations are available.".to_string(),
305                    false => format!(
306                        "Only the following relations are available:\n{}",
307                        names
308                            .iter()
309                            .map(|name| name.quoted().to_string())
310                            .collect::<Vec<_>>()
311                            .join("\n")
312                    ),
313                }
314            )),
315            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
316                "Either specify the cluster that will maintain this object via IN CLUSTER or \
317                specify size via SIZE option."
318                    .into(),
319            ),
320            AdapterError::SafeModeViolation(_) => Some(
321                "The Materialize server you are connected to is running in \
322                 safe mode, which limits the features that are available."
323                    .into(),
324            ),
325            AdapterError::IntrospectionDisabled { log_names }
326            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
327                "The query references the following log sources:\n    {}",
328                log_names.join("\n    "),
329            )),
330            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
331                "The object depends on the following log sources:\n    {}",
332                log_names.join("\n    "),
333            )),
334            AdapterError::PlanError(e) => e.detail(),
335            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
336            AdapterError::DependentObject(dependent_objects) => {
337                Some(dependent_objects
338                    .iter()
339                    .map(|(role_name, err_msgs)| err_msgs
340                        .iter()
341                        .map(|err_msg| format!("{role_name}: {err_msg}"))
342                        .join("\n"))
343                    .join("\n"))
344            },
345            AdapterError::Storage(storage_error) => {
346                storage_error.source().map(|source_error| source_error.to_string_with_causes())
347            }
348            AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
349            AdapterError::InvalidAlter(_, e) => e.detail(),
350            AdapterError::Optimizer(e) => e.detail(),
351            AdapterError::ConnectionValidation(e) => e.detail(),
352            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
353                Some(format!(
354                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
355                    view is {}.",
356                    last_refresh,
357                    earliest_possible,
358                ))
359            }
360            AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
361                format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
362            ),
363            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
364                Some(format!(
365                    "The requested REFRESH AT time is {}, \
366                    but not all input collections are readable earlier than [{}].",
367                    oracle_read_ts,
368                    if least_valid_read.len() == 1 {
369                        format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
370                    } else {
371                        // This can't occur currently
372                        format!("{:?}", least_valid_read)
373                    }
374                ))
375            }
376            AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
377            AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
378            AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
379            AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
380            _ => None,
381        }
382    }
383
384    /// Reports a hint for the user about how the error could be fixed.
385    pub fn hint(&self) -> Option<String> {
386        match self {
387            AdapterError::AmbiguousSystemColumnReference => Some(
388                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
389                convert all NATURAL JOINs to USING joins."
390                    .to_string(),
391            ),
392            AdapterError::Catalog(c) => c.hint(),
393            AdapterError::Eval(e) => e.hint(),
394            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
395                Some(if expected.is_empty() {
396                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
397                } else {
398                    format!("Valid availability zones are: {}", expected.join(", "))
399                })
400            }
401            AdapterError::InvalidStorageClusterSize { expected, .. } => {
402                Some(format!("Valid sizes are: {}", expected.join(", ")))
403            }
404            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
405                "Try choosing one of the smaller sizes to start. Available sizes: {}",
406                expected.join(", ")
407            )),
408            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
409                Some(if *is_managed {
410                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
411                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
412                } else {
413                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
414                })
415            }
416            AdapterError::UntargetedLogRead { .. } => Some(
417                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
418                 active cluster. Note that subsequent queries will only be answered by \
419                 the selected replica, which might reduce availability. To undo the replica \
420                 selection, use `RESET cluster_replica`."
421                    .into(),
422            ),
423            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
424                "Drop an existing {resource_type} or contact support to request a limit increase."
425            )),
426            AdapterError::StatementTimeout => Some(
427                "Consider increasing the maximum allowed statement duration for this session by \
428                 setting the statement_timeout session variable. For example, `SET \
429                 statement_timeout = '120s'`."
430                    .into(),
431            ),
432            AdapterError::PlanError(e) => e.hint(),
433            AdapterError::UnallowedOnCluster { cluster, .. } => {
434                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
435                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
436                    .to_string()
437                )
438            }
439            AdapterError::InvalidAlter(_, e) => e.hint(),
440            AdapterError::Optimizer(e) => e.hint(),
441            AdapterError::ConnectionValidation(e) => e.hint(),
442            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
443                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
444                 either at the explicitly specified timestamp, or now if the given timestamp would \
445                 be in the past.".to_string()
446            ),
447            AdapterError::AlterClusterTimeout => Some(
448                "Consider increasing the timeout duration in the alter cluster statement.".into(),
449            ),
450            AdapterError::DDLTransactionRace => Some(
451                "Currently, DDL transactions fail when any other DDL happens concurrently, \
452                 even on unrelated schemas/clusters.".into()
453            ),
454            _ => None,
455        }
456    }
457
458    pub fn code(&self) -> SqlState {
459        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
460        // those errors that are truly internal errors. At the moment we have
461        // a various classes of uncategorized errors that use this error code
462        // inappropriately.
463        match self {
464            // DATA_EXCEPTION to match what Postgres returns for degenerate
465            // range bounds
466            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
467            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
468            AdapterError::Catalog(e) => match &e.kind {
469                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
470                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
471                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
472                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
473                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
474                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
475                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
476                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
477                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
478                },
479                _ => SqlState::INTERNAL_ERROR,
480            },
481            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
482            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
483            AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
484                SqlState::PROGRAM_LIMIT_EXCEEDED
485            }
486            AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
487                SqlState::PROGRAM_LIMIT_EXCEEDED
488            }
489            AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
490            AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
491                SqlState::PROGRAM_LIMIT_EXCEEDED
492            }
493            AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
494            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
495            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
496            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
497            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
498            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
499            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
500            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
501            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
502            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
503            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
504            AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
505            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
506            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
507            AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
508            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
509            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
510            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
511            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
512            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
513            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
514                SqlState::DUPLICATE_COLUMN
515            }
516            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
517                SqlState::UNDEFINED_PARAMETER
518            }
519            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
520                SqlState::UNDEFINED_PARAMETER
521            }
522            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
523            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
524            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
525            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
526            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
527            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
528            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
529            AdapterError::Canceled => SqlState::QUERY_CANCELED,
530            AdapterError::IdleInTransactionSessionTimeout => {
531                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
532            }
533            AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
534            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
535            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
536            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
537            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
538            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
539            AdapterError::Optimizer(e) => match e {
540                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
541                    SqlState::INVALID_SCHEMA_NAME
542                }
543                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
544                    SqlState::DUPLICATE_COLUMN
545                }
546                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
547                    SqlState::UNDEFINED_PARAMETER
548                }
549                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
550                    SqlState::UNDEFINED_PARAMETER
551                }
552                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
553                OptimizerError::RecursionLimitError(e) => {
554                    AdapterError::RecursionLimit(e.clone()).code() // Delegate to outer
555                }
556                OptimizerError::Internal(s) => {
557                    AdapterError::Internal(s.clone()).code() // Delegate to outer
558                }
559                OptimizerError::EvalError(e) => {
560                    AdapterError::Eval(e.clone()).code() // Delegate to outer
561                }
562                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
563                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
564                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
565                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
566                // This should be handled by peek optimization, so it's an internal error if it
567                // reaches the user.
568                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
569            },
570            AdapterError::UnallowedOnCluster { .. } => {
571                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
572            }
573            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
574            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
575            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
576            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
577            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
578            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
579            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
580            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
581            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
582            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
583            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
584            AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
585            // It's not immediately clear which error code to use here because a
586            // "write-only transaction", "single table write transaction", or "ddl only
587            // transaction" are not things in Postgres. This error code is the generic "bad txn
588            // thing" code, so it's probably the best choice.
589            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
590            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
591            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
592                SqlState::INTERNAL_ERROR
593            }
594            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
595            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
596            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
597            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
598            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
599            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
600            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
601            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
602            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
603            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
604            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
605            // In read-only mode all transactions are implicitly read-only
606            // transactions.
607            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
608            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
609            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
610            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
611                SqlState::INVALID_PASSWORD
612            }
613            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
614        }
615    }
616
617    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
618        AdapterError::Internal(format!("{context}: {e}"))
619    }
620
621    // We don't want the following error conversions to `ConcurrentDependencyDrop` to happen
622    // automatically, because it might depend on the context whether `ConcurrentDependencyDrop`
623    // is appropriate, so we want to make the conversion target explicit at the call site.
624    // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
625    // in which case `ConcurrentDependencyDrop` would not be appropriate.
626    pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
627        AdapterError::ConcurrentDependencyDrop {
628            dependency_kind: "cluster",
629            dependency_id: e.0.to_string(),
630        }
631    }
632    pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
633        AdapterError::ConcurrentDependencyDrop {
634            dependency_kind: "collection",
635            dependency_id: e.0.to_string(),
636        }
637    }
638
639    pub fn concurrent_dependency_drop_from_collection_lookup_error(
640        e: CollectionLookupError,
641        compute_instance: ComputeInstanceId,
642    ) -> Self {
643        match e {
644            CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
645                dependency_kind: "cluster",
646                dependency_id: id.to_string(),
647            },
648            CollectionLookupError::CollectionMissing(id) => {
649                AdapterError::ConcurrentDependencyDrop {
650                    dependency_kind: "collection",
651                    dependency_id: id.to_string(),
652                }
653            }
654            CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
655                dependency_kind: "cluster",
656                dependency_id: compute_instance.to_string(),
657            },
658        }
659    }
660
661    pub fn concurrent_dependency_drop_from_peek_error(
662        e: PeekError,
663        compute_instance: ComputeInstanceId,
664    ) -> AdapterError {
665        match e {
666            PeekError::ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
667                dependency_kind: "replica",
668                dependency_id: id.to_string(),
669            },
670            PeekError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
671                dependency_kind: "cluster",
672                dependency_id: compute_instance.to_string(),
673            },
674            e @ PeekError::ReadHoldIdMismatch(_) => AdapterError::internal("peek error", e),
675            e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e),
676        }
677    }
678
679    pub fn concurrent_dependency_drop_from_dataflow_creation_error(
680        e: compute_error::DataflowCreationError,
681    ) -> Self {
682        use compute_error::DataflowCreationError::*;
683        match e {
684            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
685                dependency_kind: "cluster",
686                dependency_id: id.to_string(),
687            },
688            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
689                dependency_kind: "collection",
690                dependency_id: id.to_string(),
691            },
692            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
693                dependency_kind: "replica",
694                dependency_id: id.to_string(),
695            },
696            MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
697                AdapterError::internal("dataflow creation error", e)
698            }
699        }
700    }
701}
702
703impl fmt::Display for AdapterError {
704    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
705        match self {
706            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
707                assert!(up_to < as_of);
708                write!(
709                    f,
710                    r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
711                    up_to, as_of
712                )
713            }
714            AdapterError::AmbiguousSystemColumnReference => {
715                write!(
716                    f,
717                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
718                    system objects"
719                )
720            }
721            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
722            AdapterError::Catalog(e) => e.fmt(f),
723            AdapterError::DuplicateCursor(name) => {
724                write!(f, "cursor {} already exists", name.quoted())
725            }
726            AdapterError::Eval(e) => e.fmt(f),
727            AdapterError::Explain(e) => e.fmt(f),
728            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
729            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
730            AdapterError::IntrospectionDisabled { .. } => write!(
731                f,
732                "cannot read log sources of replica with disabled introspection"
733            ),
734            AdapterError::InvalidLogDependency { object_type, .. } => {
735                write!(f, "{object_type} objects cannot depend on log sources")
736            }
737            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
738                write!(f, "unknown cluster replica availability zone {az}",)
739            }
740            AdapterError::InvalidSetIsolationLevel => write!(
741                f,
742                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
743            ),
744            AdapterError::InvalidSetCluster => {
745                write!(f, "SET cluster cannot be called in an active transaction")
746            }
747            AdapterError::InvalidStorageClusterSize { size, .. } => {
748                write!(f, "unknown source size {size}")
749            }
750            AdapterError::SourceOrSinkSizeRequired { .. } => {
751                write!(f, "must specify either cluster or size option")
752            }
753            AdapterError::InvalidTableMutationSelection => {
754                f.write_str("invalid selection: operation may only refer to user-defined tables")
755            }
756            AdapterError::ConstraintViolation(not_null_violation) => {
757                write!(f, "{}", not_null_violation)
758            }
759            AdapterError::ConcurrentClusterDrop => {
760                write!(f, "the transaction's active cluster has been dropped")
761            }
762            AdapterError::ConcurrentDependencyDrop {
763                dependency_kind,
764                dependency_id,
765            } => {
766                write!(f, "{dependency_kind} '{dependency_id}' was dropped")
767            }
768            AdapterError::NoClusterReplicasAvailable { name, .. } => {
769                write!(
770                    f,
771                    "CLUSTER {} has no replicas available to service request",
772                    name.quoted()
773                )
774            }
775            AdapterError::OperationProhibitsTransaction(op) => {
776                write!(f, "{} cannot be run inside a transaction block", op)
777            }
778            AdapterError::OperationRequiresTransaction(op) => {
779                write!(f, "{} can only be used in transaction blocks", op)
780            }
781            AdapterError::ParseError(e) => e.fmt(f),
782            AdapterError::PlanError(e) => e.fmt(f),
783            AdapterError::PreparedStatementExists(name) => {
784                write!(f, "prepared statement {} already exists", name.quoted())
785            }
786            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
787            AdapterError::SingleStatementTransaction => {
788                f.write_str("this transaction can only execute a single statement")
789            }
790            AdapterError::ReadWriteUnavailable => {
791                f.write_str("transaction read-write mode must be set before any query")
792            }
793            AdapterError::WrongSetOfLocks => {
794                write!(f, "internal error, wrong set of locks acquired")
795            }
796            AdapterError::StatementTimeout => {
797                write!(f, "canceling statement due to statement timeout")
798            }
799            AdapterError::Canceled => {
800                write!(f, "canceling statement due to user request")
801            }
802            AdapterError::IdleInTransactionSessionTimeout => {
803                write!(
804                    f,
805                    "terminating connection due to idle-in-transaction timeout"
806                )
807            }
808            AdapterError::RecursionLimit(e) => e.fmt(f),
809            AdapterError::RelationOutsideTimeDomain { .. } => {
810                write!(
811                    f,
812                    "Transactions can only reference objects in the same timedomain. \
813                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
814                )
815            }
816            AdapterError::ResourceExhaustion {
817                resource_type,
818                limit_name,
819                desired,
820                limit,
821                current,
822            } => {
823                write!(
824                    f,
825                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
826                )
827            }
828            AdapterError::ResultSize(e) => write!(f, "{e}"),
829            AdapterError::SafeModeViolation(feature) => {
830                write!(f, "cannot create {} in safe mode", feature)
831            }
832            AdapterError::SubscribeOnlyTransaction => {
833                f.write_str("SUBSCRIBE in transactions must be the only read statement")
834            }
835            AdapterError::Optimizer(e) => e.fmt(f),
836            AdapterError::UnallowedOnCluster {
837                depends_on,
838                cluster,
839            } => {
840                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
841                write!(
842                    f,
843                    "querying the following items {items} is not allowed from the {} cluster",
844                    cluster.quoted()
845                )
846            }
847            AdapterError::Unauthorized(unauthorized) => {
848                write!(f, "{unauthorized}")
849            }
850            AdapterError::UnknownCursor(name) => {
851                write!(f, "cursor {} does not exist", name.quoted())
852            }
853            AdapterError::UnknownLoginRole(name) => {
854                write!(f, "role {} does not exist", name.quoted())
855            }
856            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
857            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
858            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
859            AdapterError::UnknownPreparedStatement(name) => {
860                write!(f, "prepared statement {} does not exist", name.quoted())
861            }
862            AdapterError::UnknownClusterReplica {
863                cluster_name,
864                replica_name,
865            } => write!(
866                f,
867                "cluster replica '{cluster_name}.{replica_name}' does not exist"
868            ),
869            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
870                f,
871                "unrecognized configuration parameter {}",
872                setting_name.quoted()
873            ),
874            AdapterError::UntargetedLogRead { .. } => {
875                f.write_str("log source reads must target a replica")
876            }
877            AdapterError::DDLOnlyTransaction => f.write_str(
878                "transactions which modify objects are restricted to just modifying objects",
879            ),
880            AdapterError::DDLTransactionRace => f.write_str(
881                "another session modified the catalog while this DDL transaction was open",
882            ),
883            AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
884            AdapterError::Storage(e) => e.fmt(f),
885            AdapterError::Compute(e) => e.fmt(f),
886            AdapterError::Orchestrator(e) => e.fmt(f),
887            AdapterError::DependentObject(dependent_objects) => {
888                let role_str = if dependent_objects.keys().count() == 1 {
889                    "role"
890                } else {
891                    "roles"
892                };
893                write!(
894                    f,
895                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
896                    dependent_objects.keys().join(", ")
897                )
898            }
899            AdapterError::InvalidAlter(t, e) => {
900                write!(f, "invalid ALTER {t}: {e}")
901            }
902            AdapterError::ConnectionValidation(e) => e.fmt(f),
903            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
904                write!(
905                    f,
906                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
907                    would never happen"
908                )
909            }
910            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
911                write!(
912                    f,
913                    "REFRESH AT requested for a time where not all the inputs are readable"
914                )
915            }
916            AdapterError::RtrTimeout(_) => {
917                write!(
918                    f,
919                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
920                )
921            }
922            AdapterError::RtrDropFailure(_) => write!(
923                f,
924                "real-time source dropped before ingesting the upstream system's visible frontier"
925            ),
926            AdapterError::UnreadableSinkCollection => {
927                write!(f, "collection is not readable at any time")
928            }
929            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
930            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
931            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
932            AdapterError::AlterClusterTimeout => {
933                write!(f, "canceling statement, provided timeout lapsed")
934            }
935            AdapterError::AuthenticationError(e) => {
936                write!(f, "authentication error {e}")
937            }
938            AdapterError::UnavailableFeature { feature, docs } => {
939                write!(f, "{} is not supported in this environment.", feature)?;
940                if let Some(docs) = docs {
941                    write!(
942                        f,
943                        " For more information consult the documentation at {docs}"
944                    )?;
945                }
946                Ok(())
947            }
948            AdapterError::AlterClusterWhilePendingReplicas => {
949                write!(f, "cannot alter clusters with pending updates")
950            }
951        }
952    }
953}
954
955impl From<anyhow::Error> for AdapterError {
956    fn from(e: anyhow::Error) -> AdapterError {
957        match e.downcast::<PlanError>() {
958            Ok(plan_error) => AdapterError::PlanError(plan_error),
959            Err(e) => AdapterError::Unstructured(e),
960        }
961    }
962}
963
964impl From<TryFromIntError> for AdapterError {
965    fn from(e: TryFromIntError) -> AdapterError {
966        AdapterError::Unstructured(e.into())
967    }
968}
969
970impl From<TryFromDecimalError> for AdapterError {
971    fn from(e: TryFromDecimalError) -> AdapterError {
972        AdapterError::Unstructured(e.into())
973    }
974}
975
976impl From<mz_catalog::memory::error::Error> for AdapterError {
977    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
978        AdapterError::Catalog(e)
979    }
980}
981
982impl From<mz_catalog::durable::CatalogError> for AdapterError {
983    fn from(e: mz_catalog::durable::CatalogError) -> Self {
984        mz_catalog::memory::error::Error::from(e).into()
985    }
986}
987
988impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
989    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
990        mz_catalog::durable::CatalogError::from(e).into()
991    }
992}
993
994impl From<EvalError> for AdapterError {
995    fn from(e: EvalError) -> AdapterError {
996        AdapterError::Eval(e)
997    }
998}
999
1000impl From<ExplainError> for AdapterError {
1001    fn from(e: ExplainError) -> AdapterError {
1002        match e {
1003            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1004            e => AdapterError::Explain(e),
1005        }
1006    }
1007}
1008
1009impl From<mz_sql::catalog::CatalogError> for AdapterError {
1010    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1011        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1012    }
1013}
1014
1015impl From<PlanError> for AdapterError {
1016    fn from(e: PlanError) -> AdapterError {
1017        match e {
1018            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1019            _ => AdapterError::PlanError(e),
1020        }
1021    }
1022}
1023
1024impl From<OptimizerError> for AdapterError {
1025    fn from(e: OptimizerError) -> AdapterError {
1026        use OptimizerError::*;
1027        match e {
1028            PlanError(e) => Self::PlanError(e),
1029            RecursionLimitError(e) => Self::RecursionLimit(e),
1030            EvalError(e) => Self::Eval(e),
1031            InternalUnsafeMfpPlan(e) => Self::Internal(e),
1032            Internal(e) => Self::Internal(e),
1033            e => Self::Optimizer(e),
1034        }
1035    }
1036}
1037
1038impl From<NotNullViolation> for AdapterError {
1039    fn from(e: NotNullViolation) -> AdapterError {
1040        AdapterError::ConstraintViolation(e)
1041    }
1042}
1043
1044impl From<RecursionLimitError> for AdapterError {
1045    fn from(e: RecursionLimitError) -> AdapterError {
1046        AdapterError::RecursionLimit(e)
1047    }
1048}
1049
1050impl From<oneshot::error::RecvError> for AdapterError {
1051    fn from(e: oneshot::error::RecvError) -> AdapterError {
1052        AdapterError::Unstructured(e.into())
1053    }
1054}
1055
1056impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1057    fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1058        AdapterError::Storage(e)
1059    }
1060}
1061
1062impl From<compute_error::InstanceExists> for AdapterError {
1063    fn from(e: compute_error::InstanceExists) -> Self {
1064        AdapterError::Compute(e.into())
1065    }
1066}
1067
1068impl From<TimestampError> for AdapterError {
1069    fn from(e: TimestampError) -> Self {
1070        let e: EvalError = e.into();
1071        e.into()
1072    }
1073}
1074
1075impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1076    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1077        AdapterError::ParseError(e)
1078    }
1079}
1080
1081impl From<VarError> for AdapterError {
1082    fn from(e: VarError) -> Self {
1083        let e: mz_catalog::memory::error::Error = e.into();
1084        e.into()
1085    }
1086}
1087
1088impl From<rbac::UnauthorizedError> for AdapterError {
1089    fn from(e: rbac::UnauthorizedError) -> Self {
1090        AdapterError::Unauthorized(e)
1091    }
1092}
1093
1094impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1095    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1096        AdapterError::PlanError(PlanError::InvalidIdent(value))
1097    }
1098}
1099
1100impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1101    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1102        match value {
1103            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1104                AdapterError::ResourceExhaustion {
1105                    resource_type: "connection".into(),
1106                    limit_name: "max_connections".into(),
1107                    desired: (current + 1).to_string(),
1108                    limit: limit.to_string(),
1109                    current: current.to_string(),
1110                }
1111            }
1112        }
1113    }
1114}
1115
1116impl From<NetworkPolicyError> for AdapterError {
1117    fn from(value: NetworkPolicyError) -> Self {
1118        AdapterError::NetworkPolicyDenied(value)
1119    }
1120}
1121
1122impl From<ConnectionValidationError> for AdapterError {
1123    fn from(e: ConnectionValidationError) -> AdapterError {
1124        AdapterError::ConnectionValidation(e)
1125    }
1126}
1127
1128impl Error for AdapterError {}