Skip to main content

mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::InstanceMissing;
20
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::timestamp::TimestampError;
28use mz_repr::explain::ExplainError;
29use mz_repr::{ColumnDiff, ColumnName, KeyDiff, NotNullViolation, RelationDescDiff, Timestamp};
30use mz_sql::plan::PlanError;
31use mz_sql::rbac;
32use mz_sql::session::vars::VarError;
33use mz_storage_types::connections::ConnectionValidationError;
34use mz_storage_types::controller::StorageError;
35use mz_storage_types::errors::CollectionMissing;
36use smallvec::SmallVec;
37use timely::progress::Antichain;
38use tokio::sync::oneshot;
39use tokio_postgres::error::SqlState;
40
41use crate::coord::NetworkPolicyError;
42use crate::optimize::OptimizerError;
43use crate::peek_client::CollectionLookupError;
44
45/// Errors that can occur in the coordinator.
46#[derive(Debug)]
47pub enum AdapterError {
48    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
49    AbsurdSubscribeBounds {
50        as_of: mz_repr::Timestamp,
51        up_to: mz_repr::Timestamp,
52    },
53    /// Attempted to use a potentially ambiguous column reference expression with a system table.
54    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
55    // resolved because it prevents us from adding columns to system tables.
56    AmbiguousSystemColumnReference,
57    /// An error occurred in a catalog operation.
58    Catalog(mz_catalog::memory::error::Error),
59    /// 1. The cached plan or descriptor changed,
60    /// 2. or some dependency of a statement disappeared during sequencing.
61    /// TODO(ggevay): we should refactor 2. usages to use `ConcurrentDependencyDrop` instead
62    /// (e.g., in MV sequencing)
63    ChangedPlan(String),
64    /// The cursor already exists.
65    DuplicateCursor(String),
66    /// An error while evaluating an expression.
67    Eval(EvalError),
68    /// An error occurred while planning the statement.
69    Explain(ExplainError),
70    /// The ID allocator exhausted all valid IDs.
71    IdExhaustionError,
72    /// Unexpected internal state was encountered.
73    Internal(String),
74    /// Attempted to read from log sources of a replica with disabled introspection.
75    IntrospectionDisabled {
76        log_names: Vec<String>,
77    },
78    /// Attempted to create an object dependent on log sources that doesn't support
79    /// log dependencies.
80    InvalidLogDependency {
81        object_type: String,
82        log_names: Vec<String>,
83    },
84    /// No such cluster replica size has been configured.
85    InvalidClusterReplicaAz {
86        az: String,
87        expected: Vec<String>,
88    },
89    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
90    InvalidSetIsolationLevel,
91    /// SET cluster was called in the middle of a transaction.
92    InvalidSetCluster,
93    /// No such storage instance size has been configured.
94    InvalidStorageClusterSize {
95        size: String,
96        expected: Vec<String>,
97    },
98    /// Creating a source or sink without specifying its size is forbidden.
99    SourceOrSinkSizeRequired {
100        expected: Vec<String>,
101    },
102    /// The selection value for a table mutation operation refers to an invalid object.
103    InvalidTableMutationSelection {
104        /// The full name of the problematic object (e.g. a source or source-export table).
105        object_name: String,
106        /// Human-readable type of the object (e.g. "source", "source-export table").
107        object_type: String,
108    },
109    /// Expression violated a column's constraint
110    ConstraintViolation(NotNullViolation),
111    /// An error occurred while decoding COPY data.
112    CopyFormatError(String),
113    /// Transaction cluster was dropped in the middle of a transaction.
114    ConcurrentClusterDrop,
115    /// A dependency was dropped while sequencing a statement.
116    ConcurrentDependencyDrop {
117        dependency_kind: &'static str,
118        dependency_id: String,
119    },
120    CollectionUnreadable {
121        id: String,
122    },
123    /// Target cluster has no replicas to service query.
124    NoClusterReplicasAvailable {
125        name: String,
126        is_managed: bool,
127    },
128    /// The named operation cannot be run in a transaction.
129    OperationProhibitsTransaction(String),
130    /// The named operation requires an active transaction.
131    OperationRequiresTransaction(String),
132    /// An error occurred while planning the statement.
133    PlanError(PlanError),
134    /// The named prepared statement already exists.
135    PreparedStatementExists(String),
136    /// Wrapper around parsing error
137    ParseError(mz_sql_parser::parser::ParserStatementError),
138    /// The transaction is in read-only mode.
139    ReadOnlyTransaction,
140    /// The transaction in in read-only mode and a read already occurred.
141    ReadWriteUnavailable,
142    /// The recursion limit of some operation was exceeded.
143    RecursionLimit(RecursionLimitError),
144    /// A query in a transaction referenced a relation outside the first query's
145    /// time domain.
146    RelationOutsideTimeDomain {
147        relations: Vec<String>,
148        names: Vec<String>,
149    },
150    /// A query tried to create more resources than is allowed in the system configuration.
151    ResourceExhaustion {
152        resource_type: String,
153        limit_name: String,
154        desired: String,
155        limit: String,
156        current: String,
157    },
158    /// Result size of a query is too large.
159    ResultSize(String),
160    /// The specified feature is not permitted in safe mode.
161    SafeModeViolation(String),
162    /// The current transaction had the wrong set of write locks.
163    WrongSetOfLocks,
164    /// Waiting on a query timed out.
165    ///
166    /// Note this differs slightly from PG's implementation/semantics.
167    StatementTimeout,
168    /// The user canceled the query
169    Canceled,
170    /// An idle session in a transaction has timed out.
171    IdleInTransactionSessionTimeout,
172    /// The transaction is in single-subscribe mode.
173    SubscribeOnlyTransaction,
174    /// An error occurred in the optimizer.
175    Optimizer(OptimizerError),
176    /// A query depends on items which are not allowed to be referenced from the current cluster.
177    UnallowedOnCluster {
178        depends_on: SmallVec<[String; 2]>,
179        cluster: String,
180    },
181    /// A user tried to perform an action that they were unauthorized to do.
182    Unauthorized(rbac::UnauthorizedError),
183    /// The named cursor does not exist.
184    UnknownCursor(String),
185    /// The named role does not exist.
186    UnknownLoginRole(String),
187    UnknownPreparedStatement(String),
188    /// The named cluster replica does not exist.
189    UnknownClusterReplica {
190        cluster_name: String,
191        replica_name: String,
192    },
193    /// The named setting does not exist.
194    UnrecognizedConfigurationParam(String),
195    /// A generic error occurred.
196    //
197    // TODO(benesch): convert all those errors to structured errors.
198    Unstructured(anyhow::Error),
199    /// The named feature is not supported and will (probably) not be.
200    Unsupported(&'static str),
201    /// Some feature isn't available for a (potentially opaque) reason.
202    /// For example, in cloud Self-Managed auth features aren't available,
203    /// but we don't want to mention self managed auth.
204    UnavailableFeature {
205        feature: String,
206        docs: Option<String>,
207    },
208    /// Attempted to read from log sources without selecting a target replica.
209    UntargetedLogRead {
210        log_names: Vec<String>,
211    },
212    /// The transaction is in write-only mode.
213    WriteOnlyTransaction,
214    /// The transaction can only execute a single statement.
215    SingleStatementTransaction,
216    /// The transaction can only execute simple DDL.
217    DDLOnlyTransaction,
218    /// Another session modified the Catalog while this transaction was open.
219    DDLTransactionRace,
220    /// Used to prevent us from durably committing state while a DDL transaction is open, should
221    /// never be returned to the user.
222    TransactionDryRun {
223        /// New operations that were run in the transaction.
224        new_ops: Vec<crate::catalog::Op>,
225        /// New resulting `CatalogState`.
226        new_state: crate::catalog::CatalogState,
227    },
228    /// An error occurred in the storage layer
229    Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
230    /// An error occurred in the compute layer
231    Compute(anyhow::Error),
232    /// An error in the orchestrator layer
233    Orchestrator(anyhow::Error),
234    /// A statement tried to drop a role that had dependent objects.
235    ///
236    /// The map keys are role names and values are detailed error messages.
237    DependentObject(BTreeMap<String, Vec<String>>),
238    /// When performing an `ALTER` of some variety, re-planning the statement
239    /// errored.
240    InvalidAlter(&'static str, PlanError),
241    /// An error occurred while validating a connection.
242    ConnectionValidation(ConnectionValidationError),
243    /// We refuse to create the materialized view, because it would never be refreshed, so it would
244    /// never be queryable. This can happen when the only specified refreshes are further back in
245    /// the past than the initial compaction window of the materialized view.
246    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
247    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
248    /// but was unable to get a precise read hold.
249    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
250    /// A humanized version of [`StorageError::RtrTimeout`].
251    RtrTimeout(String),
252    /// A humanized version of [`StorageError::RtrDropFailure`].
253    RtrDropFailure(String),
254    /// The collection requested to be sinked cannot be read at any timestamp
255    UnreadableSinkCollection,
256    /// User sessions have been blocked.
257    UserSessionsDisallowed,
258    /// This use session has been denied by a NetworkPolicy.
259    NetworkPolicyDenied(NetworkPolicyError),
260    /// Something attempted a write (to catalog, storage, tables, etc.) while in
261    /// read-only mode.
262    ReadOnly,
263    AlterClusterTimeout,
264    AlterClusterWhilePendingReplicas,
265    AuthenticationError(AuthenticationError),
266    /// Schema of a replacement is incompatible with the target.
267    ReplacementSchemaMismatch(RelationDescDiff),
268    /// Attempt to apply a replacement to a sealed materialized view.
269    ReplaceMaterializedViewSealed {
270        name: String,
271    },
272    /// Could not find a valid timestamp satisfying all constraints.
273    ImpossibleTimestampConstraints {
274        constraints: String,
275    },
276}
277
278#[derive(Debug, thiserror::Error)]
279pub enum AuthenticationError {
280    #[error("invalid credentials")]
281    InvalidCredentials,
282    #[error("role is not allowed to login")]
283    NonLogin,
284    #[error("role does not exist")]
285    RoleNotFound,
286    #[error("password is required")]
287    PasswordRequired,
288}
289
290impl AdapterError {
291    pub fn into_response(self, severity: Severity) -> ErrorResponse {
292        ErrorResponse {
293            severity,
294            code: self.code(),
295            message: self.to_string(),
296            detail: self.detail(),
297            hint: self.hint(),
298            position: self.position(),
299        }
300    }
301
302    pub fn position(&self) -> Option<usize> {
303        match self {
304            AdapterError::ParseError(err) => Some(err.error.pos),
305            _ => None,
306        }
307    }
308
309    /// Reports additional details about the error, if any are available.
310    pub fn detail(&self) -> Option<String> {
311        match self {
312            AdapterError::AmbiguousSystemColumnReference => {
313                Some("This is a current limitation in Materialize".into())
314            }
315            AdapterError::Catalog(c) => c.detail(),
316            AdapterError::Eval(e) => e.detail(),
317            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
318                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
319                relations
320                    .iter()
321                    .map(|r| r.quoted().to_string())
322                    .collect::<Vec<_>>()
323                    .join("\n"),
324                match names.is_empty() {
325                    true => "No relations are available.".to_string(),
326                    false => format!(
327                        "Only the following relations are available:\n{}",
328                        names
329                            .iter()
330                            .map(|name| name.quoted().to_string())
331                            .collect::<Vec<_>>()
332                            .join("\n")
333                    ),
334                }
335            )),
336            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
337                "Either specify the cluster that will maintain this object via IN CLUSTER or \
338                specify size via SIZE option."
339                    .into(),
340            ),
341            AdapterError::InvalidTableMutationSelection {
342                object_name,
343                object_type,
344            } => Some(format!(
345                "{object_type} '{}' may not be used in this operation; \
346                     the selection may refer to views and materialized views, but transitive \
347                     dependencies must not include sources or source-export tables",
348                object_name.quoted()
349            )),
350            AdapterError::SafeModeViolation(_) => Some(
351                "The Materialize server you are connected to is running in \
352                 safe mode, which limits the features that are available."
353                    .into(),
354            ),
355            AdapterError::IntrospectionDisabled { log_names }
356            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
357                "The query references the following log sources:\n    {}",
358                log_names.join("\n    "),
359            )),
360            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
361                "The object depends on the following log sources:\n    {}",
362                log_names.join("\n    "),
363            )),
364            AdapterError::PlanError(e) => e.detail(),
365            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
366            AdapterError::DependentObject(dependent_objects) => Some(
367                dependent_objects
368                    .iter()
369                    .map(|(role_name, err_msgs)| {
370                        err_msgs
371                            .iter()
372                            .map(|err_msg| format!("{role_name}: {err_msg}"))
373                            .join("\n")
374                    })
375                    .join("\n"),
376            ),
377            AdapterError::Storage(storage_error) => storage_error
378                .source()
379                .map(|source_error| source_error.to_string_with_causes()),
380            AdapterError::ReadOnlyTransaction => Some(
381                "SELECT queries cannot be combined with other query types, including SUBSCRIBE."
382                    .into(),
383            ),
384            AdapterError::InvalidAlter(_, e) => e.detail(),
385            AdapterError::Optimizer(e) => e.detail(),
386            AdapterError::ConnectionValidation(e) => e.detail(),
387            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
388                Some(format!(
389                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
390                    view is {}.",
391                    last_refresh, earliest_possible,
392                ))
393            }
394            AdapterError::UnallowedOnCluster { cluster, .. } => {
395                (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(|| {
396                    format!(
397                        "The transaction is executing on the \
398                        {cluster} cluster, maybe having been routed \
399                        there by the first statement in the transaction."
400                    )
401                })
402            }
403            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
404                Some(format!(
405                    "The requested REFRESH AT time is {}, \
406                    but not all input collections are readable earlier than [{}].",
407                    oracle_read_ts,
408                    if least_valid_read.len() == 1 {
409                        format!(
410                            "{}",
411                            least_valid_read
412                                .as_option()
413                                .expect("antichain contains exactly 1 timestamp")
414                        )
415                    } else {
416                        // This can't occur currently
417                        format!("{:?}", least_valid_read)
418                    }
419                ))
420            }
421            AdapterError::RtrTimeout(name) => Some(format!(
422                "{name} failed to ingest data up to the real-time recency point"
423            )),
424            AdapterError::RtrDropFailure(name) => Some(format!(
425                "{name} dropped before ingesting data to the real-time recency point"
426            )),
427            AdapterError::UserSessionsDisallowed => {
428                Some("Your organization has been blocked. Please contact support.".to_string())
429            }
430            AdapterError::NetworkPolicyDenied(reason) => Some(format!("{reason}.")),
431            AdapterError::ReplacementSchemaMismatch(diff) => {
432                let mut lines: Vec<_> = diff.column_diffs.iter().map(|(idx, diff)| {
433                    let pos = idx + 1;
434                    match diff {
435                        ColumnDiff::Missing { name } => {
436                            let name = name.as_str().quoted();
437                            format!("missing column {name} at position {pos}")
438                        }
439                        ColumnDiff::Extra { name } => {
440                            let name = name.as_str().quoted();
441                            format!("extra column {name} at position {pos}")
442                        }
443                        ColumnDiff::TypeMismatch { name, left, right } => {
444                            let name = name.as_str().quoted();
445                            format!("column {name} at position {pos}: type mismatch (target: {left:?}, replacement: {right:?})")
446                        }
447                        ColumnDiff::NullabilityMismatch { name, left, right } => {
448                            let name = name.as_str().quoted();
449                            let left = if *left { "NULL" } else { "NOT NULL" };
450                            let right = if *right { "NULL" } else { "NOT NULL" };
451                            format!("column {name} at position {pos}: nullability mismatch (target: {left}, replacement: {right})")
452                        }
453                        ColumnDiff::NameMismatch { left, right } => {
454                            let left = left.as_str().quoted();
455                            let right = right.as_str().quoted();
456                            format!("column at position {pos}: name mismatch (target: {left}, replacement: {right})")
457                        }
458                    }
459                }).collect();
460
461                if let Some(KeyDiff { left, right }) = &diff.key_diff {
462                    let format_keys = |keys: &BTreeSet<Vec<ColumnName>>| {
463                        if keys.is_empty() {
464                            "(none)".to_string()
465                        } else {
466                            keys.iter()
467                                .map(|key| {
468                                    let cols = key.iter().map(|c| c.as_str()).join(", ");
469                                    format!("{{{cols}}}")
470                                })
471                                .join(", ")
472                        }
473                    };
474                    lines.push(format!(
475                        "keys differ (target: {}, replacement: {})",
476                        format_keys(left),
477                        format_keys(right)
478                    ));
479                }
480                Some(lines.join("\n"))
481            }
482            AdapterError::ReplaceMaterializedViewSealed { .. } => Some(
483                "The materialized view has already computed its output until the end of time, \
484                 so replacing its definition would have no effect."
485                    .into(),
486            ),
487            AdapterError::ImpossibleTimestampConstraints { constraints } => {
488                Some(format!("Constraints:\n{}", constraints))
489            }
490            _ => None,
491        }
492    }
493
494    /// Reports a hint for the user about how the error could be fixed.
495    pub fn hint(&self) -> Option<String> {
496        match self {
497            AdapterError::AmbiguousSystemColumnReference => Some(
498                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
499                convert all NATURAL JOINs to USING joins."
500                    .to_string(),
501            ),
502            AdapterError::Catalog(c) => c.hint(),
503            AdapterError::Eval(e) => e.hint(),
504            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
505                Some(if expected.is_empty() {
506                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
507                } else {
508                    format!("Valid availability zones are: {}", expected.join(", "))
509                })
510            }
511            AdapterError::InvalidStorageClusterSize { expected, .. } => {
512                Some(format!("Valid sizes are: {}", expected.join(", ")))
513            }
514            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
515                "Try choosing one of the smaller sizes to start. Available sizes: {}",
516                expected.join(", ")
517            )),
518            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
519                Some(if *is_managed {
520                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
521                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
522                } else {
523                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
524                })
525            }
526            AdapterError::UntargetedLogRead { .. } => Some(
527                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
528                 active cluster. Note that subsequent queries will only be answered by \
529                 the selected replica, which might reduce availability. To undo the replica \
530                 selection, use `RESET cluster_replica`."
531                    .into(),
532            ),
533            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
534                "Drop an existing {resource_type} or contact support to request a limit increase."
535            )),
536            AdapterError::StatementTimeout => Some(
537                "Consider increasing the maximum allowed statement duration for this session by \
538                 setting the statement_timeout session variable. For example, `SET \
539                 statement_timeout = '120s'`."
540                    .into(),
541            ),
542            AdapterError::PlanError(e) => e.hint(),
543            AdapterError::UnallowedOnCluster { cluster, .. } => {
544                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
545                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
546                    .to_string()
547                )
548            }
549            AdapterError::InvalidAlter(_, e) => e.hint(),
550            AdapterError::Optimizer(e) => e.hint(),
551            AdapterError::ConnectionValidation(e) => e.hint(),
552            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
553                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
554                 either at the explicitly specified timestamp, or now if the given timestamp would \
555                 be in the past.".to_string()
556            ),
557            AdapterError::AlterClusterTimeout => Some(
558                "Consider increasing the timeout duration in the alter cluster statement.".into(),
559            ),
560            AdapterError::DDLTransactionRace => Some(
561                "Currently, DDL transactions fail when any other DDL happens concurrently, \
562                 even on unrelated schemas/clusters.".into()
563            ),
564            AdapterError::CollectionUnreadable { .. } => Some(
565                "This could be because the collection has recently been dropped.".into()
566            ),
567            _ => None,
568        }
569    }
570
571    pub fn code(&self) -> SqlState {
572        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
573        // those errors that are truly internal errors. At the moment we have
574        // a various classes of uncategorized errors that use this error code
575        // inappropriately.
576        match self {
577            // DATA_EXCEPTION to match what Postgres returns for degenerate
578            // range bounds
579            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
580            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
581            AdapterError::Catalog(e) => match &e.kind {
582                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
583                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
584                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
585                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
586                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
587                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
588                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
589                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
590                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
591                },
592                _ => SqlState::INTERNAL_ERROR,
593            },
594            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
595            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
596            AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
597                SqlState::PROGRAM_LIMIT_EXCEEDED
598            }
599            AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
600                SqlState::PROGRAM_LIMIT_EXCEEDED
601            }
602            AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
603            AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
604                SqlState::PROGRAM_LIMIT_EXCEEDED
605            }
606            AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
607            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
608            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
609            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
610            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
611            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
612            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
613            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
614            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
615            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
616            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
617            AdapterError::InvalidTableMutationSelection { .. } => {
618                SqlState::INVALID_TRANSACTION_STATE
619            }
620            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
621            AdapterError::CopyFormatError(_) => SqlState::BAD_COPY_FILE_FORMAT,
622            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
623            AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
624            AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
625            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
626            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
627            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
628            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
629            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
630            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
631                SqlState::DUPLICATE_COLUMN
632            }
633            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
634                SqlState::UNDEFINED_PARAMETER
635            }
636            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
637                SqlState::UNDEFINED_PARAMETER
638            }
639            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
640            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
641            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
642            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
643            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
644            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
645            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
646            AdapterError::Canceled => SqlState::QUERY_CANCELED,
647            AdapterError::IdleInTransactionSessionTimeout => {
648                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
649            }
650            AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
651            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
652            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
653            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
654            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
655            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
656            AdapterError::Optimizer(e) => match e {
657                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
658                    SqlState::INVALID_SCHEMA_NAME
659                }
660                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
661                    SqlState::DUPLICATE_COLUMN
662                }
663                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
664                    SqlState::UNDEFINED_PARAMETER
665                }
666                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
667                    SqlState::UNDEFINED_PARAMETER
668                }
669                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
670                OptimizerError::RecursionLimitError(e) => {
671                    AdapterError::RecursionLimit(e.clone()).code() // Delegate to outer
672                }
673                OptimizerError::Internal(s) => {
674                    AdapterError::Internal(s.clone()).code() // Delegate to outer
675                }
676                OptimizerError::EvalError(e) => {
677                    AdapterError::Eval(e.clone()).code() // Delegate to outer
678                }
679                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
680                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
681                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
682                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
683                // This should be handled by peek optimization, so it's an internal error if it
684                // reaches the user.
685                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
686            },
687            AdapterError::UnallowedOnCluster { .. } => {
688                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
689            }
690            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
691            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
692            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
693            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
694            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
695            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
696            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
697            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
698            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
699            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
700            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
701            AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
702            // It's not immediately clear which error code to use here because a
703            // "write-only transaction", "single table write transaction", or "ddl only
704            // transaction" are not things in Postgres. This error code is the generic "bad txn
705            // thing" code, so it's probably the best choice.
706            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
707            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
708            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
709                SqlState::INTERNAL_ERROR
710            }
711            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
712            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
713            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
714            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
715            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
716            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
717            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
718            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
719            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
720            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
721            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
722            // In read-only mode all transactions are implicitly read-only
723            // transactions.
724            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
725            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
726            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
727            AdapterError::ReplacementSchemaMismatch(_) => SqlState::FEATURE_NOT_SUPPORTED,
728            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
729                SqlState::INVALID_PASSWORD
730            }
731            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
732            AdapterError::ReplaceMaterializedViewSealed { .. } => {
733                SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE
734            }
735            // similar to AbsurdSubscribeBounds
736            AdapterError::ImpossibleTimestampConstraints { .. } => SqlState::DATA_EXCEPTION,
737        }
738    }
739
740    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
741        AdapterError::Internal(format!("{context}: {e}"))
742    }
743
744    // We don't want the following error conversions to `ConcurrentDependencyDrop` to happen
745    // automatically, because it might depend on the context whether `ConcurrentDependencyDrop`
746    // is appropriate, so we want to make the conversion target explicit at the call site.
747    // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
748    // in which case `ConcurrentDependencyDrop` would not be appropriate.
749
750    pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
751        AdapterError::ConcurrentDependencyDrop {
752            dependency_kind: "cluster",
753            dependency_id: e.0.to_string(),
754        }
755    }
756
757    pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
758        AdapterError::ConcurrentDependencyDrop {
759            dependency_kind: "collection",
760            dependency_id: e.0.to_string(),
761        }
762    }
763
764    pub fn concurrent_dependency_drop_from_collection_lookup_error(
765        e: CollectionLookupError,
766        compute_instance: ComputeInstanceId,
767    ) -> Self {
768        match e {
769            CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
770                dependency_kind: "cluster",
771                dependency_id: id.to_string(),
772            },
773            CollectionLookupError::CollectionMissing(id) => {
774                AdapterError::ConcurrentDependencyDrop {
775                    dependency_kind: "collection",
776                    dependency_id: id.to_string(),
777                }
778            }
779            CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
780                dependency_kind: "cluster",
781                dependency_id: compute_instance.to_string(),
782            },
783        }
784    }
785
786    pub fn concurrent_dependency_drop_from_watch_set_install_error(
787        e: compute_error::CollectionLookupError,
788    ) -> Self {
789        match e {
790            compute_error::CollectionLookupError::InstanceMissing(id) => {
791                AdapterError::ConcurrentDependencyDrop {
792                    dependency_kind: "cluster",
793                    dependency_id: id.to_string(),
794                }
795            }
796            compute_error::CollectionLookupError::CollectionMissing(id) => {
797                AdapterError::ConcurrentDependencyDrop {
798                    dependency_kind: "collection",
799                    dependency_id: id.to_string(),
800                }
801            }
802        }
803    }
804
805    pub fn concurrent_dependency_drop_from_instance_peek_error(
806        e: mz_compute_client::controller::instance_client::PeekError,
807        compute_instance: ComputeInstanceId,
808    ) -> AdapterError {
809        use mz_compute_client::controller::instance_client::PeekError::*;
810        match e {
811            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
812                dependency_kind: "replica",
813                dependency_id: id.to_string(),
814            },
815            InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
816                dependency_kind: "cluster",
817                dependency_id: compute_instance.to_string(),
818            },
819            e @ ReadHoldIdMismatch(_) => AdapterError::internal("instance peek error", e),
820            e @ ReadHoldInsufficient(_) => AdapterError::internal("instance peek error", e),
821        }
822    }
823
824    pub fn concurrent_dependency_drop_from_peek_error(
825        e: mz_compute_client::controller::error::PeekError,
826    ) -> AdapterError {
827        use mz_compute_client::controller::error::PeekError::*;
828        match e {
829            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
830                dependency_kind: "cluster",
831                dependency_id: id.to_string(),
832            },
833            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
834                dependency_kind: "collection",
835                dependency_id: id.to_string(),
836            },
837            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
838                dependency_kind: "replica",
839                dependency_id: id.to_string(),
840            },
841            e @ SinceViolation(_) => AdapterError::internal("peek error", e),
842        }
843    }
844
845    pub fn concurrent_dependency_drop_from_dataflow_creation_error(
846        e: compute_error::DataflowCreationError,
847    ) -> Self {
848        use compute_error::DataflowCreationError::*;
849        match e {
850            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
851                dependency_kind: "cluster",
852                dependency_id: id.to_string(),
853            },
854            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
855                dependency_kind: "collection",
856                dependency_id: id.to_string(),
857            },
858            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
859                dependency_kind: "replica",
860                dependency_id: id.to_string(),
861            },
862            MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
863                AdapterError::internal("dataflow creation error", e)
864            }
865        }
866    }
867}
868
869impl fmt::Display for AdapterError {
870    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
871        match self {
872            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
873                write!(
874                    f,
875                    "subscription lower bound (`AS OF`) is greater than its upper bound (`UP TO`): \
876                     {as_of} > {up_to}",
877                )
878            }
879            AdapterError::AmbiguousSystemColumnReference => {
880                write!(
881                    f,
882                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
883                    system objects"
884                )
885            }
886            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
887            AdapterError::Catalog(e) => e.fmt(f),
888            AdapterError::DuplicateCursor(name) => {
889                write!(f, "cursor {} already exists", name.quoted())
890            }
891            AdapterError::Eval(e) => e.fmt(f),
892            AdapterError::Explain(e) => e.fmt(f),
893            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
894            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
895            AdapterError::IntrospectionDisabled { .. } => write!(
896                f,
897                "cannot read log sources of replica with disabled introspection"
898            ),
899            AdapterError::InvalidLogDependency { object_type, .. } => {
900                write!(f, "{object_type} objects cannot depend on log sources")
901            }
902            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
903                write!(f, "unknown cluster replica availability zone {az}",)
904            }
905            AdapterError::InvalidSetIsolationLevel => write!(
906                f,
907                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
908            ),
909            AdapterError::InvalidSetCluster => {
910                write!(f, "SET cluster cannot be called in an active transaction")
911            }
912            AdapterError::InvalidStorageClusterSize { size, .. } => {
913                write!(f, "unknown source size {size}")
914            }
915            AdapterError::SourceOrSinkSizeRequired { .. } => {
916                write!(f, "must specify either cluster or size option")
917            }
918            AdapterError::InvalidTableMutationSelection { .. } => {
919                write!(
920                    f,
921                    "invalid selection: operation may only (transitively) refer to non-source, non-system tables"
922                )
923            }
924            AdapterError::ReplaceMaterializedViewSealed { name } => {
925                write!(
926                    f,
927                    "materialized view {name} is sealed and thus cannot be replaced"
928                )
929            }
930            AdapterError::ConstraintViolation(not_null_violation) => {
931                write!(f, "{}", not_null_violation)
932            }
933            AdapterError::CopyFormatError(e) => write!(f, "{e}"),
934            AdapterError::ConcurrentClusterDrop => {
935                write!(f, "the transaction's active cluster has been dropped")
936            }
937            AdapterError::ConcurrentDependencyDrop {
938                dependency_kind,
939                dependency_id,
940            } => {
941                write!(f, "{dependency_kind} '{dependency_id}' was dropped")
942            }
943            AdapterError::CollectionUnreadable { id } => {
944                write!(f, "collection '{id}' is not readable at any timestamp")
945            }
946            AdapterError::NoClusterReplicasAvailable { name, .. } => {
947                write!(
948                    f,
949                    "CLUSTER {} has no replicas available to service request",
950                    name.quoted()
951                )
952            }
953            AdapterError::OperationProhibitsTransaction(op) => {
954                write!(f, "{} cannot be run inside a transaction block", op)
955            }
956            AdapterError::OperationRequiresTransaction(op) => {
957                write!(f, "{} can only be used in transaction blocks", op)
958            }
959            AdapterError::ParseError(e) => e.fmt(f),
960            AdapterError::PlanError(e) => e.fmt(f),
961            AdapterError::PreparedStatementExists(name) => {
962                write!(f, "prepared statement {} already exists", name.quoted())
963            }
964            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
965            AdapterError::SingleStatementTransaction => {
966                f.write_str("this transaction can only execute a single statement")
967            }
968            AdapterError::ReadWriteUnavailable => {
969                f.write_str("transaction read-write mode must be set before any query")
970            }
971            AdapterError::WrongSetOfLocks => {
972                write!(f, "internal error, wrong set of locks acquired")
973            }
974            AdapterError::StatementTimeout => {
975                write!(f, "canceling statement due to statement timeout")
976            }
977            AdapterError::Canceled => {
978                write!(f, "canceling statement due to user request")
979            }
980            AdapterError::IdleInTransactionSessionTimeout => {
981                write!(
982                    f,
983                    "terminating connection due to idle-in-transaction timeout"
984                )
985            }
986            AdapterError::RecursionLimit(e) => e.fmt(f),
987            AdapterError::RelationOutsideTimeDomain { .. } => {
988                write!(
989                    f,
990                    "Transactions can only reference objects in the same timedomain. \
991                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
992                )
993            }
994            AdapterError::ResourceExhaustion {
995                resource_type,
996                limit_name,
997                desired,
998                limit,
999                current,
1000            } => {
1001                write!(
1002                    f,
1003                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
1004                )
1005            }
1006            AdapterError::ResultSize(e) => write!(f, "{e}"),
1007            AdapterError::SafeModeViolation(feature) => {
1008                write!(f, "cannot create {} in safe mode", feature)
1009            }
1010            AdapterError::SubscribeOnlyTransaction => {
1011                f.write_str("SUBSCRIBE in transactions must be the only read statement")
1012            }
1013            AdapterError::Optimizer(e) => e.fmt(f),
1014            AdapterError::UnallowedOnCluster {
1015                depends_on,
1016                cluster,
1017            } => {
1018                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
1019                write!(
1020                    f,
1021                    "querying the following items {items} is not allowed from the {} cluster",
1022                    cluster.quoted()
1023                )
1024            }
1025            AdapterError::Unauthorized(unauthorized) => {
1026                write!(f, "{unauthorized}")
1027            }
1028            AdapterError::UnknownCursor(name) => {
1029                write!(f, "cursor {} does not exist", name.quoted())
1030            }
1031            AdapterError::UnknownLoginRole(name) => {
1032                write!(f, "role {} does not exist", name.quoted())
1033            }
1034            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
1035            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
1036            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
1037            AdapterError::UnknownPreparedStatement(name) => {
1038                write!(f, "prepared statement {} does not exist", name.quoted())
1039            }
1040            AdapterError::UnknownClusterReplica {
1041                cluster_name,
1042                replica_name,
1043            } => write!(
1044                f,
1045                "cluster replica '{cluster_name}.{replica_name}' does not exist"
1046            ),
1047            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
1048                f,
1049                "unrecognized configuration parameter {}",
1050                setting_name.quoted()
1051            ),
1052            AdapterError::UntargetedLogRead { .. } => {
1053                f.write_str("log source reads must target a replica")
1054            }
1055            AdapterError::DDLOnlyTransaction => f.write_str(
1056                "transactions which modify objects are restricted to just modifying objects",
1057            ),
1058            AdapterError::DDLTransactionRace => f.write_str(
1059                "another session modified the catalog while this DDL transaction was open",
1060            ),
1061            AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
1062            AdapterError::Storage(e) => e.fmt(f),
1063            AdapterError::Compute(e) => e.fmt(f),
1064            AdapterError::Orchestrator(e) => e.fmt(f),
1065            AdapterError::DependentObject(dependent_objects) => {
1066                let role_str = if dependent_objects.keys().count() == 1 {
1067                    "role"
1068                } else {
1069                    "roles"
1070                };
1071                write!(
1072                    f,
1073                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
1074                    dependent_objects.keys().join(", ")
1075                )
1076            }
1077            AdapterError::InvalidAlter(t, e) => {
1078                write!(f, "invalid ALTER {t}: {e}")
1079            }
1080            AdapterError::ConnectionValidation(e) => e.fmt(f),
1081            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
1082                write!(
1083                    f,
1084                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
1085                    would never happen"
1086                )
1087            }
1088            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
1089                write!(
1090                    f,
1091                    "REFRESH AT requested for a time where not all the inputs are readable"
1092                )
1093            }
1094            AdapterError::RtrTimeout(_) => {
1095                write!(
1096                    f,
1097                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
1098                )
1099            }
1100            AdapterError::RtrDropFailure(_) => write!(
1101                f,
1102                "real-time source dropped before ingesting the upstream system's visible frontier"
1103            ),
1104            AdapterError::UnreadableSinkCollection => {
1105                write!(f, "collection is not readable at any time")
1106            }
1107            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
1108            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
1109            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
1110            AdapterError::AlterClusterTimeout => {
1111                write!(f, "canceling statement, provided timeout lapsed")
1112            }
1113            AdapterError::AuthenticationError(e) => {
1114                write!(f, "authentication error {e}")
1115            }
1116            AdapterError::UnavailableFeature { feature, docs } => {
1117                write!(f, "{} is not supported in this environment.", feature)?;
1118                if let Some(docs) = docs {
1119                    write!(
1120                        f,
1121                        " For more information consult the documentation at {docs}"
1122                    )?;
1123                }
1124                Ok(())
1125            }
1126            AdapterError::AlterClusterWhilePendingReplicas => {
1127                write!(f, "cannot alter clusters with pending updates")
1128            }
1129            AdapterError::ReplacementSchemaMismatch(_) => {
1130                write!(f, "replacement schema differs from target schema")
1131            }
1132            AdapterError::ImpossibleTimestampConstraints { .. } => {
1133                write!(f, "could not find a valid timestamp for the query")
1134            }
1135        }
1136    }
1137}
1138
1139impl From<anyhow::Error> for AdapterError {
1140    fn from(e: anyhow::Error) -> AdapterError {
1141        match e.downcast::<PlanError>() {
1142            Ok(plan_error) => AdapterError::PlanError(plan_error),
1143            Err(e) => AdapterError::Unstructured(e),
1144        }
1145    }
1146}
1147
1148impl From<TryFromIntError> for AdapterError {
1149    fn from(e: TryFromIntError) -> AdapterError {
1150        AdapterError::Unstructured(e.into())
1151    }
1152}
1153
1154impl From<TryFromDecimalError> for AdapterError {
1155    fn from(e: TryFromDecimalError) -> AdapterError {
1156        AdapterError::Unstructured(e.into())
1157    }
1158}
1159
1160impl From<mz_catalog::memory::error::Error> for AdapterError {
1161    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
1162        AdapterError::Catalog(e)
1163    }
1164}
1165
1166impl From<mz_catalog::durable::CatalogError> for AdapterError {
1167    fn from(e: mz_catalog::durable::CatalogError) -> Self {
1168        mz_catalog::memory::error::Error::from(e).into()
1169    }
1170}
1171
1172impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
1173    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1174        mz_catalog::durable::CatalogError::from(e).into()
1175    }
1176}
1177
1178impl From<EvalError> for AdapterError {
1179    fn from(e: EvalError) -> AdapterError {
1180        AdapterError::Eval(e)
1181    }
1182}
1183
1184impl From<ExplainError> for AdapterError {
1185    fn from(e: ExplainError) -> AdapterError {
1186        match e {
1187            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1188            e => AdapterError::Explain(e),
1189        }
1190    }
1191}
1192
1193impl From<mz_sql::catalog::CatalogError> for AdapterError {
1194    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1195        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1196    }
1197}
1198
1199impl From<PlanError> for AdapterError {
1200    fn from(e: PlanError) -> AdapterError {
1201        match e {
1202            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1203            _ => AdapterError::PlanError(e),
1204        }
1205    }
1206}
1207
1208impl From<OptimizerError> for AdapterError {
1209    fn from(e: OptimizerError) -> AdapterError {
1210        use OptimizerError::*;
1211        match e {
1212            PlanError(e) => Self::PlanError(e),
1213            RecursionLimitError(e) => Self::RecursionLimit(e),
1214            EvalError(e) => Self::Eval(e),
1215            InternalUnsafeMfpPlan(e) => Self::Internal(e),
1216            Internal(e) => Self::Internal(e),
1217            e => Self::Optimizer(e),
1218        }
1219    }
1220}
1221
1222impl From<NotNullViolation> for AdapterError {
1223    fn from(e: NotNullViolation) -> AdapterError {
1224        AdapterError::ConstraintViolation(e)
1225    }
1226}
1227
1228impl From<RecursionLimitError> for AdapterError {
1229    fn from(e: RecursionLimitError) -> AdapterError {
1230        AdapterError::RecursionLimit(e)
1231    }
1232}
1233
1234impl From<oneshot::error::RecvError> for AdapterError {
1235    fn from(e: oneshot::error::RecvError) -> AdapterError {
1236        AdapterError::Unstructured(e.into())
1237    }
1238}
1239
1240impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1241    fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1242        AdapterError::Storage(e)
1243    }
1244}
1245
1246impl From<compute_error::InstanceExists> for AdapterError {
1247    fn from(e: compute_error::InstanceExists) -> Self {
1248        AdapterError::Compute(e.into())
1249    }
1250}
1251
1252impl From<TimestampError> for AdapterError {
1253    fn from(e: TimestampError) -> Self {
1254        let e: EvalError = e.into();
1255        e.into()
1256    }
1257}
1258
1259impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1260    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1261        AdapterError::ParseError(e)
1262    }
1263}
1264
1265impl From<VarError> for AdapterError {
1266    fn from(e: VarError) -> Self {
1267        let e: mz_catalog::memory::error::Error = e.into();
1268        e.into()
1269    }
1270}
1271
1272impl From<rbac::UnauthorizedError> for AdapterError {
1273    fn from(e: rbac::UnauthorizedError) -> Self {
1274        AdapterError::Unauthorized(e)
1275    }
1276}
1277
1278impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1279    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1280        AdapterError::PlanError(PlanError::InvalidIdent(value))
1281    }
1282}
1283
1284impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1285    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1286        match value {
1287            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1288                AdapterError::ResourceExhaustion {
1289                    resource_type: "connection".into(),
1290                    limit_name: "max_connections".into(),
1291                    desired: (current + 1).to_string(),
1292                    limit: limit.to_string(),
1293                    current: current.to_string(),
1294                }
1295            }
1296        }
1297    }
1298}
1299
1300impl From<NetworkPolicyError> for AdapterError {
1301    fn from(value: NetworkPolicyError) -> Self {
1302        AdapterError::NetworkPolicyDenied(value)
1303    }
1304}
1305
1306impl From<ConnectionValidationError> for AdapterError {
1307    fn from(e: ConnectionValidationError) -> AdapterError {
1308        AdapterError::ConnectionValidation(e)
1309    }
1310}
1311
1312impl Error for AdapterError {}