Skip to main content

mz_adapter/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::InstanceMissing;
20
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::array::InvalidArrayError;
28use mz_repr::adt::range::InvalidRangeError;
29use mz_repr::adt::timestamp::TimestampError;
30use mz_repr::explain::ExplainError;
31use mz_repr::strconv::{ParseError, ParseErrorKind};
32use mz_repr::{ColumnDiff, ColumnName, KeyDiff, NotNullViolation, RelationDescDiff, Timestamp};
33use mz_sql::plan::PlanError;
34use mz_sql::rbac;
35use mz_sql::session::vars::VarError;
36use mz_storage_types::connections::ConnectionValidationError;
37use mz_storage_types::controller::StorageError;
38use mz_storage_types::errors::CollectionMissing;
39use smallvec::SmallVec;
40use timely::progress::Antichain;
41use tokio::sync::oneshot;
42use tokio_postgres::error::SqlState;
43
44use crate::coord::NetworkPolicyError;
45use crate::optimize::OptimizerError;
46use crate::peek_client::CollectionLookupError;
47
48/// Errors that can occur in the coordinator.
49#[derive(Debug)]
50pub enum AdapterError {
51    /// A `SUBSCRIBE` was requested whose `UP TO` bound precedes its `as_of` timestamp
52    AbsurdSubscribeBounds {
53        as_of: mz_repr::Timestamp,
54        up_to: mz_repr::Timestamp,
55    },
56    /// Attempted to use a potentially ambiguous column reference expression with a system table.
57    // We don't allow this until https://github.com/MaterializeInc/database-issues/issues/4824 is
58    // resolved because it prevents us from adding columns to system tables.
59    AmbiguousSystemColumnReference,
60    /// An error occurred in a catalog operation.
61    Catalog(mz_catalog::memory::error::Error),
62    /// 1. The cached plan or descriptor changed,
63    /// 2. or some dependency of a statement disappeared during sequencing.
64    /// TODO(ggevay): we should refactor 2. usages to use `ConcurrentDependencyDrop` instead
65    /// (e.g., in MV sequencing)
66    ChangedPlan(String),
67    /// The cursor already exists.
68    DuplicateCursor(String),
69    /// An error while evaluating an expression.
70    Eval(EvalError),
71    /// An error occurred while planning the statement.
72    Explain(ExplainError),
73    /// The ID allocator exhausted all valid IDs.
74    IdExhaustionError,
75    /// Unexpected internal state was encountered.
76    Internal(String),
77    /// Attempted to read from log sources of a replica with disabled introspection.
78    IntrospectionDisabled {
79        log_names: Vec<String>,
80    },
81    /// Attempted to create an object dependent on log sources that doesn't support
82    /// log dependencies.
83    InvalidLogDependency {
84        object_type: String,
85        log_names: Vec<String>,
86    },
87    /// No such cluster replica size has been configured.
88    InvalidClusterReplicaAz {
89        az: String,
90        expected: Vec<String>,
91    },
92    /// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
93    InvalidSetIsolationLevel,
94    /// SET cluster was called in the middle of a transaction.
95    InvalidSetCluster,
96    /// No such storage instance size has been configured.
97    InvalidStorageClusterSize {
98        size: String,
99        expected: Vec<String>,
100    },
101    /// Creating a source or sink without specifying its size is forbidden.
102    SourceOrSinkSizeRequired {
103        expected: Vec<String>,
104    },
105    /// The selection value for a table mutation operation refers to an invalid object.
106    InvalidTableMutationSelection {
107        /// The full name of the problematic object (e.g. a source or source-export table).
108        object_name: String,
109        /// Human-readable type of the object (e.g. "source", "source-export table").
110        object_type: String,
111    },
112    /// Expression violated a column's constraint
113    ConstraintViolation(NotNullViolation),
114    /// An error occurred while decoding COPY data.
115    CopyFormatError(String),
116    /// Transaction cluster was dropped in the middle of a transaction.
117    ConcurrentClusterDrop,
118    /// A dependency was dropped while sequencing a statement.
119    ConcurrentDependencyDrop {
120        dependency_kind: &'static str,
121        dependency_id: String,
122    },
123    CollectionUnreadable {
124        id: String,
125    },
126    /// Target cluster has no replicas to service query.
127    NoClusterReplicasAvailable {
128        name: String,
129        is_managed: bool,
130    },
131    /// The named operation cannot be run in a transaction.
132    OperationProhibitsTransaction(String),
133    /// The named operation requires an active transaction.
134    OperationRequiresTransaction(String),
135    /// An error occurred while planning the statement.
136    PlanError(PlanError),
137    /// The named prepared statement already exists.
138    PreparedStatementExists(String),
139    /// Wrapper around parsing error
140    ParseError(mz_sql_parser::parser::ParserStatementError),
141    /// The transaction is in read-only mode.
142    ReadOnlyTransaction,
143    /// The transaction in in read-only mode and a read already occurred.
144    ReadWriteUnavailable,
145    /// The recursion limit of some operation was exceeded.
146    RecursionLimit(RecursionLimitError),
147    /// A query in a transaction referenced a relation outside the first query's
148    /// time domain.
149    RelationOutsideTimeDomain {
150        relations: Vec<String>,
151        names: Vec<String>,
152    },
153    /// A query tried to create more resources than is allowed in the system configuration.
154    ResourceExhaustion {
155        resource_type: String,
156        limit_name: String,
157        desired: String,
158        limit: String,
159        current: String,
160    },
161    /// Result size of a query is too large.
162    ResultSize(String),
163    /// The specified feature is not permitted in safe mode.
164    SafeModeViolation(String),
165    /// The current transaction had the wrong set of write locks.
166    WrongSetOfLocks,
167    /// Waiting on a query timed out.
168    ///
169    /// Note this differs slightly from PG's implementation/semantics.
170    StatementTimeout,
171    /// The user canceled the query
172    Canceled,
173    /// An idle session in a transaction has timed out.
174    IdleInTransactionSessionTimeout,
175    /// The transaction is in single-subscribe mode.
176    SubscribeOnlyTransaction,
177    /// An error occurred in the optimizer.
178    Optimizer(OptimizerError),
179    /// A query depends on items which are not allowed to be referenced from the current cluster.
180    UnallowedOnCluster {
181        depends_on: SmallVec<[String; 2]>,
182        cluster: String,
183    },
184    /// A user tried to perform an action that they were unauthorized to do.
185    Unauthorized(rbac::UnauthorizedError),
186    /// The named cursor does not exist.
187    UnknownCursor(String),
188    /// The named role does not exist.
189    UnknownLoginRole(String),
190    UnknownPreparedStatement(String),
191    /// The named cluster replica does not exist.
192    UnknownClusterReplica {
193        cluster_name: String,
194        replica_name: String,
195    },
196    /// The named setting does not exist.
197    UnrecognizedConfigurationParam(String),
198    /// A generic error occurred.
199    //
200    // TODO(benesch): convert all those errors to structured errors.
201    Unstructured(anyhow::Error),
202    /// The named feature is not supported and will (probably) not be.
203    Unsupported(&'static str),
204    /// Some feature isn't available for a (potentially opaque) reason.
205    /// For example, in cloud Self-Managed auth features aren't available,
206    /// but we don't want to mention self managed auth.
207    UnavailableFeature {
208        feature: String,
209        docs: Option<String>,
210    },
211    /// Attempted to read from log sources without selecting a target replica.
212    UntargetedLogRead {
213        log_names: Vec<String>,
214    },
215    /// The transaction is in write-only mode.
216    WriteOnlyTransaction,
217    /// The transaction can only execute a single statement.
218    SingleStatementTransaction,
219    /// The transaction can only execute simple DDL.
220    DDLOnlyTransaction,
221    /// Another session modified the Catalog while this transaction was open.
222    DDLTransactionRace,
223    /// An error occurred in the storage layer
224    Storage(mz_storage_types::controller::StorageError),
225    /// An error occurred in the compute layer
226    Compute(anyhow::Error),
227    /// An error in the orchestrator layer
228    Orchestrator(anyhow::Error),
229    /// A statement tried to drop a role that had dependent objects.
230    ///
231    /// The map keys are role names and values are detailed error messages.
232    DependentObject(BTreeMap<String, Vec<String>>),
233    /// When performing an `ALTER` of some variety, re-planning the statement
234    /// errored.
235    InvalidAlter(&'static str, PlanError),
236    /// An error occurred while validating a connection.
237    ConnectionValidation(ConnectionValidationError),
238    /// We refuse to create the materialized view, because it would never be refreshed, so it would
239    /// never be queryable. This can happen when the only specified refreshes are further back in
240    /// the past than the initial compaction window of the materialized view.
241    MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
242    /// A CREATE MATERIALIZED VIEW statement tried to acquire a read hold at a REFRESH AT time,
243    /// but was unable to get a precise read hold.
244    InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
245    /// A humanized version of [`StorageError::RtrTimeout`].
246    RtrTimeout(String),
247    /// A humanized version of [`StorageError::RtrDropFailure`].
248    RtrDropFailure(String),
249    /// The collection requested to be sinked cannot be read at any timestamp
250    UnreadableSinkCollection,
251    /// User sessions have been blocked.
252    UserSessionsDisallowed,
253    /// This use session has been denied by a NetworkPolicy.
254    NetworkPolicyDenied(NetworkPolicyError),
255    /// Something attempted a write (to catalog, storage, tables, etc.) while in
256    /// read-only mode.
257    ReadOnly,
258    AlterClusterTimeout,
259    AlterClusterWhilePendingReplicas,
260    AuthenticationError(AuthenticationError),
261    /// Schema of a replacement is incompatible with the target.
262    ReplacementSchemaMismatch(RelationDescDiff),
263    /// Attempt to apply a replacement to a sealed materialized view.
264    ReplaceMaterializedViewSealed {
265        name: String,
266    },
267    /// Could not find a valid timestamp satisfying all constraints.
268    ImpossibleTimestampConstraints {
269        constraints: String,
270    },
271    /// OIDC group-to-role sync failed and strict mode is enabled.
272    OidcGroupSyncFailed(String),
273    /// Returned when bounded staleness was selected but the input frontiers lag
274    /// further than the bound permits, so no timestamp in the no-wait window is
275    /// at most `bound` stale.
276    BoundedStalenessExceeded {
277        bound: std::time::Duration,
278        gap_ms: u64,
279        slowest_input: Option<mz_repr::GlobalId>,
280    },
281    /// A write was attempted in a session whose isolation level is bounded
282    /// staleness. Bounded staleness is read-only.
283    BoundedStalenessReadOnly,
284    /// `real_time_recency = on` and bounded staleness were both requested in
285    /// the same session; they are mutually exclusive.
286    BoundedStalenessRealTimeRecencyConflict,
287    /// A bounded-staleness query touched a timeline whose timestamps are not
288    /// the `EpochMilliseconds` wall-clock timeline.
289    BoundedStalenessTimelineUnsupported,
290}
291
292#[derive(Debug, thiserror::Error)]
293pub enum AuthenticationError {
294    #[error("invalid credentials")]
295    InvalidCredentials,
296    #[error("role is not allowed to login")]
297    NonLogin,
298    #[error("role does not exist")]
299    RoleNotFound,
300    #[error("password is required")]
301    PasswordRequired,
302}
303
304/// Maps a parse/cast [`ParseError`] to the appropriate `DATA_EXCEPTION` (class
305/// 22) SQLSTATE. Out-of-range values map to overflow codes and malformed input
306/// maps to invalid-representation codes, in both cases distinguishing datetime
307/// types from everything else, matching PostgreSQL.
308fn parse_error_code(err: &ParseError) -> SqlState {
309    let is_datetime = matches!(
310        &*err.type_name,
311        "date" | "time" | "timestamp" | "timestamp with time zone" | "interval"
312    );
313    match (err.kind, is_datetime) {
314        (ParseErrorKind::OutOfRange, true) => SqlState::DATETIME_FIELD_OVERFLOW,
315        (ParseErrorKind::OutOfRange, false) => SqlState::NUMERIC_VALUE_OUT_OF_RANGE,
316        (ParseErrorKind::InvalidInputSyntax, true) => SqlState::INVALID_DATETIME_FORMAT,
317        (ParseErrorKind::InvalidInputSyntax, false) => SqlState::INVALID_TEXT_REPRESENTATION,
318    }
319}
320
321/// Maps an [`EvalError`] to the appropriate SQLSTATE.
322///
323/// Historically every `EvalError` fell through to `INTERNAL_ERROR` (`XX000`),
324/// but the overwhelming majority are user-facing data exceptions (class 22) or
325/// other well-defined conditions, not internal errors. This match is
326/// deliberately exhaustive — with no wildcard — so that adding a new
327/// `EvalError` variant is a compile error until it is assigned a code, and we
328/// never silently regress to `XX000`. Codes are chosen to match PostgreSQL
329/// where an equivalent error exists.
330fn eval_error_code(err: &EvalError) -> SqlState {
331    match err {
332        // Division and mathematical domain errors.
333        EvalError::DivisionByZero => SqlState::DIVISION_BY_ZERO,
334        EvalError::NegSqrt | EvalError::ComplexOutOfRange(_) => {
335            SqlState::INVALID_ARGUMENT_FOR_POWER_FUNCTION
336        }
337        EvalError::InfinityOutOfDomain(_)
338        | EvalError::NegativeOutOfDomain(_)
339        | EvalError::ZeroOutOfDomain(_)
340        | EvalError::OutOfDomain(..)
341        | EvalError::Undefined(_) => SqlState::INVALID_PARAMETER_VALUE,
342
343        // Out-of-range numeric, integer, and float values.
344        EvalError::FloatOverflow
345        | EvalError::FloatUnderflow
346        | EvalError::NumericFieldOverflow
347        | EvalError::Float32OutOfRange(_)
348        | EvalError::Float64OutOfRange(_)
349        | EvalError::Int16OutOfRange(_)
350        | EvalError::Int32OutOfRange(_)
351        | EvalError::Int64OutOfRange(_)
352        | EvalError::UInt16OutOfRange(_)
353        | EvalError::UInt32OutOfRange(_)
354        | EvalError::UInt64OutOfRange(_)
355        | EvalError::OidOutOfRange(_)
356        | EvalError::MzTimestampOutOfRange(_)
357        | EvalError::MzTimestampStepOverflow
358        | EvalError::CharOutOfRange => SqlState::NUMERIC_VALUE_OUT_OF_RANGE,
359
360        // Out-of-range datetime and interval values.
361        EvalError::DateOutOfRange
362        | EvalError::TimestampOutOfRange
363        | EvalError::TimestampCannotBeNan
364        | EvalError::DateBinOutOfRange(_)
365        | EvalError::DateDiffOverflow { .. } => SqlState::DATETIME_FIELD_OVERFLOW,
366        EvalError::IntervalOutOfRange(_) => SqlState::INTERVAL_FIELD_OVERFLOW,
367
368        // Parse/cast failures, plus other malformed textual/encoded input.
369        EvalError::Parse(e) => parse_error_code(e),
370        EvalError::ParseHex(_)
371        | EvalError::InvalidBase64Equals
372        | EvalError::InvalidBase64Symbol(_)
373        | EvalError::InvalidBase64EndSequence => SqlState::INVALID_TEXT_REPRESENTATION,
374        EvalError::InvalidByteSequence { .. } => SqlState::CHARACTER_NOT_IN_REPERTOIRE,
375        EvalError::CharacterNotValidForEncoding(_) | EvalError::CharacterTooLargeForEncoding(_) => {
376            SqlState::PROGRAM_LIMIT_EXCEEDED
377        }
378
379        // Invalid argument values.
380        EvalError::InvalidTimezone(_)
381        | EvalError::InvalidTimezoneInterval
382        | EvalError::InvalidTimezoneConversion
383        | EvalError::InvalidIanaTimezoneId(_)
384        | EvalError::InvalidLayer { .. }
385        | EvalError::InvalidEncodingName(_)
386        | EvalError::InvalidHashAlgorithm(_)
387        | EvalError::InvalidDatePart(_)
388        | EvalError::InvalidParameterValue(_)
389        | EvalError::InvalidJsonbCast { .. }
390        | EvalError::UnknownUnits(_)
391        | EvalError::UnsupportedUnits(..)
392        | EvalError::InvalidIdentifier { .. }
393        | EvalError::InvalidRoleId(_)
394        | EvalError::InvalidPrivileges(_)
395        | EvalError::NegLimit => SqlState::INVALID_PARAMETER_VALUE,
396
397        // Regular expressions.
398        EvalError::InvalidRegex(_) | EvalError::InvalidRegexFlag(_) => {
399            SqlState::INVALID_REGULAR_EXPRESSION
400        }
401
402        // LIKE escape sequences.
403        EvalError::UnterminatedLikeEscapeSequence | EvalError::LikeEscapeTooLong => {
404            SqlState::INVALID_ESCAPE_SEQUENCE
405        }
406
407        // NULL values where they are not permitted.
408        EvalError::KeyCannotBeNull
409        | EvalError::MustNotBeNull(_)
410        | EvalError::AclArrayNullElement
411        | EvalError::MzAclArrayNullElement => SqlState::NULL_VALUE_NOT_ALLOWED,
412
413        // Array subscript/dimension errors.
414        EvalError::IndexOutOfRange { .. }
415        | EvalError::ArrayFillWrongArraySubscripts
416        | EvalError::IncompatibleArrayDimensions { .. } => SqlState::ARRAY_SUBSCRIPT_ERROR,
417        EvalError::InvalidArray(e) => match e {
418            InvalidArrayError::TooManyDimensions(_) => SqlState::PROGRAM_LIMIT_EXCEEDED,
419            InvalidArrayError::WrongCardinality { .. } => SqlState::ARRAY_SUBSCRIPT_ERROR,
420        },
421
422        // Range errors.
423        EvalError::InvalidRange(e) => match e {
424            InvalidRangeError::CanonicalizationOverflow(_) => SqlState::NUMERIC_VALUE_OUT_OF_RANGE,
425            InvalidRangeError::MisorderedRangeBounds
426            | InvalidRangeError::InvalidRangeBoundFlags
427            | InvalidRangeError::NullRangeBoundFlags
428            | InvalidRangeError::DiscontiguousUnion
429            | InvalidRangeError::DiscontiguousDifference
430            | InvalidRangeError::InvalidRangeData => SqlState::DATA_EXCEPTION,
431        },
432
433        // Cardinality violations from scalar subqueries.
434        EvalError::MultipleRowsFromSubquery | EvalError::NegativeRowsFromSubquery => {
435            SqlState::CARDINALITY_VIOLATION
436        }
437
438        // Length, size, and resource limits.
439        EvalError::StringValueTooLong { .. } => SqlState::STRING_DATA_RIGHT_TRUNCATION,
440        EvalError::LikePatternTooLong
441        | EvalError::LengthTooLarge
442        | EvalError::NullCharacterNotPermitted
443        | EvalError::MaxArraySizeExceeded(_)
444        | EvalError::LetRecLimitExceeded(_) => SqlState::PROGRAM_LIMIT_EXCEEDED,
445
446        // Unsupported features.
447        EvalError::Unsupported { .. }
448        | EvalError::MultidimensionalArrayRemovalNotSupported
449        | EvalError::MultiDimensionalArraySearch => SqlState::FEATURE_NOT_SUPPORTED,
450
451        // User-raised errors (e.g. `error_if_null`).
452        EvalError::IfNullError(_) => SqlState::DATA_EXCEPTION,
453
454        // Genuinely internal errors.
455        EvalError::Internal(_)
456        | EvalError::InvalidCatalogJson(_)
457        | EvalError::TypeFromOid(_)
458        | EvalError::PrettyError(_)
459        | EvalError::RedactError(_) => SqlState::INTERNAL_ERROR,
460    }
461}
462
463impl AdapterError {
464    pub fn into_response(self, severity: Severity) -> ErrorResponse {
465        ErrorResponse {
466            severity,
467            code: self.code(),
468            message: self.to_string(),
469            detail: self.detail(),
470            hint: self.hint(),
471            position: self.position(),
472        }
473    }
474
475    pub fn position(&self) -> Option<usize> {
476        match self {
477            AdapterError::ParseError(err) => Some(err.error.pos),
478            _ => None,
479        }
480    }
481
482    /// Reports additional details about the error, if any are available.
483    pub fn detail(&self) -> Option<String> {
484        match self {
485            AdapterError::AmbiguousSystemColumnReference => {
486                Some("This is a current limitation in Materialize".into())
487            }
488            AdapterError::Catalog(c) => c.detail(),
489            AdapterError::Eval(e) => e.detail(),
490            AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
491                "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
492                relations
493                    .iter()
494                    .map(|r| r.quoted().to_string())
495                    .collect::<Vec<_>>()
496                    .join("\n"),
497                match names.is_empty() {
498                    true => "No relations are available.".to_string(),
499                    false => format!(
500                        "Only the following relations are available:\n{}",
501                        names
502                            .iter()
503                            .map(|name| name.quoted().to_string())
504                            .collect::<Vec<_>>()
505                            .join("\n")
506                    ),
507                }
508            )),
509            AdapterError::SourceOrSinkSizeRequired { .. } => Some(
510                "Either specify the cluster that will maintain this object via IN CLUSTER or \
511                specify size via SIZE option."
512                    .into(),
513            ),
514            AdapterError::InvalidTableMutationSelection {
515                object_name,
516                object_type,
517            } => Some(format!(
518                "{object_type} '{}' may not be used in this operation; \
519                     the selection may refer to views and materialized views, but transitive \
520                     dependencies must not include sources or source-export tables",
521                object_name.quoted()
522            )),
523            AdapterError::SafeModeViolation(_) => Some(
524                "The Materialize server you are connected to is running in \
525                 safe mode, which limits the features that are available."
526                    .into(),
527            ),
528            AdapterError::IntrospectionDisabled { log_names }
529            | AdapterError::UntargetedLogRead { log_names } => Some(format!(
530                "The query references the following log sources:\n    {}",
531                log_names.join("\n    "),
532            )),
533            AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
534                "The object depends on the following log sources:\n    {}",
535                log_names.join("\n    "),
536            )),
537            AdapterError::PlanError(e) => e.detail(),
538            AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
539            AdapterError::DependentObject(dependent_objects) => Some(
540                dependent_objects
541                    .iter()
542                    .map(|(role_name, err_msgs)| {
543                        err_msgs
544                            .iter()
545                            .map(|err_msg| format!("{role_name}: {err_msg}"))
546                            .join("\n")
547                    })
548                    .join("\n"),
549            ),
550            AdapterError::Storage(storage_error) => storage_error
551                .source()
552                .map(|source_error| source_error.to_string_with_causes()),
553            AdapterError::ReadOnlyTransaction => Some(
554                "SELECT queries cannot be combined with other query types, including SUBSCRIBE."
555                    .into(),
556            ),
557            AdapterError::InvalidAlter(_, e) => e.detail(),
558            AdapterError::Optimizer(e) => e.detail(),
559            AdapterError::ConnectionValidation(e) => e.detail(),
560            AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
561                Some(format!(
562                    "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
563                    view is {}.",
564                    last_refresh, earliest_possible,
565                ))
566            }
567            AdapterError::UnallowedOnCluster { cluster, .. } => {
568                (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(|| {
569                    format!(
570                        "The transaction is executing on the \
571                        {cluster} cluster, maybe having been routed \
572                        there by the first statement in the transaction."
573                    )
574                })
575            }
576            AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
577                Some(format!(
578                    "The requested REFRESH AT time is {}, \
579                    but not all input collections are readable earlier than [{}].",
580                    oracle_read_ts,
581                    if least_valid_read.len() == 1 {
582                        format!(
583                            "{}",
584                            least_valid_read
585                                .as_option()
586                                .expect("antichain contains exactly 1 timestamp")
587                        )
588                    } else {
589                        // This can't occur currently
590                        format!("{:?}", least_valid_read)
591                    }
592                ))
593            }
594            AdapterError::RtrTimeout(name) => Some(format!(
595                "{name} failed to ingest data up to the real-time recency point"
596            )),
597            AdapterError::RtrDropFailure(name) => Some(format!(
598                "{name} dropped before ingesting data to the real-time recency point"
599            )),
600            AdapterError::UserSessionsDisallowed => {
601                Some("Your organization has been blocked. Please contact support.".to_string())
602            }
603            AdapterError::NetworkPolicyDenied(reason) => Some(format!("{reason}.")),
604            AdapterError::ReplacementSchemaMismatch(diff) => {
605                let mut lines: Vec<_> = diff.column_diffs.iter().map(|(idx, diff)| {
606                    let pos = idx + 1;
607                    match diff {
608                        ColumnDiff::Missing { name } => {
609                            let name = name.as_str().quoted();
610                            format!("missing column {name} at position {pos}")
611                        }
612                        ColumnDiff::Extra { name } => {
613                            let name = name.as_str().quoted();
614                            format!("extra column {name} at position {pos}")
615                        }
616                        ColumnDiff::TypeMismatch { name, left, right } => {
617                            let name = name.as_str().quoted();
618                            format!("column {name} at position {pos}: type mismatch (target: {left:?}, replacement: {right:?})")
619                        }
620                        ColumnDiff::NullabilityMismatch { name, left, right } => {
621                            let name = name.as_str().quoted();
622                            let left = if *left { "NULL" } else { "NOT NULL" };
623                            let right = if *right { "NULL" } else { "NOT NULL" };
624                            format!("column {name} at position {pos}: nullability mismatch (target: {left}, replacement: {right})")
625                        }
626                        ColumnDiff::NameMismatch { left, right } => {
627                            let left = left.as_str().quoted();
628                            let right = right.as_str().quoted();
629                            format!("column at position {pos}: name mismatch (target: {left}, replacement: {right})")
630                        }
631                    }
632                }).collect();
633
634                if let Some(KeyDiff { left, right }) = &diff.key_diff {
635                    let format_keys = |keys: &BTreeSet<Vec<ColumnName>>| {
636                        if keys.is_empty() {
637                            "(none)".to_string()
638                        } else {
639                            keys.iter()
640                                .map(|key| {
641                                    let cols = key.iter().map(|c| c.as_str()).join(", ");
642                                    format!("{{{cols}}}")
643                                })
644                                .join(", ")
645                        }
646                    };
647                    lines.push(format!(
648                        "keys differ (target: {}, replacement: {})",
649                        format_keys(left),
650                        format_keys(right)
651                    ));
652                }
653                Some(lines.join("\n"))
654            }
655            AdapterError::ReplaceMaterializedViewSealed { .. } => Some(
656                "The materialized view has already computed its output until the end of time, \
657                 so replacing its definition would have no effect."
658                    .into(),
659            ),
660            AdapterError::ImpossibleTimestampConstraints { constraints } => {
661                Some(format!("Constraints:\n{}", constraints))
662            }
663            AdapterError::BoundedStalenessExceeded {
664                gap_ms,
665                slowest_input,
666                ..
667            } => {
668                let mut detail = format!(
669                    "Freshest available timestamp is {}ms older than the bound.",
670                    gap_ms,
671                );
672                if let Some(id) = slowest_input {
673                    detail.push_str(&format!(" Slowest input: {}.", id));
674                }
675                Some(detail)
676            }
677            AdapterError::BoundedStalenessTimelineUnsupported => Some(
678                "This query touches a timeline other than the EpochMilliseconds wall-clock \
679                 timeline."
680                    .into(),
681            ),
682            _ => None,
683        }
684    }
685
686    /// Reports a hint for the user about how the error could be fixed.
687    pub fn hint(&self) -> Option<String> {
688        match self {
689            AdapterError::AmbiguousSystemColumnReference => Some(
690                "Rewrite the view to refer to all columns by name. Expand all wildcards and \
691                convert all NATURAL JOINs to USING joins."
692                    .to_string(),
693            ),
694            AdapterError::Catalog(c) => c.hint(),
695            AdapterError::Eval(e) => e.hint(),
696            AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
697                Some(if expected.is_empty() {
698                    "No availability zones configured; do not specify AVAILABILITY ZONE".into()
699                } else {
700                    format!("Valid availability zones are: {}", expected.join(", "))
701                })
702            }
703            AdapterError::InvalidStorageClusterSize { expected, .. } => {
704                Some(format!("Valid sizes are: {}", expected.join(", ")))
705            }
706            AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
707                "Try choosing one of the smaller sizes to start. Available sizes: {}",
708                expected.join(", ")
709            )),
710            AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
711                Some(if *is_managed {
712                    "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
713                    Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
714                } else {
715                    "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
716                })
717            }
718            AdapterError::UntargetedLogRead { .. } => Some(
719                "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
720                 active cluster. Note that subsequent queries will only be answered by \
721                 the selected replica, which might reduce availability. To undo the replica \
722                 selection, use `RESET cluster_replica`."
723                    .into(),
724            ),
725            AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
726                "Drop an existing {resource_type} or contact support to request a limit increase."
727            )),
728            AdapterError::StatementTimeout => Some(
729                "Consider increasing the maximum allowed statement duration for this session by \
730                 setting the statement_timeout session variable. For example, `SET \
731                 statement_timeout = '120s'`."
732                    .into(),
733            ),
734            AdapterError::PlanError(e) => e.hint(),
735            AdapterError::UnallowedOnCluster { cluster, .. } => {
736                (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
737                    "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
738                    .to_string()
739                )
740            }
741            AdapterError::InvalidAlter(_, e) => e.hint(),
742            AdapterError::Optimizer(e) => e.hint(),
743            AdapterError::ConnectionValidation(e) => e.hint(),
744            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
745                "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
746                 either at the explicitly specified timestamp, or now if the given timestamp would \
747                 be in the past.".to_string()
748            ),
749            AdapterError::AlterClusterTimeout => Some(
750                "Consider increasing the timeout duration in the alter cluster statement.".into(),
751            ),
752            AdapterError::DDLTransactionRace => Some(
753                "Currently, DDL transactions fail when any other DDL happens concurrently, \
754                 even on unrelated schemas/clusters.".into()
755            ),
756            AdapterError::CollectionUnreadable { .. } => Some(
757                "This could be because the collection has recently been dropped.".into()
758            ),
759            _ => None,
760        }
761    }
762
763    pub fn code(&self) -> SqlState {
764        // We define this up here to make sure `AdapterError::` and `OptimizerError::` act the same way.
765        const RECURSION_LIMIT_ERROR_CODE: SqlState = SqlState::INTERNAL_ERROR;
766
767        // TODO(benesch): we should only use `SqlState::INTERNAL_ERROR` for
768        // those errors that are truly internal errors. At the moment we have
769        // a various classes of uncategorized errors that use this error code
770        // inappropriately.
771        match self {
772            // DATA_EXCEPTION to match what Postgres returns for degenerate
773            // range bounds
774            AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
775            AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
776            AdapterError::Catalog(e) => match &e.kind {
777                mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
778                    VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
779                    VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
780                    VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
781                    VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
782                    VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
783                    VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
784                    VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
785                    VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
786                },
787                _ => SqlState::INTERNAL_ERROR,
788            },
789            AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
790            AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
791            // Evaluation errors are almost all user-facing data exceptions, not
792            // internal errors. `eval_error_code` matches every variant
793            // exhaustively so the catch-all `INTERNAL_ERROR` no longer applies
794            // to errors that are really the user's fault. See SQL-326.
795            AdapterError::Eval(e) => eval_error_code(e),
796            AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
797            AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
798            AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
799            AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
800            AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
801            AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
802            AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
803            AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
804            AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
805            AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
806            AdapterError::InvalidTableMutationSelection { .. } => {
807                SqlState::INVALID_TRANSACTION_STATE
808            }
809            AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
810            AdapterError::CopyFormatError(_) => SqlState::BAD_COPY_FILE_FORMAT,
811            AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
812            AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
813            AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
814            AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
815            AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
816            AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
817            AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
818            AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
819            AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
820                SqlState::DUPLICATE_COLUMN
821            }
822            AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
823                SqlState::UNDEFINED_PARAMETER
824            }
825            AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
826                SqlState::UNDEFINED_PARAMETER
827            }
828            AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
829            AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
830            AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
831            AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
832            AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
833            AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
834            AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
835            AdapterError::Canceled => SqlState::QUERY_CANCELED,
836            AdapterError::IdleInTransactionSessionTimeout => {
837                SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
838            }
839            AdapterError::RecursionLimit(_) => RECURSION_LIMIT_ERROR_CODE,
840            AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
841            AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
842            AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
843            AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
844            AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
845            AdapterError::Optimizer(e) => match e {
846                OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
847                    SqlState::INVALID_SCHEMA_NAME
848                }
849                OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
850                    SqlState::DUPLICATE_COLUMN
851                }
852                OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
853                    SqlState::UNDEFINED_PARAMETER
854                }
855                OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
856                    SqlState::UNDEFINED_PARAMETER
857                }
858                OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
859                OptimizerError::RecursionLimitError(_) => RECURSION_LIMIT_ERROR_CODE,
860                OptimizerError::Internal(s) => {
861                    AdapterError::Internal(s.clone()).code() // Delegate to outer
862                }
863                OptimizerError::EvalError(e) => {
864                    AdapterError::Eval(e.clone()).code() // Delegate to outer
865                }
866                OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
867                OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
868                OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
869                OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
870                OptimizerError::RestrictedFunction(_) => SqlState::INSUFFICIENT_PRIVILEGE,
871                // This should be handled by peek optimization, so it's an internal error if it
872                // reaches the user.
873                OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
874            },
875            AdapterError::UnallowedOnCluster { .. } => {
876                SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
877            }
878            AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
879            AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
880            AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
881            AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
882            AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
883            AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
884            AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
885            AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
886            AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
887            AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
888            AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
889            // It's not immediately clear which error code to use here because a
890            // "write-only transaction", "single table write transaction", or "ddl only
891            // transaction" are not things in Postgres. This error code is the generic "bad txn
892            // thing" code, so it's probably the best choice.
893            AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
894            AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
895            AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
896                SqlState::INTERNAL_ERROR
897            }
898            AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
899            AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
900            AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
901            // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`.
902            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
903            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
904            AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
905            AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
906            AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
907            AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
908            AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
909            // In read-only mode all transactions are implicitly read-only
910            // transactions.
911            AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
912            AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
913            AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
914            AdapterError::ReplacementSchemaMismatch(_) => SqlState::FEATURE_NOT_SUPPORTED,
915            AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
916                SqlState::INVALID_PASSWORD
917            }
918            AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
919            AdapterError::ReplaceMaterializedViewSealed { .. } => {
920                SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE
921            }
922            // similar to AbsurdSubscribeBounds
923            AdapterError::ImpossibleTimestampConstraints { .. } => SqlState::DATA_EXCEPTION,
924            AdapterError::OidcGroupSyncFailed(_) => SqlState::INTERNAL_ERROR,
925            AdapterError::BoundedStalenessExceeded { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
926            // Matches ReadOnlyTransaction/ReadOnly: a write was rejected
927            // because the session is effectively read-only.
928            AdapterError::BoundedStalenessReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
929            AdapterError::BoundedStalenessRealTimeRecencyConflict => {
930                SqlState::FEATURE_NOT_SUPPORTED
931            }
932            AdapterError::BoundedStalenessTimelineUnsupported => SqlState::FEATURE_NOT_SUPPORTED,
933        }
934    }
935
936    pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
937        AdapterError::Internal(format!("{context}: {e}"))
938    }
939
940    // We don't want the following error conversions to `ConcurrentDependencyDrop` to happen
941    // automatically, because it might depend on the context whether `ConcurrentDependencyDrop`
942    // is appropriate, so we want to make the conversion target explicit at the call site.
943    // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
944    // in which case `ConcurrentDependencyDrop` would not be appropriate.
945
946    pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
947        AdapterError::ConcurrentDependencyDrop {
948            dependency_kind: "cluster",
949            dependency_id: e.0.to_string(),
950        }
951    }
952
953    pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
954        AdapterError::ConcurrentDependencyDrop {
955            dependency_kind: "collection",
956            dependency_id: e.0.to_string(),
957        }
958    }
959
960    pub fn concurrent_dependency_drop_from_collection_lookup_error(
961        e: CollectionLookupError,
962        compute_instance: ComputeInstanceId,
963    ) -> Self {
964        match e {
965            CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
966                dependency_kind: "cluster",
967                dependency_id: id.to_string(),
968            },
969            CollectionLookupError::CollectionMissing(id) => {
970                AdapterError::ConcurrentDependencyDrop {
971                    dependency_kind: "collection",
972                    dependency_id: id.to_string(),
973                }
974            }
975            CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
976                dependency_kind: "cluster",
977                dependency_id: compute_instance.to_string(),
978            },
979        }
980    }
981
982    pub fn concurrent_dependency_drop_from_watch_set_install_error(
983        e: compute_error::CollectionLookupError,
984    ) -> Self {
985        match e {
986            compute_error::CollectionLookupError::InstanceMissing(id) => {
987                AdapterError::ConcurrentDependencyDrop {
988                    dependency_kind: "cluster",
989                    dependency_id: id.to_string(),
990                }
991            }
992            compute_error::CollectionLookupError::CollectionMissing(id) => {
993                AdapterError::ConcurrentDependencyDrop {
994                    dependency_kind: "collection",
995                    dependency_id: id.to_string(),
996                }
997            }
998        }
999    }
1000
1001    pub fn concurrent_dependency_drop_from_instance_peek_error(
1002        e: mz_compute_client::controller::instance_client::PeekError,
1003        compute_instance: ComputeInstanceId,
1004    ) -> AdapterError {
1005        use mz_compute_client::controller::instance_client::PeekError::*;
1006        match e {
1007            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
1008                dependency_kind: "replica",
1009                dependency_id: id.to_string(),
1010            },
1011            InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
1012                dependency_kind: "cluster",
1013                dependency_id: compute_instance.to_string(),
1014            },
1015            e @ ReadHoldIdMismatch(_) => AdapterError::internal("instance peek error", e),
1016            e @ ReadHoldInsufficient(_) => AdapterError::internal("instance peek error", e),
1017        }
1018    }
1019
1020    pub fn concurrent_dependency_drop_from_collection_update_error(
1021        e: compute_error::CollectionUpdateError,
1022    ) -> Self {
1023        use compute_error::CollectionUpdateError::*;
1024        match e {
1025            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
1026                dependency_kind: "cluster",
1027                dependency_id: id.to_string(),
1028            },
1029            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
1030                dependency_kind: "collection",
1031                dependency_id: id.to_string(),
1032            },
1033        }
1034    }
1035
1036    pub fn concurrent_dependency_drop_from_peek_error(
1037        e: mz_compute_client::controller::error::PeekError,
1038    ) -> AdapterError {
1039        use mz_compute_client::controller::error::PeekError::*;
1040        match e {
1041            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
1042                dependency_kind: "cluster",
1043                dependency_id: id.to_string(),
1044            },
1045            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
1046                dependency_kind: "collection",
1047                dependency_id: id.to_string(),
1048            },
1049            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
1050                dependency_kind: "replica",
1051                dependency_id: id.to_string(),
1052            },
1053            e @ (ReadHoldIdMismatch(_) | SinceViolation(_)) => {
1054                AdapterError::internal("peek error", e)
1055            }
1056        }
1057    }
1058
1059    pub fn concurrent_dependency_drop_from_dataflow_creation_error(
1060        e: compute_error::DataflowCreationError,
1061    ) -> Self {
1062        use compute_error::DataflowCreationError::*;
1063        match e {
1064            InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
1065                dependency_kind: "cluster",
1066                dependency_id: id.to_string(),
1067            },
1068            CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
1069                dependency_kind: "collection",
1070                dependency_id: id.to_string(),
1071            },
1072            ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
1073                dependency_kind: "replica",
1074                dependency_id: id.to_string(),
1075            },
1076            MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
1077                AdapterError::internal("dataflow creation error", e)
1078            }
1079        }
1080    }
1081}
1082
1083impl fmt::Display for AdapterError {
1084    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1085        match self {
1086            AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
1087                write!(
1088                    f,
1089                    "subscription lower bound (`AS OF`) is greater than its upper bound (`UP TO`): \
1090                     {as_of} > {up_to}",
1091                )
1092            }
1093            AdapterError::AmbiguousSystemColumnReference => {
1094                write!(
1095                    f,
1096                    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
1097                    system objects"
1098                )
1099            }
1100            AdapterError::ChangedPlan(e) => write!(f, "{}", e),
1101            AdapterError::Catalog(e) => e.fmt(f),
1102            AdapterError::DuplicateCursor(name) => {
1103                write!(f, "cursor {} already exists", name.quoted())
1104            }
1105            AdapterError::Eval(e) => e.fmt(f),
1106            AdapterError::Explain(e) => e.fmt(f),
1107            AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
1108            AdapterError::Internal(e) => write!(f, "internal error: {}", e),
1109            AdapterError::IntrospectionDisabled { .. } => write!(
1110                f,
1111                "cannot read log sources of replica with disabled introspection"
1112            ),
1113            AdapterError::InvalidLogDependency { object_type, .. } => {
1114                write!(f, "{object_type} objects cannot depend on log sources")
1115            }
1116            AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
1117                write!(f, "unknown cluster replica availability zone {az}",)
1118            }
1119            AdapterError::InvalidSetIsolationLevel => write!(
1120                f,
1121                "SET TRANSACTION ISOLATION LEVEL must be called before any query"
1122            ),
1123            AdapterError::InvalidSetCluster => {
1124                write!(f, "SET cluster cannot be called in an active transaction")
1125            }
1126            AdapterError::InvalidStorageClusterSize { size, .. } => {
1127                write!(f, "unknown source size {size}")
1128            }
1129            AdapterError::SourceOrSinkSizeRequired { .. } => {
1130                write!(f, "must specify either cluster or size option")
1131            }
1132            AdapterError::InvalidTableMutationSelection { .. } => {
1133                write!(
1134                    f,
1135                    "invalid selection: operation may only (transitively) refer to non-source, non-system tables"
1136                )
1137            }
1138            AdapterError::ReplaceMaterializedViewSealed { name } => {
1139                write!(
1140                    f,
1141                    "materialized view {name} is sealed and thus cannot be replaced"
1142                )
1143            }
1144            AdapterError::ConstraintViolation(not_null_violation) => {
1145                write!(f, "{}", not_null_violation)
1146            }
1147            AdapterError::CopyFormatError(e) => write!(f, "{e}"),
1148            AdapterError::ConcurrentClusterDrop => {
1149                write!(f, "the transaction's active cluster has been dropped")
1150            }
1151            AdapterError::ConcurrentDependencyDrop {
1152                dependency_kind,
1153                dependency_id,
1154            } => {
1155                write!(f, "{dependency_kind} '{dependency_id}' was dropped")
1156            }
1157            AdapterError::CollectionUnreadable { id } => {
1158                write!(f, "collection '{id}' is not readable at any timestamp")
1159            }
1160            AdapterError::NoClusterReplicasAvailable { name, .. } => {
1161                write!(
1162                    f,
1163                    "CLUSTER {} has no replicas available to service request",
1164                    name.quoted()
1165                )
1166            }
1167            AdapterError::OperationProhibitsTransaction(op) => {
1168                write!(f, "{} cannot be run inside a transaction block", op)
1169            }
1170            AdapterError::OperationRequiresTransaction(op) => {
1171                write!(f, "{} can only be used in transaction blocks", op)
1172            }
1173            AdapterError::ParseError(e) => e.fmt(f),
1174            AdapterError::PlanError(e) => e.fmt(f),
1175            AdapterError::PreparedStatementExists(name) => {
1176                write!(f, "prepared statement {} already exists", name.quoted())
1177            }
1178            AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
1179            AdapterError::SingleStatementTransaction => {
1180                f.write_str("this transaction can only execute a single statement")
1181            }
1182            AdapterError::ReadWriteUnavailable => {
1183                f.write_str("transaction read-write mode must be set before any query")
1184            }
1185            AdapterError::WrongSetOfLocks => {
1186                write!(f, "internal error, wrong set of locks acquired")
1187            }
1188            AdapterError::StatementTimeout => {
1189                write!(f, "canceling statement due to statement timeout")
1190            }
1191            AdapterError::Canceled => {
1192                write!(f, "canceling statement due to user request")
1193            }
1194            AdapterError::IdleInTransactionSessionTimeout => {
1195                write!(
1196                    f,
1197                    "terminating connection due to idle-in-transaction timeout"
1198                )
1199            }
1200            AdapterError::RecursionLimit(e) => e.fmt(f),
1201            AdapterError::RelationOutsideTimeDomain { .. } => {
1202                write!(
1203                    f,
1204                    "Transactions can only reference objects in the same timedomain. \
1205                     See https://materialize.com/docs/sql/begin/#same-timedomain-error",
1206                )
1207            }
1208            AdapterError::ResourceExhaustion {
1209                resource_type,
1210                limit_name,
1211                desired,
1212                limit,
1213                current,
1214            } => {
1215                write!(
1216                    f,
1217                    "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
1218                )
1219            }
1220            AdapterError::ResultSize(e) => write!(f, "{e}"),
1221            AdapterError::SafeModeViolation(feature) => {
1222                write!(f, "cannot create {} in safe mode", feature)
1223            }
1224            AdapterError::SubscribeOnlyTransaction => {
1225                f.write_str("SUBSCRIBE in transactions must be the only read statement")
1226            }
1227            AdapterError::Optimizer(e) => e.fmt(f),
1228            AdapterError::UnallowedOnCluster {
1229                depends_on,
1230                cluster,
1231            } => {
1232                let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
1233                write!(
1234                    f,
1235                    "querying the following items {items} is not allowed from the {} cluster",
1236                    cluster.quoted()
1237                )
1238            }
1239            AdapterError::Unauthorized(unauthorized) => {
1240                write!(f, "{unauthorized}")
1241            }
1242            AdapterError::UnknownCursor(name) => {
1243                write!(f, "cursor {} does not exist", name.quoted())
1244            }
1245            AdapterError::UnknownLoginRole(name) => {
1246                write!(f, "role {} does not exist", name.quoted())
1247            }
1248            AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
1249            AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
1250            AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
1251            AdapterError::UnknownPreparedStatement(name) => {
1252                write!(f, "prepared statement {} does not exist", name.quoted())
1253            }
1254            AdapterError::UnknownClusterReplica {
1255                cluster_name,
1256                replica_name,
1257            } => write!(
1258                f,
1259                "cluster replica '{cluster_name}.{replica_name}' does not exist"
1260            ),
1261            AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
1262                f,
1263                "unrecognized configuration parameter {}",
1264                setting_name.quoted()
1265            ),
1266            AdapterError::UntargetedLogRead { .. } => {
1267                f.write_str("log source reads must target a replica")
1268            }
1269            AdapterError::DDLOnlyTransaction => f.write_str(
1270                "transactions which modify objects are restricted to just modifying objects",
1271            ),
1272            AdapterError::DDLTransactionRace => f.write_str(
1273                "another session modified the catalog while this DDL transaction was open",
1274            ),
1275            AdapterError::Storage(e) => e.fmt(f),
1276            AdapterError::Compute(e) => e.fmt(f),
1277            AdapterError::Orchestrator(e) => e.fmt(f),
1278            AdapterError::DependentObject(dependent_objects) => {
1279                let role_str = if dependent_objects.keys().count() == 1 {
1280                    "role"
1281                } else {
1282                    "roles"
1283                };
1284                write!(
1285                    f,
1286                    "{role_str} \"{}\" cannot be dropped because some objects depend on it",
1287                    dependent_objects.keys().join(", ")
1288                )
1289            }
1290            AdapterError::InvalidAlter(t, e) => {
1291                write!(f, "invalid ALTER {t}: {e}")
1292            }
1293            AdapterError::ConnectionValidation(e) => e.fmt(f),
1294            AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
1295                write!(
1296                    f,
1297                    "all the specified refreshes of the materialized view would be too far in the past, and thus they \
1298                    would never happen"
1299                )
1300            }
1301            AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
1302                write!(
1303                    f,
1304                    "REFRESH AT requested for a time where not all the inputs are readable"
1305                )
1306            }
1307            AdapterError::RtrTimeout(_) => {
1308                write!(
1309                    f,
1310                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
1311                )
1312            }
1313            AdapterError::RtrDropFailure(_) => write!(
1314                f,
1315                "real-time source dropped before ingesting the upstream system's visible frontier"
1316            ),
1317            AdapterError::UnreadableSinkCollection => {
1318                write!(f, "collection is not readable at any time")
1319            }
1320            AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
1321            AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
1322            AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
1323            AdapterError::AlterClusterTimeout => {
1324                write!(f, "canceling statement, provided timeout lapsed")
1325            }
1326            AdapterError::AuthenticationError(e) => {
1327                write!(f, "authentication error {e}")
1328            }
1329            AdapterError::UnavailableFeature { feature, docs } => {
1330                write!(f, "{} is not supported in this environment.", feature)?;
1331                if let Some(docs) = docs {
1332                    write!(
1333                        f,
1334                        " For more information consult the documentation at {docs}"
1335                    )?;
1336                }
1337                Ok(())
1338            }
1339            AdapterError::AlterClusterWhilePendingReplicas => {
1340                write!(f, "cannot alter clusters with pending updates")
1341            }
1342            AdapterError::ReplacementSchemaMismatch(_) => {
1343                write!(f, "replacement schema differs from target schema")
1344            }
1345            AdapterError::ImpossibleTimestampConstraints { .. } => {
1346                write!(f, "could not find a valid timestamp for the query")
1347            }
1348            AdapterError::OidcGroupSyncFailed(msg) => {
1349                write!(f, "OIDC group-to-role sync failed: {msg}")
1350            }
1351            AdapterError::BoundedStalenessExceeded { bound, .. } => {
1352                write!(
1353                    f,
1354                    "cannot serve query under bounded staleness {}",
1355                    humantime::format_duration(*bound),
1356                )
1357            }
1358            AdapterError::BoundedStalenessReadOnly => {
1359                f.write_str("writes are not permitted under bounded staleness isolation")
1360            }
1361            AdapterError::BoundedStalenessRealTimeRecencyConflict => {
1362                f.write_str("real_time_recency cannot be combined with bounded staleness isolation")
1363            }
1364            AdapterError::BoundedStalenessTimelineUnsupported => {
1365                f.write_str("bounded staleness isolation requires the EpochMilliseconds timeline")
1366            }
1367        }
1368    }
1369}
1370
1371impl From<anyhow::Error> for AdapterError {
1372    fn from(e: anyhow::Error) -> AdapterError {
1373        match e.downcast::<PlanError>() {
1374            Ok(plan_error) => AdapterError::PlanError(plan_error),
1375            Err(e) => AdapterError::Unstructured(e),
1376        }
1377    }
1378}
1379
1380impl From<TryFromIntError> for AdapterError {
1381    fn from(e: TryFromIntError) -> AdapterError {
1382        AdapterError::Unstructured(e.into())
1383    }
1384}
1385
1386impl From<TryFromDecimalError> for AdapterError {
1387    fn from(e: TryFromDecimalError) -> AdapterError {
1388        AdapterError::Unstructured(e.into())
1389    }
1390}
1391
1392impl From<mz_catalog::memory::error::Error> for AdapterError {
1393    fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
1394        AdapterError::Catalog(e)
1395    }
1396}
1397
1398impl From<mz_catalog::durable::CatalogError> for AdapterError {
1399    fn from(e: mz_catalog::durable::CatalogError) -> Self {
1400        mz_catalog::memory::error::Error::from(e).into()
1401    }
1402}
1403
1404impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
1405    fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1406        mz_catalog::durable::CatalogError::from(e).into()
1407    }
1408}
1409
1410impl From<EvalError> for AdapterError {
1411    fn from(e: EvalError) -> AdapterError {
1412        AdapterError::Eval(e)
1413    }
1414}
1415
1416impl From<ExplainError> for AdapterError {
1417    fn from(e: ExplainError) -> AdapterError {
1418        match e {
1419            ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1420            e => AdapterError::Explain(e),
1421        }
1422    }
1423}
1424
1425impl From<mz_sql::catalog::CatalogError> for AdapterError {
1426    fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1427        AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1428    }
1429}
1430
1431impl From<PlanError> for AdapterError {
1432    fn from(e: PlanError) -> AdapterError {
1433        match e {
1434            PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1435            _ => AdapterError::PlanError(e),
1436        }
1437    }
1438}
1439
1440impl From<OptimizerError> for AdapterError {
1441    fn from(e: OptimizerError) -> AdapterError {
1442        use OptimizerError::*;
1443        match e {
1444            PlanError(e) => Self::PlanError(e),
1445            RecursionLimitError(e) => Self::RecursionLimit(e),
1446            EvalError(e) => Self::Eval(e),
1447            InternalUnsafeMfpPlan(e) => Self::Internal(e),
1448            Internal(e) => Self::Internal(e),
1449            RestrictedFunction(func) => {
1450                Self::Unauthorized(mz_sql::rbac::UnauthorizedError::RestrictedSystemObject {
1451                    object_name: format!("function {func}"),
1452                })
1453            }
1454            e => Self::Optimizer(e),
1455        }
1456    }
1457}
1458
1459impl From<NotNullViolation> for AdapterError {
1460    fn from(e: NotNullViolation) -> AdapterError {
1461        AdapterError::ConstraintViolation(e)
1462    }
1463}
1464
1465impl From<RecursionLimitError> for AdapterError {
1466    fn from(e: RecursionLimitError) -> AdapterError {
1467        AdapterError::RecursionLimit(e)
1468    }
1469}
1470
1471impl From<oneshot::error::RecvError> for AdapterError {
1472    fn from(e: oneshot::error::RecvError) -> AdapterError {
1473        AdapterError::Unstructured(e.into())
1474    }
1475}
1476
1477impl From<StorageError> for AdapterError {
1478    fn from(e: StorageError) -> Self {
1479        AdapterError::Storage(e)
1480    }
1481}
1482
1483impl From<compute_error::InstanceExists> for AdapterError {
1484    fn from(e: compute_error::InstanceExists) -> Self {
1485        AdapterError::Compute(e.into())
1486    }
1487}
1488
1489impl From<TimestampError> for AdapterError {
1490    fn from(e: TimestampError) -> Self {
1491        let e: EvalError = e.into();
1492        e.into()
1493    }
1494}
1495
1496impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1497    fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1498        AdapterError::ParseError(e)
1499    }
1500}
1501
1502impl From<VarError> for AdapterError {
1503    fn from(e: VarError) -> Self {
1504        let e: mz_catalog::memory::error::Error = e.into();
1505        e.into()
1506    }
1507}
1508
1509impl From<rbac::UnauthorizedError> for AdapterError {
1510    fn from(e: rbac::UnauthorizedError) -> Self {
1511        AdapterError::Unauthorized(e)
1512    }
1513}
1514
1515impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1516    fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1517        AdapterError::PlanError(PlanError::InvalidIdent(value))
1518    }
1519}
1520
1521impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1522    fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1523        match value {
1524            mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1525                AdapterError::ResourceExhaustion {
1526                    resource_type: "connection".into(),
1527                    limit_name: "max_connections".into(),
1528                    desired: (current + 1).to_string(),
1529                    limit: limit.to_string(),
1530                    current: current.to_string(),
1531                }
1532            }
1533        }
1534    }
1535}
1536
1537impl From<NetworkPolicyError> for AdapterError {
1538    fn from(value: NetworkPolicyError) -> Self {
1539        AdapterError::NetworkPolicyDenied(value)
1540    }
1541}
1542
1543impl From<ConnectionValidationError> for AdapterError {
1544    fn from(e: ConnectionValidationError) -> AdapterError {
1545        AdapterError::ConnectionValidation(e)
1546    }
1547}
1548
1549impl Error for AdapterError {}