mz_sql/plan/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37    CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45    CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46    KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47    MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53    /// This feature is not yet supported, but may be supported at some point in the future.
54    Unsupported {
55        feature: String,
56        discussion_no: Option<usize>,
57    },
58    /// This feature is not supported, and will likely never be supported.
59    NeverSupported {
60        feature: String,
61        documentation_link: Option<String>,
62        details: Option<String>,
63    },
64    UnknownColumn {
65        table: Option<PartialItemName>,
66        column: ColumnName,
67        similar: Box<[ColumnName]>,
68    },
69    UngroupedColumn {
70        table: Option<PartialItemName>,
71        column: ColumnName,
72    },
73    WrongJoinTypeForLateralColumn {
74        table: Option<PartialItemName>,
75        column: ColumnName,
76    },
77    AmbiguousColumn(ColumnName),
78    TooManyColumns {
79        max_num_columns: usize,
80        req_num_columns: usize,
81    },
82    ColumnAlreadyExists {
83        column_name: ColumnName,
84        object_name: String,
85    },
86    AmbiguousTable(PartialItemName),
87    UnknownColumnInUsingClause {
88        column: ColumnName,
89        join_side: JoinSide,
90    },
91    AmbiguousColumnInUsingClause {
92        column: ColumnName,
93        join_side: JoinSide,
94    },
95    MisqualifiedName(String),
96    OverqualifiedDatabaseName(String),
97    OverqualifiedSchemaName(String),
98    UnderqualifiedColumnName(String),
99    SubqueriesDisallowed {
100        context: String,
101    },
102    UnknownParameter(usize),
103    ParameterNotAllowed(String),
104    WrongParameterType(usize, String, String),
105    RecursionLimit(RecursionLimitError),
106    StrconvParse(strconv::ParseError),
107    Catalog(CatalogError),
108    UpsertSinkWithoutKey,
109    UpsertSinkWithInvalidKey {
110        name: String,
111        desired_key: Vec<String>,
112        valid_keys: Vec<Vec<String>>,
113    },
114    InvalidWmrRecursionLimit(String),
115    InvalidNumericMaxScale(InvalidNumericMaxScaleError),
116    InvalidCharLength(InvalidCharLengthError),
117    InvalidId(CatalogItemId),
118    InvalidIdent(IdentError),
119    InvalidObject(Box<ResolvedItemName>),
120    InvalidObjectType {
121        expected_type: SystemObjectType,
122        actual_type: SystemObjectType,
123        object_name: String,
124    },
125    InvalidPrivilegeTypes {
126        invalid_privileges: AclMode,
127        object_description: ErrorMessageObjectDescription,
128    },
129    InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
130    InvalidTimestampPrecision(InvalidTimestampPrecisionError),
131    InvalidSecret(Box<ResolvedItemName>),
132    InvalidTemporarySchema,
133    InvalidCast {
134        name: String,
135        ccx: CastContext,
136        from: String,
137        to: String,
138    },
139    InvalidTable {
140        name: String,
141    },
142    InvalidVersion {
143        name: String,
144        version: String,
145    },
146    MangedReplicaName(String),
147    ParserStatement(ParserStatementError),
148    Parser(ParserError),
149    DropViewOnMaterializedView(String),
150    DependentObjectsStillExist {
151        object_type: String,
152        object_name: String,
153        // (dependent type, name)
154        dependents: Vec<(String, String)>,
155    },
156    AlterViewOnMaterializedView(String),
157    ShowCreateViewOnMaterializedView(String),
158    ExplainViewOnMaterializedView(String),
159    UnacceptableTimelineName(String),
160    FetchingCsrSchemaFailed {
161        schema_lookup: String,
162        cause: Arc<dyn Error + Send + Sync>,
163    },
164    PostgresConnectionErr {
165        cause: Arc<mz_postgres_util::PostgresError>,
166    },
167    MySqlConnectionErr {
168        cause: Arc<MySqlError>,
169    },
170    SqlServerConnectionErr {
171        cause: Arc<SqlServerError>,
172    },
173    SubsourceNameConflict {
174        name: UnresolvedItemName,
175        upstream_references: Vec<UnresolvedItemName>,
176    },
177    SubsourceDuplicateReference {
178        name: UnresolvedItemName,
179        target_names: Vec<UnresolvedItemName>,
180    },
181    NoTablesFoundForSchemas(Vec<String>),
182    InvalidProtobufSchema {
183        cause: protobuf_native::OperationFailedError,
184    },
185    InvalidOptionValue {
186        // Expected to be generated from the `to_ast_string` value on the option
187        // name.
188        option_name: String,
189        err: Box<PlanError>,
190    },
191    UnexpectedDuplicateReference {
192        name: UnresolvedItemName,
193    },
194    /// Declaration of a recursive type did not match the inferred type.
195    RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
196    UnknownFunction {
197        name: String,
198        arg_types: Vec<String>,
199    },
200    IndistinctFunction {
201        name: String,
202        arg_types: Vec<String>,
203    },
204    UnknownOperator {
205        name: String,
206        arg_types: Vec<String>,
207    },
208    IndistinctOperator {
209        name: String,
210        arg_types: Vec<String>,
211    },
212    InvalidPrivatelinkAvailabilityZone {
213        name: String,
214        supported_azs: BTreeSet<String>,
215    },
216    DuplicatePrivatelinkAvailabilityZone {
217        duplicate_azs: BTreeSet<String>,
218    },
219    InvalidSchemaName,
220    ItemAlreadyExists {
221        name: String,
222        item_type: CatalogItemType,
223    },
224    ManagedCluster {
225        cluster_name: String,
226    },
227    InvalidKeysInSubscribeEnvelopeUpsert,
228    InvalidKeysInSubscribeEnvelopeDebezium,
229    InvalidPartitionByEnvelopeDebezium {
230        column_name: String,
231    },
232    InvalidOrderByInSubscribeWithinTimestampOrderBy,
233    FromValueRequiresParen,
234    VarError(VarError),
235    UnsolvablePolymorphicFunctionInput,
236    ShowCommandInView,
237    WebhookValidationDoesNotUseColumns,
238    WebhookValidationNonDeterministic,
239    InternalFunctionCall,
240    CommentTooLong {
241        length: usize,
242        max_size: usize,
243    },
244    InvalidTimestampInterval {
245        min: Duration,
246        max: Duration,
247        requested: Duration,
248    },
249    InvalidGroupSizeHints,
250    PgSourcePurification(PgSourcePurificationError),
251    KafkaSourcePurification(KafkaSourcePurificationError),
252    KafkaSinkPurification(KafkaSinkPurificationError),
253    IcebergSinkPurification(IcebergSinkPurificationError),
254    LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
255    CsrPurification(CsrPurificationError),
256    MySqlSourcePurification(MySqlSourcePurificationError),
257    SqlServerSourcePurificationError(SqlServerSourcePurificationError),
258    UseTablesForSources(String),
259    MissingName(CatalogItemType),
260    InvalidRefreshAt,
261    InvalidRefreshEveryAlignedTo,
262    CreateReplicaFailStorageObjects {
263        /// The current number of replicas on the cluster
264        current_replica_count: usize,
265        /// THe number of internal replicas on the cluster
266        internal_replica_count: usize,
267        /// The number of replicas that executing this command would have
268        /// created
269        hypothetical_replica_count: usize,
270    },
271    MismatchedObjectType {
272        name: PartialItemName,
273        is_type: ObjectType,
274        expected_type: ObjectType,
275    },
276    /// MZ failed to generate cast for the data type.
277    TableContainsUningestableTypes {
278        name: String,
279        type_: String,
280        column: String,
281    },
282    RetainHistoryLow {
283        limit: Duration,
284    },
285    RetainHistoryRequired,
286    UntilReadyTimeoutRequired,
287    SubsourceResolutionError(ExternalReferenceResolutionError),
288    Replan(String),
289    NetworkPolicyLockoutError,
290    NetworkPolicyInUse,
291    /// Expected a constant expression that evaluates without an error to a non-null value.
292    ConstantExpressionSimplificationFailed(String),
293    InvalidOffset(String),
294    /// The named cursor does not exist.
295    UnknownCursor(String),
296    CopyFromTargetTableDropped {
297        target_name: String,
298    },
299    /// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value.
300    InvalidAsOfUpTo,
301    // TODO(benesch): eventually all errors should be structured.
302    Unstructured(String),
303}
304
305impl PlanError {
306    pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
307        PlanError::UngroupedColumn {
308            table: item.table_name.clone(),
309            column: item.column_name.clone(),
310        }
311    }
312
313    pub fn detail(&self) -> Option<String> {
314        match self {
315            Self::NeverSupported { details, .. } => details.clone(),
316            Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
317            Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
318            Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
319            Self::InvalidOptionValue { err, .. } => err.detail(),
320            Self::UpsertSinkWithInvalidKey {
321                name,
322                desired_key,
323                valid_keys,
324            } => {
325                let valid_keys = if valid_keys.is_empty() {
326                    "There are no known valid unique keys for the underlying relation.".into()
327                } else {
328                    format!(
329                        "The following keys are known to be unique for the underlying relation:\n{}",
330                        valid_keys
331                            .iter()
332                            .map(|k|
333                                format!("  ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
334                            )
335                            .join("\n"),
336                    )
337                };
338                Some(format!(
339                    "Materialize could not prove that the specified upsert envelope key ({}) \
340                    was a unique key of the underlying relation {}. {valid_keys}",
341                    separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
342                    name.quoted()
343                ))
344            }
345            Self::VarError(e) => e.detail(),
346            Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
347            Self::PgSourcePurification(e) => e.detail(),
348            Self::MySqlSourcePurification(e) => e.detail(),
349            Self::SqlServerSourcePurificationError(e) => e.detail(),
350            Self::KafkaSourcePurification(e) => e.detail(),
351            Self::LoadGeneratorSourcePurification(e) => e.detail(),
352            Self::CsrPurification(e) => e.detail(),
353            Self::KafkaSinkPurification(e) => e.detail(),
354            Self::IcebergSinkPurification(e) => e.detail(),
355            Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
356                Some(format!(
357                    "Currently have {} replica{}{}; command would result in {}",
358                    current,
359                    if *current != 1 { "s" } else { "" },
360                    if *internal > 0 {
361                        format!(" ({} internal)", internal)
362                    } else {
363                        "".to_string()
364                    },
365                    target
366                ))
367            },
368            Self::SubsourceNameConflict {
369                name: _,
370                upstream_references,
371            } => Some(format!(
372                "referenced tables with duplicate name: {}",
373                itertools::join(upstream_references, ", ")
374            )),
375            Self::SubsourceDuplicateReference {
376                name: _,
377                target_names,
378            } => Some(format!(
379                "subsources referencing table: {}",
380                itertools::join(target_names, ", ")
381            )),
382            Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
383                "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
384            ),
385            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
386                "missing schemas: {}",
387                separated(", ", schemas.iter().map(|c| c.quoted()))
388            )),
389            _ => None,
390        }
391    }
392
393    pub fn hint(&self) -> Option<String> {
394        match self {
395            Self::DropViewOnMaterializedView(_) => {
396                Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
397            }
398            Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
399            Self::AlterViewOnMaterializedView(_) => {
400                Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
401            }
402            Self::ShowCreateViewOnMaterializedView(_) => {
403                Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
404            }
405            Self::ExplainViewOnMaterializedView(_) => {
406                Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
407            }
408            Self::UnacceptableTimelineName(_) => {
409                Some("The prefix \"mz_\" is reserved for system timelines.".into())
410            }
411            Self::PostgresConnectionErr { cause } => {
412                if let Some(cause) = cause.source() {
413                    if let Some(cause) = cause.downcast_ref::<io::Error>() {
414                        if cause.kind() == io::ErrorKind::TimedOut {
415                            return Some(
416                                "Do you have a firewall or security group that is \
417                                preventing Materialize from connecting to your PostgreSQL server?"
418                                    .into(),
419                            );
420                        }
421                    }
422                }
423                None
424            }
425            Self::InvalidOptionValue { err, .. } => err.hint(),
426            Self::UnknownFunction { ..} => Some("No function matches the given name and argument types.  You might need to add explicit type casts.".into()),
427            Self::IndistinctFunction {..} => {
428                Some("Could not choose a best candidate function.  You might need to add explicit type casts.".into())
429            }
430            Self::UnknownOperator {..} => {
431                Some("No operator matches the given name and argument types.  You might need to add explicit type casts.".into())
432            }
433            Self::IndistinctOperator {..} => {
434                Some("Could not choose a best candidate operator.  You might need to add explicit type casts.".into())
435            },
436            Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
437                let supported_azs_str = supported_azs.iter().join("\n  ");
438                Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n  {}", supported_azs_str))
439            }
440            Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
441                let duplicate_azs  = duplicate_azs.iter().join("\n  ");
442                Some(format!("Duplicated availability zones:\n  {}", duplicate_azs))
443            }
444            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
445                Some("All keys must be columns on the underlying relation.".into())
446            }
447            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
448                Some("All keys must be columns on the underlying relation.".into())
449            }
450            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
451                Some("All order bys must be output columns.".into())
452            }
453            Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
454                Some("See: https://materialize.com/s/sink-key-selection".into())
455            }
456            Self::Catalog(e) => e.hint(),
457            Self::VarError(e) => e.hint(),
458            Self::PgSourcePurification(e) => e.hint(),
459            Self::MySqlSourcePurification(e) => e.hint(),
460            Self::SqlServerSourcePurificationError(e) => e.hint(),
461            Self::KafkaSourcePurification(e) => e.hint(),
462            Self::LoadGeneratorSourcePurification(e) => e.hint(),
463            Self::CsrPurification(e) => e.hint(),
464            Self::KafkaSinkPurification(e) => e.hint(),
465            Self::UnknownColumn { table, similar, .. } => {
466                let suffix = "Make sure to surround case sensitive names in double quotes.";
467                match &similar[..] {
468                    [] => None,
469                    [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
470                    names => {
471                        let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
472                        Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
473                    }
474                }
475            }
476            Self::RecursiveTypeMismatch(..) => {
477                Some("You will need to rewrite or cast the query's expressions.".into())
478            },
479            Self::InvalidRefreshAt
480            | Self::InvalidRefreshEveryAlignedTo => {
481                Some("Calling `mz_now()` is allowed.".into())
482            },
483            Self::TableContainsUningestableTypes { column,.. } => {
484                Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
485            }
486            Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
487                Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
488            }
489            Self::NetworkPolicyInUse => {
490                Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
491            }
492            Self::WrongParameterType(_, _, _) => {
493                Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context.  Try adding an explicit cast.".into())
494            }
495            Self::InvalidSchemaName => {
496                Some("Use SET schema = name to select a schema.  Use SHOW SCHEMAS to list available schemas.  Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
497            }
498            _ => None,
499        }
500    }
501}
502
503impl fmt::Display for PlanError {
504    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
505        match self {
506            Self::Unsupported { feature, discussion_no } => {
507                write!(f, "{} not yet supported", feature)?;
508                if let Some(discussion_no) = discussion_no {
509                    write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
510                }
511                Ok(())
512            }
513            Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
514                write!(f, "{feature} is not supported",)?;
515                if let Some(documentation_path) = documentation_path {
516                    write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
517                }
518                Ok(())
519            }
520            Self::UnknownColumn { table, column, similar: _ } => write!(
521                f,
522                "column {} does not exist",
523                ColumnDisplay { table, column }
524            ),
525            Self::UngroupedColumn { table, column } => write!(
526                f,
527                "column {} must appear in the GROUP BY clause or be used in an aggregate function",
528                ColumnDisplay { table, column },
529            ),
530            Self::WrongJoinTypeForLateralColumn { table, column } => write!(
531                f,
532                "column {} cannot be referenced from this part of the query: \
533                the combining JOIN type must be INNER or LEFT for a LATERAL reference",
534                ColumnDisplay { table, column },
535            ),
536            Self::AmbiguousColumn(column) => write!(
537                f,
538                "column reference {} is ambiguous",
539                column.quoted()
540            ),
541            Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
542                f,
543                "attempt to create relation with too many columns, {} max: {}",
544                req_num_columns, max_num_columns
545            ),
546            Self::ColumnAlreadyExists { column_name, object_name } => write!(
547                f,
548                "column {} of relation {} already exists",
549                column_name.quoted(), object_name.quoted(),
550            ),
551            Self::AmbiguousTable(table) => write!(
552                f,
553                "table reference {} is ambiguous",
554                table.item.as_str().quoted()
555            ),
556            Self::UnknownColumnInUsingClause { column, join_side } => write!(
557                f,
558                "column {} specified in USING clause does not exist in {} table",
559                column.quoted(),
560                join_side,
561            ),
562            Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
563                f,
564                "common column name {} appears more than once in {} table",
565                column.quoted(),
566                join_side,
567            ),
568            Self::MisqualifiedName(name) => write!(
569                f,
570                "qualified name did not have between 1 and 3 components: {}",
571                name
572            ),
573            Self::OverqualifiedDatabaseName(name) => write!(
574                f,
575                "database name '{}' does not have exactly one component",
576                name
577            ),
578            Self::OverqualifiedSchemaName(name) => write!(
579                f,
580                "schema name '{}' cannot have more than two components",
581                name
582            ),
583            Self::UnderqualifiedColumnName(name) => write!(
584                f,
585                "column name '{}' must have at least a table qualification",
586                name
587            ),
588            Self::UnacceptableTimelineName(name) => {
589                write!(f, "unacceptable timeline name {}", name.quoted(),)
590            }
591            Self::SubqueriesDisallowed { context } => {
592                write!(f, "{} does not allow subqueries", context)
593            }
594            Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
595            Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
596            Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
597            Self::RecursionLimit(e) => write!(f, "{}", e),
598            Self::StrconvParse(e) => write!(f, "{}", e),
599            Self::Catalog(e) => write!(f, "{}", e),
600            Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
601            Self::UpsertSinkWithInvalidKey { .. } => {
602                write!(f, "upsert key could not be validated as unique")
603            }
604            Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
605            Self::InvalidNumericMaxScale(e) => e.fmt(f),
606            Self::InvalidCharLength(e) => e.fmt(f),
607            Self::InvalidVarCharMaxLength(e) => e.fmt(f),
608            Self::InvalidTimestampPrecision(e) => e.fmt(f),
609            Self::Parser(e) => e.fmt(f),
610            Self::ParserStatement(e) => e.fmt(f),
611            Self::Unstructured(e) => write!(f, "{}", e),
612            Self::InvalidId(id) => write!(f, "invalid id {}", id),
613            Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
614            Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
615            Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
616            Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
617                write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
618            },
619            Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
620            Self::InvalidTemporarySchema => {
621                write!(f, "cannot create temporary item in non-temporary schema")
622            }
623            Self::InvalidCast { name, ccx, from, to } =>{
624                write!(
625                    f,
626                    "{name} does not support {ccx}casting from {from} to {to}",
627                    ccx = if matches!(ccx, CastContext::Implicit) {
628                        "implicitly "
629                    } else {
630                        ""
631                    },
632                )
633            }
634            Self::InvalidTable { name } => {
635                write!(f, "invalid table definition for {}", name.quoted())
636            },
637            Self::InvalidVersion { name, version } => {
638                write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
639            },
640            Self::DropViewOnMaterializedView(name)
641            | Self::AlterViewOnMaterializedView(name)
642            | Self::ShowCreateViewOnMaterializedView(name)
643            | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
644            Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
645                write!(f, "failed to fetch schema {schema_lookup} from schema registry")
646            }
647            Self::PostgresConnectionErr { .. } => {
648                write!(f, "failed to connect to PostgreSQL database")
649            }
650            Self::MySqlConnectionErr { cause } => {
651                write!(f, "failed to connect to MySQL database: {}", cause)
652            }
653            Self::SqlServerConnectionErr { cause } => {
654                write!(f, "failed to connect to SQL Server database: {}", cause)
655            }
656            Self::SubsourceNameConflict {
657                name , upstream_references: _,
658            } => {
659                write!(f, "multiple subsources would be named {}", name)
660            },
661            Self::SubsourceDuplicateReference {
662                name,
663                target_names: _,
664            } => {
665                write!(f, "multiple subsources refer to table {}", name)
666            },
667            Self::NoTablesFoundForSchemas(schemas) => {
668                write!(f, "no tables found in referenced schemas: {}",
669                    separated(", ", schemas.iter().map(|c| c.quoted()))
670                )
671            },
672            Self::InvalidProtobufSchema { .. } => {
673                write!(f, "invalid protobuf schema")
674            }
675            Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
676                let reason = match &dependents[..] {
677                    [] => " because other objects depend on it".to_string(),
678                    dependents => {
679                        let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
680                        format!(": still depended upon by {dependents}")
681                    },
682                };
683                let object_name = object_name.quoted();
684                write!(f, "cannot drop {object_type} {object_name}{reason}")
685            }
686            Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
687            Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
688            Self::RecursiveTypeMismatch(name, declared, inferred) => {
689                let declared = separated(", ", declared);
690                let inferred = separated(", ", inferred);
691                let name = name.quoted();
692                write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
693            },
694            Self::UnknownFunction {name, arg_types, ..} => {
695                write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
696            },
697            Self::IndistinctFunction {name, arg_types, ..} => {
698                write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
699            },
700            Self::UnknownOperator {name, arg_types, ..} => {
701                write!(f, "operator does not exist: {}", match arg_types.as_slice(){
702                    [typ] => format!("{} {}", name, typ),
703                    [ltyp, rtyp] => {
704                        format!("{} {} {}", ltyp, name, rtyp)
705                    }
706                    _ => unreachable!("non-unary non-binary operator"),
707                })
708            },
709            Self::IndistinctOperator {name, arg_types, ..} => {
710                write!(f, "operator is not unique: {}", match arg_types.as_slice(){
711                    [typ] => format!("{} {}", name, typ),
712                    [ltyp, rtyp] => {
713                        format!("{} {} {}", ltyp, name, rtyp)
714                    }
715                    _ => unreachable!("non-unary non-binary operator"),
716                })
717            },
718            Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
719            Self::DuplicatePrivatelinkAvailabilityZone {..} =>   write!(f, "connection cannot contain duplicate availability zones"),
720            Self::InvalidSchemaName => write!(f, "no valid schema selected"),
721            Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
722            Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
723            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
724                write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
725            }
726            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
727                write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
728            }
729            Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
730                write!(
731                    f,
732                    "PARTITION BY expression cannot refer to non-key column {}",
733                    column_name.quoted(),
734                )
735            }
736            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
737                write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
738            }
739            Self::FromValueRequiresParen => f.write_str(
740                "VALUES expression in FROM clause must be surrounded by parentheses"
741            ),
742            Self::VarError(e) => e.fmt(f),
743            Self::UnsolvablePolymorphicFunctionInput => f.write_str(
744                "could not determine polymorphic type because input has type unknown"
745            ),
746            Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
747            Self::WebhookValidationDoesNotUseColumns => f.write_str(
748                "expression provided in CHECK does not reference any columns"
749            ),
750            Self::WebhookValidationNonDeterministic => f.write_str(
751                "expression provided in CHECK is not deterministic"
752            ),
753            Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
754            Self::CommentTooLong { length, max_size } => {
755                write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
756            }
757            Self::InvalidTimestampInterval { min, max, requested } => {
758                write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
759            }
760            Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
761                simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
762                or LIMIT INPUT GROUP SIZE"),
763            Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
764            Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
765            Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
766            Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
767            Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
768            Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
769            Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
770            Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
771            Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
772            Self::MangedReplicaName(name) => {
773                write!(f, "{name} is reserved for replicas of managed clusters")
774            }
775            Self::MissingName(item_type) => {
776                write!(f, "unspecified name for {item_type}")
777            }
778            Self::InvalidRefreshAt => {
779                write!(f, "REFRESH AT argument must be an expression that can be simplified \
780                           and/or cast to a constant whose type is mz_timestamp")
781            }
782            Self::InvalidRefreshEveryAlignedTo => {
783                write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
784                           and/or cast to a constant whose type is mz_timestamp")
785            }
786            Self::CreateReplicaFailStorageObjects {..} => {
787                write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
788            },
789            Self::MismatchedObjectType {
790                name,
791                is_type,
792                expected_type,
793            } => {
794                write!(
795                    f,
796                    "{name} is {} {} not {} {}",
797                    if *is_type == ObjectType::Index {
798                        "an"
799                    } else {
800                        "a"
801                    },
802                    is_type.to_string().to_lowercase(),
803                    if *expected_type == ObjectType::Index {
804                        "an"
805                    } else {
806                        "a"
807                    },
808                    expected_type.to_string().to_lowercase()
809                )
810            }
811            Self::TableContainsUningestableTypes { name, type_, column } => {
812                write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
813            },
814            Self::RetainHistoryLow { limit } => {
815                write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
816            },
817            Self::RetainHistoryRequired => {
818                write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
819            },
820            Self::SubsourceResolutionError(e) => write!(f, "{}", e),
821            Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
822            Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
823            Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
824            Self::UntilReadyTimeoutRequired => {
825                write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
826            },
827            Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
828            Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
829            Self::UnknownCursor(name) => {
830                write!(f, "cursor {} does not exist", name.quoted())
831            }
832            Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
833            Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value")
834        }
835    }
836}
837
838impl Error for PlanError {}
839
840impl From<CatalogError> for PlanError {
841    fn from(e: CatalogError) -> PlanError {
842        PlanError::Catalog(e)
843    }
844}
845
846impl From<strconv::ParseError> for PlanError {
847    fn from(e: strconv::ParseError) -> PlanError {
848        PlanError::StrconvParse(e)
849    }
850}
851
852impl From<RecursionLimitError> for PlanError {
853    fn from(e: RecursionLimitError) -> PlanError {
854        PlanError::RecursionLimit(e)
855    }
856}
857
858impl From<InvalidNumericMaxScaleError> for PlanError {
859    fn from(e: InvalidNumericMaxScaleError) -> PlanError {
860        PlanError::InvalidNumericMaxScale(e)
861    }
862}
863
864impl From<InvalidCharLengthError> for PlanError {
865    fn from(e: InvalidCharLengthError) -> PlanError {
866        PlanError::InvalidCharLength(e)
867    }
868}
869
870impl From<InvalidVarCharMaxLengthError> for PlanError {
871    fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
872        PlanError::InvalidVarCharMaxLength(e)
873    }
874}
875
876impl From<InvalidTimestampPrecisionError> for PlanError {
877    fn from(e: InvalidTimestampPrecisionError) -> PlanError {
878        PlanError::InvalidTimestampPrecision(e)
879    }
880}
881
882impl From<anyhow::Error> for PlanError {
883    fn from(e: anyhow::Error) -> PlanError {
884        // WIP: Do we maybe want to keep the alternate selector for these?
885        sql_err!("{}", e.display_with_causes())
886    }
887}
888
889impl From<TryFromIntError> for PlanError {
890    fn from(e: TryFromIntError) -> PlanError {
891        sql_err!("{}", e.display_with_causes())
892    }
893}
894
895impl From<ParseIntError> for PlanError {
896    fn from(e: ParseIntError) -> PlanError {
897        sql_err!("{}", e.display_with_causes())
898    }
899}
900
901impl From<EvalError> for PlanError {
902    fn from(e: EvalError) -> PlanError {
903        sql_err!("{}", e.display_with_causes())
904    }
905}
906
907impl From<ParserError> for PlanError {
908    fn from(e: ParserError) -> PlanError {
909        PlanError::Parser(e)
910    }
911}
912
913impl From<ParserStatementError> for PlanError {
914    fn from(e: ParserStatementError) -> PlanError {
915        PlanError::ParserStatement(e)
916    }
917}
918
919impl From<PostgresError> for PlanError {
920    fn from(e: PostgresError) -> PlanError {
921        PlanError::PostgresConnectionErr { cause: Arc::new(e) }
922    }
923}
924
925impl From<MySqlError> for PlanError {
926    fn from(e: MySqlError) -> PlanError {
927        PlanError::MySqlConnectionErr { cause: Arc::new(e) }
928    }
929}
930
931impl From<SqlServerError> for PlanError {
932    fn from(e: SqlServerError) -> PlanError {
933        PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
934    }
935}
936
937impl From<VarError> for PlanError {
938    fn from(e: VarError) -> Self {
939        PlanError::VarError(e)
940    }
941}
942
943impl From<PgSourcePurificationError> for PlanError {
944    fn from(e: PgSourcePurificationError) -> Self {
945        PlanError::PgSourcePurification(e)
946    }
947}
948
949impl From<KafkaSourcePurificationError> for PlanError {
950    fn from(e: KafkaSourcePurificationError) -> Self {
951        PlanError::KafkaSourcePurification(e)
952    }
953}
954
955impl From<KafkaSinkPurificationError> for PlanError {
956    fn from(e: KafkaSinkPurificationError) -> Self {
957        PlanError::KafkaSinkPurification(e)
958    }
959}
960
961impl From<IcebergSinkPurificationError> for PlanError {
962    fn from(e: IcebergSinkPurificationError) -> Self {
963        PlanError::IcebergSinkPurification(e)
964    }
965}
966
967impl From<CsrPurificationError> for PlanError {
968    fn from(e: CsrPurificationError) -> Self {
969        PlanError::CsrPurification(e)
970    }
971}
972
973impl From<LoadGeneratorSourcePurificationError> for PlanError {
974    fn from(e: LoadGeneratorSourcePurificationError) -> Self {
975        PlanError::LoadGeneratorSourcePurification(e)
976    }
977}
978
979impl From<MySqlSourcePurificationError> for PlanError {
980    fn from(e: MySqlSourcePurificationError) -> Self {
981        PlanError::MySqlSourcePurification(e)
982    }
983}
984
985impl From<SqlServerSourcePurificationError> for PlanError {
986    fn from(e: SqlServerSourcePurificationError) -> Self {
987        PlanError::SqlServerSourcePurificationError(e)
988    }
989}
990
991impl From<IdentError> for PlanError {
992    fn from(e: IdentError) -> Self {
993        PlanError::InvalidIdent(e)
994    }
995}
996
997impl From<ExternalReferenceResolutionError> for PlanError {
998    fn from(e: ExternalReferenceResolutionError) -> Self {
999        PlanError::SubsourceResolutionError(e)
1000    }
1001}
1002
1003struct ColumnDisplay<'a> {
1004    table: &'a Option<PartialItemName>,
1005    column: &'a ColumnName,
1006}
1007
1008impl<'a> fmt::Display for ColumnDisplay<'a> {
1009    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1010        if let Some(table) = &self.table {
1011            format!("{}.{}", table.item, self.column).quoted().fmt(f)
1012        } else {
1013            self.column.quoted().fmt(f)
1014        }
1015    }
1016}