1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46 KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47 MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 ItemWithoutColumns {
74 name: String,
75 item_type: CatalogItemType,
76 },
77 WrongJoinTypeForLateralColumn {
78 table: Option<PartialItemName>,
79 column: ColumnName,
80 },
81 AmbiguousColumn(ColumnName),
82 TooManyColumns {
83 max_num_columns: usize,
84 req_num_columns: usize,
85 },
86 ColumnAlreadyExists {
87 column_name: ColumnName,
88 object_name: String,
89 },
90 AmbiguousTable(PartialItemName),
91 UnknownColumnInUsingClause {
92 column: ColumnName,
93 join_side: JoinSide,
94 },
95 AmbiguousColumnInUsingClause {
96 column: ColumnName,
97 join_side: JoinSide,
98 },
99 MisqualifiedName(String),
100 OverqualifiedDatabaseName(String),
101 OverqualifiedSchemaName(String),
102 UnderqualifiedColumnName(String),
103 SubqueriesDisallowed {
104 context: String,
105 },
106 UnknownParameter(usize),
107 ParameterNotAllowed(String),
108 WrongParameterType(usize, String, String),
109 RecursionLimit(RecursionLimitError),
110 StrconvParse(strconv::ParseError),
111 Catalog(CatalogError),
112 UpsertSinkWithoutKey,
113 UpsertSinkWithInvalidKey {
114 name: String,
115 desired_key: Vec<String>,
116 valid_keys: Vec<Vec<String>>,
117 },
118 InvalidWmrRecursionLimit(String),
119 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
120 InvalidCharLength(InvalidCharLengthError),
121 InvalidId(CatalogItemId),
122 InvalidIdent(IdentError),
123 InvalidObject(Box<ResolvedItemName>),
124 InvalidObjectType {
125 expected_type: SystemObjectType,
126 actual_type: SystemObjectType,
127 object_name: String,
128 },
129 InvalidPrivilegeTypes {
130 invalid_privileges: AclMode,
131 object_description: ErrorMessageObjectDescription,
132 },
133 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
134 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
135 InvalidSecret(Box<ResolvedItemName>),
136 InvalidTemporarySchema,
137 InvalidCast {
138 name: String,
139 ccx: CastContext,
140 from: String,
141 to: String,
142 },
143 InvalidTable {
144 name: String,
145 },
146 InvalidVersion {
147 name: String,
148 version: String,
149 },
150 InvalidSinkFrom {
151 name: String,
152 item_type: CatalogItemType,
153 },
154 InvalidDependency {
155 name: String,
156 item_type: CatalogItemType,
157 },
158 MangedReplicaName(String),
159 ParserStatement(ParserStatementError),
160 Parser(ParserError),
161 DropViewOnMaterializedView(String),
162 DependentObjectsStillExist {
163 object_type: String,
164 object_name: String,
165 dependents: Vec<(String, String)>,
167 },
168 AlterViewOnMaterializedView(String),
169 ShowCreateViewOnMaterializedView(String),
170 ExplainViewOnMaterializedView(String),
171 UnacceptableTimelineName(String),
172 FetchingCsrSchemaFailed {
173 schema_lookup: String,
174 cause: Arc<dyn Error + Send + Sync>,
175 },
176 PostgresConnectionErr {
177 cause: Arc<mz_postgres_util::PostgresError>,
178 },
179 MySqlConnectionErr {
180 cause: Arc<MySqlError>,
181 },
182 SqlServerConnectionErr {
183 cause: Arc<SqlServerError>,
184 },
185 SubsourceNameConflict {
186 name: UnresolvedItemName,
187 upstream_references: Vec<UnresolvedItemName>,
188 },
189 SubsourceDuplicateReference {
190 name: UnresolvedItemName,
191 target_names: Vec<UnresolvedItemName>,
192 },
193 NoTablesFoundForSchemas(Vec<String>),
194 InvalidProtobufSchema {
195 cause: protobuf_native::OperationFailedError,
196 },
197 InvalidOptionValue {
198 option_name: String,
201 err: Box<PlanError>,
202 },
203 UnexpectedDuplicateReference {
204 name: UnresolvedItemName,
205 },
206 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
208 UnknownFunction {
209 name: String,
210 arg_types: Vec<String>,
211 },
212 IndistinctFunction {
213 name: String,
214 arg_types: Vec<String>,
215 },
216 UnknownOperator {
217 name: String,
218 arg_types: Vec<String>,
219 },
220 IndistinctOperator {
221 name: String,
222 arg_types: Vec<String>,
223 },
224 InvalidPrivatelinkAvailabilityZone {
225 name: String,
226 supported_azs: BTreeSet<String>,
227 },
228 DuplicatePrivatelinkAvailabilityZone {
229 duplicate_azs: BTreeSet<String>,
230 },
231 InvalidSchemaName,
232 ItemAlreadyExists {
233 name: String,
234 item_type: CatalogItemType,
235 },
236 ManagedCluster {
237 cluster_name: String,
238 },
239 InvalidKeysInSubscribeEnvelopeUpsert,
240 InvalidKeysInSubscribeEnvelopeDebezium,
241 InvalidPartitionByEnvelopeDebezium {
242 column_name: String,
243 },
244 InvalidOrderByInSubscribeWithinTimestampOrderBy,
245 FromValueRequiresParen,
246 VarError(VarError),
247 UnsolvablePolymorphicFunctionInput,
248 ShowCommandInView,
249 WebhookValidationDoesNotUseColumns,
250 WebhookValidationNonDeterministic,
251 InternalFunctionCall,
252 CommentTooLong {
253 length: usize,
254 max_size: usize,
255 },
256 InvalidTimestampInterval {
257 min: Duration,
258 max: Duration,
259 requested: Duration,
260 },
261 InvalidGroupSizeHints,
262 PgSourcePurification(PgSourcePurificationError),
263 KafkaSourcePurification(KafkaSourcePurificationError),
264 KafkaSinkPurification(KafkaSinkPurificationError),
265 IcebergSinkPurification(IcebergSinkPurificationError),
266 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
267 CsrPurification(CsrPurificationError),
268 MySqlSourcePurification(MySqlSourcePurificationError),
269 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
270 UseTablesForSources(String),
271 MissingName(CatalogItemType),
272 InvalidRefreshAt,
273 InvalidRefreshEveryAlignedTo,
274 CreateReplicaFailStorageObjects {
275 current_replica_count: usize,
277 internal_replica_count: usize,
279 hypothetical_replica_count: usize,
282 },
283 MismatchedObjectType {
284 name: PartialItemName,
285 is_type: ObjectType,
286 expected_type: ObjectType,
287 },
288 TableContainsUningestableTypes {
290 name: String,
291 type_: String,
292 column: String,
293 },
294 RetainHistoryLow {
295 limit: Duration,
296 },
297 RetainHistoryRequired,
298 UntilReadyTimeoutRequired,
299 SubsourceResolutionError(ExternalReferenceResolutionError),
300 Replan(String),
301 NetworkPolicyLockoutError,
302 NetworkPolicyInUse,
303 ConstantExpressionSimplificationFailed(String),
305 InvalidOffset(String),
306 UnknownCursor(String),
308 CopyFromTargetTableDropped {
309 target_name: String,
310 },
311 InvalidAsOfUpTo,
313 InvalidReplacement {
314 item_type: CatalogItemType,
315 item_name: PartialItemName,
316 replacement_type: CatalogItemType,
317 replacement_name: PartialItemName,
318 },
319 Unstructured(String),
321}
322
323impl PlanError {
324 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
325 PlanError::UngroupedColumn {
326 table: item.table_name.clone(),
327 column: item.column_name.clone(),
328 }
329 }
330
331 pub fn detail(&self) -> Option<String> {
332 match self {
333 Self::NeverSupported { details, .. } => details.clone(),
334 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
335 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
336 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
337 Self::InvalidOptionValue { err, .. } => err.detail(),
338 Self::UpsertSinkWithInvalidKey {
339 name,
340 desired_key,
341 valid_keys,
342 } => {
343 let valid_keys = if valid_keys.is_empty() {
344 "There are no known valid unique keys for the underlying relation.".into()
345 } else {
346 format!(
347 "The following keys are known to be unique for the underlying relation:\n{}",
348 valid_keys
349 .iter()
350 .map(|k|
351 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
352 )
353 .join("\n"),
354 )
355 };
356 Some(format!(
357 "Materialize could not prove that the specified upsert envelope key ({}) \
358 was a unique key of the underlying relation {}. {valid_keys}",
359 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
360 name.quoted()
361 ))
362 }
363 Self::VarError(e) => e.detail(),
364 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
365 Self::PgSourcePurification(e) => e.detail(),
366 Self::MySqlSourcePurification(e) => e.detail(),
367 Self::SqlServerSourcePurificationError(e) => e.detail(),
368 Self::KafkaSourcePurification(e) => e.detail(),
369 Self::LoadGeneratorSourcePurification(e) => e.detail(),
370 Self::CsrPurification(e) => e.detail(),
371 Self::KafkaSinkPurification(e) => e.detail(),
372 Self::IcebergSinkPurification(e) => e.detail(),
373 Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
374 Some(format!(
375 "Currently have {} replica{}{}; command would result in {}",
376 current,
377 if *current != 1 { "s" } else { "" },
378 if *internal > 0 {
379 format!(" ({} internal)", internal)
380 } else {
381 "".to_string()
382 },
383 target
384 ))
385 },
386 Self::SubsourceNameConflict {
387 name: _,
388 upstream_references,
389 } => Some(format!(
390 "referenced tables with duplicate name: {}",
391 itertools::join(upstream_references, ", ")
392 )),
393 Self::SubsourceDuplicateReference {
394 name: _,
395 target_names,
396 } => Some(format!(
397 "subsources referencing table: {}",
398 itertools::join(target_names, ", ")
399 )),
400 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
401 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
402 ),
403 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
404 "missing schemas: {}",
405 separated(", ", schemas.iter().map(|c| c.quoted()))
406 )),
407 _ => None,
408 }
409 }
410
411 pub fn hint(&self) -> Option<String> {
412 match self {
413 Self::DropViewOnMaterializedView(_) => {
414 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
415 }
416 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
417 Self::AlterViewOnMaterializedView(_) => {
418 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
419 }
420 Self::ShowCreateViewOnMaterializedView(_) => {
421 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
422 }
423 Self::ExplainViewOnMaterializedView(_) => {
424 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
425 }
426 Self::UnacceptableTimelineName(_) => {
427 Some("The prefix \"mz_\" is reserved for system timelines.".into())
428 }
429 Self::PostgresConnectionErr { cause } => {
430 if let Some(cause) = cause.source() {
431 if let Some(cause) = cause.downcast_ref::<io::Error>() {
432 if cause.kind() == io::ErrorKind::TimedOut {
433 return Some(
434 "Do you have a firewall or security group that is \
435 preventing Materialize from connecting to your PostgreSQL server?"
436 .into(),
437 );
438 }
439 }
440 }
441 None
442 }
443 Self::InvalidOptionValue { err, .. } => err.hint(),
444 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
445 Self::IndistinctFunction {..} => {
446 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
447 }
448 Self::UnknownOperator {..} => {
449 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
450 }
451 Self::IndistinctOperator {..} => {
452 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
453 },
454 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
455 let supported_azs_str = supported_azs.iter().join("\n ");
456 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
457 }
458 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
459 let duplicate_azs = duplicate_azs.iter().join("\n ");
460 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
461 }
462 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
463 Some("All keys must be columns on the underlying relation.".into())
464 }
465 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
466 Some("All keys must be columns on the underlying relation.".into())
467 }
468 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
469 Some("All order bys must be output columns.".into())
470 }
471 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
472 Some("See: https://materialize.com/s/sink-key-selection".into())
473 }
474 Self::Catalog(e) => e.hint(),
475 Self::VarError(e) => e.hint(),
476 Self::PgSourcePurification(e) => e.hint(),
477 Self::MySqlSourcePurification(e) => e.hint(),
478 Self::SqlServerSourcePurificationError(e) => e.hint(),
479 Self::KafkaSourcePurification(e) => e.hint(),
480 Self::LoadGeneratorSourcePurification(e) => e.hint(),
481 Self::CsrPurification(e) => e.hint(),
482 Self::KafkaSinkPurification(e) => e.hint(),
483 Self::UnknownColumn { table, similar, .. } => {
484 let suffix = "Make sure to surround case sensitive names in double quotes.";
485 match &similar[..] {
486 [] => None,
487 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
488 names => {
489 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
490 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
491 }
492 }
493 }
494 Self::RecursiveTypeMismatch(..) => {
495 Some("You will need to rewrite or cast the query's expressions.".into())
496 },
497 Self::InvalidRefreshAt
498 | Self::InvalidRefreshEveryAlignedTo => {
499 Some("Calling `mz_now()` is allowed.".into())
500 },
501 Self::TableContainsUningestableTypes { column,.. } => {
502 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
503 }
504 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
505 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
506 }
507 Self::NetworkPolicyInUse => {
508 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
509 }
510 Self::WrongParameterType(_, _, _) => {
511 Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context. Try adding an explicit cast.".into())
512 }
513 Self::InvalidSchemaName => {
514 Some("Use SET schema = name to select a schema. Use SHOW SCHEMAS to list available schemas. Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
515 }
516 _ => None,
517 }
518 }
519}
520
521impl fmt::Display for PlanError {
522 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
523 match self {
524 Self::Unsupported { feature, discussion_no } => {
525 write!(f, "{} not yet supported", feature)?;
526 if let Some(discussion_no) = discussion_no {
527 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
528 }
529 Ok(())
530 }
531 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
532 write!(f, "{feature} is not supported",)?;
533 if let Some(documentation_path) = documentation_path {
534 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
535 }
536 Ok(())
537 }
538 Self::UnknownColumn { table, column, similar: _ } => write!(
539 f,
540 "column {} does not exist",
541 ColumnDisplay { table, column }
542 ),
543 Self::UngroupedColumn { table, column } => write!(
544 f,
545 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
546 ColumnDisplay { table, column },
547 ),
548 Self::ItemWithoutColumns { name, item_type } => {
549 let name = name.quoted();
550 write!(f, "{item_type} {name} does not have columns")
551 }
552 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
553 f,
554 "column {} cannot be referenced from this part of the query: \
555 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
556 ColumnDisplay { table, column },
557 ),
558 Self::AmbiguousColumn(column) => write!(
559 f,
560 "column reference {} is ambiguous",
561 column.quoted()
562 ),
563 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
564 f,
565 "attempt to create relation with too many columns, {} max: {}",
566 req_num_columns, max_num_columns
567 ),
568 Self::ColumnAlreadyExists { column_name, object_name } => write!(
569 f,
570 "column {} of relation {} already exists",
571 column_name.quoted(), object_name.quoted(),
572 ),
573 Self::AmbiguousTable(table) => write!(
574 f,
575 "table reference {} is ambiguous",
576 table.item.as_str().quoted()
577 ),
578 Self::UnknownColumnInUsingClause { column, join_side } => write!(
579 f,
580 "column {} specified in USING clause does not exist in {} table",
581 column.quoted(),
582 join_side,
583 ),
584 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
585 f,
586 "common column name {} appears more than once in {} table",
587 column.quoted(),
588 join_side,
589 ),
590 Self::MisqualifiedName(name) => write!(
591 f,
592 "qualified name did not have between 1 and 3 components: {}",
593 name
594 ),
595 Self::OverqualifiedDatabaseName(name) => write!(
596 f,
597 "database name '{}' does not have exactly one component",
598 name
599 ),
600 Self::OverqualifiedSchemaName(name) => write!(
601 f,
602 "schema name '{}' cannot have more than two components",
603 name
604 ),
605 Self::UnderqualifiedColumnName(name) => write!(
606 f,
607 "column name '{}' must have at least a table qualification",
608 name
609 ),
610 Self::UnacceptableTimelineName(name) => {
611 write!(f, "unacceptable timeline name {}", name.quoted(),)
612 }
613 Self::SubqueriesDisallowed { context } => {
614 write!(f, "{} does not allow subqueries", context)
615 }
616 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
617 Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
618 Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
619 Self::RecursionLimit(e) => write!(f, "{}", e),
620 Self::StrconvParse(e) => write!(f, "{}", e),
621 Self::Catalog(e) => write!(f, "{}", e),
622 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
623 Self::UpsertSinkWithInvalidKey { .. } => {
624 write!(f, "upsert key could not be validated as unique")
625 }
626 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
627 Self::InvalidNumericMaxScale(e) => e.fmt(f),
628 Self::InvalidCharLength(e) => e.fmt(f),
629 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
630 Self::InvalidTimestampPrecision(e) => e.fmt(f),
631 Self::Parser(e) => e.fmt(f),
632 Self::ParserStatement(e) => e.fmt(f),
633 Self::Unstructured(e) => write!(f, "{}", e),
634 Self::InvalidId(id) => write!(f, "invalid id {}", id),
635 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
636 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
637 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
638 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
639 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
640 },
641 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
642 Self::InvalidTemporarySchema => {
643 write!(f, "cannot create temporary item in non-temporary schema")
644 }
645 Self::InvalidCast { name, ccx, from, to } =>{
646 write!(
647 f,
648 "{name} does not support {ccx}casting from {from} to {to}",
649 ccx = if matches!(ccx, CastContext::Implicit) {
650 "implicitly "
651 } else {
652 ""
653 },
654 )
655 }
656 Self::InvalidTable { name } => {
657 write!(f, "invalid table definition for {}", name.quoted())
658 },
659 Self::InvalidVersion { name, version } => {
660 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
661 },
662 Self::InvalidSinkFrom { name, item_type } => {
663 write!(f, "{name} is a {item_type}, which cannot be exported as a sink")
664 },
665 Self::InvalidDependency { name, item_type } => {
666 let a = if *item_type == CatalogItemType::Index { "an" } else { "a" };
667 write!(f, "{name} is {a} {item_type}, which cannot be depended upon")
668 },
669 Self::DropViewOnMaterializedView(name)
670 | Self::AlterViewOnMaterializedView(name)
671 | Self::ShowCreateViewOnMaterializedView(name)
672 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
673 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
674 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
675 }
676 Self::PostgresConnectionErr { .. } => {
677 write!(f, "failed to connect to PostgreSQL database")
678 }
679 Self::MySqlConnectionErr { cause } => {
680 write!(f, "failed to connect to MySQL database: {}", cause)
681 }
682 Self::SqlServerConnectionErr { cause } => {
683 write!(f, "failed to connect to SQL Server database: {}", cause)
684 }
685 Self::SubsourceNameConflict {
686 name , upstream_references: _,
687 } => {
688 write!(f, "multiple subsources would be named {}", name)
689 },
690 Self::SubsourceDuplicateReference {
691 name,
692 target_names: _,
693 } => {
694 write!(f, "multiple subsources refer to table {}", name)
695 },
696 Self::NoTablesFoundForSchemas(schemas) => {
697 write!(f, "no tables found in referenced schemas: {}",
698 separated(", ", schemas.iter().map(|c| c.quoted()))
699 )
700 },
701 Self::InvalidProtobufSchema { .. } => {
702 write!(f, "invalid protobuf schema")
703 }
704 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
705 let reason = match &dependents[..] {
706 [] => " because other objects depend on it".to_string(),
707 dependents => {
708 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
709 format!(": still depended upon by {dependents}")
710 },
711 };
712 let object_name = object_name.quoted();
713 write!(f, "cannot drop {object_type} {object_name}{reason}")
714 }
715 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
716 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
717 Self::RecursiveTypeMismatch(name, declared, inferred) => {
718 let declared = separated(", ", declared);
719 let inferred = separated(", ", inferred);
720 let name = name.quoted();
721 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
722 },
723 Self::UnknownFunction {name, arg_types, ..} => {
724 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
725 },
726 Self::IndistinctFunction {name, arg_types, ..} => {
727 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
728 },
729 Self::UnknownOperator {name, arg_types, ..} => {
730 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
731 [typ] => format!("{} {}", name, typ),
732 [ltyp, rtyp] => {
733 format!("{} {} {}", ltyp, name, rtyp)
734 }
735 _ => unreachable!("non-unary non-binary operator"),
736 })
737 },
738 Self::IndistinctOperator {name, arg_types, ..} => {
739 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
740 [typ] => format!("{} {}", name, typ),
741 [ltyp, rtyp] => {
742 format!("{} {} {}", ltyp, name, rtyp)
743 }
744 _ => unreachable!("non-unary non-binary operator"),
745 })
746 },
747 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
748 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
749 Self::InvalidSchemaName => write!(f, "no valid schema selected"),
750 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
751 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
752 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
753 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
754 }
755 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
756 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
757 }
758 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
759 write!(
760 f,
761 "PARTITION BY expression cannot refer to non-key column {}",
762 column_name.quoted(),
763 )
764 }
765 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
766 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
767 }
768 Self::FromValueRequiresParen => f.write_str(
769 "VALUES expression in FROM clause must be surrounded by parentheses"
770 ),
771 Self::VarError(e) => e.fmt(f),
772 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
773 "could not determine polymorphic type because input has type unknown"
774 ),
775 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
776 Self::WebhookValidationDoesNotUseColumns => f.write_str(
777 "expression provided in CHECK does not reference any columns"
778 ),
779 Self::WebhookValidationNonDeterministic => f.write_str(
780 "expression provided in CHECK is not deterministic"
781 ),
782 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
783 Self::CommentTooLong { length, max_size } => {
784 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
785 }
786 Self::InvalidTimestampInterval { min, max, requested } => {
787 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
788 }
789 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
790 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
791 or LIMIT INPUT GROUP SIZE"),
792 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
793 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
794 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
795 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
796 Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
797 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
798 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
799 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
800 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
801 Self::MangedReplicaName(name) => {
802 write!(f, "{name} is reserved for replicas of managed clusters")
803 }
804 Self::MissingName(item_type) => {
805 write!(f, "unspecified name for {item_type}")
806 }
807 Self::InvalidRefreshAt => {
808 write!(f, "REFRESH AT argument must be an expression that can be simplified \
809 and/or cast to a constant whose type is mz_timestamp")
810 }
811 Self::InvalidRefreshEveryAlignedTo => {
812 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
813 and/or cast to a constant whose type is mz_timestamp")
814 }
815 Self::CreateReplicaFailStorageObjects {..} => {
816 write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
817 },
818 Self::MismatchedObjectType {
819 name,
820 is_type,
821 expected_type,
822 } => {
823 write!(
824 f,
825 "{name} is {} {} not {} {}",
826 if *is_type == ObjectType::Index {
827 "an"
828 } else {
829 "a"
830 },
831 is_type.to_string().to_lowercase(),
832 if *expected_type == ObjectType::Index {
833 "an"
834 } else {
835 "a"
836 },
837 expected_type.to_string().to_lowercase()
838 )
839 }
840 Self::TableContainsUningestableTypes { name, type_, column } => {
841 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
842 },
843 Self::RetainHistoryLow { limit } => {
844 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
845 },
846 Self::RetainHistoryRequired => {
847 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
848 },
849 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
850 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
851 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
852 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
853 Self::UntilReadyTimeoutRequired => {
854 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
855 },
856 Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
857 Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
858 Self::UnknownCursor(name) => {
859 write!(f, "cursor {} does not exist", name.quoted())
860 }
861 Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
862 Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"),
863 Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => {
864 write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}")
865 }
866 }
867 }
868}
869
870impl Error for PlanError {}
871
872impl From<CatalogError> for PlanError {
873 fn from(e: CatalogError) -> PlanError {
874 PlanError::Catalog(e)
875 }
876}
877
878impl From<strconv::ParseError> for PlanError {
879 fn from(e: strconv::ParseError) -> PlanError {
880 PlanError::StrconvParse(e)
881 }
882}
883
884impl From<RecursionLimitError> for PlanError {
885 fn from(e: RecursionLimitError) -> PlanError {
886 PlanError::RecursionLimit(e)
887 }
888}
889
890impl From<InvalidNumericMaxScaleError> for PlanError {
891 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
892 PlanError::InvalidNumericMaxScale(e)
893 }
894}
895
896impl From<InvalidCharLengthError> for PlanError {
897 fn from(e: InvalidCharLengthError) -> PlanError {
898 PlanError::InvalidCharLength(e)
899 }
900}
901
902impl From<InvalidVarCharMaxLengthError> for PlanError {
903 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
904 PlanError::InvalidVarCharMaxLength(e)
905 }
906}
907
908impl From<InvalidTimestampPrecisionError> for PlanError {
909 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
910 PlanError::InvalidTimestampPrecision(e)
911 }
912}
913
914impl From<anyhow::Error> for PlanError {
915 fn from(e: anyhow::Error) -> PlanError {
916 sql_err!("{}", e.display_with_causes())
918 }
919}
920
921impl From<TryFromIntError> for PlanError {
922 fn from(e: TryFromIntError) -> PlanError {
923 sql_err!("{}", e.display_with_causes())
924 }
925}
926
927impl From<ParseIntError> for PlanError {
928 fn from(e: ParseIntError) -> PlanError {
929 sql_err!("{}", e.display_with_causes())
930 }
931}
932
933impl From<EvalError> for PlanError {
934 fn from(e: EvalError) -> PlanError {
935 sql_err!("{}", e.display_with_causes())
936 }
937}
938
939impl From<ParserError> for PlanError {
940 fn from(e: ParserError) -> PlanError {
941 PlanError::Parser(e)
942 }
943}
944
945impl From<ParserStatementError> for PlanError {
946 fn from(e: ParserStatementError) -> PlanError {
947 PlanError::ParserStatement(e)
948 }
949}
950
951impl From<PostgresError> for PlanError {
952 fn from(e: PostgresError) -> PlanError {
953 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
954 }
955}
956
957impl From<MySqlError> for PlanError {
958 fn from(e: MySqlError) -> PlanError {
959 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
960 }
961}
962
963impl From<SqlServerError> for PlanError {
964 fn from(e: SqlServerError) -> PlanError {
965 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
966 }
967}
968
969impl From<VarError> for PlanError {
970 fn from(e: VarError) -> Self {
971 PlanError::VarError(e)
972 }
973}
974
975impl From<PgSourcePurificationError> for PlanError {
976 fn from(e: PgSourcePurificationError) -> Self {
977 PlanError::PgSourcePurification(e)
978 }
979}
980
981impl From<KafkaSourcePurificationError> for PlanError {
982 fn from(e: KafkaSourcePurificationError) -> Self {
983 PlanError::KafkaSourcePurification(e)
984 }
985}
986
987impl From<KafkaSinkPurificationError> for PlanError {
988 fn from(e: KafkaSinkPurificationError) -> Self {
989 PlanError::KafkaSinkPurification(e)
990 }
991}
992
993impl From<IcebergSinkPurificationError> for PlanError {
994 fn from(e: IcebergSinkPurificationError) -> Self {
995 PlanError::IcebergSinkPurification(e)
996 }
997}
998
999impl From<CsrPurificationError> for PlanError {
1000 fn from(e: CsrPurificationError) -> Self {
1001 PlanError::CsrPurification(e)
1002 }
1003}
1004
1005impl From<LoadGeneratorSourcePurificationError> for PlanError {
1006 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
1007 PlanError::LoadGeneratorSourcePurification(e)
1008 }
1009}
1010
1011impl From<MySqlSourcePurificationError> for PlanError {
1012 fn from(e: MySqlSourcePurificationError) -> Self {
1013 PlanError::MySqlSourcePurification(e)
1014 }
1015}
1016
1017impl From<SqlServerSourcePurificationError> for PlanError {
1018 fn from(e: SqlServerSourcePurificationError) -> Self {
1019 PlanError::SqlServerSourcePurificationError(e)
1020 }
1021}
1022
1023impl From<IdentError> for PlanError {
1024 fn from(e: IdentError) -> Self {
1025 PlanError::InvalidIdent(e)
1026 }
1027}
1028
1029impl From<ExternalReferenceResolutionError> for PlanError {
1030 fn from(e: ExternalReferenceResolutionError) -> Self {
1031 PlanError::SubsourceResolutionError(e)
1032 }
1033}
1034
1035struct ColumnDisplay<'a> {
1036 table: &'a Option<PartialItemName>,
1037 column: &'a ColumnName,
1038}
1039
1040impl<'a> fmt::Display for ColumnDisplay<'a> {
1041 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1042 if let Some(table) = &self.table {
1043 format!("{}.{}", table.item, self.column).quoted().fmt(f)
1044 } else {
1045 self.column.quoted().fmt(f)
1046 }
1047 }
1048}