mz_sql/plan/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37    CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45    CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46    KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47    MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53    /// This feature is not yet supported, but may be supported at some point in the future.
54    Unsupported {
55        feature: String,
56        discussion_no: Option<usize>,
57    },
58    /// This feature is not supported, and will likely never be supported.
59    NeverSupported {
60        feature: String,
61        documentation_link: Option<String>,
62        details: Option<String>,
63    },
64    UnknownColumn {
65        table: Option<PartialItemName>,
66        column: ColumnName,
67        similar: Box<[ColumnName]>,
68    },
69    UngroupedColumn {
70        table: Option<PartialItemName>,
71        column: ColumnName,
72    },
73    ItemWithoutColumns {
74        name: String,
75        item_type: CatalogItemType,
76    },
77    WrongJoinTypeForLateralColumn {
78        table: Option<PartialItemName>,
79        column: ColumnName,
80    },
81    AmbiguousColumn(ColumnName),
82    TooManyColumns {
83        max_num_columns: usize,
84        req_num_columns: usize,
85    },
86    ColumnAlreadyExists {
87        column_name: ColumnName,
88        object_name: String,
89    },
90    AmbiguousTable(PartialItemName),
91    UnknownColumnInUsingClause {
92        column: ColumnName,
93        join_side: JoinSide,
94    },
95    AmbiguousColumnInUsingClause {
96        column: ColumnName,
97        join_side: JoinSide,
98    },
99    MisqualifiedName(String),
100    OverqualifiedDatabaseName(String),
101    OverqualifiedSchemaName(String),
102    UnderqualifiedColumnName(String),
103    SubqueriesDisallowed {
104        context: String,
105    },
106    UnknownParameter(usize),
107    ParameterNotAllowed(String),
108    WrongParameterType(usize, String, String),
109    RecursionLimit(RecursionLimitError),
110    StrconvParse(strconv::ParseError),
111    Catalog(CatalogError),
112    UpsertSinkWithoutKey,
113    UpsertSinkWithInvalidKey {
114        name: String,
115        desired_key: Vec<String>,
116        valid_keys: Vec<Vec<String>>,
117    },
118    InvalidWmrRecursionLimit(String),
119    InvalidNumericMaxScale(InvalidNumericMaxScaleError),
120    InvalidCharLength(InvalidCharLengthError),
121    InvalidId(CatalogItemId),
122    InvalidIdent(IdentError),
123    InvalidObject(Box<ResolvedItemName>),
124    InvalidObjectType {
125        expected_type: SystemObjectType,
126        actual_type: SystemObjectType,
127        object_name: String,
128    },
129    InvalidPrivilegeTypes {
130        invalid_privileges: AclMode,
131        object_description: ErrorMessageObjectDescription,
132    },
133    InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
134    InvalidTimestampPrecision(InvalidTimestampPrecisionError),
135    InvalidSecret(Box<ResolvedItemName>),
136    InvalidTemporarySchema,
137    InvalidCast {
138        name: String,
139        ccx: CastContext,
140        from: String,
141        to: String,
142    },
143    InvalidTable {
144        name: String,
145    },
146    InvalidVersion {
147        name: String,
148        version: String,
149    },
150    InvalidSinkFrom {
151        name: String,
152        item_type: CatalogItemType,
153    },
154    InvalidDependency {
155        name: String,
156        item_type: CatalogItemType,
157    },
158    MangedReplicaName(String),
159    ParserStatement(ParserStatementError),
160    Parser(ParserError),
161    DropViewOnMaterializedView(String),
162    DependentObjectsStillExist {
163        object_type: String,
164        object_name: String,
165        // (dependent type, name)
166        dependents: Vec<(String, String)>,
167    },
168    AlterViewOnMaterializedView(String),
169    ShowCreateViewOnMaterializedView(String),
170    ExplainViewOnMaterializedView(String),
171    UnacceptableTimelineName(String),
172    FetchingCsrSchemaFailed {
173        schema_lookup: String,
174        cause: Arc<dyn Error + Send + Sync>,
175    },
176    PostgresConnectionErr {
177        cause: Arc<mz_postgres_util::PostgresError>,
178    },
179    MySqlConnectionErr {
180        cause: Arc<MySqlError>,
181    },
182    SqlServerConnectionErr {
183        cause: Arc<SqlServerError>,
184    },
185    SubsourceNameConflict {
186        name: UnresolvedItemName,
187        upstream_references: Vec<UnresolvedItemName>,
188    },
189    SubsourceDuplicateReference {
190        name: UnresolvedItemName,
191        target_names: Vec<UnresolvedItemName>,
192    },
193    NoTablesFoundForSchemas(Vec<String>),
194    InvalidProtobufSchema {
195        cause: protobuf_native::OperationFailedError,
196    },
197    InvalidOptionValue {
198        // Expected to be generated from the `to_ast_string` value on the option
199        // name.
200        option_name: String,
201        err: Box<PlanError>,
202    },
203    UnexpectedDuplicateReference {
204        name: UnresolvedItemName,
205    },
206    /// Declaration of a recursive type did not match the inferred type.
207    RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
208    UnknownFunction {
209        name: String,
210        arg_types: Vec<String>,
211    },
212    IndistinctFunction {
213        name: String,
214        arg_types: Vec<String>,
215    },
216    UnknownOperator {
217        name: String,
218        arg_types: Vec<String>,
219    },
220    IndistinctOperator {
221        name: String,
222        arg_types: Vec<String>,
223    },
224    InvalidPrivatelinkAvailabilityZone {
225        name: String,
226        supported_azs: BTreeSet<String>,
227    },
228    DuplicatePrivatelinkAvailabilityZone {
229        duplicate_azs: BTreeSet<String>,
230    },
231    InvalidSchemaName,
232    ItemAlreadyExists {
233        name: String,
234        item_type: CatalogItemType,
235    },
236    ManagedCluster {
237        cluster_name: String,
238    },
239    InvalidKeysInSubscribeEnvelopeUpsert,
240    InvalidKeysInSubscribeEnvelopeDebezium,
241    InvalidPartitionByEnvelopeDebezium {
242        column_name: String,
243    },
244    InvalidOrderByInSubscribeWithinTimestampOrderBy,
245    FromValueRequiresParen,
246    VarError(VarError),
247    UnsolvablePolymorphicFunctionInput,
248    ShowCommandInView,
249    WebhookValidationDoesNotUseColumns,
250    WebhookValidationNonDeterministic,
251    InternalFunctionCall,
252    CommentTooLong {
253        length: usize,
254        max_size: usize,
255    },
256    InvalidTimestampInterval {
257        min: Duration,
258        max: Duration,
259        requested: Duration,
260    },
261    InvalidGroupSizeHints,
262    PgSourcePurification(PgSourcePurificationError),
263    KafkaSourcePurification(KafkaSourcePurificationError),
264    KafkaSinkPurification(KafkaSinkPurificationError),
265    IcebergSinkPurification(IcebergSinkPurificationError),
266    LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
267    CsrPurification(CsrPurificationError),
268    MySqlSourcePurification(MySqlSourcePurificationError),
269    SqlServerSourcePurificationError(SqlServerSourcePurificationError),
270    UseTablesForSources(String),
271    MissingName(CatalogItemType),
272    InvalidRefreshAt,
273    InvalidRefreshEveryAlignedTo,
274    CreateReplicaFailStorageObjects {
275        /// The current number of replicas on the cluster
276        current_replica_count: usize,
277        /// THe number of internal replicas on the cluster
278        internal_replica_count: usize,
279        /// The number of replicas that executing this command would have
280        /// created
281        hypothetical_replica_count: usize,
282    },
283    MismatchedObjectType {
284        name: PartialItemName,
285        is_type: ObjectType,
286        expected_type: ObjectType,
287    },
288    /// MZ failed to generate cast for the data type.
289    TableContainsUningestableTypes {
290        name: String,
291        type_: String,
292        column: String,
293    },
294    RetainHistoryLow {
295        limit: Duration,
296    },
297    RetainHistoryRequired,
298    UntilReadyTimeoutRequired,
299    SubsourceResolutionError(ExternalReferenceResolutionError),
300    Replan(String),
301    NetworkPolicyLockoutError,
302    NetworkPolicyInUse,
303    /// Expected a constant expression that evaluates without an error to a non-null value.
304    ConstantExpressionSimplificationFailed(String),
305    InvalidOffset(String),
306    /// The named cursor does not exist.
307    UnknownCursor(String),
308    CopyFromTargetTableDropped {
309        target_name: String,
310    },
311    /// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value.
312    InvalidAsOfUpTo,
313    InvalidReplacement {
314        item_type: CatalogItemType,
315        item_name: PartialItemName,
316        replacement_type: CatalogItemType,
317        replacement_name: PartialItemName,
318    },
319    // TODO(benesch): eventually all errors should be structured.
320    Unstructured(String),
321}
322
323impl PlanError {
324    pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
325        PlanError::UngroupedColumn {
326            table: item.table_name.clone(),
327            column: item.column_name.clone(),
328        }
329    }
330
331    pub fn detail(&self) -> Option<String> {
332        match self {
333            Self::NeverSupported { details, .. } => details.clone(),
334            Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
335            Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
336            Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
337            Self::InvalidOptionValue { err, .. } => err.detail(),
338            Self::UpsertSinkWithInvalidKey {
339                name,
340                desired_key,
341                valid_keys,
342            } => {
343                let valid_keys = if valid_keys.is_empty() {
344                    "There are no known valid unique keys for the underlying relation.".into()
345                } else {
346                    format!(
347                        "The following keys are known to be unique for the underlying relation:\n{}",
348                        valid_keys
349                            .iter()
350                            .map(|k|
351                                format!("  ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
352                            )
353                            .join("\n"),
354                    )
355                };
356                Some(format!(
357                    "Materialize could not prove that the specified upsert envelope key ({}) \
358                    was a unique key of the underlying relation {}. {valid_keys}",
359                    separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
360                    name.quoted()
361                ))
362            }
363            Self::VarError(e) => e.detail(),
364            Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
365            Self::PgSourcePurification(e) => e.detail(),
366            Self::MySqlSourcePurification(e) => e.detail(),
367            Self::SqlServerSourcePurificationError(e) => e.detail(),
368            Self::KafkaSourcePurification(e) => e.detail(),
369            Self::LoadGeneratorSourcePurification(e) => e.detail(),
370            Self::CsrPurification(e) => e.detail(),
371            Self::KafkaSinkPurification(e) => e.detail(),
372            Self::IcebergSinkPurification(e) => e.detail(),
373            Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
374                Some(format!(
375                    "Currently have {} replica{}{}; command would result in {}",
376                    current,
377                    if *current != 1 { "s" } else { "" },
378                    if *internal > 0 {
379                        format!(" ({} internal)", internal)
380                    } else {
381                        "".to_string()
382                    },
383                    target
384                ))
385            },
386            Self::SubsourceNameConflict {
387                name: _,
388                upstream_references,
389            } => Some(format!(
390                "referenced tables with duplicate name: {}",
391                itertools::join(upstream_references, ", ")
392            )),
393            Self::SubsourceDuplicateReference {
394                name: _,
395                target_names,
396            } => Some(format!(
397                "subsources referencing table: {}",
398                itertools::join(target_names, ", ")
399            )),
400            Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
401                "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
402            ),
403            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
404                "missing schemas: {}",
405                separated(", ", schemas.iter().map(|c| c.quoted()))
406            )),
407            _ => None,
408        }
409    }
410
411    pub fn hint(&self) -> Option<String> {
412        match self {
413            Self::DropViewOnMaterializedView(_) => {
414                Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
415            }
416            Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
417            Self::AlterViewOnMaterializedView(_) => {
418                Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
419            }
420            Self::ShowCreateViewOnMaterializedView(_) => {
421                Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
422            }
423            Self::ExplainViewOnMaterializedView(_) => {
424                Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
425            }
426            Self::UnacceptableTimelineName(_) => {
427                Some("The prefix \"mz_\" is reserved for system timelines.".into())
428            }
429            Self::PostgresConnectionErr { cause } => {
430                if let Some(cause) = cause.source() {
431                    if let Some(cause) = cause.downcast_ref::<io::Error>() {
432                        if cause.kind() == io::ErrorKind::TimedOut {
433                            return Some(
434                                "Do you have a firewall or security group that is \
435                                preventing Materialize from connecting to your PostgreSQL server?"
436                                    .into(),
437                            );
438                        }
439                    }
440                }
441                None
442            }
443            Self::InvalidOptionValue { err, .. } => err.hint(),
444            Self::UnknownFunction { ..} => Some("No function matches the given name and argument types.  You might need to add explicit type casts.".into()),
445            Self::IndistinctFunction {..} => {
446                Some("Could not choose a best candidate function.  You might need to add explicit type casts.".into())
447            }
448            Self::UnknownOperator {..} => {
449                Some("No operator matches the given name and argument types.  You might need to add explicit type casts.".into())
450            }
451            Self::IndistinctOperator {..} => {
452                Some("Could not choose a best candidate operator.  You might need to add explicit type casts.".into())
453            },
454            Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
455                let supported_azs_str = supported_azs.iter().join("\n  ");
456                Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n  {}", supported_azs_str))
457            }
458            Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
459                let duplicate_azs  = duplicate_azs.iter().join("\n  ");
460                Some(format!("Duplicated availability zones:\n  {}", duplicate_azs))
461            }
462            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
463                Some("All keys must be columns on the underlying relation.".into())
464            }
465            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
466                Some("All keys must be columns on the underlying relation.".into())
467            }
468            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
469                Some("All order bys must be output columns.".into())
470            }
471            Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
472                Some("See: https://materialize.com/s/sink-key-selection".into())
473            }
474            Self::Catalog(e) => e.hint(),
475            Self::VarError(e) => e.hint(),
476            Self::PgSourcePurification(e) => e.hint(),
477            Self::MySqlSourcePurification(e) => e.hint(),
478            Self::SqlServerSourcePurificationError(e) => e.hint(),
479            Self::KafkaSourcePurification(e) => e.hint(),
480            Self::LoadGeneratorSourcePurification(e) => e.hint(),
481            Self::CsrPurification(e) => e.hint(),
482            Self::KafkaSinkPurification(e) => e.hint(),
483            Self::UnknownColumn { table, similar, .. } => {
484                let suffix = "Make sure to surround case sensitive names in double quotes.";
485                match &similar[..] {
486                    [] => None,
487                    [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
488                    names => {
489                        let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
490                        Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
491                    }
492                }
493            }
494            Self::RecursiveTypeMismatch(..) => {
495                Some("You will need to rewrite or cast the query's expressions.".into())
496            },
497            Self::InvalidRefreshAt
498            | Self::InvalidRefreshEveryAlignedTo => {
499                Some("Calling `mz_now()` is allowed.".into())
500            },
501            Self::TableContainsUningestableTypes { column,.. } => {
502                Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
503            }
504            Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
505                Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
506            }
507            Self::NetworkPolicyInUse => {
508                Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
509            }
510            Self::WrongParameterType(_, _, _) => {
511                Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context.  Try adding an explicit cast.".into())
512            }
513            Self::InvalidSchemaName => {
514                Some("Use SET schema = name to select a schema.  Use SHOW SCHEMAS to list available schemas.  Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
515            }
516            _ => None,
517        }
518    }
519}
520
521impl fmt::Display for PlanError {
522    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
523        match self {
524            Self::Unsupported { feature, discussion_no } => {
525                write!(f, "{} not yet supported", feature)?;
526                if let Some(discussion_no) = discussion_no {
527                    write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
528                }
529                Ok(())
530            }
531            Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
532                write!(f, "{feature} is not supported",)?;
533                if let Some(documentation_path) = documentation_path {
534                    write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
535                }
536                Ok(())
537            }
538            Self::UnknownColumn { table, column, similar: _ } => write!(
539                f,
540                "column {} does not exist",
541                ColumnDisplay { table, column }
542            ),
543            Self::UngroupedColumn { table, column } => write!(
544                f,
545                "column {} must appear in the GROUP BY clause or be used in an aggregate function",
546                ColumnDisplay { table, column },
547            ),
548            Self::ItemWithoutColumns { name, item_type } => {
549                let name = name.quoted();
550                write!(f, "{item_type} {name} does not have columns")
551            }
552            Self::WrongJoinTypeForLateralColumn { table, column } => write!(
553                f,
554                "column {} cannot be referenced from this part of the query: \
555                the combining JOIN type must be INNER or LEFT for a LATERAL reference",
556                ColumnDisplay { table, column },
557            ),
558            Self::AmbiguousColumn(column) => write!(
559                f,
560                "column reference {} is ambiguous",
561                column.quoted()
562            ),
563            Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
564                f,
565                "attempt to create relation with too many columns, {} max: {}",
566                req_num_columns, max_num_columns
567            ),
568            Self::ColumnAlreadyExists { column_name, object_name } => write!(
569                f,
570                "column {} of relation {} already exists",
571                column_name.quoted(), object_name.quoted(),
572            ),
573            Self::AmbiguousTable(table) => write!(
574                f,
575                "table reference {} is ambiguous",
576                table.item.as_str().quoted()
577            ),
578            Self::UnknownColumnInUsingClause { column, join_side } => write!(
579                f,
580                "column {} specified in USING clause does not exist in {} table",
581                column.quoted(),
582                join_side,
583            ),
584            Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
585                f,
586                "common column name {} appears more than once in {} table",
587                column.quoted(),
588                join_side,
589            ),
590            Self::MisqualifiedName(name) => write!(
591                f,
592                "qualified name did not have between 1 and 3 components: {}",
593                name
594            ),
595            Self::OverqualifiedDatabaseName(name) => write!(
596                f,
597                "database name '{}' does not have exactly one component",
598                name
599            ),
600            Self::OverqualifiedSchemaName(name) => write!(
601                f,
602                "schema name '{}' cannot have more than two components",
603                name
604            ),
605            Self::UnderqualifiedColumnName(name) => write!(
606                f,
607                "column name '{}' must have at least a table qualification",
608                name
609            ),
610            Self::UnacceptableTimelineName(name) => {
611                write!(f, "unacceptable timeline name {}", name.quoted(),)
612            }
613            Self::SubqueriesDisallowed { context } => {
614                write!(f, "{} does not allow subqueries", context)
615            }
616            Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
617            Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
618            Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
619            Self::RecursionLimit(e) => write!(f, "{}", e),
620            Self::StrconvParse(e) => write!(f, "{}", e),
621            Self::Catalog(e) => write!(f, "{}", e),
622            Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
623            Self::UpsertSinkWithInvalidKey { .. } => {
624                write!(f, "upsert key could not be validated as unique")
625            }
626            Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
627            Self::InvalidNumericMaxScale(e) => e.fmt(f),
628            Self::InvalidCharLength(e) => e.fmt(f),
629            Self::InvalidVarCharMaxLength(e) => e.fmt(f),
630            Self::InvalidTimestampPrecision(e) => e.fmt(f),
631            Self::Parser(e) => e.fmt(f),
632            Self::ParserStatement(e) => e.fmt(f),
633            Self::Unstructured(e) => write!(f, "{}", e),
634            Self::InvalidId(id) => write!(f, "invalid id {}", id),
635            Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
636            Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
637            Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
638            Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
639                write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
640            },
641            Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
642            Self::InvalidTemporarySchema => {
643                write!(f, "cannot create temporary item in non-temporary schema")
644            }
645            Self::InvalidCast { name, ccx, from, to } =>{
646                write!(
647                    f,
648                    "{name} does not support {ccx}casting from {from} to {to}",
649                    ccx = if matches!(ccx, CastContext::Implicit) {
650                        "implicitly "
651                    } else {
652                        ""
653                    },
654                )
655            }
656            Self::InvalidTable { name } => {
657                write!(f, "invalid table definition for {}", name.quoted())
658            },
659            Self::InvalidVersion { name, version } => {
660                write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
661            },
662            Self::InvalidSinkFrom { name, item_type } => {
663                write!(f, "{name} is a {item_type}, which cannot be exported as a sink")
664            },
665            Self::InvalidDependency { name, item_type } => {
666                let a = if *item_type == CatalogItemType::Index { "an" } else { "a" };
667                write!(f, "{name} is {a} {item_type}, which cannot be depended upon")
668            },
669            Self::DropViewOnMaterializedView(name)
670            | Self::AlterViewOnMaterializedView(name)
671            | Self::ShowCreateViewOnMaterializedView(name)
672            | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
673            Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
674                write!(f, "failed to fetch schema {schema_lookup} from schema registry")
675            }
676            Self::PostgresConnectionErr { .. } => {
677                write!(f, "failed to connect to PostgreSQL database")
678            }
679            Self::MySqlConnectionErr { cause } => {
680                write!(f, "failed to connect to MySQL database: {}", cause)
681            }
682            Self::SqlServerConnectionErr { cause } => {
683                write!(f, "failed to connect to SQL Server database: {}", cause)
684            }
685            Self::SubsourceNameConflict {
686                name , upstream_references: _,
687            } => {
688                write!(f, "multiple subsources would be named {}", name)
689            },
690            Self::SubsourceDuplicateReference {
691                name,
692                target_names: _,
693            } => {
694                write!(f, "multiple subsources refer to table {}", name)
695            },
696            Self::NoTablesFoundForSchemas(schemas) => {
697                write!(f, "no tables found in referenced schemas: {}",
698                    separated(", ", schemas.iter().map(|c| c.quoted()))
699                )
700            },
701            Self::InvalidProtobufSchema { .. } => {
702                write!(f, "invalid protobuf schema")
703            }
704            Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
705                let reason = match &dependents[..] {
706                    [] => " because other objects depend on it".to_string(),
707                    dependents => {
708                        let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
709                        format!(": still depended upon by {dependents}")
710                    },
711                };
712                let object_name = object_name.quoted();
713                write!(f, "cannot drop {object_type} {object_name}{reason}")
714            }
715            Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
716            Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
717            Self::RecursiveTypeMismatch(name, declared, inferred) => {
718                let declared = separated(", ", declared);
719                let inferred = separated(", ", inferred);
720                let name = name.quoted();
721                write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
722            },
723            Self::UnknownFunction {name, arg_types, ..} => {
724                write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
725            },
726            Self::IndistinctFunction {name, arg_types, ..} => {
727                write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
728            },
729            Self::UnknownOperator {name, arg_types, ..} => {
730                write!(f, "operator does not exist: {}", match arg_types.as_slice(){
731                    [typ] => format!("{} {}", name, typ),
732                    [ltyp, rtyp] => {
733                        format!("{} {} {}", ltyp, name, rtyp)
734                    }
735                    _ => unreachable!("non-unary non-binary operator"),
736                })
737            },
738            Self::IndistinctOperator {name, arg_types, ..} => {
739                write!(f, "operator is not unique: {}", match arg_types.as_slice(){
740                    [typ] => format!("{} {}", name, typ),
741                    [ltyp, rtyp] => {
742                        format!("{} {} {}", ltyp, name, rtyp)
743                    }
744                    _ => unreachable!("non-unary non-binary operator"),
745                })
746            },
747            Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
748            Self::DuplicatePrivatelinkAvailabilityZone {..} =>   write!(f, "connection cannot contain duplicate availability zones"),
749            Self::InvalidSchemaName => write!(f, "no valid schema selected"),
750            Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
751            Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
752            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
753                write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
754            }
755            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
756                write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
757            }
758            Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
759                write!(
760                    f,
761                    "PARTITION BY expression cannot refer to non-key column {}",
762                    column_name.quoted(),
763                )
764            }
765            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
766                write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
767            }
768            Self::FromValueRequiresParen => f.write_str(
769                "VALUES expression in FROM clause must be surrounded by parentheses"
770            ),
771            Self::VarError(e) => e.fmt(f),
772            Self::UnsolvablePolymorphicFunctionInput => f.write_str(
773                "could not determine polymorphic type because input has type unknown"
774            ),
775            Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
776            Self::WebhookValidationDoesNotUseColumns => f.write_str(
777                "expression provided in CHECK does not reference any columns"
778            ),
779            Self::WebhookValidationNonDeterministic => f.write_str(
780                "expression provided in CHECK is not deterministic"
781            ),
782            Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
783            Self::CommentTooLong { length, max_size } => {
784                write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
785            }
786            Self::InvalidTimestampInterval { min, max, requested } => {
787                write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
788            }
789            Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
790                simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
791                or LIMIT INPUT GROUP SIZE"),
792            Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
793            Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
794            Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
795            Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
796            Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
797            Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
798            Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
799            Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
800            Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
801            Self::MangedReplicaName(name) => {
802                write!(f, "{name} is reserved for replicas of managed clusters")
803            }
804            Self::MissingName(item_type) => {
805                write!(f, "unspecified name for {item_type}")
806            }
807            Self::InvalidRefreshAt => {
808                write!(f, "REFRESH AT argument must be an expression that can be simplified \
809                           and/or cast to a constant whose type is mz_timestamp")
810            }
811            Self::InvalidRefreshEveryAlignedTo => {
812                write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
813                           and/or cast to a constant whose type is mz_timestamp")
814            }
815            Self::CreateReplicaFailStorageObjects {..} => {
816                write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
817            },
818            Self::MismatchedObjectType {
819                name,
820                is_type,
821                expected_type,
822            } => {
823                write!(
824                    f,
825                    "{name} is {} {} not {} {}",
826                    if *is_type == ObjectType::Index {
827                        "an"
828                    } else {
829                        "a"
830                    },
831                    is_type.to_string().to_lowercase(),
832                    if *expected_type == ObjectType::Index {
833                        "an"
834                    } else {
835                        "a"
836                    },
837                    expected_type.to_string().to_lowercase()
838                )
839            }
840            Self::TableContainsUningestableTypes { name, type_, column } => {
841                write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
842            },
843            Self::RetainHistoryLow { limit } => {
844                write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
845            },
846            Self::RetainHistoryRequired => {
847                write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
848            },
849            Self::SubsourceResolutionError(e) => write!(f, "{}", e),
850            Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
851            Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
852            Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
853            Self::UntilReadyTimeoutRequired => {
854                write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
855            },
856            Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
857            Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
858            Self::UnknownCursor(name) => {
859                write!(f, "cursor {} does not exist", name.quoted())
860            }
861            Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
862            Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"),
863            Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => {
864                write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}")
865            }
866        }
867    }
868}
869
870impl Error for PlanError {}
871
872impl From<CatalogError> for PlanError {
873    fn from(e: CatalogError) -> PlanError {
874        PlanError::Catalog(e)
875    }
876}
877
878impl From<strconv::ParseError> for PlanError {
879    fn from(e: strconv::ParseError) -> PlanError {
880        PlanError::StrconvParse(e)
881    }
882}
883
884impl From<RecursionLimitError> for PlanError {
885    fn from(e: RecursionLimitError) -> PlanError {
886        PlanError::RecursionLimit(e)
887    }
888}
889
890impl From<InvalidNumericMaxScaleError> for PlanError {
891    fn from(e: InvalidNumericMaxScaleError) -> PlanError {
892        PlanError::InvalidNumericMaxScale(e)
893    }
894}
895
896impl From<InvalidCharLengthError> for PlanError {
897    fn from(e: InvalidCharLengthError) -> PlanError {
898        PlanError::InvalidCharLength(e)
899    }
900}
901
902impl From<InvalidVarCharMaxLengthError> for PlanError {
903    fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
904        PlanError::InvalidVarCharMaxLength(e)
905    }
906}
907
908impl From<InvalidTimestampPrecisionError> for PlanError {
909    fn from(e: InvalidTimestampPrecisionError) -> PlanError {
910        PlanError::InvalidTimestampPrecision(e)
911    }
912}
913
914impl From<anyhow::Error> for PlanError {
915    fn from(e: anyhow::Error) -> PlanError {
916        // WIP: Do we maybe want to keep the alternate selector for these?
917        sql_err!("{}", e.display_with_causes())
918    }
919}
920
921impl From<TryFromIntError> for PlanError {
922    fn from(e: TryFromIntError) -> PlanError {
923        sql_err!("{}", e.display_with_causes())
924    }
925}
926
927impl From<ParseIntError> for PlanError {
928    fn from(e: ParseIntError) -> PlanError {
929        sql_err!("{}", e.display_with_causes())
930    }
931}
932
933impl From<EvalError> for PlanError {
934    fn from(e: EvalError) -> PlanError {
935        sql_err!("{}", e.display_with_causes())
936    }
937}
938
939impl From<ParserError> for PlanError {
940    fn from(e: ParserError) -> PlanError {
941        PlanError::Parser(e)
942    }
943}
944
945impl From<ParserStatementError> for PlanError {
946    fn from(e: ParserStatementError) -> PlanError {
947        PlanError::ParserStatement(e)
948    }
949}
950
951impl From<PostgresError> for PlanError {
952    fn from(e: PostgresError) -> PlanError {
953        PlanError::PostgresConnectionErr { cause: Arc::new(e) }
954    }
955}
956
957impl From<MySqlError> for PlanError {
958    fn from(e: MySqlError) -> PlanError {
959        PlanError::MySqlConnectionErr { cause: Arc::new(e) }
960    }
961}
962
963impl From<SqlServerError> for PlanError {
964    fn from(e: SqlServerError) -> PlanError {
965        PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
966    }
967}
968
969impl From<VarError> for PlanError {
970    fn from(e: VarError) -> Self {
971        PlanError::VarError(e)
972    }
973}
974
975impl From<PgSourcePurificationError> for PlanError {
976    fn from(e: PgSourcePurificationError) -> Self {
977        PlanError::PgSourcePurification(e)
978    }
979}
980
981impl From<KafkaSourcePurificationError> for PlanError {
982    fn from(e: KafkaSourcePurificationError) -> Self {
983        PlanError::KafkaSourcePurification(e)
984    }
985}
986
987impl From<KafkaSinkPurificationError> for PlanError {
988    fn from(e: KafkaSinkPurificationError) -> Self {
989        PlanError::KafkaSinkPurification(e)
990    }
991}
992
993impl From<IcebergSinkPurificationError> for PlanError {
994    fn from(e: IcebergSinkPurificationError) -> Self {
995        PlanError::IcebergSinkPurification(e)
996    }
997}
998
999impl From<CsrPurificationError> for PlanError {
1000    fn from(e: CsrPurificationError) -> Self {
1001        PlanError::CsrPurification(e)
1002    }
1003}
1004
1005impl From<LoadGeneratorSourcePurificationError> for PlanError {
1006    fn from(e: LoadGeneratorSourcePurificationError) -> Self {
1007        PlanError::LoadGeneratorSourcePurification(e)
1008    }
1009}
1010
1011impl From<MySqlSourcePurificationError> for PlanError {
1012    fn from(e: MySqlSourcePurificationError) -> Self {
1013        PlanError::MySqlSourcePurification(e)
1014    }
1015}
1016
1017impl From<SqlServerSourcePurificationError> for PlanError {
1018    fn from(e: SqlServerSourcePurificationError) -> Self {
1019        PlanError::SqlServerSourcePurificationError(e)
1020    }
1021}
1022
1023impl From<IdentError> for PlanError {
1024    fn from(e: IdentError) -> Self {
1025        PlanError::InvalidIdent(e)
1026    }
1027}
1028
1029impl From<ExternalReferenceResolutionError> for PlanError {
1030    fn from(e: ExternalReferenceResolutionError) -> Self {
1031        PlanError::SubsourceResolutionError(e)
1032    }
1033}
1034
1035struct ColumnDisplay<'a> {
1036    table: &'a Option<PartialItemName>,
1037    column: &'a ColumnName,
1038}
1039
1040impl<'a> fmt::Display for ColumnDisplay<'a> {
1041    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1042        if let Some(table) = &self.table {
1043            format!("{}.{}", table.item, self.column).quoted().fmt(f)
1044        } else {
1045            self.column.quoted().fmt(f)
1046        }
1047    }
1048}