Skip to main content

mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
20use mz_compute_types::ComputeInstanceId;
21use mz_expr::EvalError;
22use mz_ore::error::ErrorExt;
23use mz_ore::stack::RecursionLimitError;
24use mz_ore::str::StrExt;
25use mz_pgwire_common::{ErrorResponse, Severity};
26use mz_repr::adt::timestamp::TimestampError;
27use mz_repr::explain::ExplainError;
28use mz_repr::{ColumnDiff, ColumnName, KeyDiff, NotNullViolation, RelationDescDiff, Timestamp};
29use mz_sql::plan::PlanError;
30use mz_sql::rbac;
31use mz_sql::session::vars::VarError;
32use mz_storage_types::connections::ConnectionValidationError;
33use mz_storage_types::controller::StorageError;
34use mz_storage_types::errors::CollectionMissing;
35use smallvec::SmallVec;
36use timely::progress::Antichain;
37use tokio::sync::oneshot;
38use tokio_postgres::error::SqlState;
39
40use crate::coord::NetworkPolicyError;
41use crate::optimize::OptimizerError;
42
43/// Errors that can occur in the coordinator.
44#[derive(Debug)]
45pub enum AdapterError {
46    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
47    AbsurdSubscribeBounds {
48        as_of: mz_repr::Timestamp,
49        up_to: mz_repr::Timestamp,
50    },
51    /// Attempted to use a potentially ambiguous column reference expression with a system table.
52    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
53    // resolved because it prevents us from adding columns to system tables.
54    AmbiguousSystemColumnReference,
55    /// An error occurred in a catalog operation.
56    Catalog(mz_catalog::memory::error::Error),
57    /// 1. The cached plan or descriptor changed,
58    /// 2. or some dependency of a statement disappeared during sequencing.
59    /// TODO(ggevay): we should refactor 2. usages to use `ConcurrentDependencyDrop` instead
60    /// (e.g., in MV sequencing)
61    ChangedPlan(String),
62    /// The cursor already exists.
63    DuplicateCursor(String),
64    /// An error while evaluating an expression.
65    Eval(EvalError),
66    /// An error occurred while planning the statement.
67    Explain(ExplainError),
68    /// The ID allocator exhausted all valid IDs.
69    IdExhaustionError,
70    /// Unexpected internal state was encountered.
71    Internal(String),
72    /// Attempted to read from log sources of a replica with disabled introspection.
73    IntrospectionDisabled {
74        log_names: Vec<String>,
75    },
76    /// Attempted to create an object dependent on log sources that doesn't support
77    /// log dependencies.
78    InvalidLogDependency {
79        object_type: String,
80        log_names: Vec<String>,
81    },
82    /// No such cluster replica size has been configured.
83    InvalidClusterReplicaAz {
84        az: String,
85        expected: Vec<String>,
86    },
87    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
88    InvalidSetIsolationLevel,
89    /// SET cluster was called in the middle of a transaction.
90    InvalidSetCluster,
91    /// No such storage instance size has been configured.
92    InvalidStorageClusterSize {
93        size: String,
94        expected: Vec<String>,
95    },
96    /// Creating a source or sink without specifying its size is forbidden.
97    SourceOrSinkSizeRequired {
98        expected: Vec<String>,
99    },
100    /// The selection value for a table mutation operation refers to an invalid object.
101    InvalidTableMutationSelection {
102        /// The full name of the problematic object (e.g. a source or source-export table).
103        object_name: String,
104        /// Human-readable type of the object (e.g. "source", "source-export table").
105        object_type: String,
106    },
107    /// Expression violated a column's constraint
108    ConstraintViolation(NotNullViolation),
109    /// Transaction cluster was dropped in the middle of a transaction.
110    ConcurrentClusterDrop,
111    /// A dependency was dropped while sequencing a statement.
112    ConcurrentDependencyDrop {
113        dependency_kind: &'static str,
114        dependency_id: String,
115    },
116    CollectionUnreadable {
117        id: String,
118    },
119    /// Target cluster has no replicas to service query.
120    NoClusterReplicasAvailable {
121        name: String,
122        is_managed: bool,
123    },
124    /// The named operation cannot be run in a transaction.
125    OperationProhibitsTransaction(String),
126    /// The named operation requires an active transaction.
127    OperationRequiresTransaction(String),
128    /// An error occurred while planning the statement.
129    PlanError(PlanError),
130    /// The named prepared statement already exists.
131    PreparedStatementExists(String),
132    /// Wrapper around parsing error
133    ParseError(mz_sql_parser::parser::ParserStatementError),
134    /// The transaction is in read-only mode.
135    ReadOnlyTransaction,
136    /// The transaction in in read-only mode and a read already occurred.
137    ReadWriteUnavailable,
138    /// The recursion limit of some operation was exceeded.
139    RecursionLimit(RecursionLimitError),
140    /// A query in a transaction referenced a relation outside the first query's
141    /// time domain.
142    RelationOutsideTimeDomain {
143        relations: Vec<String>,
144        names: Vec<String>,
145    },
146    /// A query tried to create more resources than is allowed in the system configuration.
147    ResourceExhaustion {
148        resource_type: String,
149        limit_name: String,
150        desired: String,
151        limit: String,
152        current: String,
153    },
154    /// Result size of a query is too large.
155    ResultSize(String),
156    /// The specified feature is not permitted in safe mode.
157    SafeModeViolation(String),
158    /// The current transaction had the wrong set of write locks.
159    WrongSetOfLocks,
160    /// Waiting on a query timed out.
161    ///
162    /// Note this differs slightly from PG's implementation/semantics.
163    StatementTimeout,
164    /// The user canceled the query
165    Canceled,
166    /// An idle session in a transaction has timed out.
167    IdleInTransactionSessionTimeout,
168    /// The transaction is in single-subscribe mode.
169    SubscribeOnlyTransaction,
170    /// An error occurred in the optimizer.
171    Optimizer(OptimizerError),
172    /// A query depends on items which are not allowed to be referenced from the current cluster.
173    UnallowedOnCluster {
174        depends_on: SmallVec<[String; 2]>,
175        cluster: String,
176    },
177    /// A user tried to perform an action that they were unauthorized to do.
178    Unauthorized(rbac::UnauthorizedError),
179    /// The named cursor does not exist.
180    UnknownCursor(String),
181    /// The named role does not exist.
182    UnknownLoginRole(String),
183    UnknownPreparedStatement(String),
184    /// The named cluster replica does not exist.
185    UnknownClusterReplica {
186        cluster_name: String,
187        replica_name: String,
188    },
189    /// The named setting does not exist.
190    UnrecognizedConfigurationParam(String),
191    /// A generic error occurred.
192    //
193    // TODO(benesch): convert all those errors to structured errors.
194    Unstructured(anyhow::Error),
195    /// The named feature is not supported and will (probably) not be.
196    Unsupported(&'static str),
197    /// Some feature isn't available for a (potentially opaque) reason.
198    /// For example, in cloud Self-Managed auth features aren't available,
199    /// but we don't want to mention self managed auth.
200    UnavailableFeature {
201        feature: String,
202        docs: Option<String>,
203    },
204    /// Attempted to read from log sources without selecting a target replica.
205    UntargetedLogRead {
206        log_names: Vec<String>,
207    },
208    /// The transaction is in write-only mode.
209    WriteOnlyTransaction,
210    /// The transaction can only execute a single statement.
211    SingleStatementTransaction,
212    /// The transaction can only execute simple DDL.
213    DDLOnlyTransaction,
214    /// Another session modified the Catalog while this transaction was open.
215    DDLTransactionRace,
216    /// Used to prevent us from durably committing state while a DDL transaction is open, should
217    /// never be returned to the user.
218    TransactionDryRun {
219        /// New operations that were run in the transaction.
220        new_ops: Vec<crate::catalog::Op>,
221        /// New resulting `CatalogState`.
222        new_state: crate::catalog::CatalogState,
223    },
224    /// An error occurred in the storage layer
225    Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
226    /// An error occurred in the compute layer
227    Compute(anyhow::Error),
228    /// An error in the orchestrator layer
229    Orchestrator(anyhow::Error),
230    /// A statement tried to drop a role that had dependent objects.
231    ///
232    /// The map keys are role names and values are detailed error messages.
233    DependentObject(BTreeMap<String, Vec<String>>),
234    /// When performing an `ALTER` of some variety, re-planning the statement
235    /// errored.
236    InvalidAlter(&'static str, PlanError),
237    /// An error occurred while validating a connection.
238    ConnectionValidation(ConnectionValidationError),
239    /// We refuse to create the materialized view, because it would never be refreshed, so it would
240    /// never be queryable. This can happen when the only specified refreshes are further back in
241    /// the past than the initial compaction window of the materialized view.
242    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
243    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
244    /// but was unable to get a precise read hold.
245    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
246    /// A humanized version of [`StorageError::RtrTimeout`].
247    RtrTimeout(String),
248    /// A humanized version of [`StorageError::RtrDropFailure`].
249    RtrDropFailure(String),
250    /// The collection requested to be sinked cannot be read at any timestamp
251    UnreadableSinkCollection,
252    /// User sessions have been blocked.
253    UserSessionsDisallowed,
254    /// This use session has been deneid by a NetworkPolicy.
255    NetworkPolicyDenied(NetworkPolicyError),
256    /// Something attempted a write (to catalog, storage, tables, etc.) while in
257    /// read-only mode.
258    ReadOnly,
259    AlterClusterTimeout,
260    AlterClusterWhilePendingReplicas,
261    AuthenticationError(AuthenticationError),
262    /// Schema of a replacement is incompatible with the target.
263    ReplacementSchemaMismatch(RelationDescDiff),
264    /// Attempt to apply a replacement to a sealed materialized view.
265    ReplaceMaterializedViewSealed {
266        name: String,
267    },
268}
269
270#[derive(Debug, thiserror::Error)]
271pub enum AuthenticationError {
272    #[error("invalid credentials")]
273    InvalidCredentials,
274    #[error("role is not allowed to login")]
275    NonLogin,
276    #[error("role does not exist")]
277    RoleNotFound,
278    #[error("password is required")]
279    PasswordRequired,
280}
281
282impl AdapterError {
283    pub fn into_response(self, severity: Severity) -> ErrorResponse {
284        ErrorResponse {
285            severity,
286            code: self.code(),
287            message: self.to_string(),
288            detail: self.detail(),
289            hint: self.hint(),
290            position: self.position(),
291        }
292    }
293
294    pub fn position(&self) -> Option<usize> {
295        match self {
296            AdapterError::ParseError(err) => Some(err.error.pos),
297            _ => None,
298        }
299    }
300
301    /// Reports additional details about the error, if any are available.
302    pub fn detail(&self) -> Option<String> {
303        match self {
304            AdapterError::AmbiguousSystemColumnReference => {
305                Some("This is a current limitation in Materialize".into())
306            },
307            AdapterError::Catalog(c) => c.detail(),
308            AdapterError::Eval(e) => e.detail(),
309            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
310                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
311                relations
312                    .iter()
313                    .map(|r| r.quoted().to_string())
314                    .collect::<Vec<_>>()
315                    .join("\n"),
316                match names.is_empty() {
317                    true => "No relations are available.".to_string(),
318                    false => format!(
319                        "Only the following relations are available:\n{}",
320                        names
321                            .iter()
322                            .map(|name| name.quoted().to_string())
323                            .collect::<Vec<_>>()
324                            .join("\n")
325                    ),
326                }
327            )),
328            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
329                "Either specify the cluster that will maintain this object via IN CLUSTER or \
330                specify size via SIZE option."
331                    .into(),
332            ),
333             AdapterError::InvalidTableMutationSelection {
334                object_name,
335                object_type,
336            } => Some(
337                    format!(
338                    "{object_type} '{}' may not be used in this operation; \
339                     the selection may refer to views and materialized views, but transitive \
340                     dependencies must not include sources or source-export tables",
341                    object_name.quoted()
342                    )
343            ),
344            AdapterError::SafeModeViolation(_) => Some(
345                "The Materialize server you are connected to is running in \
346                 safe mode, which limits the features that are available."
347                    .into(),
348            ),
349            AdapterError::IntrospectionDisabled { log_names }
350            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
351                "The query references the following log sources:\n    {}",
352                log_names.join("\n    "),
353            )),
354            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
355                "The object depends on the following log sources:\n    {}",
356                log_names.join("\n    "),
357            )),
358            AdapterError::PlanError(e) => e.detail(),
359            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
360            AdapterError::DependentObject(dependent_objects) => {
361                Some(dependent_objects
362                    .iter()
363                    .map(|(role_name, err_msgs)| err_msgs
364                        .iter()
365                        .map(|err_msg| format!("{role_name}: {err_msg}"))
366                        .join("\n"))
367                    .join("\n"))
368            },
369            AdapterError::Storage(storage_error) => {
370                storage_error.source().map(|source_error| source_error.to_string_with_causes())
371            }
372            AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
373            AdapterError::InvalidAlter(_, e) => e.detail(),
374            AdapterError::Optimizer(e) => e.detail(),
375            AdapterError::ConnectionValidation(e) => e.detail(),
376            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
377                Some(format!(
378                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
379                    view is {}.",
380                    last_refresh,
381                    earliest_possible,
382                ))
383            }
384            AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
385                format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
386            ),
387            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
388                Some(format!(
389                    "The requested REFRESH AT time is {}, \
390                    but not all input collections are readable earlier than [{}].",
391                    oracle_read_ts,
392                    if least_valid_read.len() == 1 {
393                        format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
394                    } else {
395                        // This can't occur currently
396                        format!("{:?}", least_valid_read)
397                    }
398                ))
399            }
400            AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
401            AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
402            AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
403            AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
404            AdapterError::ReplacementSchemaMismatch(diff) => {
405                let mut lines: Vec<_> = diff.column_diffs.iter().map(|(idx, diff)| {
406                    let pos = idx + 1;
407                    match diff {
408                        ColumnDiff::Missing { name } => {
409                            let name = name.as_str().quoted();
410                            format!("missing column {name} at position {pos}")
411                        }
412                        ColumnDiff::Extra { name } => {
413                            let name = name.as_str().quoted();
414                            format!("extra column {name} at position {pos}")
415                        }
416                        ColumnDiff::TypeMismatch { name, left, right } => {
417                            let name = name.as_str().quoted();
418                            format!("column {name} at position {pos}: type mismatch (target: {left:?}, replacement: {right:?})")
419                        }
420                        ColumnDiff::NullabilityMismatch { name, left, right } => {
421                            let name = name.as_str().quoted();
422                            let left = if *left { "NULL" } else { "NOT NULL" };
423                            let right = if *right { "NULL" } else { "NOT NULL" };
424                            format!("column {name} at position {pos}: nullability mismatch (target: {left}, replacement: {right})")
425                        }
426                        ColumnDiff::NameMismatch { left, right } => {
427                            let left = left.as_str().quoted();
428                            let right = right.as_str().quoted();
429                            format!("column at position {pos}: name mismatch (target: {left}, replacement: {right})")
430                        }
431                    }
432                }).collect();
433
434                if let Some(KeyDiff { left, right }) = &diff.key_diff {
435                    let format_keys = |keys: &BTreeSet<Vec<ColumnName>>| {
436                        if keys.is_empty() {
437                            "(none)".to_string()
438                        } else {
439                           keys.iter()
440                                .map(|key| {
441                                    let cols = key.iter().map(|c| c.as_str()).join(", ");
442                                    format!("{{{cols}}}")
443                                })
444                                .join(", ")
445                        }
446                    };
447                    lines.push(format!(
448                        "keys differ (target: {}, replacement: {})",
449                        format_keys(left),
450                        format_keys(right)
451                    ));
452                }
453                Some(lines.join("\n"))
454            }
455            AdapterError::ReplaceMaterializedViewSealed { .. } => Some(
456                "The materialized view has already computed its output until the end of time, \
457                 so replacing its definition would have no effect."
458                .into(),
459            ),
460            _ => None,
461        }
462    }
463
464    /// Reports a hint for the user about how the error could be fixed.
465    pub fn hint(&self) -> Option<String> {
466        match self {
467            AdapterError::AmbiguousSystemColumnReference => Some(
468                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
469                convert all NATURAL JOINs to USING joins."
470                    .to_string(),
471            ),
472            AdapterError::Catalog(c) => c.hint(),
473            AdapterError::Eval(e) => e.hint(),
474            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
475                Some(if expected.is_empty() {
476                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
477                } else {
478                    format!("Valid availability zones are: {}", expected.join(", "))
479                })
480            }
481            AdapterError::InvalidStorageClusterSize { expected, .. } => {
482                Some(format!("Valid sizes are: {}", expected.join(", ")))
483            }
484            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
485                "Try choosing one of the smaller sizes to start. Available sizes: {}",
486                expected.join(", ")
487            )),
488            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
489                Some(if *is_managed {
490                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
491                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
492                } else {
493                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
494                })
495            }
496            AdapterError::UntargetedLogRead { .. } => Some(
497                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
498                 active cluster. Note that subsequent queries will only be answered by \
499                 the selected replica, which might reduce availability. To undo the replica \
500                 selection, use `RESET cluster_replica`."
501                    .into(),
502            ),
503            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
504                "Drop an existing {resource_type} or contact support to request a limit increase."
505            )),
506            AdapterError::StatementTimeout => Some(
507                "Consider increasing the maximum allowed statement duration for this session by \
508                 setting the statement_timeout session variable. For example, `SET \
509                 statement_timeout = '120s'`."
510                    .into(),
511            ),
512            AdapterError::PlanError(e) => e.hint(),
513            AdapterError::UnallowedOnCluster { cluster, .. } => {
514                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
515                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
516                    .to_string()
517                )
518            }
519            AdapterError::InvalidAlter(_, e) => e.hint(),
520            AdapterError::Optimizer(e) => e.hint(),
521            AdapterError::ConnectionValidation(e) => e.hint(),
522            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
523                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
524                 either at the explicitly specified timestamp, or now if the given timestamp would \
525                 be in the past.".to_string()
526            ),
527            AdapterError::AlterClusterTimeout => Some(
528                "Consider increasing the timeout duration in the alter cluster statement.".into(),
529            ),
530            AdapterError::DDLTransactionRace => Some(
531                "Currently, DDL transactions fail when any other DDL happens concurrently, \
532                 even on unrelated schemas/clusters.".into()
533            ),
534            AdapterError::CollectionUnreadable { .. } => Some(
535                "This could be because the collection has recently been dropped.".into()
536            ),
537            _ => None,
538        }
539    }
540
541    pub fn code(&self) -> SqlState {
542        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
543        // those errors that are truly internal errors. At the moment we have
544        // a various classes of uncategorized errors that use this error code
545        // inappropriately.
546        match self {
547            // DATA_EXCEPTION to match what Postgres returns for degenerate
548            // range bounds
549            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
550            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
551            AdapterError::Catalog(e) => match &e.kind {
552                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
553                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
554                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
555                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
556                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
557                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
558                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
559                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
560                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
561                },
562                _ => SqlState::INTERNAL_ERROR,
563            },
564            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
565            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
566            AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
567                SqlState::PROGRAM_LIMIT_EXCEEDED
568            }
569            AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
570                SqlState::PROGRAM_LIMIT_EXCEEDED
571            }
572            AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
573            AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
574                SqlState::PROGRAM_LIMIT_EXCEEDED
575            }
576            AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
577            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
578            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
579            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
580            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
581            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
582            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
583            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
584            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
585            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
586            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
587            AdapterError::InvalidTableMutationSelection { .. } => {
588                SqlState::INVALID_TRANSACTION_STATE
589            }
590            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
591            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
592            AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
593            AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
594            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
595            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
596            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
597            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
598            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
599            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
600                SqlState::DUPLICATE_COLUMN
601            }
602            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
603                SqlState::UNDEFINED_PARAMETER
604            }
605            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
606                SqlState::UNDEFINED_PARAMETER
607            }
608            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
609            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
610            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
611            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
612            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
613            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
614            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
615            AdapterError::Canceled => SqlState::QUERY_CANCELED,
616            AdapterError::IdleInTransactionSessionTimeout => {
617                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
618            }
619            AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
620            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
621            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
622            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
623            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
624            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
625            AdapterError::Optimizer(e) => match e {
626                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
627                    SqlState::INVALID_SCHEMA_NAME
628                }
629                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
630                    SqlState::DUPLICATE_COLUMN
631                }
632                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
633                    SqlState::UNDEFINED_PARAMETER
634                }
635                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
636                    SqlState::UNDEFINED_PARAMETER
637                }
638                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
639                OptimizerError::RecursionLimitError(e) => {
640                    AdapterError::RecursionLimit(e.clone()).code() // Delegate to outer
641                }
642                OptimizerError::Internal(s) => {
643                    AdapterError::Internal(s.clone()).code() // Delegate to outer
644                }
645                OptimizerError::EvalError(e) => {
646                    AdapterError::Eval(e.clone()).code() // Delegate to outer
647                }
648                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
649                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
650                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
651                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
652                // This should be handled by peek optimization, so it's an internal error if it
653                // reaches the user.
654                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
655            },
656            AdapterError::UnallowedOnCluster { .. } => {
657                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
658            }
659            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
660            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
661            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
662            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
663            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
664            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
665            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
666            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
667            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
668            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
669            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
670            AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
671            // It's not immediately clear which error code to use here because a
672            // "write-only transaction", "single table write transaction", or "ddl only
673            // transaction" are not things in Postgres. This error code is the generic "bad txn
674            // thing" code, so it's probably the best choice.
675            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
676            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
677            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
678                SqlState::INTERNAL_ERROR
679            }
680            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
681            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
682            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
683            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
684            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
685            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
686            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
687            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
688            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
689            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
690            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
691            // In read-only mode all transactions are implicitly read-only
692            // transactions.
693            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
694            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
695            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
696            AdapterError::ReplacementSchemaMismatch(_) => SqlState::FEATURE_NOT_SUPPORTED,
697            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
698                SqlState::INVALID_PASSWORD
699            }
700            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
701            AdapterError::ReplaceMaterializedViewSealed { .. } => {
702                SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE
703            }
704        }
705    }
706
707    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
708        AdapterError::Internal(format!("{context}: {e}"))
709    }
710
711    // We don't want the following error conversions to `ConcurrentDependencyDrop` to happen
712    // automatically, because it might depend on the context whether `ConcurrentDependencyDrop`
713    // is appropriate, so we want to make the conversion target explicit at the call site.
714    // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
715    // in which case `ConcurrentDependencyDrop` would not be appropriate.
716    pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
717        AdapterError::ConcurrentDependencyDrop {
718            dependency_kind: "cluster",
719            dependency_id: e.0.to_string(),
720        }
721    }
722    pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
723        AdapterError::ConcurrentDependencyDrop {
724            dependency_kind: "collection",
725            dependency_id: e.0.to_string(),
726        }
727    }
728
729    pub fn concurrent_dependency_drop_from_collection_lookup_error(
730        e: CollectionLookupError,
731        compute_instance: ComputeInstanceId,
732    ) -> Self {
733        match e {
734            CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
735                dependency_kind: "cluster",
736                dependency_id: id.to_string(),
737            },
738            CollectionLookupError::CollectionMissing(id) => {
739                AdapterError::ConcurrentDependencyDrop {
740                    dependency_kind: "collection",
741                    dependency_id: id.to_string(),
742                }
743            }
744            CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
745                dependency_kind: "cluster",
746                dependency_id: compute_instance.to_string(),
747            },
748        }
749    }
750
751    pub fn concurrent_dependency_drop_from_instance_peek_error(
752        e: mz_compute_client::controller::instance::PeekError,
753        compute_instance: ComputeInstanceId,
754    ) -> AdapterError {
755        use mz_compute_client::controller::instance::PeekError::*;
756        match e {
757            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
758                dependency_kind: "replica",
759                dependency_id: id.to_string(),
760            },
761            InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
762                dependency_kind: "cluster",
763                dependency_id: compute_instance.to_string(),
764            },
765            e @ ReadHoldIdMismatch(_) => AdapterError::internal("instance peek error", e),
766            e @ ReadHoldInsufficient(_) => AdapterError::internal("instance peek error", e),
767        }
768    }
769
770    pub fn concurrent_dependency_drop_from_peek_error(
771        e: mz_compute_client::controller::error::PeekError,
772    ) -> AdapterError {
773        use mz_compute_client::controller::error::PeekError::*;
774        match e {
775            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
776                dependency_kind: "cluster",
777                dependency_id: id.to_string(),
778            },
779            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
780                dependency_kind: "collection",
781                dependency_id: id.to_string(),
782            },
783            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
784                dependency_kind: "replica",
785                dependency_id: id.to_string(),
786            },
787            e @ SinceViolation(_) => AdapterError::internal("peek error", e),
788        }
789    }
790
791    pub fn concurrent_dependency_drop_from_dataflow_creation_error(
792        e: compute_error::DataflowCreationError,
793    ) -> Self {
794        use compute_error::DataflowCreationError::*;
795        match e {
796            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
797                dependency_kind: "cluster",
798                dependency_id: id.to_string(),
799            },
800            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
801                dependency_kind: "collection",
802                dependency_id: id.to_string(),
803            },
804            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
805                dependency_kind: "replica",
806                dependency_id: id.to_string(),
807            },
808            MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
809                AdapterError::internal("dataflow creation error", e)
810            }
811        }
812    }
813}
814
815impl fmt::Display for AdapterError {
816    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
817        match self {
818            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
819                write!(
820                    f,
821                    "subscription lower bound (`AS OF`) is greater than its upper bound (`UP TO`): \
822                     {as_of} > {up_to}",
823                )
824            }
825            AdapterError::AmbiguousSystemColumnReference => {
826                write!(
827                    f,
828                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
829                    system objects"
830                )
831            }
832            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
833            AdapterError::Catalog(e) => e.fmt(f),
834            AdapterError::DuplicateCursor(name) => {
835                write!(f, "cursor {} already exists", name.quoted())
836            }
837            AdapterError::Eval(e) => e.fmt(f),
838            AdapterError::Explain(e) => e.fmt(f),
839            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
840            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
841            AdapterError::IntrospectionDisabled { .. } => write!(
842                f,
843                "cannot read log sources of replica with disabled introspection"
844            ),
845            AdapterError::InvalidLogDependency { object_type, .. } => {
846                write!(f, "{object_type} objects cannot depend on log sources")
847            }
848            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
849                write!(f, "unknown cluster replica availability zone {az}",)
850            }
851            AdapterError::InvalidSetIsolationLevel => write!(
852                f,
853                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
854            ),
855            AdapterError::InvalidSetCluster => {
856                write!(f, "SET cluster cannot be called in an active transaction")
857            }
858            AdapterError::InvalidStorageClusterSize { size, .. } => {
859                write!(f, "unknown source size {size}")
860            }
861            AdapterError::SourceOrSinkSizeRequired { .. } => {
862                write!(f, "must specify either cluster or size option")
863            }
864            AdapterError::InvalidTableMutationSelection { .. } => {
865                write!(
866                    f,
867                    "invalid selection: operation may only (transitively) refer to non-source, non-system tables"
868                )
869            }
870            AdapterError::ReplaceMaterializedViewSealed { name } => {
871                write!(
872                    f,
873                    "materialized view {name} is sealed and thus cannot be replaced"
874                )
875            }
876            AdapterError::ConstraintViolation(not_null_violation) => {
877                write!(f, "{}", not_null_violation)
878            }
879            AdapterError::ConcurrentClusterDrop => {
880                write!(f, "the transaction's active cluster has been dropped")
881            }
882            AdapterError::ConcurrentDependencyDrop {
883                dependency_kind,
884                dependency_id,
885            } => {
886                write!(f, "{dependency_kind} '{dependency_id}' was dropped")
887            }
888            AdapterError::CollectionUnreadable { id } => {
889                write!(f, "collection '{id}' is not readable at any timestamp")
890            }
891            AdapterError::NoClusterReplicasAvailable { name, .. } => {
892                write!(
893                    f,
894                    "CLUSTER {} has no replicas available to service request",
895                    name.quoted()
896                )
897            }
898            AdapterError::OperationProhibitsTransaction(op) => {
899                write!(f, "{} cannot be run inside a transaction block", op)
900            }
901            AdapterError::OperationRequiresTransaction(op) => {
902                write!(f, "{} can only be used in transaction blocks", op)
903            }
904            AdapterError::ParseError(e) => e.fmt(f),
905            AdapterError::PlanError(e) => e.fmt(f),
906            AdapterError::PreparedStatementExists(name) => {
907                write!(f, "prepared statement {} already exists", name.quoted())
908            }
909            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
910            AdapterError::SingleStatementTransaction => {
911                f.write_str("this transaction can only execute a single statement")
912            }
913            AdapterError::ReadWriteUnavailable => {
914                f.write_str("transaction read-write mode must be set before any query")
915            }
916            AdapterError::WrongSetOfLocks => {
917                write!(f, "internal error, wrong set of locks acquired")
918            }
919            AdapterError::StatementTimeout => {
920                write!(f, "canceling statement due to statement timeout")
921            }
922            AdapterError::Canceled => {
923                write!(f, "canceling statement due to user request")
924            }
925            AdapterError::IdleInTransactionSessionTimeout => {
926                write!(
927                    f,
928                    "terminating connection due to idle-in-transaction timeout"
929                )
930            }
931            AdapterError::RecursionLimit(e) => e.fmt(f),
932            AdapterError::RelationOutsideTimeDomain { .. } => {
933                write!(
934                    f,
935                    "Transactions can only reference objects in the same timedomain. \
936                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
937                )
938            }
939            AdapterError::ResourceExhaustion {
940                resource_type,
941                limit_name,
942                desired,
943                limit,
944                current,
945            } => {
946                write!(
947                    f,
948                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
949                )
950            }
951            AdapterError::ResultSize(e) => write!(f, "{e}"),
952            AdapterError::SafeModeViolation(feature) => {
953                write!(f, "cannot create {} in safe mode", feature)
954            }
955            AdapterError::SubscribeOnlyTransaction => {
956                f.write_str("SUBSCRIBE in transactions must be the only read statement")
957            }
958            AdapterError::Optimizer(e) => e.fmt(f),
959            AdapterError::UnallowedOnCluster {
960                depends_on,
961                cluster,
962            } => {
963                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
964                write!(
965                    f,
966                    "querying the following items {items} is not allowed from the {} cluster",
967                    cluster.quoted()
968                )
969            }
970            AdapterError::Unauthorized(unauthorized) => {
971                write!(f, "{unauthorized}")
972            }
973            AdapterError::UnknownCursor(name) => {
974                write!(f, "cursor {} does not exist", name.quoted())
975            }
976            AdapterError::UnknownLoginRole(name) => {
977                write!(f, "role {} does not exist", name.quoted())
978            }
979            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
980            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
981            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
982            AdapterError::UnknownPreparedStatement(name) => {
983                write!(f, "prepared statement {} does not exist", name.quoted())
984            }
985            AdapterError::UnknownClusterReplica {
986                cluster_name,
987                replica_name,
988            } => write!(
989                f,
990                "cluster replica '{cluster_name}.{replica_name}' does not exist"
991            ),
992            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
993                f,
994                "unrecognized configuration parameter {}",
995                setting_name.quoted()
996            ),
997            AdapterError::UntargetedLogRead { .. } => {
998                f.write_str("log source reads must target a replica")
999            }
1000            AdapterError::DDLOnlyTransaction => f.write_str(
1001                "transactions which modify objects are restricted to just modifying objects",
1002            ),
1003            AdapterError::DDLTransactionRace => f.write_str(
1004                "another session modified the catalog while this DDL transaction was open",
1005            ),
1006            AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
1007            AdapterError::Storage(e) => e.fmt(f),
1008            AdapterError::Compute(e) => e.fmt(f),
1009            AdapterError::Orchestrator(e) => e.fmt(f),
1010            AdapterError::DependentObject(dependent_objects) => {
1011                let role_str = if dependent_objects.keys().count() == 1 {
1012                    "role"
1013                } else {
1014                    "roles"
1015                };
1016                write!(
1017                    f,
1018                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
1019                    dependent_objects.keys().join(", ")
1020                )
1021            }
1022            AdapterError::InvalidAlter(t, e) => {
1023                write!(f, "invalid ALTER {t}: {e}")
1024            }
1025            AdapterError::ConnectionValidation(e) => e.fmt(f),
1026            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
1027                write!(
1028                    f,
1029                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
1030                    would never happen"
1031                )
1032            }
1033            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
1034                write!(
1035                    f,
1036                    "REFRESH AT requested for a time where not all the inputs are readable"
1037                )
1038            }
1039            AdapterError::RtrTimeout(_) => {
1040                write!(
1041                    f,
1042                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
1043                )
1044            }
1045            AdapterError::RtrDropFailure(_) => write!(
1046                f,
1047                "real-time source dropped before ingesting the upstream system's visible frontier"
1048            ),
1049            AdapterError::UnreadableSinkCollection => {
1050                write!(f, "collection is not readable at any time")
1051            }
1052            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
1053            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
1054            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
1055            AdapterError::AlterClusterTimeout => {
1056                write!(f, "canceling statement, provided timeout lapsed")
1057            }
1058            AdapterError::AuthenticationError(e) => {
1059                write!(f, "authentication error {e}")
1060            }
1061            AdapterError::UnavailableFeature { feature, docs } => {
1062                write!(f, "{} is not supported in this environment.", feature)?;
1063                if let Some(docs) = docs {
1064                    write!(
1065                        f,
1066                        " For more information consult the documentation at {docs}"
1067                    )?;
1068                }
1069                Ok(())
1070            }
1071            AdapterError::AlterClusterWhilePendingReplicas => {
1072                write!(f, "cannot alter clusters with pending updates")
1073            }
1074            AdapterError::ReplacementSchemaMismatch(_) => {
1075                write!(f, "replacement schema differs from target schema")
1076            }
1077        }
1078    }
1079}
1080
1081impl From<anyhow::Error> for AdapterError {
1082    fn from(e: anyhow::Error) -> AdapterError {
1083        match e.downcast::<PlanError>() {
1084            Ok(plan_error) => AdapterError::PlanError(plan_error),
1085            Err(e) => AdapterError::Unstructured(e),
1086        }
1087    }
1088}
1089
1090impl From<TryFromIntError> for AdapterError {
1091    fn from(e: TryFromIntError) -> AdapterError {
1092        AdapterError::Unstructured(e.into())
1093    }
1094}
1095
1096impl From<TryFromDecimalError> for AdapterError {
1097    fn from(e: TryFromDecimalError) -> AdapterError {
1098        AdapterError::Unstructured(e.into())
1099    }
1100}
1101
1102impl From<mz_catalog::memory::error::Error> for AdapterError {
1103    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
1104        AdapterError::Catalog(e)
1105    }
1106}
1107
1108impl From<mz_catalog::durable::CatalogError> for AdapterError {
1109    fn from(e: mz_catalog::durable::CatalogError) -> Self {
1110        mz_catalog::memory::error::Error::from(e).into()
1111    }
1112}
1113
1114impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
1115    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1116        mz_catalog::durable::CatalogError::from(e).into()
1117    }
1118}
1119
1120impl From<EvalError> for AdapterError {
1121    fn from(e: EvalError) -> AdapterError {
1122        AdapterError::Eval(e)
1123    }
1124}
1125
1126impl From<ExplainError> for AdapterError {
1127    fn from(e: ExplainError) -> AdapterError {
1128        match e {
1129            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1130            e => AdapterError::Explain(e),
1131        }
1132    }
1133}
1134
1135impl From<mz_sql::catalog::CatalogError> for AdapterError {
1136    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1137        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1138    }
1139}
1140
1141impl From<PlanError> for AdapterError {
1142    fn from(e: PlanError) -> AdapterError {
1143        match e {
1144            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1145            _ => AdapterError::PlanError(e),
1146        }
1147    }
1148}
1149
1150impl From<OptimizerError> for AdapterError {
1151    fn from(e: OptimizerError) -> AdapterError {
1152        use OptimizerError::*;
1153        match e {
1154            PlanError(e) => Self::PlanError(e),
1155            RecursionLimitError(e) => Self::RecursionLimit(e),
1156            EvalError(e) => Self::Eval(e),
1157            InternalUnsafeMfpPlan(e) => Self::Internal(e),
1158            Internal(e) => Self::Internal(e),
1159            e => Self::Optimizer(e),
1160        }
1161    }
1162}
1163
1164impl From<NotNullViolation> for AdapterError {
1165    fn from(e: NotNullViolation) -> AdapterError {
1166        AdapterError::ConstraintViolation(e)
1167    }
1168}
1169
1170impl From<RecursionLimitError> for AdapterError {
1171    fn from(e: RecursionLimitError) -> AdapterError {
1172        AdapterError::RecursionLimit(e)
1173    }
1174}
1175
1176impl From<oneshot::error::RecvError> for AdapterError {
1177    fn from(e: oneshot::error::RecvError) -> AdapterError {
1178        AdapterError::Unstructured(e.into())
1179    }
1180}
1181
1182impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1183    fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1184        AdapterError::Storage(e)
1185    }
1186}
1187
1188impl From<compute_error::InstanceExists> for AdapterError {
1189    fn from(e: compute_error::InstanceExists) -> Self {
1190        AdapterError::Compute(e.into())
1191    }
1192}
1193
1194impl From<TimestampError> for AdapterError {
1195    fn from(e: TimestampError) -> Self {
1196        let e: EvalError = e.into();
1197        e.into()
1198    }
1199}
1200
1201impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1202    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1203        AdapterError::ParseError(e)
1204    }
1205}
1206
1207impl From<VarError> for AdapterError {
1208    fn from(e: VarError) -> Self {
1209        let e: mz_catalog::memory::error::Error = e.into();
1210        e.into()
1211    }
1212}
1213
1214impl From<rbac::UnauthorizedError> for AdapterError {
1215    fn from(e: rbac::UnauthorizedError) -> Self {
1216        AdapterError::Unauthorized(e)
1217    }
1218}
1219
1220impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1221    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1222        AdapterError::PlanError(PlanError::InvalidIdent(value))
1223    }
1224}
1225
1226impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1227    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1228        match value {
1229            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1230                AdapterError::ResourceExhaustion {
1231                    resource_type: "connection".into(),
1232                    limit_name: "max_connections".into(),
1233                    desired: (current + 1).to_string(),
1234                    limit: limit.to_string(),
1235                    current: current.to_string(),
1236                }
1237            }
1238        }
1239    }
1240}
1241
1242impl From<NetworkPolicyError> for AdapterError {
1243    fn from(value: NetworkPolicyError) -> Self {
1244        AdapterError::NetworkPolicyDenied(value)
1245    }
1246}
1247
1248impl From<ConnectionValidationError> for AdapterError {
1249    fn from(e: ConnectionValidationError) -> AdapterError {
1250        AdapterError::ConnectionValidation(e)
1251    }
1252}
1253
1254impl Error for AdapterError {}