Skip to main content

mz_sql/plan/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37    CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45    CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46    KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47    MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53    /// This feature is not yet supported, but may be supported at some point in the future.
54    Unsupported {
55        feature: String,
56        discussion_no: Option<usize>,
57    },
58    /// This feature is not supported, and will likely never be supported.
59    NeverSupported {
60        feature: String,
61        documentation_link: Option<String>,
62        details: Option<String>,
63    },
64    UnknownColumn {
65        table: Option<PartialItemName>,
66        column: ColumnName,
67        similar: Box<[ColumnName]>,
68    },
69    UngroupedColumn {
70        table: Option<PartialItemName>,
71        column: ColumnName,
72    },
73    ItemWithoutColumns {
74        name: String,
75        item_type: CatalogItemType,
76    },
77    WrongJoinTypeForLateralColumn {
78        table: Option<PartialItemName>,
79        column: ColumnName,
80    },
81    AmbiguousColumn(ColumnName),
82    TooManyColumns {
83        max_num_columns: usize,
84        req_num_columns: usize,
85    },
86    ColumnAlreadyExists {
87        column_name: ColumnName,
88        object_name: String,
89    },
90    AmbiguousTable(PartialItemName),
91    UnknownColumnInUsingClause {
92        column: ColumnName,
93        join_side: JoinSide,
94    },
95    AmbiguousColumnInUsingClause {
96        column: ColumnName,
97        join_side: JoinSide,
98    },
99    MisqualifiedName(String),
100    OverqualifiedDatabaseName(String),
101    OverqualifiedSchemaName(String),
102    UnderqualifiedColumnName(String),
103    SubqueriesDisallowed {
104        context: String,
105    },
106    UnknownParameter(usize),
107    ParameterNotAllowed(String),
108    WrongParameterType(usize, String, String),
109    RecursionLimit(RecursionLimitError),
110    StrconvParse(strconv::ParseError),
111    Catalog(CatalogError),
112    UpsertSinkWithoutKey,
113    UpsertSinkWithInvalidKey {
114        name: String,
115        desired_key: Vec<String>,
116        valid_keys: Vec<Vec<String>>,
117    },
118    IcebergSinkUnsupportedKeyType {
119        column: String,
120        column_type: String,
121    },
122    InvalidWmrRecursionLimit(String),
123    InvalidNumericMaxScale(InvalidNumericMaxScaleError),
124    InvalidCharLength(InvalidCharLengthError),
125    InvalidId(CatalogItemId),
126    InvalidIdent(IdentError),
127    InvalidObject(Box<ResolvedItemName>),
128    InvalidObjectType {
129        expected_type: SystemObjectType,
130        actual_type: SystemObjectType,
131        object_name: String,
132    },
133    InvalidPrivilegeTypes {
134        invalid_privileges: AclMode,
135        object_description: ErrorMessageObjectDescription,
136    },
137    InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
138    InvalidTimestampPrecision(InvalidTimestampPrecisionError),
139    InvalidSecret(Box<ResolvedItemName>),
140    InvalidTemporarySchema,
141    InvalidCast {
142        name: String,
143        ccx: CastContext,
144        from: String,
145        to: String,
146    },
147    /// Range type with an element type that is not supported (e.g. float, uint).
148    UnsupportedRangeElementType {
149        element_type_name: String,
150    },
151    InvalidTable {
152        name: String,
153    },
154    InvalidVersion {
155        name: String,
156        version: String,
157    },
158    InvalidSinkFrom {
159        name: String,
160        item_type: String,
161    },
162    InvalidDependency {
163        name: String,
164        item_type: String,
165    },
166    MangedReplicaName(String),
167    ParserStatement(ParserStatementError),
168    Parser(ParserError),
169    DropViewOnMaterializedView(String),
170    DependentObjectsStillExist {
171        object_type: String,
172        object_name: String,
173        // (dependent type, name)
174        dependents: Vec<(String, String)>,
175    },
176    AlterViewOnMaterializedView(String),
177    ShowCreateViewOnMaterializedView(String),
178    ExplainViewOnMaterializedView(String),
179    UnacceptableTimelineName(String),
180    FetchingCsrSchemaFailed {
181        schema_lookup: String,
182        cause: Arc<dyn Error + Send + Sync>,
183    },
184    PostgresConnectionErr {
185        cause: Arc<mz_postgres_util::PostgresError>,
186    },
187    MySqlConnectionErr {
188        cause: Arc<MySqlError>,
189    },
190    SqlServerConnectionErr {
191        cause: Arc<SqlServerError>,
192    },
193    SubsourceNameConflict {
194        name: UnresolvedItemName,
195        upstream_references: Vec<UnresolvedItemName>,
196    },
197    SubsourceDuplicateReference {
198        name: UnresolvedItemName,
199        target_names: Vec<UnresolvedItemName>,
200    },
201    NoTablesFoundForSchemas(Vec<String>),
202    InvalidProtobufSchema {
203        cause: protobuf_native::OperationFailedError,
204    },
205    InvalidOptionValue {
206        // Expected to be generated from the `to_ast_string` value on the option
207        // name.
208        option_name: String,
209        err: Box<PlanError>,
210    },
211    UnexpectedDuplicateReference {
212        name: UnresolvedItemName,
213    },
214    /// Declaration of a recursive type did not match the inferred type.
215    RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
216    UnknownFunction {
217        name: String,
218        arg_types: Vec<String>,
219    },
220    IndistinctFunction {
221        name: String,
222        arg_types: Vec<String>,
223    },
224    UnknownOperator {
225        name: String,
226        arg_types: Vec<String>,
227    },
228    IndistinctOperator {
229        name: String,
230        arg_types: Vec<String>,
231    },
232    InvalidPrivatelinkAvailabilityZone {
233        name: String,
234        supported_azs: BTreeSet<String>,
235    },
236    DuplicatePrivatelinkAvailabilityZone {
237        duplicate_azs: BTreeSet<String>,
238    },
239    InvalidSchemaName,
240    ItemAlreadyExists {
241        name: String,
242        item_type: CatalogItemType,
243    },
244    ManagedCluster {
245        cluster_name: String,
246    },
247    InvalidKeysInSubscribeEnvelopeUpsert,
248    InvalidKeysInSubscribeEnvelopeDebezium,
249    InvalidPartitionByEnvelopeDebezium {
250        column_name: String,
251    },
252    InvalidOrderByInSubscribeWithinTimestampOrderBy,
253    FromValueRequiresParen,
254    VarError(VarError),
255    UnsolvablePolymorphicFunctionInput,
256    ShowCommandInView,
257    WebhookValidationDoesNotUseColumns,
258    WebhookValidationNonDeterministic,
259    InternalFunctionCall,
260    CommentTooLong {
261        length: usize,
262        max_size: usize,
263    },
264    InvalidTimestampInterval {
265        min: Duration,
266        max: Duration,
267        requested: Duration,
268    },
269    InvalidGroupSizeHints,
270    PgSourcePurification(PgSourcePurificationError),
271    KafkaSourcePurification(KafkaSourcePurificationError),
272    KafkaSinkPurification(KafkaSinkPurificationError),
273    IcebergSinkPurification(IcebergSinkPurificationError),
274    LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
275    CsrPurification(CsrPurificationError),
276    MySqlSourcePurification(MySqlSourcePurificationError),
277    SqlServerSourcePurificationError(SqlServerSourcePurificationError),
278    UseTablesForSources(String),
279    MissingName(CatalogItemType),
280    InvalidRefreshAt,
281    InvalidRefreshEveryAlignedTo,
282    CreateReplicaFailStorageObjects {
283        /// The current number of replicas on the cluster
284        current_replica_count: usize,
285        /// THe number of internal replicas on the cluster
286        internal_replica_count: usize,
287        /// The number of replicas that executing this command would have
288        /// created
289        hypothetical_replica_count: usize,
290    },
291    MismatchedObjectType {
292        name: PartialItemName,
293        is_type: ObjectType,
294        expected_type: ObjectType,
295    },
296    /// MZ failed to generate cast for the data type.
297    TableContainsUningestableTypes {
298        name: String,
299        type_: String,
300        column: String,
301    },
302    RetainHistoryLow {
303        limit: Duration,
304    },
305    RetainHistoryRequired,
306    UntilReadyTimeoutRequired,
307    SubsourceResolutionError(ExternalReferenceResolutionError),
308    Replan(String),
309    Internal(String),
310    NetworkPolicyLockoutError,
311    NetworkPolicyInUse,
312    /// Expected a constant expression that evaluates without an error to a non-null value.
313    ConstantExpressionSimplificationFailed(String),
314    InvalidOffset(String),
315    /// The named cursor does not exist.
316    UnknownCursor(String),
317    CopyFromTargetTableDropped {
318        target_name: String,
319    },
320    /// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value.
321    InvalidAsOfUpTo,
322    InvalidReplacement {
323        item_type: CatalogItemType,
324        item_name: PartialItemName,
325        replacement_type: CatalogItemType,
326        replacement_name: PartialItemName,
327    },
328    // TODO(benesch): eventually all errors should be structured.
329    Unstructured(String),
330}
331
332impl PlanError {
333    pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
334        PlanError::UngroupedColumn {
335            table: item.table_name.clone(),
336            column: item.column_name.clone(),
337        }
338    }
339
340    pub fn detail(&self) -> Option<String> {
341        match self {
342            Self::NeverSupported { details, .. } => details.clone(),
343            Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
344            Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
345            Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
346            Self::InvalidOptionValue { err, .. } => err.detail(),
347            Self::UpsertSinkWithInvalidKey {
348                name,
349                desired_key,
350                valid_keys,
351            } => {
352                let valid_keys = if valid_keys.is_empty() {
353                    "There are no known valid unique keys for the underlying relation.".into()
354                } else {
355                    format!(
356                        "The following keys are known to be unique for the underlying relation:\n{}",
357                        valid_keys
358                            .iter()
359                            .map(|k|
360                                format!("  ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
361                            )
362                            .join("\n"),
363                    )
364                };
365                Some(format!(
366                    "Materialize could not prove that the specified upsert envelope key ({}) \
367                    was a unique key of the underlying relation {}. {valid_keys}",
368                    separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
369                    name.quoted()
370                ))
371            }
372            Self::VarError(e) => e.detail(),
373            Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
374            Self::PgSourcePurification(e) => e.detail(),
375            Self::MySqlSourcePurification(e) => e.detail(),
376            Self::SqlServerSourcePurificationError(e) => e.detail(),
377            Self::KafkaSourcePurification(e) => e.detail(),
378            Self::LoadGeneratorSourcePurification(e) => e.detail(),
379            Self::CsrPurification(e) => e.detail(),
380            Self::KafkaSinkPurification(e) => e.detail(),
381            Self::IcebergSinkPurification(e) => e.detail(),
382            Self::CreateReplicaFailStorageObjects {
383                current_replica_count: current,
384                internal_replica_count: internal,
385                hypothetical_replica_count: target,
386            } => {
387                Some(format!(
388                    "Currently have {} replica{}{}; command would result in {}",
389                    current,
390                    if *current != 1 { "s" } else { "" },
391                    if *internal > 0 {
392                        format!(" ({} internal)", internal)
393                    } else {
394                        "".to_string()
395                    },
396                    target
397                ))
398            },
399            Self::SubsourceNameConflict {
400                name: _,
401                upstream_references,
402            } => Some(format!(
403                "referenced tables with duplicate name: {}",
404                itertools::join(upstream_references, ", ")
405            )),
406            Self::SubsourceDuplicateReference {
407                name: _,
408                target_names,
409            } => Some(format!(
410                "subsources referencing table: {}",
411                itertools::join(target_names, ", ")
412            )),
413            Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
414                "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
415            ),
416            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
417                "missing schemas: {}",
418                separated(", ", schemas.iter().map(|c| c.quoted()))
419            )),
420            _ => None,
421        }
422    }
423
424    pub fn hint(&self) -> Option<String> {
425        match self {
426            Self::DropViewOnMaterializedView(_) => {
427                Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
428            }
429            Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
430            Self::AlterViewOnMaterializedView(_) => {
431                Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
432            }
433            Self::ShowCreateViewOnMaterializedView(_) => {
434                Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
435            }
436            Self::ExplainViewOnMaterializedView(_) => {
437                Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
438            }
439            Self::UnacceptableTimelineName(_) => {
440                Some("The prefix \"mz_\" is reserved for system timelines.".into())
441            }
442            Self::PostgresConnectionErr { cause } => {
443                if let Some(cause) = cause.source() {
444                    if let Some(cause) = cause.downcast_ref::<io::Error>() {
445                        if cause.kind() == io::ErrorKind::TimedOut {
446                            return Some(
447                                "Do you have a firewall or security group that is \
448                                preventing Materialize from connecting to your PostgreSQL server?"
449                                    .into(),
450                            );
451                        }
452                    }
453                }
454                None
455            }
456            Self::InvalidOptionValue { err, .. } => err.hint(),
457            Self::UnknownFunction { ..} => Some("No function matches the given name and argument types.  You might need to add explicit type casts.".into()),
458            Self::IndistinctFunction {..} => {
459                Some("Could not choose a best candidate function.  You might need to add explicit type casts.".into())
460            }
461            Self::UnknownOperator {..} => {
462                Some("No operator matches the given name and argument types.  You might need to add explicit type casts.".into())
463            }
464            Self::IndistinctOperator {..} => {
465                Some("Could not choose a best candidate operator.  You might need to add explicit type casts.".into())
466            },
467            Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
468                let supported_azs_str = supported_azs.iter().join("\n  ");
469                Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n  {}", supported_azs_str))
470            }
471            Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
472                let duplicate_azs  = duplicate_azs.iter().join("\n  ");
473                Some(format!("Duplicated availability zones:\n  {}", duplicate_azs))
474            }
475            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
476                Some("All keys must be columns on the underlying relation.".into())
477            }
478            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
479                Some("All keys must be columns on the underlying relation.".into())
480            }
481            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
482                Some("All order bys must be output columns.".into())
483            }
484            Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
485                Some("See: https://materialize.com/s/sink-key-selection".into())
486            }
487            Self::IcebergSinkUnsupportedKeyType { .. } => {
488                Some("Iceberg equality delete keys must be primitive, non-floating-point columns.".into())
489            }
490            Self::Catalog(e) => e.hint(),
491            Self::VarError(e) => e.hint(),
492            Self::PgSourcePurification(e) => e.hint(),
493            Self::MySqlSourcePurification(e) => e.hint(),
494            Self::SqlServerSourcePurificationError(e) => e.hint(),
495            Self::KafkaSourcePurification(e) => e.hint(),
496            Self::LoadGeneratorSourcePurification(e) => e.hint(),
497            Self::CsrPurification(e) => e.hint(),
498            Self::KafkaSinkPurification(e) => e.hint(),
499            Self::UnknownColumn { table, similar, .. } => {
500                let suffix = "Make sure to surround case sensitive names in double quotes.";
501                match &similar[..] {
502                    [] => None,
503                    [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
504                    names => {
505                        let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
506                        Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
507                    }
508                }
509            }
510            Self::RecursiveTypeMismatch(..) => {
511                Some("You will need to rewrite or cast the query's expressions.".into())
512            },
513            Self::InvalidRefreshAt
514            | Self::InvalidRefreshEveryAlignedTo => {
515                Some("Calling `mz_now()` is allowed.".into())
516            },
517            Self::TableContainsUningestableTypes { column,.. } => {
518                Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
519            }
520            Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
521                Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
522            }
523            Self::NetworkPolicyInUse => {
524                Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
525            }
526            Self::WrongParameterType(_, _, _) => {
527                Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context.  Try adding an explicit cast.".into())
528            }
529            Self::InvalidSchemaName => {
530                Some("Use SET schema = name to select a schema.  Use SHOW SCHEMAS to list available schemas.  Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
531            }
532            _ => None,
533        }
534    }
535}
536
537impl fmt::Display for PlanError {
538    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
539        match self {
540            Self::Unsupported { feature, discussion_no } => {
541                write!(f, "{} not yet supported", feature)?;
542                if let Some(discussion_no) = discussion_no {
543                    write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
544                }
545                Ok(())
546            }
547            Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
548                write!(f, "{feature} is not supported",)?;
549                if let Some(documentation_path) = documentation_path {
550                    write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
551                }
552                Ok(())
553            }
554            Self::UnknownColumn { table, column, similar: _ } => write!(
555                f,
556                "column {} does not exist",
557                ColumnDisplay { table, column }
558            ),
559            Self::UngroupedColumn { table, column } => write!(
560                f,
561                "column {} must appear in the GROUP BY clause or be used in an aggregate function",
562                ColumnDisplay { table, column },
563            ),
564            Self::ItemWithoutColumns { name, item_type } => {
565                let name = name.quoted();
566                write!(f, "{item_type} {name} does not have columns")
567            }
568            Self::WrongJoinTypeForLateralColumn { table, column } => write!(
569                f,
570                "column {} cannot be referenced from this part of the query: \
571                the combining JOIN type must be INNER or LEFT for a LATERAL reference",
572                ColumnDisplay { table, column },
573            ),
574            Self::AmbiguousColumn(column) => write!(
575                f,
576                "column reference {} is ambiguous",
577                column.quoted()
578            ),
579            Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
580                f,
581                "attempt to create relation with too many columns, {} max: {}",
582                req_num_columns, max_num_columns
583            ),
584            Self::ColumnAlreadyExists { column_name, object_name } => write!(
585                f,
586                "column {} of relation {} already exists",
587                column_name.quoted(), object_name.quoted(),
588            ),
589            Self::AmbiguousTable(table) => write!(
590                f,
591                "table reference {} is ambiguous",
592                table.item.as_str().quoted()
593            ),
594            Self::UnknownColumnInUsingClause { column, join_side } => write!(
595                f,
596                "column {} specified in USING clause does not exist in {} table",
597                column.quoted(),
598                join_side,
599            ),
600            Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
601                f,
602                "common column name {} appears more than once in {} table",
603                column.quoted(),
604                join_side,
605            ),
606            Self::MisqualifiedName(name) => write!(
607                f,
608                "qualified name did not have between 1 and 3 components: {}",
609                name
610            ),
611            Self::OverqualifiedDatabaseName(name) => write!(
612                f,
613                "database name '{}' does not have exactly one component",
614                name
615            ),
616            Self::OverqualifiedSchemaName(name) => write!(
617                f,
618                "schema name '{}' cannot have more than two components",
619                name
620            ),
621            Self::UnderqualifiedColumnName(name) => write!(
622                f,
623                "column name '{}' must have at least a table qualification",
624                name
625            ),
626            Self::UnacceptableTimelineName(name) => {
627                write!(f, "unacceptable timeline name {}", name.quoted(),)
628            }
629            Self::SubqueriesDisallowed { context } => {
630                write!(f, "{} does not allow subqueries", context)
631            }
632            Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
633            Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
634            Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
635            Self::RecursionLimit(e) => write!(f, "{}", e),
636            Self::StrconvParse(e) => write!(f, "{}", e),
637            Self::Catalog(e) => write!(f, "{}", e),
638            Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
639            Self::UpsertSinkWithInvalidKey { .. } => {
640                write!(f, "upsert key could not be validated as unique")
641            }
642            Self::IcebergSinkUnsupportedKeyType { column, column_type } => {
643                write!(f, "column {column} has type {column_type} which cannot be used as an Iceberg equality delete key")
644            }
645            Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
646            Self::InvalidNumericMaxScale(e) => e.fmt(f),
647            Self::InvalidCharLength(e) => e.fmt(f),
648            Self::InvalidVarCharMaxLength(e) => e.fmt(f),
649            Self::InvalidTimestampPrecision(e) => e.fmt(f),
650            Self::Parser(e) => e.fmt(f),
651            Self::ParserStatement(e) => e.fmt(f),
652            Self::Unstructured(e) => write!(f, "{}", e),
653            Self::InvalidId(id) => write!(f, "invalid id {}", id),
654            Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
655            Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
656            Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
657            Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
658                write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
659            },
660            Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
661            Self::InvalidTemporarySchema => {
662                write!(f, "cannot create temporary item in non-temporary schema")
663            }
664            Self::InvalidCast { name, ccx, from, to } =>{
665                write!(
666                    f,
667                    "{name} does not support {ccx}casting from {from} to {to}",
668                    ccx = if matches!(ccx, CastContext::Implicit) {
669                        "implicitly "
670                    } else {
671                        ""
672                    },
673                )
674            }
675            Self::UnsupportedRangeElementType { element_type_name } => {
676                write!(f, "range type over {} is not supported", element_type_name)
677            }
678            Self::InvalidTable { name } => {
679                write!(f, "invalid table definition for {}", name.quoted())
680            },
681            Self::InvalidVersion { name, version } => {
682                write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
683            },
684            Self::InvalidSinkFrom { name, item_type } => {
685                write!(f, "{item_type} {name} cannot be exported as a sink")
686            },
687            Self::InvalidDependency { name, item_type } => {
688                write!(f, "{item_type} {name} cannot be depended upon")
689            },
690            Self::DropViewOnMaterializedView(name)
691            | Self::AlterViewOnMaterializedView(name)
692            | Self::ShowCreateViewOnMaterializedView(name)
693            | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
694            Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
695                write!(f, "failed to fetch schema {schema_lookup} from schema registry")
696            }
697            Self::PostgresConnectionErr { .. } => {
698                write!(f, "failed to connect to PostgreSQL database")
699            }
700            Self::MySqlConnectionErr { cause } => {
701                write!(f, "failed to connect to MySQL database: {}", cause)
702            }
703            Self::SqlServerConnectionErr { cause } => {
704                write!(f, "failed to connect to SQL Server database: {}", cause)
705            }
706            Self::SubsourceNameConflict {
707                name , upstream_references: _,
708            } => {
709                write!(f, "multiple subsources would be named {}", name)
710            },
711            Self::SubsourceDuplicateReference {
712                name,
713                target_names: _,
714            } => {
715                write!(f, "multiple subsources refer to table {}", name)
716            },
717            Self::NoTablesFoundForSchemas(schemas) => {
718                write!(f, "no tables found in referenced schemas: {}",
719                    separated(", ", schemas.iter().map(|c| c.quoted()))
720                )
721            },
722            Self::InvalidProtobufSchema { .. } => {
723                write!(f, "invalid protobuf schema")
724            }
725            Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
726                let reason = match &dependents[..] {
727                    [] => " because other objects depend on it".to_string(),
728                    dependents => {
729                        let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
730                        format!(": still depended upon by {dependents}")
731                    },
732                };
733                let object_name = object_name.quoted();
734                write!(f, "cannot drop {object_type} {object_name}{reason}")
735            }
736            Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
737            Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
738            Self::RecursiveTypeMismatch(name, declared, inferred) => {
739                let declared = separated(", ", declared);
740                let inferred = separated(", ", inferred);
741                let name = name.quoted();
742                write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
743            },
744            Self::UnknownFunction {name, arg_types, ..} => {
745                write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
746            },
747            Self::IndistinctFunction {name, arg_types, ..} => {
748                write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
749            },
750            Self::UnknownOperator {name, arg_types, ..} => {
751                write!(f, "operator does not exist: {}", match arg_types.as_slice(){
752                    [typ] => format!("{} {}", name, typ),
753                    [ltyp, rtyp] => {
754                        format!("{} {} {}", ltyp, name, rtyp)
755                    }
756                    _ => unreachable!("non-unary non-binary operator"),
757                })
758            },
759            Self::IndistinctOperator {name, arg_types, ..} => {
760                write!(f, "operator is not unique: {}", match arg_types.as_slice(){
761                    [typ] => format!("{} {}", name, typ),
762                    [ltyp, rtyp] => {
763                        format!("{} {} {}", ltyp, name, rtyp)
764                    }
765                    _ => unreachable!("non-unary non-binary operator"),
766                })
767            },
768            Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
769            Self::DuplicatePrivatelinkAvailabilityZone {..} =>   write!(f, "connection cannot contain duplicate availability zones"),
770            Self::InvalidSchemaName => write!(f, "no valid schema selected"),
771            Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
772            Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
773            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
774                write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
775            }
776            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
777                write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
778            }
779            Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
780                write!(
781                    f,
782                    "PARTITION BY expression cannot refer to non-key column {}",
783                    column_name.quoted(),
784                )
785            }
786            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
787                write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
788            }
789            Self::FromValueRequiresParen => f.write_str(
790                "VALUES expression in FROM clause must be surrounded by parentheses"
791            ),
792            Self::VarError(e) => e.fmt(f),
793            Self::UnsolvablePolymorphicFunctionInput => f.write_str(
794                "could not determine polymorphic type because input has type unknown"
795            ),
796            Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
797            Self::WebhookValidationDoesNotUseColumns => f.write_str(
798                "expression provided in CHECK does not reference any columns"
799            ),
800            Self::WebhookValidationNonDeterministic => f.write_str(
801                "expression provided in CHECK is not deterministic"
802            ),
803            Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
804            Self::CommentTooLong { length, max_size } => {
805                write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
806            }
807            Self::InvalidTimestampInterval { min, max, requested } => {
808                write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
809            }
810            Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
811                simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
812                or LIMIT INPUT GROUP SIZE"),
813            Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
814            Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
815            Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
816            Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
817            Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
818            Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
819            Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
820            Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
821            Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
822            Self::MangedReplicaName(name) => {
823                write!(f, "{name} is reserved for replicas of managed clusters")
824            }
825            Self::MissingName(item_type) => {
826                write!(f, "unspecified name for {item_type}")
827            }
828            Self::InvalidRefreshAt => {
829                write!(f, "REFRESH AT argument must be an expression that can be simplified \
830                           and/or cast to a constant whose type is mz_timestamp")
831            }
832            Self::InvalidRefreshEveryAlignedTo => {
833                write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
834                           and/or cast to a constant whose type is mz_timestamp")
835            }
836            Self::CreateReplicaFailStorageObjects {..} => {
837                write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
838            },
839            Self::MismatchedObjectType {
840                name,
841                is_type,
842                expected_type,
843            } => {
844                write!(
845                    f,
846                    "{name} is {} {} not {} {}",
847                    if *is_type == ObjectType::Index {
848                        "an"
849                    } else {
850                        "a"
851                    },
852                    is_type.to_string().to_lowercase(),
853                    if *expected_type == ObjectType::Index {
854                        "an"
855                    } else {
856                        "a"
857                    },
858                    expected_type.to_string().to_lowercase()
859                )
860            }
861            Self::TableContainsUningestableTypes { name, type_, column } => {
862                write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
863            },
864            Self::RetainHistoryLow { limit } => {
865                write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
866            },
867            Self::RetainHistoryRequired => {
868                write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
869            },
870            Self::SubsourceResolutionError(e) => write!(f, "{}", e),
871            Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
872            Self::Internal(msg) => write!(f, "internal error: {msg}"),
873            Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
874            Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
875            Self::UntilReadyTimeoutRequired => {
876                write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
877            },
878            Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
879            Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
880            Self::UnknownCursor(name) => {
881                write!(f, "cursor {} does not exist", name.quoted())
882            }
883            Self::CopyFromTargetTableDropped { target_name: name } => {
884                write!(f, "COPY FROM's target table {} was dropped", name.quoted())
885            }
886            Self::InvalidAsOfUpTo => write!(
887                f,
888                "AS OF or UP TO should be castable to a (non-null) mz_timestamp value",
889            ),
890            Self::InvalidReplacement {
891                item_type, item_name, replacement_type, replacement_name,
892            } => {
893                write!(
894                    f,
895                    "cannot replace {item_type} {item_name} \
896                     with {replacement_type} {replacement_name}",
897                )
898            }
899        }
900    }
901}
902
903impl Error for PlanError {}
904
905impl From<CatalogError> for PlanError {
906    fn from(e: CatalogError) -> PlanError {
907        PlanError::Catalog(e)
908    }
909}
910
911impl From<strconv::ParseError> for PlanError {
912    fn from(e: strconv::ParseError) -> PlanError {
913        PlanError::StrconvParse(e)
914    }
915}
916
917impl From<RecursionLimitError> for PlanError {
918    fn from(e: RecursionLimitError) -> PlanError {
919        PlanError::RecursionLimit(e)
920    }
921}
922
923impl From<InvalidNumericMaxScaleError> for PlanError {
924    fn from(e: InvalidNumericMaxScaleError) -> PlanError {
925        PlanError::InvalidNumericMaxScale(e)
926    }
927}
928
929impl From<InvalidCharLengthError> for PlanError {
930    fn from(e: InvalidCharLengthError) -> PlanError {
931        PlanError::InvalidCharLength(e)
932    }
933}
934
935impl From<InvalidVarCharMaxLengthError> for PlanError {
936    fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
937        PlanError::InvalidVarCharMaxLength(e)
938    }
939}
940
941impl From<InvalidTimestampPrecisionError> for PlanError {
942    fn from(e: InvalidTimestampPrecisionError) -> PlanError {
943        PlanError::InvalidTimestampPrecision(e)
944    }
945}
946
947impl From<anyhow::Error> for PlanError {
948    fn from(e: anyhow::Error) -> PlanError {
949        // WIP: Do we maybe want to keep the alternate selector for these?
950        sql_err!("{}", e.display_with_causes())
951    }
952}
953
954impl From<TryFromIntError> for PlanError {
955    fn from(e: TryFromIntError) -> PlanError {
956        sql_err!("{}", e.display_with_causes())
957    }
958}
959
960impl From<ParseIntError> for PlanError {
961    fn from(e: ParseIntError) -> PlanError {
962        sql_err!("{}", e.display_with_causes())
963    }
964}
965
966impl From<EvalError> for PlanError {
967    fn from(e: EvalError) -> PlanError {
968        sql_err!("{}", e.display_with_causes())
969    }
970}
971
972impl From<ParserError> for PlanError {
973    fn from(e: ParserError) -> PlanError {
974        PlanError::Parser(e)
975    }
976}
977
978impl From<ParserStatementError> for PlanError {
979    fn from(e: ParserStatementError) -> PlanError {
980        PlanError::ParserStatement(e)
981    }
982}
983
984impl From<PostgresError> for PlanError {
985    fn from(e: PostgresError) -> PlanError {
986        PlanError::PostgresConnectionErr { cause: Arc::new(e) }
987    }
988}
989
990impl From<MySqlError> for PlanError {
991    fn from(e: MySqlError) -> PlanError {
992        PlanError::MySqlConnectionErr { cause: Arc::new(e) }
993    }
994}
995
996impl From<SqlServerError> for PlanError {
997    fn from(e: SqlServerError) -> PlanError {
998        PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
999    }
1000}
1001
1002impl From<VarError> for PlanError {
1003    fn from(e: VarError) -> Self {
1004        PlanError::VarError(e)
1005    }
1006}
1007
1008impl From<PgSourcePurificationError> for PlanError {
1009    fn from(e: PgSourcePurificationError) -> Self {
1010        PlanError::PgSourcePurification(e)
1011    }
1012}
1013
1014impl From<KafkaSourcePurificationError> for PlanError {
1015    fn from(e: KafkaSourcePurificationError) -> Self {
1016        PlanError::KafkaSourcePurification(e)
1017    }
1018}
1019
1020impl From<KafkaSinkPurificationError> for PlanError {
1021    fn from(e: KafkaSinkPurificationError) -> Self {
1022        PlanError::KafkaSinkPurification(e)
1023    }
1024}
1025
1026impl From<IcebergSinkPurificationError> for PlanError {
1027    fn from(e: IcebergSinkPurificationError) -> Self {
1028        PlanError::IcebergSinkPurification(e)
1029    }
1030}
1031
1032impl From<CsrPurificationError> for PlanError {
1033    fn from(e: CsrPurificationError) -> Self {
1034        PlanError::CsrPurification(e)
1035    }
1036}
1037
1038impl From<LoadGeneratorSourcePurificationError> for PlanError {
1039    fn from(e: LoadGeneratorSourcePurificationError) -> Self {
1040        PlanError::LoadGeneratorSourcePurification(e)
1041    }
1042}
1043
1044impl From<MySqlSourcePurificationError> for PlanError {
1045    fn from(e: MySqlSourcePurificationError) -> Self {
1046        PlanError::MySqlSourcePurification(e)
1047    }
1048}
1049
1050impl From<SqlServerSourcePurificationError> for PlanError {
1051    fn from(e: SqlServerSourcePurificationError) -> Self {
1052        PlanError::SqlServerSourcePurificationError(e)
1053    }
1054}
1055
1056impl From<IdentError> for PlanError {
1057    fn from(e: IdentError) -> Self {
1058        PlanError::InvalidIdent(e)
1059    }
1060}
1061
1062impl From<ExternalReferenceResolutionError> for PlanError {
1063    fn from(e: ExternalReferenceResolutionError) -> Self {
1064        PlanError::SubsourceResolutionError(e)
1065    }
1066}
1067
1068struct ColumnDisplay<'a> {
1069    table: &'a Option<PartialItemName>,
1070    column: &'a ColumnName,
1071}
1072
1073impl<'a> fmt::Display for ColumnDisplay<'a> {
1074    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1075        if let Some(table) = &self.table {
1076            format!("{}.{}", table.item, self.column).quoted().fmt(f)
1077        } else {
1078            self.column.quoted().fmt(f)
1079        }
1080    }
1081}