mz_sql/plan/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37    CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45    CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46    KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47    MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53    /// This feature is not yet supported, but may be supported at some point in the future.
54    Unsupported {
55        feature: String,
56        discussion_no: Option<usize>,
57    },
58    /// This feature is not supported, and will likely never be supported.
59    NeverSupported {
60        feature: String,
61        documentation_link: Option<String>,
62        details: Option<String>,
63    },
64    UnknownColumn {
65        table: Option<PartialItemName>,
66        column: ColumnName,
67        similar: Box<[ColumnName]>,
68    },
69    UngroupedColumn {
70        table: Option<PartialItemName>,
71        column: ColumnName,
72    },
73    TypeWithoutColumns {
74        type_name: PartialItemName,
75    },
76    WrongJoinTypeForLateralColumn {
77        table: Option<PartialItemName>,
78        column: ColumnName,
79    },
80    AmbiguousColumn(ColumnName),
81    TooManyColumns {
82        max_num_columns: usize,
83        req_num_columns: usize,
84    },
85    ColumnAlreadyExists {
86        column_name: ColumnName,
87        object_name: String,
88    },
89    AmbiguousTable(PartialItemName),
90    UnknownColumnInUsingClause {
91        column: ColumnName,
92        join_side: JoinSide,
93    },
94    AmbiguousColumnInUsingClause {
95        column: ColumnName,
96        join_side: JoinSide,
97    },
98    MisqualifiedName(String),
99    OverqualifiedDatabaseName(String),
100    OverqualifiedSchemaName(String),
101    UnderqualifiedColumnName(String),
102    SubqueriesDisallowed {
103        context: String,
104    },
105    UnknownParameter(usize),
106    ParameterNotAllowed(String),
107    WrongParameterType(usize, String, String),
108    RecursionLimit(RecursionLimitError),
109    StrconvParse(strconv::ParseError),
110    Catalog(CatalogError),
111    UpsertSinkWithoutKey,
112    UpsertSinkWithInvalidKey {
113        name: String,
114        desired_key: Vec<String>,
115        valid_keys: Vec<Vec<String>>,
116    },
117    InvalidWmrRecursionLimit(String),
118    InvalidNumericMaxScale(InvalidNumericMaxScaleError),
119    InvalidCharLength(InvalidCharLengthError),
120    InvalidId(CatalogItemId),
121    InvalidIdent(IdentError),
122    InvalidObject(Box<ResolvedItemName>),
123    InvalidObjectType {
124        expected_type: SystemObjectType,
125        actual_type: SystemObjectType,
126        object_name: String,
127    },
128    InvalidPrivilegeTypes {
129        invalid_privileges: AclMode,
130        object_description: ErrorMessageObjectDescription,
131    },
132    InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
133    InvalidTimestampPrecision(InvalidTimestampPrecisionError),
134    InvalidSecret(Box<ResolvedItemName>),
135    InvalidTemporarySchema,
136    InvalidCast {
137        name: String,
138        ccx: CastContext,
139        from: String,
140        to: String,
141    },
142    InvalidTable {
143        name: String,
144    },
145    InvalidVersion {
146        name: String,
147        version: String,
148    },
149    InvalidSinkFrom {
150        name: String,
151        item_type: CatalogItemType,
152    },
153    MangedReplicaName(String),
154    ParserStatement(ParserStatementError),
155    Parser(ParserError),
156    DropViewOnMaterializedView(String),
157    DependentObjectsStillExist {
158        object_type: String,
159        object_name: String,
160        // (dependent type, name)
161        dependents: Vec<(String, String)>,
162    },
163    AlterViewOnMaterializedView(String),
164    ShowCreateViewOnMaterializedView(String),
165    ExplainViewOnMaterializedView(String),
166    UnacceptableTimelineName(String),
167    FetchingCsrSchemaFailed {
168        schema_lookup: String,
169        cause: Arc<dyn Error + Send + Sync>,
170    },
171    PostgresConnectionErr {
172        cause: Arc<mz_postgres_util::PostgresError>,
173    },
174    MySqlConnectionErr {
175        cause: Arc<MySqlError>,
176    },
177    SqlServerConnectionErr {
178        cause: Arc<SqlServerError>,
179    },
180    SubsourceNameConflict {
181        name: UnresolvedItemName,
182        upstream_references: Vec<UnresolvedItemName>,
183    },
184    SubsourceDuplicateReference {
185        name: UnresolvedItemName,
186        target_names: Vec<UnresolvedItemName>,
187    },
188    NoTablesFoundForSchemas(Vec<String>),
189    InvalidProtobufSchema {
190        cause: protobuf_native::OperationFailedError,
191    },
192    InvalidOptionValue {
193        // Expected to be generated from the `to_ast_string` value on the option
194        // name.
195        option_name: String,
196        err: Box<PlanError>,
197    },
198    UnexpectedDuplicateReference {
199        name: UnresolvedItemName,
200    },
201    /// Declaration of a recursive type did not match the inferred type.
202    RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
203    UnknownFunction {
204        name: String,
205        arg_types: Vec<String>,
206    },
207    IndistinctFunction {
208        name: String,
209        arg_types: Vec<String>,
210    },
211    UnknownOperator {
212        name: String,
213        arg_types: Vec<String>,
214    },
215    IndistinctOperator {
216        name: String,
217        arg_types: Vec<String>,
218    },
219    InvalidPrivatelinkAvailabilityZone {
220        name: String,
221        supported_azs: BTreeSet<String>,
222    },
223    DuplicatePrivatelinkAvailabilityZone {
224        duplicate_azs: BTreeSet<String>,
225    },
226    InvalidSchemaName,
227    ItemAlreadyExists {
228        name: String,
229        item_type: CatalogItemType,
230    },
231    ManagedCluster {
232        cluster_name: String,
233    },
234    InvalidKeysInSubscribeEnvelopeUpsert,
235    InvalidKeysInSubscribeEnvelopeDebezium,
236    InvalidPartitionByEnvelopeDebezium {
237        column_name: String,
238    },
239    InvalidOrderByInSubscribeWithinTimestampOrderBy,
240    FromValueRequiresParen,
241    VarError(VarError),
242    UnsolvablePolymorphicFunctionInput,
243    ShowCommandInView,
244    WebhookValidationDoesNotUseColumns,
245    WebhookValidationNonDeterministic,
246    InternalFunctionCall,
247    CommentTooLong {
248        length: usize,
249        max_size: usize,
250    },
251    InvalidTimestampInterval {
252        min: Duration,
253        max: Duration,
254        requested: Duration,
255    },
256    InvalidGroupSizeHints,
257    PgSourcePurification(PgSourcePurificationError),
258    KafkaSourcePurification(KafkaSourcePurificationError),
259    KafkaSinkPurification(KafkaSinkPurificationError),
260    IcebergSinkPurification(IcebergSinkPurificationError),
261    LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
262    CsrPurification(CsrPurificationError),
263    MySqlSourcePurification(MySqlSourcePurificationError),
264    SqlServerSourcePurificationError(SqlServerSourcePurificationError),
265    UseTablesForSources(String),
266    MissingName(CatalogItemType),
267    InvalidRefreshAt,
268    InvalidRefreshEveryAlignedTo,
269    CreateReplicaFailStorageObjects {
270        /// The current number of replicas on the cluster
271        current_replica_count: usize,
272        /// THe number of internal replicas on the cluster
273        internal_replica_count: usize,
274        /// The number of replicas that executing this command would have
275        /// created
276        hypothetical_replica_count: usize,
277    },
278    MismatchedObjectType {
279        name: PartialItemName,
280        is_type: ObjectType,
281        expected_type: ObjectType,
282    },
283    /// MZ failed to generate cast for the data type.
284    TableContainsUningestableTypes {
285        name: String,
286        type_: String,
287        column: String,
288    },
289    RetainHistoryLow {
290        limit: Duration,
291    },
292    RetainHistoryRequired,
293    UntilReadyTimeoutRequired,
294    SubsourceResolutionError(ExternalReferenceResolutionError),
295    Replan(String),
296    NetworkPolicyLockoutError,
297    NetworkPolicyInUse,
298    /// Expected a constant expression that evaluates without an error to a non-null value.
299    ConstantExpressionSimplificationFailed(String),
300    InvalidOffset(String),
301    /// The named cursor does not exist.
302    UnknownCursor(String),
303    CopyFromTargetTableDropped {
304        target_name: String,
305    },
306    /// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value.
307    InvalidAsOfUpTo,
308    InvalidReplacement {
309        item_type: CatalogItemType,
310        item_name: PartialItemName,
311        replacement_type: CatalogItemType,
312        replacement_name: PartialItemName,
313    },
314    // TODO(benesch): eventually all errors should be structured.
315    Unstructured(String),
316}
317
318impl PlanError {
319    pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
320        PlanError::UngroupedColumn {
321            table: item.table_name.clone(),
322            column: item.column_name.clone(),
323        }
324    }
325
326    pub fn detail(&self) -> Option<String> {
327        match self {
328            Self::NeverSupported { details, .. } => details.clone(),
329            Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
330            Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
331            Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
332            Self::InvalidOptionValue { err, .. } => err.detail(),
333            Self::UpsertSinkWithInvalidKey {
334                name,
335                desired_key,
336                valid_keys,
337            } => {
338                let valid_keys = if valid_keys.is_empty() {
339                    "There are no known valid unique keys for the underlying relation.".into()
340                } else {
341                    format!(
342                        "The following keys are known to be unique for the underlying relation:\n{}",
343                        valid_keys
344                            .iter()
345                            .map(|k|
346                                format!("  ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
347                            )
348                            .join("\n"),
349                    )
350                };
351                Some(format!(
352                    "Materialize could not prove that the specified upsert envelope key ({}) \
353                    was a unique key of the underlying relation {}. {valid_keys}",
354                    separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
355                    name.quoted()
356                ))
357            }
358            Self::VarError(e) => e.detail(),
359            Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
360            Self::PgSourcePurification(e) => e.detail(),
361            Self::MySqlSourcePurification(e) => e.detail(),
362            Self::SqlServerSourcePurificationError(e) => e.detail(),
363            Self::KafkaSourcePurification(e) => e.detail(),
364            Self::LoadGeneratorSourcePurification(e) => e.detail(),
365            Self::CsrPurification(e) => e.detail(),
366            Self::KafkaSinkPurification(e) => e.detail(),
367            Self::IcebergSinkPurification(e) => e.detail(),
368            Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
369                Some(format!(
370                    "Currently have {} replica{}{}; command would result in {}",
371                    current,
372                    if *current != 1 { "s" } else { "" },
373                    if *internal > 0 {
374                        format!(" ({} internal)", internal)
375                    } else {
376                        "".to_string()
377                    },
378                    target
379                ))
380            },
381            Self::SubsourceNameConflict {
382                name: _,
383                upstream_references,
384            } => Some(format!(
385                "referenced tables with duplicate name: {}",
386                itertools::join(upstream_references, ", ")
387            )),
388            Self::SubsourceDuplicateReference {
389                name: _,
390                target_names,
391            } => Some(format!(
392                "subsources referencing table: {}",
393                itertools::join(target_names, ", ")
394            )),
395            Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
396                "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
397            ),
398            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
399                "missing schemas: {}",
400                separated(", ", schemas.iter().map(|c| c.quoted()))
401            )),
402            _ => None,
403        }
404    }
405
406    pub fn hint(&self) -> Option<String> {
407        match self {
408            Self::DropViewOnMaterializedView(_) => {
409                Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
410            }
411            Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
412            Self::AlterViewOnMaterializedView(_) => {
413                Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
414            }
415            Self::ShowCreateViewOnMaterializedView(_) => {
416                Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
417            }
418            Self::ExplainViewOnMaterializedView(_) => {
419                Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
420            }
421            Self::UnacceptableTimelineName(_) => {
422                Some("The prefix \"mz_\" is reserved for system timelines.".into())
423            }
424            Self::PostgresConnectionErr { cause } => {
425                if let Some(cause) = cause.source() {
426                    if let Some(cause) = cause.downcast_ref::<io::Error>() {
427                        if cause.kind() == io::ErrorKind::TimedOut {
428                            return Some(
429                                "Do you have a firewall or security group that is \
430                                preventing Materialize from connecting to your PostgreSQL server?"
431                                    .into(),
432                            );
433                        }
434                    }
435                }
436                None
437            }
438            Self::InvalidOptionValue { err, .. } => err.hint(),
439            Self::UnknownFunction { ..} => Some("No function matches the given name and argument types.  You might need to add explicit type casts.".into()),
440            Self::IndistinctFunction {..} => {
441                Some("Could not choose a best candidate function.  You might need to add explicit type casts.".into())
442            }
443            Self::UnknownOperator {..} => {
444                Some("No operator matches the given name and argument types.  You might need to add explicit type casts.".into())
445            }
446            Self::IndistinctOperator {..} => {
447                Some("Could not choose a best candidate operator.  You might need to add explicit type casts.".into())
448            },
449            Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
450                let supported_azs_str = supported_azs.iter().join("\n  ");
451                Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n  {}", supported_azs_str))
452            }
453            Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
454                let duplicate_azs  = duplicate_azs.iter().join("\n  ");
455                Some(format!("Duplicated availability zones:\n  {}", duplicate_azs))
456            }
457            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
458                Some("All keys must be columns on the underlying relation.".into())
459            }
460            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
461                Some("All keys must be columns on the underlying relation.".into())
462            }
463            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
464                Some("All order bys must be output columns.".into())
465            }
466            Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
467                Some("See: https://materialize.com/s/sink-key-selection".into())
468            }
469            Self::Catalog(e) => e.hint(),
470            Self::VarError(e) => e.hint(),
471            Self::PgSourcePurification(e) => e.hint(),
472            Self::MySqlSourcePurification(e) => e.hint(),
473            Self::SqlServerSourcePurificationError(e) => e.hint(),
474            Self::KafkaSourcePurification(e) => e.hint(),
475            Self::LoadGeneratorSourcePurification(e) => e.hint(),
476            Self::CsrPurification(e) => e.hint(),
477            Self::KafkaSinkPurification(e) => e.hint(),
478            Self::UnknownColumn { table, similar, .. } => {
479                let suffix = "Make sure to surround case sensitive names in double quotes.";
480                match &similar[..] {
481                    [] => None,
482                    [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
483                    names => {
484                        let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
485                        Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
486                    }
487                }
488            }
489            Self::RecursiveTypeMismatch(..) => {
490                Some("You will need to rewrite or cast the query's expressions.".into())
491            },
492            Self::InvalidRefreshAt
493            | Self::InvalidRefreshEveryAlignedTo => {
494                Some("Calling `mz_now()` is allowed.".into())
495            },
496            Self::TableContainsUningestableTypes { column,.. } => {
497                Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
498            }
499            Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
500                Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
501            }
502            Self::NetworkPolicyInUse => {
503                Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
504            }
505            Self::WrongParameterType(_, _, _) => {
506                Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context.  Try adding an explicit cast.".into())
507            }
508            Self::InvalidSchemaName => {
509                Some("Use SET schema = name to select a schema.  Use SHOW SCHEMAS to list available schemas.  Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
510            }
511            _ => None,
512        }
513    }
514}
515
516impl fmt::Display for PlanError {
517    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
518        match self {
519            Self::Unsupported { feature, discussion_no } => {
520                write!(f, "{} not yet supported", feature)?;
521                if let Some(discussion_no) = discussion_no {
522                    write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
523                }
524                Ok(())
525            }
526            Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
527                write!(f, "{feature} is not supported",)?;
528                if let Some(documentation_path) = documentation_path {
529                    write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
530                }
531                Ok(())
532            }
533            Self::UnknownColumn { table, column, similar: _ } => write!(
534                f,
535                "column {} does not exist",
536                ColumnDisplay { table, column }
537            ),
538            Self::UngroupedColumn { table, column } => write!(
539                f,
540                "column {} must appear in the GROUP BY clause or be used in an aggregate function",
541                ColumnDisplay { table, column },
542            ),
543            Self::TypeWithoutColumns { type_name } => write!(
544                f,
545                "type {} does not have addressable columns",
546                type_name.item.quoted(),
547            ),
548            Self::WrongJoinTypeForLateralColumn { table, column } => write!(
549                f,
550                "column {} cannot be referenced from this part of the query: \
551                the combining JOIN type must be INNER or LEFT for a LATERAL reference",
552                ColumnDisplay { table, column },
553            ),
554            Self::AmbiguousColumn(column) => write!(
555                f,
556                "column reference {} is ambiguous",
557                column.quoted()
558            ),
559            Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
560                f,
561                "attempt to create relation with too many columns, {} max: {}",
562                req_num_columns, max_num_columns
563            ),
564            Self::ColumnAlreadyExists { column_name, object_name } => write!(
565                f,
566                "column {} of relation {} already exists",
567                column_name.quoted(), object_name.quoted(),
568            ),
569            Self::AmbiguousTable(table) => write!(
570                f,
571                "table reference {} is ambiguous",
572                table.item.as_str().quoted()
573            ),
574            Self::UnknownColumnInUsingClause { column, join_side } => write!(
575                f,
576                "column {} specified in USING clause does not exist in {} table",
577                column.quoted(),
578                join_side,
579            ),
580            Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
581                f,
582                "common column name {} appears more than once in {} table",
583                column.quoted(),
584                join_side,
585            ),
586            Self::MisqualifiedName(name) => write!(
587                f,
588                "qualified name did not have between 1 and 3 components: {}",
589                name
590            ),
591            Self::OverqualifiedDatabaseName(name) => write!(
592                f,
593                "database name '{}' does not have exactly one component",
594                name
595            ),
596            Self::OverqualifiedSchemaName(name) => write!(
597                f,
598                "schema name '{}' cannot have more than two components",
599                name
600            ),
601            Self::UnderqualifiedColumnName(name) => write!(
602                f,
603                "column name '{}' must have at least a table qualification",
604                name
605            ),
606            Self::UnacceptableTimelineName(name) => {
607                write!(f, "unacceptable timeline name {}", name.quoted(),)
608            }
609            Self::SubqueriesDisallowed { context } => {
610                write!(f, "{} does not allow subqueries", context)
611            }
612            Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
613            Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
614            Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
615            Self::RecursionLimit(e) => write!(f, "{}", e),
616            Self::StrconvParse(e) => write!(f, "{}", e),
617            Self::Catalog(e) => write!(f, "{}", e),
618            Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
619            Self::UpsertSinkWithInvalidKey { .. } => {
620                write!(f, "upsert key could not be validated as unique")
621            }
622            Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
623            Self::InvalidNumericMaxScale(e) => e.fmt(f),
624            Self::InvalidCharLength(e) => e.fmt(f),
625            Self::InvalidVarCharMaxLength(e) => e.fmt(f),
626            Self::InvalidTimestampPrecision(e) => e.fmt(f),
627            Self::Parser(e) => e.fmt(f),
628            Self::ParserStatement(e) => e.fmt(f),
629            Self::Unstructured(e) => write!(f, "{}", e),
630            Self::InvalidId(id) => write!(f, "invalid id {}", id),
631            Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
632            Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
633            Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
634            Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
635                write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
636            },
637            Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
638            Self::InvalidTemporarySchema => {
639                write!(f, "cannot create temporary item in non-temporary schema")
640            }
641            Self::InvalidCast { name, ccx, from, to } =>{
642                write!(
643                    f,
644                    "{name} does not support {ccx}casting from {from} to {to}",
645                    ccx = if matches!(ccx, CastContext::Implicit) {
646                        "implicitly "
647                    } else {
648                        ""
649                    },
650                )
651            }
652            Self::InvalidTable { name } => {
653                write!(f, "invalid table definition for {}", name.quoted())
654            },
655            Self::InvalidVersion { name, version } => {
656                write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
657            },
658            Self::InvalidSinkFrom { name, item_type } => {
659                write!(f, "{name} is a {item_type}, which cannot be exported as a sink")
660            },
661            Self::DropViewOnMaterializedView(name)
662            | Self::AlterViewOnMaterializedView(name)
663            | Self::ShowCreateViewOnMaterializedView(name)
664            | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
665            Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
666                write!(f, "failed to fetch schema {schema_lookup} from schema registry")
667            }
668            Self::PostgresConnectionErr { .. } => {
669                write!(f, "failed to connect to PostgreSQL database")
670            }
671            Self::MySqlConnectionErr { cause } => {
672                write!(f, "failed to connect to MySQL database: {}", cause)
673            }
674            Self::SqlServerConnectionErr { cause } => {
675                write!(f, "failed to connect to SQL Server database: {}", cause)
676            }
677            Self::SubsourceNameConflict {
678                name , upstream_references: _,
679            } => {
680                write!(f, "multiple subsources would be named {}", name)
681            },
682            Self::SubsourceDuplicateReference {
683                name,
684                target_names: _,
685            } => {
686                write!(f, "multiple subsources refer to table {}", name)
687            },
688            Self::NoTablesFoundForSchemas(schemas) => {
689                write!(f, "no tables found in referenced schemas: {}",
690                    separated(", ", schemas.iter().map(|c| c.quoted()))
691                )
692            },
693            Self::InvalidProtobufSchema { .. } => {
694                write!(f, "invalid protobuf schema")
695            }
696            Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
697                let reason = match &dependents[..] {
698                    [] => " because other objects depend on it".to_string(),
699                    dependents => {
700                        let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
701                        format!(": still depended upon by {dependents}")
702                    },
703                };
704                let object_name = object_name.quoted();
705                write!(f, "cannot drop {object_type} {object_name}{reason}")
706            }
707            Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
708            Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
709            Self::RecursiveTypeMismatch(name, declared, inferred) => {
710                let declared = separated(", ", declared);
711                let inferred = separated(", ", inferred);
712                let name = name.quoted();
713                write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
714            },
715            Self::UnknownFunction {name, arg_types, ..} => {
716                write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
717            },
718            Self::IndistinctFunction {name, arg_types, ..} => {
719                write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
720            },
721            Self::UnknownOperator {name, arg_types, ..} => {
722                write!(f, "operator does not exist: {}", match arg_types.as_slice(){
723                    [typ] => format!("{} {}", name, typ),
724                    [ltyp, rtyp] => {
725                        format!("{} {} {}", ltyp, name, rtyp)
726                    }
727                    _ => unreachable!("non-unary non-binary operator"),
728                })
729            },
730            Self::IndistinctOperator {name, arg_types, ..} => {
731                write!(f, "operator is not unique: {}", match arg_types.as_slice(){
732                    [typ] => format!("{} {}", name, typ),
733                    [ltyp, rtyp] => {
734                        format!("{} {} {}", ltyp, name, rtyp)
735                    }
736                    _ => unreachable!("non-unary non-binary operator"),
737                })
738            },
739            Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
740            Self::DuplicatePrivatelinkAvailabilityZone {..} =>   write!(f, "connection cannot contain duplicate availability zones"),
741            Self::InvalidSchemaName => write!(f, "no valid schema selected"),
742            Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
743            Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
744            Self::InvalidKeysInSubscribeEnvelopeUpsert => {
745                write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
746            }
747            Self::InvalidKeysInSubscribeEnvelopeDebezium => {
748                write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
749            }
750            Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
751                write!(
752                    f,
753                    "PARTITION BY expression cannot refer to non-key column {}",
754                    column_name.quoted(),
755                )
756            }
757            Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
758                write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
759            }
760            Self::FromValueRequiresParen => f.write_str(
761                "VALUES expression in FROM clause must be surrounded by parentheses"
762            ),
763            Self::VarError(e) => e.fmt(f),
764            Self::UnsolvablePolymorphicFunctionInput => f.write_str(
765                "could not determine polymorphic type because input has type unknown"
766            ),
767            Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
768            Self::WebhookValidationDoesNotUseColumns => f.write_str(
769                "expression provided in CHECK does not reference any columns"
770            ),
771            Self::WebhookValidationNonDeterministic => f.write_str(
772                "expression provided in CHECK is not deterministic"
773            ),
774            Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
775            Self::CommentTooLong { length, max_size } => {
776                write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
777            }
778            Self::InvalidTimestampInterval { min, max, requested } => {
779                write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
780            }
781            Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
782                simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
783                or LIMIT INPUT GROUP SIZE"),
784            Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
785            Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
786            Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
787            Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
788            Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
789            Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
790            Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
791            Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
792            Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
793            Self::MangedReplicaName(name) => {
794                write!(f, "{name} is reserved for replicas of managed clusters")
795            }
796            Self::MissingName(item_type) => {
797                write!(f, "unspecified name for {item_type}")
798            }
799            Self::InvalidRefreshAt => {
800                write!(f, "REFRESH AT argument must be an expression that can be simplified \
801                           and/or cast to a constant whose type is mz_timestamp")
802            }
803            Self::InvalidRefreshEveryAlignedTo => {
804                write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
805                           and/or cast to a constant whose type is mz_timestamp")
806            }
807            Self::CreateReplicaFailStorageObjects {..} => {
808                write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
809            },
810            Self::MismatchedObjectType {
811                name,
812                is_type,
813                expected_type,
814            } => {
815                write!(
816                    f,
817                    "{name} is {} {} not {} {}",
818                    if *is_type == ObjectType::Index {
819                        "an"
820                    } else {
821                        "a"
822                    },
823                    is_type.to_string().to_lowercase(),
824                    if *expected_type == ObjectType::Index {
825                        "an"
826                    } else {
827                        "a"
828                    },
829                    expected_type.to_string().to_lowercase()
830                )
831            }
832            Self::TableContainsUningestableTypes { name, type_, column } => {
833                write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
834            },
835            Self::RetainHistoryLow { limit } => {
836                write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
837            },
838            Self::RetainHistoryRequired => {
839                write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
840            },
841            Self::SubsourceResolutionError(e) => write!(f, "{}", e),
842            Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
843            Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
844            Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
845            Self::UntilReadyTimeoutRequired => {
846                write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
847            },
848            Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
849            Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
850            Self::UnknownCursor(name) => {
851                write!(f, "cursor {} does not exist", name.quoted())
852            }
853            Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
854            Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"),
855            Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => {
856                write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}")
857            }
858        }
859    }
860}
861
862impl Error for PlanError {}
863
864impl From<CatalogError> for PlanError {
865    fn from(e: CatalogError) -> PlanError {
866        PlanError::Catalog(e)
867    }
868}
869
870impl From<strconv::ParseError> for PlanError {
871    fn from(e: strconv::ParseError) -> PlanError {
872        PlanError::StrconvParse(e)
873    }
874}
875
876impl From<RecursionLimitError> for PlanError {
877    fn from(e: RecursionLimitError) -> PlanError {
878        PlanError::RecursionLimit(e)
879    }
880}
881
882impl From<InvalidNumericMaxScaleError> for PlanError {
883    fn from(e: InvalidNumericMaxScaleError) -> PlanError {
884        PlanError::InvalidNumericMaxScale(e)
885    }
886}
887
888impl From<InvalidCharLengthError> for PlanError {
889    fn from(e: InvalidCharLengthError) -> PlanError {
890        PlanError::InvalidCharLength(e)
891    }
892}
893
894impl From<InvalidVarCharMaxLengthError> for PlanError {
895    fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
896        PlanError::InvalidVarCharMaxLength(e)
897    }
898}
899
900impl From<InvalidTimestampPrecisionError> for PlanError {
901    fn from(e: InvalidTimestampPrecisionError) -> PlanError {
902        PlanError::InvalidTimestampPrecision(e)
903    }
904}
905
906impl From<anyhow::Error> for PlanError {
907    fn from(e: anyhow::Error) -> PlanError {
908        // WIP: Do we maybe want to keep the alternate selector for these?
909        sql_err!("{}", e.display_with_causes())
910    }
911}
912
913impl From<TryFromIntError> for PlanError {
914    fn from(e: TryFromIntError) -> PlanError {
915        sql_err!("{}", e.display_with_causes())
916    }
917}
918
919impl From<ParseIntError> for PlanError {
920    fn from(e: ParseIntError) -> PlanError {
921        sql_err!("{}", e.display_with_causes())
922    }
923}
924
925impl From<EvalError> for PlanError {
926    fn from(e: EvalError) -> PlanError {
927        sql_err!("{}", e.display_with_causes())
928    }
929}
930
931impl From<ParserError> for PlanError {
932    fn from(e: ParserError) -> PlanError {
933        PlanError::Parser(e)
934    }
935}
936
937impl From<ParserStatementError> for PlanError {
938    fn from(e: ParserStatementError) -> PlanError {
939        PlanError::ParserStatement(e)
940    }
941}
942
943impl From<PostgresError> for PlanError {
944    fn from(e: PostgresError) -> PlanError {
945        PlanError::PostgresConnectionErr { cause: Arc::new(e) }
946    }
947}
948
949impl From<MySqlError> for PlanError {
950    fn from(e: MySqlError) -> PlanError {
951        PlanError::MySqlConnectionErr { cause: Arc::new(e) }
952    }
953}
954
955impl From<SqlServerError> for PlanError {
956    fn from(e: SqlServerError) -> PlanError {
957        PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
958    }
959}
960
961impl From<VarError> for PlanError {
962    fn from(e: VarError) -> Self {
963        PlanError::VarError(e)
964    }
965}
966
967impl From<PgSourcePurificationError> for PlanError {
968    fn from(e: PgSourcePurificationError) -> Self {
969        PlanError::PgSourcePurification(e)
970    }
971}
972
973impl From<KafkaSourcePurificationError> for PlanError {
974    fn from(e: KafkaSourcePurificationError) -> Self {
975        PlanError::KafkaSourcePurification(e)
976    }
977}
978
979impl From<KafkaSinkPurificationError> for PlanError {
980    fn from(e: KafkaSinkPurificationError) -> Self {
981        PlanError::KafkaSinkPurification(e)
982    }
983}
984
985impl From<IcebergSinkPurificationError> for PlanError {
986    fn from(e: IcebergSinkPurificationError) -> Self {
987        PlanError::IcebergSinkPurification(e)
988    }
989}
990
991impl From<CsrPurificationError> for PlanError {
992    fn from(e: CsrPurificationError) -> Self {
993        PlanError::CsrPurification(e)
994    }
995}
996
997impl From<LoadGeneratorSourcePurificationError> for PlanError {
998    fn from(e: LoadGeneratorSourcePurificationError) -> Self {
999        PlanError::LoadGeneratorSourcePurification(e)
1000    }
1001}
1002
1003impl From<MySqlSourcePurificationError> for PlanError {
1004    fn from(e: MySqlSourcePurificationError) -> Self {
1005        PlanError::MySqlSourcePurification(e)
1006    }
1007}
1008
1009impl From<SqlServerSourcePurificationError> for PlanError {
1010    fn from(e: SqlServerSourcePurificationError) -> Self {
1011        PlanError::SqlServerSourcePurificationError(e)
1012    }
1013}
1014
1015impl From<IdentError> for PlanError {
1016    fn from(e: IdentError) -> Self {
1017        PlanError::InvalidIdent(e)
1018    }
1019}
1020
1021impl From<ExternalReferenceResolutionError> for PlanError {
1022    fn from(e: ExternalReferenceResolutionError) -> Self {
1023        PlanError::SubsourceResolutionError(e)
1024    }
1025}
1026
1027struct ColumnDisplay<'a> {
1028    table: &'a Option<PartialItemName>,
1029    column: &'a ColumnName,
1030}
1031
1032impl<'a> fmt::Display for ColumnDisplay<'a> {
1033    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1034        if let Some(table) = &self.table {
1035            format!("{}.{}", table.item, self.column).quoted().fmt(f)
1036        } else {
1037            self.column.quoted().fmt(f)
1038        }
1039    }
1040}