mz_sql/plan/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37    CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
46    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
47    SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Clone, Debug)]
52pub enum PlanError {
53    /// This feature is not yet supported, but may be supported at some point in the future.
54    Unsupported {
55        feature: String,
56        discussion_no: Option<usize>,
57    },
58    /// This feature is not supported, and will likely never be supported.
59    NeverSupported {
60        feature: String,
61        documentation_link: Option<String>,
62        details: Option<String>,
63    },
64    UnknownColumn {
65        table: Option<PartialItemName>,
66        column: ColumnName,
67        similar: Box<[ColumnName]>,
68    },
69    UngroupedColumn {
70        table: Option<PartialItemName>,
71        column: ColumnName,
72    },
73    WrongJoinTypeForLateralColumn {
74        table: Option<PartialItemName>,
75        column: ColumnName,
76    },
77    AmbiguousColumn(ColumnName),
78    TooManyColumns {
79        max_num_columns: usize,
80        req_num_columns: usize,
81    },
82    ColumnAlreadyExists {
83        column_name: ColumnName,
84        object_name: String,
85    },
86    AmbiguousTable(PartialItemName),
87    UnknownColumnInUsingClause {
88        column: ColumnName,
89        join_side: JoinSide,
90    },
91    AmbiguousColumnInUsingClause {
92        column: ColumnName,
93        join_side: JoinSide,
94    },
95    MisqualifiedName(String),
96    OverqualifiedDatabaseName(String),
97    OverqualifiedSchemaName(String),
98    UnderqualifiedColumnName(String),
99    SubqueriesDisallowed {
100        context: String,
101    },
102    UnknownParameter(usize),
103    RecursionLimit(RecursionLimitError),
104    StrconvParse(strconv::ParseError),
105    Catalog(CatalogError),
106    UpsertSinkWithoutKey,
107    UpsertSinkWithInvalidKey {
108        name: String,
109        desired_key: Vec<String>,
110        valid_keys: Vec<Vec<String>>,
111    },
112    InvalidWmrRecursionLimit(String),
113    InvalidNumericMaxScale(InvalidNumericMaxScaleError),
114    InvalidCharLength(InvalidCharLengthError),
115    InvalidId(CatalogItemId),
116    InvalidIdent(IdentError),
117    InvalidObject(Box<ResolvedItemName>),
118    InvalidObjectType {
119        expected_type: SystemObjectType,
120        actual_type: SystemObjectType,
121        object_name: String,
122    },
123    InvalidPrivilegeTypes {
124        invalid_privileges: AclMode,
125        object_description: ErrorMessageObjectDescription,
126    },
127    InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
128    InvalidTimestampPrecision(InvalidTimestampPrecisionError),
129    InvalidSecret(Box<ResolvedItemName>),
130    InvalidTemporarySchema,
131    InvalidCast {
132        name: String,
133        ccx: CastContext,
134        from: String,
135        to: String,
136    },
137    InvalidTable {
138        name: String,
139    },
140    InvalidVersion {
141        name: String,
142        version: String,
143    },
144    MangedReplicaName(String),
145    ParserStatement(ParserStatementError),
146    Parser(ParserError),
147    DropViewOnMaterializedView(String),
148    DependentObjectsStillExist {
149        object_type: String,
150        object_name: String,
151        // (dependent type, name)
152        dependents: Vec<(String, String)>,
153    },
154    AlterViewOnMaterializedView(String),
155    ShowCreateViewOnMaterializedView(String),
156    ExplainViewOnMaterializedView(String),
157    UnacceptableTimelineName(String),
158    FetchingCsrSchemaFailed {
159        schema_lookup: String,
160        cause: Arc<dyn Error + Send + Sync>,
161    },
162    PostgresConnectionErr {
163        cause: Arc<mz_postgres_util::PostgresError>,
164    },
165    MySqlConnectionErr {
166        cause: Arc<MySqlError>,
167    },
168    SqlServerConnectionErr {
169        cause: Arc<SqlServerError>,
170    },
171    SubsourceNameConflict {
172        name: UnresolvedItemName,
173        upstream_references: Vec<UnresolvedItemName>,
174    },
175    SubsourceDuplicateReference {
176        name: UnresolvedItemName,
177        target_names: Vec<UnresolvedItemName>,
178    },
179    NoTablesFoundForSchemas(Vec<String>),
180    InvalidProtobufSchema {
181        cause: protobuf_native::OperationFailedError,
182    },
183    InvalidOptionValue {
184        // Expected to be generated from the `to_ast_string` value on the option
185        // name.
186        option_name: String,
187        err: Box<PlanError>,
188    },
189    UnexpectedDuplicateReference {
190        name: UnresolvedItemName,
191    },
192    /// Declaration of a recursive type did not match the inferred type.
193    RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
194    UnknownFunction {
195        name: String,
196        arg_types: Vec<String>,
197    },
198    IndistinctFunction {
199        name: String,
200        arg_types: Vec<String>,
201    },
202    UnknownOperator {
203        name: String,
204        arg_types: Vec<String>,
205    },
206    IndistinctOperator {
207        name: String,
208        arg_types: Vec<String>,
209    },
210    InvalidPrivatelinkAvailabilityZone {
211        name: String,
212        supported_azs: BTreeSet<String>,
213    },
214    DuplicatePrivatelinkAvailabilityZone {
215        duplicate_azs: BTreeSet<String>,
216    },
217    InvalidSchemaName,
218    ItemAlreadyExists {
219        name: String,
220        item_type: CatalogItemType,
221    },
222    ManagedCluster {
223        cluster_name: String,
224    },
225    InvalidKeysInSubscribeEnvelopeUpsert,
226    InvalidKeysInSubscribeEnvelopeDebezium,
227    InvalidPartitionByEnvelopeDebezium {
228        column_name: String,
229    },
230    InvalidOrderByInSubscribeWithinTimestampOrderBy,
231    FromValueRequiresParen,
232    VarError(VarError),
233    UnsolvablePolymorphicFunctionInput,
234    ShowCommandInView,
235    WebhookValidationDoesNotUseColumns,
236    WebhookValidationNonDeterministic,
237    InternalFunctionCall,
238    CommentTooLong {
239        length: usize,
240        max_size: usize,
241    },
242    InvalidTimestampInterval {
243        min: Duration,
244        max: Duration,
245        requested: Duration,
246    },
247    InvalidGroupSizeHints,
248    PgSourcePurification(PgSourcePurificationError),
249    KafkaSourcePurification(KafkaSourcePurificationError),
250    KafkaSinkPurification(KafkaSinkPurificationError),
251    LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
252    CsrPurification(CsrPurificationError),
253    MySqlSourcePurification(MySqlSourcePurificationError),
254    SqlServerSourcePurificationError(SqlServerSourcePurificationError),
255    UseTablesForSources(String),
256    MissingName(CatalogItemType),
257    InvalidRefreshAt,
258    InvalidRefreshEveryAlignedTo,
259    CreateReplicaFailStorageObjects {
260        /// The current number of replicas on the cluster
261        current_replica_count: usize,
262        /// THe number of internal replicas on the cluster
263        internal_replica_count: usize,
264        /// The number of replicas that executing this command would have
265        /// created
266        hypothetical_replica_count: usize,
267    },
268    MismatchedObjectType {
269        name: PartialItemName,
270        is_type: ObjectType,
271        expected_type: ObjectType,
272    },
273    /// MZ failed to generate cast for the data type.
274    TableContainsUningestableTypes {
275        name: String,
276        type_: String,
277        column: String,
278    },
279    RetainHistoryLow {
280        limit: Duration,
281    },
282    RetainHistoryRequired,
283    UntilReadyTimeoutRequired,
284    SubsourceResolutionError(ExternalReferenceResolutionError),
285    Replan(String),
286    NetworkPolicyLockoutError,
287    NetworkPolicyInUse,
288    // TODO(benesch): eventually all errors should be structured.
289    Unstructured(String),
290}
291
292impl PlanError {
293    pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
294        PlanError::UngroupedColumn {
295            table: item.table_name.clone(),
296            column: item.column_name.clone(),
297        }
298    }
299
300    pub fn detail(&self) -> Option<String> {
301        match self {
302            Self::NeverSupported { details, .. } => details.clone(),
303            Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
304            Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
305            Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
306            Self::InvalidOptionValue { err, .. } => err.detail(),
307            Self::UpsertSinkWithInvalidKey {
308                name,
309                desired_key,
310                valid_keys,
311            } => {
312                let valid_keys = if valid_keys.is_empty() {
313                    "There are no known valid unique keys for the underlying relation.".into()
314                } else {
315                    format!(
316                        "The following keys are known to be unique for the underlying relation:\n{}",
317                        valid_keys
318                            .iter()
319                            .map(|k|
320                                format!("  ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
321                            )
322                            .join("\n"),
323                    )
324                };
325                Some(format!(
326                    "Materialize could not prove that the specified upsert envelope key ({}) \
327                    was a unique key of the underlying relation {}. {valid_keys}",
328                    separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
329                    name.quoted()
330                ))
331            }
332            Self::VarError(e) => e.detail(),
333            Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
334            Self::PgSourcePurification(e) => e.detail(),
335            Self::MySqlSourcePurification(e) => e.detail(),
336            Self::SqlServerSourcePurificationError(e) => e.detail(),
337            Self::KafkaSourcePurification(e) => e.detail(),
338            Self::LoadGeneratorSourcePurification(e) => e.detail(),
339            Self::CsrPurification(e) => e.detail(),
340            Self::KafkaSinkPurification(e) => e.detail(),
341            Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
342                Some(format!(
343                    "Currently have {} replica{}{}; command would result in {}",
344                    current,
345                    if *current != 1 { "s" } else { "" },
346                    if *internal > 0 {
347                        format!(" ({} internal)", internal)
348                    } else {
349                        "".to_string()
350                    },
351                    target
352                ))
353            },
354            Self::SubsourceNameConflict {
355                name: _,
356                upstream_references,
357            } => Some(format!(
358                "referenced tables with duplicate name: {}",
359                itertools::join(upstream_references, ", ")
360            )),
361            Self::SubsourceDuplicateReference {
362                name: _,
363                target_names,
364            } => Some(format!(
365                "subsources referencing table: {}",
366                itertools::join(target_names, ", ")
367            )),
368            Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
369                "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
370            ),
371            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
372                "missing schemas: {}",
373                separated(", ", schemas.iter().map(|c| c.quoted()))
374            )),
375            _ => None,
376        }
377    }
378
379    pub fn hint(&self) -> Option<String> {
380        match self {
381            Self::DropViewOnMaterializedView(_) => {
382                Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
383            }
384            Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
385            Self::AlterViewOnMaterializedView(_) => {
386                Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
387            }
388            Self::ShowCreateViewOnMaterializedView(_) => {
389                Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
390            }
391            Self::ExplainViewOnMaterializedView(_) => {
392                Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
393            }
394            Self::UnacceptableTimelineName(_) => {
395                Some("The prefix \"mz_\" is reserved for system timelines.".into())
396            }
397            Self::PostgresConnectionErr { cause } => {
398                if let Some(cause) = cause.source() {
399                    if let Some(cause) = cause.downcast_ref::<io::Error>() {
400                        if cause.kind() == io::ErrorKind::TimedOut {
401                            return Some(
402                                "Do you have a firewall or security group that is \
403                                preventing Materialize from connecting to your PostgreSQL server?"
404                                    .into(),
405                            );
406                        }
407                    }
408                }
409                None
410            }
411            Self::InvalidOptionValue { err, .. } => err.hint(),
412            Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
413            Self::IndistinctFunction {..} => {
414                Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
415            }
416            Self::UnknownOperator {..} => {
417                Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
418            }
419            Self::IndistinctOperator {..} => {
420                Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
421            },
422            Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
423                let supported_azs_str = supported_azs.iter().join("\n  ");
424                Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n  {}", supported_azs_str))
425            }
426            Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
427                let duplicate_azs  = duplicate_azs.iter().join("\n  ");
428                Some(format!("Duplicated availability zones:\n  {}", duplicate_azs))
429            }
430            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
431                Some("All keys must be columns on the underlying relation.".into())
432            }
433            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
434                Some("All keys must be columns on the underlying relation.".into())
435            }
436            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
437                Some("All order bys must be output columns.".into())
438            }
439            Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
440                Some("See: https://materialize.com/s/sink-key-selection".into())
441            }
442            Self::Catalog(e) => e.hint(),
443            Self::VarError(e) => e.hint(),
444            Self::PgSourcePurification(e) => e.hint(),
445            Self::MySqlSourcePurification(e) => e.hint(),
446            Self::SqlServerSourcePurificationError(e) => e.hint(),
447            Self::KafkaSourcePurification(e) => e.hint(),
448            Self::LoadGeneratorSourcePurification(e) => e.hint(),
449            Self::CsrPurification(e) => e.hint(),
450            Self::KafkaSinkPurification(e) => e.hint(),
451            Self::UnknownColumn { table, similar, .. } => {
452                let suffix = "Make sure to surround case sensitive names in double quotes.";
453                match &similar[..] {
454                    [] => None,
455                    [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
456                    names => {
457                        let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
458                        Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
459                    }
460                }
461            }
462            Self::RecursiveTypeMismatch(..) => {
463                Some("You will need to rewrite or cast the query's expressions.".into())
464            },
465            Self::InvalidRefreshAt
466            | Self::InvalidRefreshEveryAlignedTo => {
467                Some("Calling `mz_now()` is allowed.".into())
468            },
469            Self::TableContainsUningestableTypes { column,.. } => {
470                Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
471            }
472            Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
473                Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
474            }
475            Self::NetworkPolicyInUse => {
476                Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
477            }
478            _ => None,
479        }
480    }
481}
482
483impl fmt::Display for PlanError {
484    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485        match self {
486            Self::Unsupported { feature, discussion_no } => {
487                write!(f, "{} not yet supported", feature)?;
488                if let Some(discussion_no) = discussion_no {
489                    write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
490                }
491                Ok(())
492            }
493            Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
494                write!(f, "{feature} is not supported",)?;
495                if let Some(documentation_path) = documentation_path {
496                    write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
497                }
498                Ok(())
499            }
500            Self::UnknownColumn { table, column, similar: _ } => write!(
501                f,
502                "column {} does not exist",
503                ColumnDisplay { table, column }
504            ),
505            Self::UngroupedColumn { table, column } => write!(
506                f,
507                "column {} must appear in the GROUP BY clause or be used in an aggregate function",
508                ColumnDisplay { table, column },
509            ),
510            Self::WrongJoinTypeForLateralColumn { table, column } => write!(
511                f,
512                "column {} cannot be referenced from this part of the query: \
513                the combining JOIN type must be INNER or LEFT for a LATERAL reference",
514                ColumnDisplay { table, column },
515            ),
516            Self::AmbiguousColumn(column) => write!(
517                f,
518                "column reference {} is ambiguous",
519                column.as_str().quoted()
520            ),
521            Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
522                f,
523                "attempt to create relation with too many columns, {} max: {}",
524                req_num_columns, max_num_columns
525            ),
526            Self::ColumnAlreadyExists { column_name, object_name } => write!(
527                f,
528                "column {} of relation {} already exists",
529                column_name.as_str().quoted(), object_name.quoted(),
530            ),
531            Self::AmbiguousTable(table) => write!(
532                f,
533                "table reference {} is ambiguous",
534                table.item.as_str().quoted()
535            ),
536            Self::UnknownColumnInUsingClause { column, join_side } => write!(
537                f,
538                "column {} specified in USING clause does not exist in {} table",
539                column.as_str().quoted(),
540                join_side,
541            ),
542            Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
543                f,
544                "common column name {} appears more than once in {} table",
545                column.as_str().quoted(),
546                join_side,
547            ),
548            Self::MisqualifiedName(name) => write!(
549                f,
550                "qualified name did not have between 1 and 3 components: {}",
551                name
552            ),
553            Self::OverqualifiedDatabaseName(name) => write!(
554                f,
555                "database name '{}' does not have exactly one component",
556                name
557            ),
558            Self::OverqualifiedSchemaName(name) => write!(
559                f,
560                "schema name '{}' cannot have more than two components",
561                name
562            ),
563            Self::UnderqualifiedColumnName(name) => write!(
564                f,
565                "column name '{}' must have at least a table qualification",
566                name
567            ),
568            Self::UnacceptableTimelineName(name) => {
569                write!(f, "unacceptable timeline name {}", name.quoted(),)
570            }
571            Self::SubqueriesDisallowed { context } => {
572                write!(f, "{} does not allow subqueries", context)
573            }
574            Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
575            Self::RecursionLimit(e) => write!(f, "{}", e),
576            Self::StrconvParse(e) => write!(f, "{}", e),
577            Self::Catalog(e) => write!(f, "{}", e),
578            Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
579            Self::UpsertSinkWithInvalidKey { .. } => {
580                write!(f, "upsert key could not be validated as unique")
581            }
582            Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
583            Self::InvalidNumericMaxScale(e) => e.fmt(f),
584            Self::InvalidCharLength(e) => e.fmt(f),
585            Self::InvalidVarCharMaxLength(e) => e.fmt(f),
586            Self::InvalidTimestampPrecision(e) => e.fmt(f),
587            Self::Parser(e) => e.fmt(f),
588            Self::ParserStatement(e) => e.fmt(f),
589            Self::Unstructured(e) => write!(f, "{}", e),
590            Self::InvalidId(id) => write!(f, "invalid id {}", id),
591            Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
592            Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
593            Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
594            Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
595                write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
596            },
597            Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
598            Self::InvalidTemporarySchema => {
599                write!(f, "cannot create temporary item in non-temporary schema")
600            }
601            Self::InvalidCast { name, ccx, from, to } =>{
602                write!(
603                    f,
604                    "{name} does not support {ccx}casting from {from} to {to}",
605                    ccx = if matches!(ccx, CastContext::Implicit) {
606                        "implicitly "
607                    } else {
608                        ""
609                    },
610                )
611            }
612            Self::InvalidTable { name } => {
613                write!(f, "invalid table definition for {}", name.quoted())
614            },
615            Self::InvalidVersion { name, version } => {
616                write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
617            },
618            Self::DropViewOnMaterializedView(name)
619            | Self::AlterViewOnMaterializedView(name)
620            | Self::ShowCreateViewOnMaterializedView(name)
621            | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
622            Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
623                write!(f, "failed to fetch schema {schema_lookup} from schema registry")
624            }
625            Self::PostgresConnectionErr { .. } => {
626                write!(f, "failed to connect to PostgreSQL database")
627            }
628            Self::MySqlConnectionErr { cause } => {
629                write!(f, "failed to connect to MySQL database: {}", cause)
630            }
631            Self::SqlServerConnectionErr { cause } => {
632                write!(f, "failed to connect to SQL Server database: {}", cause)
633            }
634            Self::SubsourceNameConflict {
635                name , upstream_references: _,
636            } => {
637                write!(f, "multiple subsources would be named {}", name)
638            },
639            Self::SubsourceDuplicateReference {
640                name,
641                target_names: _,
642            } => {
643                write!(f, "multiple subsources refer to table {}", name)
644            },
645            Self::NoTablesFoundForSchemas(schemas) => {
646                write!(f, "no tables found in referenced schemas: {}",
647                    separated(", ", schemas.iter().map(|c| c.quoted()))
648                )
649            },
650            Self::InvalidProtobufSchema { .. } => {
651                write!(f, "invalid protobuf schema")
652            }
653            Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
654                let reason = match &dependents[..] {
655                    [] => " because other objects depend on it".to_string(),
656                    dependents => {
657                        let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
658                        format!(": still depended upon by {dependents}")
659                    },
660                };
661                let object_name = object_name.quoted();
662                write!(f, "cannot drop {object_type} {object_name}{reason}")
663            }
664            Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
665            Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
666            Self::RecursiveTypeMismatch(name, declared, inferred) => {
667                let declared = separated(", ", declared);
668                let inferred = separated(", ", inferred);
669                let name = name.quoted();
670                write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
671            },
672            Self::UnknownFunction {name, arg_types, ..} => {
673                write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
674            },
675            Self::IndistinctFunction {name, arg_types, ..} => {
676                write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
677            },
678            Self::UnknownOperator {name, arg_types, ..} => {
679                write!(f, "operator does not exist: {}", match arg_types.as_slice(){
680                    [typ] => format!("{} {}", name, typ),
681                    [ltyp, rtyp] => {
682                        format!("{} {} {}", ltyp, name, rtyp)
683                    }
684                    _ => unreachable!("non-unary non-binary operator"),
685                })
686            },
687            Self::IndistinctOperator {name, arg_types, ..} => {
688                write!(f, "operator is not unique: {}", match arg_types.as_slice(){
689                    [typ] => format!("{} {}", name, typ),
690                    [ltyp, rtyp] => {
691                        format!("{} {} {}", ltyp, name, rtyp)
692                    }
693                    _ => unreachable!("non-unary non-binary operator"),
694                })
695            },
696            Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
697            Self::DuplicatePrivatelinkAvailabilityZone {..} =>   write!(f, "connection cannot contain duplicate availability zones"),
698            Self::InvalidSchemaName => write!(f, "no schema has been selected to create in"),
699            Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
700            Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
701            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
702                write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
703            }
704            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
705                write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
706            }
707            Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
708                write!(
709                    f,
710                    "PARTITION BY expression cannot refer to non-key column {}",
711                    column_name.quoted(),
712                )
713            }
714            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
715                write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
716            }
717            Self::FromValueRequiresParen => f.write_str(
718                "VALUES expression in FROM clause must be surrounded by parentheses"
719            ),
720            Self::VarError(e) => e.fmt(f),
721            Self::UnsolvablePolymorphicFunctionInput => f.write_str(
722                "could not determine polymorphic type because input has type unknown"
723            ),
724            Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
725            Self::WebhookValidationDoesNotUseColumns => f.write_str(
726                "expression provided in CHECK does not reference any columns"
727            ),
728            Self::WebhookValidationNonDeterministic => f.write_str(
729                "expression provided in CHECK is not deterministic"
730            ),
731            Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
732            Self::CommentTooLong { length, max_size } => {
733                write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
734            }
735            Self::InvalidTimestampInterval { min, max, requested } => {
736                write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
737            }
738            Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
739                simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
740                or LIMIT INPUT GROUP SIZE"),
741            Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
742            Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
743            Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
744            Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
745            Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
746            Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
747            Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
748            Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
749            Self::MangedReplicaName(name) => {
750                write!(f, "{name} is reserved for replicas of managed clusters")
751            }
752            Self::MissingName(item_type) => {
753                write!(f, "unspecified name for {item_type}")
754            }
755            Self::InvalidRefreshAt => {
756                write!(f, "REFRESH AT argument must be an expression that can be simplified \
757                           and/or cast to a constant whose type is mz_timestamp")
758            }
759            Self::InvalidRefreshEveryAlignedTo => {
760                write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
761                           and/or cast to a constant whose type is mz_timestamp")
762            }
763            Self::CreateReplicaFailStorageObjects {..} => {
764                write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
765            },
766            Self::MismatchedObjectType {
767                name,
768                is_type,
769                expected_type,
770            } => {
771                write!(
772                    f,
773                    "{name} is {} {} not {} {}",
774                    if *is_type == ObjectType::Index {
775                        "an"
776                    } else {
777                        "a"
778                    },
779                    is_type.to_string().to_lowercase(),
780                    if *expected_type == ObjectType::Index {
781                        "an"
782                    } else {
783                        "a"
784                    },
785                    expected_type.to_string().to_lowercase()
786                )
787            }
788            Self::TableContainsUningestableTypes { name, type_, column } => {
789                write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
790            },
791            Self::RetainHistoryLow { limit } => {
792                write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
793            },
794            Self::RetainHistoryRequired => {
795                write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
796            },
797            Self::SubsourceResolutionError(e) => write!(f, "{}", e),
798            Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
799            Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
800            Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
801            Self::UntilReadyTimeoutRequired => {
802                write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
803            },
804        }
805    }
806}
807
808impl Error for PlanError {}
809
810impl From<CatalogError> for PlanError {
811    fn from(e: CatalogError) -> PlanError {
812        PlanError::Catalog(e)
813    }
814}
815
816impl From<strconv::ParseError> for PlanError {
817    fn from(e: strconv::ParseError) -> PlanError {
818        PlanError::StrconvParse(e)
819    }
820}
821
822impl From<RecursionLimitError> for PlanError {
823    fn from(e: RecursionLimitError) -> PlanError {
824        PlanError::RecursionLimit(e)
825    }
826}
827
828impl From<InvalidNumericMaxScaleError> for PlanError {
829    fn from(e: InvalidNumericMaxScaleError) -> PlanError {
830        PlanError::InvalidNumericMaxScale(e)
831    }
832}
833
834impl From<InvalidCharLengthError> for PlanError {
835    fn from(e: InvalidCharLengthError) -> PlanError {
836        PlanError::InvalidCharLength(e)
837    }
838}
839
840impl From<InvalidVarCharMaxLengthError> for PlanError {
841    fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
842        PlanError::InvalidVarCharMaxLength(e)
843    }
844}
845
846impl From<InvalidTimestampPrecisionError> for PlanError {
847    fn from(e: InvalidTimestampPrecisionError) -> PlanError {
848        PlanError::InvalidTimestampPrecision(e)
849    }
850}
851
852impl From<anyhow::Error> for PlanError {
853    fn from(e: anyhow::Error) -> PlanError {
854        // WIP: Do we maybe want to keep the alternate selector for these?
855        sql_err!("{}", e.display_with_causes())
856    }
857}
858
859impl From<TryFromIntError> for PlanError {
860    fn from(e: TryFromIntError) -> PlanError {
861        sql_err!("{}", e.display_with_causes())
862    }
863}
864
865impl From<ParseIntError> for PlanError {
866    fn from(e: ParseIntError) -> PlanError {
867        sql_err!("{}", e.display_with_causes())
868    }
869}
870
871impl From<EvalError> for PlanError {
872    fn from(e: EvalError) -> PlanError {
873        sql_err!("{}", e.display_with_causes())
874    }
875}
876
877impl From<ParserError> for PlanError {
878    fn from(e: ParserError) -> PlanError {
879        PlanError::Parser(e)
880    }
881}
882
883impl From<ParserStatementError> for PlanError {
884    fn from(e: ParserStatementError) -> PlanError {
885        PlanError::ParserStatement(e)
886    }
887}
888
889impl From<PostgresError> for PlanError {
890    fn from(e: PostgresError) -> PlanError {
891        PlanError::PostgresConnectionErr { cause: Arc::new(e) }
892    }
893}
894
895impl From<MySqlError> for PlanError {
896    fn from(e: MySqlError) -> PlanError {
897        PlanError::MySqlConnectionErr { cause: Arc::new(e) }
898    }
899}
900
901impl From<SqlServerError> for PlanError {
902    fn from(e: SqlServerError) -> PlanError {
903        PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
904    }
905}
906
907impl From<VarError> for PlanError {
908    fn from(e: VarError) -> Self {
909        PlanError::VarError(e)
910    }
911}
912
913impl From<PgSourcePurificationError> for PlanError {
914    fn from(e: PgSourcePurificationError) -> Self {
915        PlanError::PgSourcePurification(e)
916    }
917}
918
919impl From<KafkaSourcePurificationError> for PlanError {
920    fn from(e: KafkaSourcePurificationError) -> Self {
921        PlanError::KafkaSourcePurification(e)
922    }
923}
924
925impl From<KafkaSinkPurificationError> for PlanError {
926    fn from(e: KafkaSinkPurificationError) -> Self {
927        PlanError::KafkaSinkPurification(e)
928    }
929}
930
931impl From<CsrPurificationError> for PlanError {
932    fn from(e: CsrPurificationError) -> Self {
933        PlanError::CsrPurification(e)
934    }
935}
936
937impl From<LoadGeneratorSourcePurificationError> for PlanError {
938    fn from(e: LoadGeneratorSourcePurificationError) -> Self {
939        PlanError::LoadGeneratorSourcePurification(e)
940    }
941}
942
943impl From<MySqlSourcePurificationError> for PlanError {
944    fn from(e: MySqlSourcePurificationError) -> Self {
945        PlanError::MySqlSourcePurification(e)
946    }
947}
948
949impl From<SqlServerSourcePurificationError> for PlanError {
950    fn from(e: SqlServerSourcePurificationError) -> Self {
951        PlanError::SqlServerSourcePurificationError(e)
952    }
953}
954
955impl From<IdentError> for PlanError {
956    fn from(e: IdentError) -> Self {
957        PlanError::InvalidIdent(e)
958    }
959}
960
961impl From<ExternalReferenceResolutionError> for PlanError {
962    fn from(e: ExternalReferenceResolutionError) -> Self {
963        PlanError::SubsourceResolutionError(e)
964    }
965}
966
967struct ColumnDisplay<'a> {
968    table: &'a Option<PartialItemName>,
969    column: &'a ColumnName,
970}
971
972impl<'a> fmt::Display for ColumnDisplay<'a> {
973    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
974        if let Some(table) = &self.table {
975            format!("{}.{}", table.item, self.column).quoted().fmt(f)
976        } else {
977            self.column.as_str().quoted().fmt(f)
978        }
979    }
980}