1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46 KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47 MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 ItemWithoutColumns {
74 name: String,
75 item_type: CatalogItemType,
76 },
77 WrongJoinTypeForLateralColumn {
78 table: Option<PartialItemName>,
79 column: ColumnName,
80 },
81 AmbiguousColumn(ColumnName),
82 TooManyColumns {
83 max_num_columns: usize,
84 req_num_columns: usize,
85 },
86 ColumnAlreadyExists {
87 column_name: ColumnName,
88 object_name: String,
89 },
90 AmbiguousTable(PartialItemName),
91 UnknownColumnInUsingClause {
92 column: ColumnName,
93 join_side: JoinSide,
94 },
95 AmbiguousColumnInUsingClause {
96 column: ColumnName,
97 join_side: JoinSide,
98 },
99 MisqualifiedName(String),
100 OverqualifiedDatabaseName(String),
101 OverqualifiedSchemaName(String),
102 UnderqualifiedColumnName(String),
103 SubqueriesDisallowed {
104 context: String,
105 },
106 UnknownParameter(usize),
107 ParameterNotAllowed(String),
108 WrongParameterType(usize, String, String),
109 RecursionLimit(RecursionLimitError),
110 StrconvParse(strconv::ParseError),
111 Catalog(CatalogError),
112 UpsertSinkWithoutKey,
113 UpsertSinkWithInvalidKey {
114 name: String,
115 desired_key: Vec<String>,
116 valid_keys: Vec<Vec<String>>,
117 },
118 IcebergSinkUnsupportedKeyType {
119 column: String,
120 column_type: String,
121 },
122 InvalidWmrRecursionLimit(String),
123 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
124 InvalidCharLength(InvalidCharLengthError),
125 InvalidId(CatalogItemId),
126 InvalidIdent(IdentError),
127 InvalidObject(Box<ResolvedItemName>),
128 InvalidObjectType {
129 expected_type: SystemObjectType,
130 actual_type: SystemObjectType,
131 object_name: String,
132 },
133 InvalidPrivilegeTypes {
134 invalid_privileges: AclMode,
135 object_description: ErrorMessageObjectDescription,
136 },
137 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
138 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
139 InvalidSecret(Box<ResolvedItemName>),
140 InvalidTemporarySchema,
141 InvalidCast {
142 name: String,
143 ccx: CastContext,
144 from: String,
145 to: String,
146 },
147 UnsupportedRangeElementType {
149 element_type_name: String,
150 },
151 InvalidTable {
152 name: String,
153 },
154 InvalidVersion {
155 name: String,
156 version: String,
157 },
158 InvalidSinkFrom {
159 name: String,
160 item_type: String,
161 },
162 InvalidDependency {
163 name: String,
164 item_type: String,
165 },
166 MangedReplicaName(String),
167 ParserStatement(ParserStatementError),
168 Parser(ParserError),
169 DropViewOnMaterializedView(String),
170 DependentObjectsStillExist {
171 object_type: String,
172 object_name: String,
173 dependents: Vec<(String, String)>,
175 },
176 AlterViewOnMaterializedView(String),
177 ShowCreateViewOnMaterializedView(String),
178 ExplainViewOnMaterializedView(String),
179 UnacceptableTimelineName(String),
180 FetchingCsrSchemaFailed {
181 schema_lookup: String,
182 cause: Arc<dyn Error + Send + Sync>,
183 },
184 PostgresConnectionErr {
185 cause: Arc<mz_postgres_util::PostgresError>,
186 },
187 MySqlConnectionErr {
188 cause: Arc<MySqlError>,
189 },
190 SqlServerConnectionErr {
191 cause: Arc<SqlServerError>,
192 },
193 SubsourceNameConflict {
194 name: UnresolvedItemName,
195 upstream_references: Vec<UnresolvedItemName>,
196 },
197 SubsourceDuplicateReference {
198 name: UnresolvedItemName,
199 target_names: Vec<UnresolvedItemName>,
200 },
201 NoTablesFoundForSchemas(Vec<String>),
202 InvalidProtobufSchema {
203 cause: protobuf_native::OperationFailedError,
204 },
205 InvalidOptionValue {
206 option_name: String,
209 err: Box<PlanError>,
210 },
211 UnexpectedDuplicateReference {
212 name: UnresolvedItemName,
213 },
214 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
216 UnknownFunction {
217 name: String,
218 arg_types: Vec<String>,
219 },
220 IndistinctFunction {
221 name: String,
222 arg_types: Vec<String>,
223 },
224 UnknownOperator {
225 name: String,
226 arg_types: Vec<String>,
227 },
228 IndistinctOperator {
229 name: String,
230 arg_types: Vec<String>,
231 },
232 InvalidPrivatelinkAvailabilityZone {
233 name: String,
234 supported_azs: BTreeSet<String>,
235 },
236 DuplicatePrivatelinkAvailabilityZone {
237 duplicate_azs: BTreeSet<String>,
238 },
239 InvalidSchemaName,
240 ItemAlreadyExists {
241 name: String,
242 item_type: CatalogItemType,
243 },
244 ManagedCluster {
245 cluster_name: String,
246 },
247 InvalidKeysInSubscribeEnvelopeUpsert,
248 InvalidKeysInSubscribeEnvelopeDebezium,
249 DuplicateKeyColumnInSubscribeEnvelope {
250 column_name: String,
251 },
252 InvalidPartitionByEnvelopeDebezium {
253 column_name: String,
254 },
255 InvalidOrderByInSubscribeWithinTimestampOrderBy,
256 FromValueRequiresParen,
257 VarError(VarError),
258 UnsolvablePolymorphicFunctionInput,
259 ShowCommandInView,
260 WebhookValidationDoesNotUseColumns,
261 WebhookValidationNonDeterministic,
262 InternalFunctionCall,
263 CommentTooLong {
264 length: usize,
265 max_size: usize,
266 },
267 InvalidTimestampInterval {
268 min: Duration,
269 max: Duration,
270 requested: Duration,
271 },
272 InvalidGroupSizeHints,
273 PgSourcePurification(PgSourcePurificationError),
274 KafkaSourcePurification(KafkaSourcePurificationError),
275 KafkaSinkPurification(KafkaSinkPurificationError),
276 IcebergSinkPurification(IcebergSinkPurificationError),
277 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
278 CsrPurification(CsrPurificationError),
279 MySqlSourcePurification(MySqlSourcePurificationError),
280 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
281 UseTablesForSources(String),
282 MissingName(CatalogItemType),
283 InvalidRefreshAt,
284 InvalidRefreshEveryAlignedTo,
285 MismatchedObjectType {
286 name: PartialItemName,
287 is_type: ObjectType,
288 expected_type: ObjectType,
289 },
290 TableContainsUningestableTypes {
292 name: String,
293 type_: String,
294 column: String,
295 },
296 RetainHistoryLow {
297 limit: Duration,
298 },
299 RetainHistoryRequired,
300 UntilReadyTimeoutRequired,
301 SubsourceResolutionError(ExternalReferenceResolutionError),
302 Replan(String),
303 Internal(String),
304 NetworkPolicyLockoutError,
305 NetworkPolicyInUse,
306 ConstantExpressionSimplificationFailed(String),
308 InvalidOffset(String),
309 UnknownCursor(String),
311 CopyFromTargetTableDropped {
312 target_name: String,
313 },
314 InvalidAsOfUpTo,
316 InvalidReplacement {
317 item_type: CatalogItemType,
318 item_name: PartialItemName,
319 replacement_type: CatalogItemType,
320 replacement_name: PartialItemName,
321 },
322 Unstructured(String),
324}
325
326impl PlanError {
327 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
328 PlanError::UngroupedColumn {
329 table: item.table_name.clone(),
330 column: item.column_name.clone(),
331 }
332 }
333
334 pub fn detail(&self) -> Option<String> {
335 match self {
336 Self::NeverSupported { details, .. } => details.clone(),
337 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
338 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
339 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
340 Self::InvalidOptionValue { err, .. } => err.detail(),
341 Self::UpsertSinkWithInvalidKey {
342 name,
343 desired_key,
344 valid_keys,
345 } => {
346 let valid_keys = if valid_keys.is_empty() {
347 "There are no known valid unique keys for the underlying relation.".into()
348 } else {
349 format!(
350 "The following keys are known to be unique for the underlying relation:\n{}",
351 valid_keys
352 .iter()
353 .map(|k|
354 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
355 )
356 .join("\n"),
357 )
358 };
359 Some(format!(
360 "Materialize could not prove that the specified upsert envelope key ({}) \
361 was a unique key of the underlying relation {}. {valid_keys}",
362 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
363 name.quoted()
364 ))
365 }
366 Self::VarError(e) => e.detail(),
367 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
368 Self::PgSourcePurification(e) => e.detail(),
369 Self::MySqlSourcePurification(e) => e.detail(),
370 Self::SqlServerSourcePurificationError(e) => e.detail(),
371 Self::KafkaSourcePurification(e) => e.detail(),
372 Self::LoadGeneratorSourcePurification(e) => e.detail(),
373 Self::CsrPurification(e) => e.detail(),
374 Self::KafkaSinkPurification(e) => e.detail(),
375 Self::IcebergSinkPurification(e) => e.detail(),
376 Self::SubsourceNameConflict {
377 name: _,
378 upstream_references,
379 } => Some(format!(
380 "referenced tables with duplicate name: {}",
381 itertools::join(upstream_references, ", ")
382 )),
383 Self::SubsourceDuplicateReference {
384 name: _,
385 target_names,
386 } => Some(format!(
387 "subsources referencing table: {}",
388 itertools::join(target_names, ", ")
389 )),
390 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
391 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
392 ),
393 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
394 "missing schemas: {}",
395 separated(", ", schemas.iter().map(|c| c.quoted()))
396 )),
397 _ => None,
398 }
399 }
400
401 pub fn hint(&self) -> Option<String> {
402 match self {
403 Self::DropViewOnMaterializedView(_) => {
404 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
405 }
406 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
407 Self::AlterViewOnMaterializedView(_) => {
408 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
409 }
410 Self::ShowCreateViewOnMaterializedView(_) => {
411 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
412 }
413 Self::ExplainViewOnMaterializedView(_) => {
414 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
415 }
416 Self::UnacceptableTimelineName(_) => {
417 Some("The prefix \"mz_\" is reserved for system timelines.".into())
418 }
419 Self::PostgresConnectionErr { cause } => {
420 if let Some(cause) = cause.source() {
421 if let Some(cause) = cause.downcast_ref::<io::Error>() {
422 if cause.kind() == io::ErrorKind::TimedOut {
423 return Some(
424 "Do you have a firewall or security group that is \
425 preventing Materialize from connecting to your PostgreSQL server?"
426 .into(),
427 );
428 }
429 }
430 }
431 None
432 }
433 Self::InvalidOptionValue { err, .. } => err.hint(),
434 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
435 Self::IndistinctFunction {..} => {
436 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
437 }
438 Self::UnknownOperator {..} => {
439 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
440 }
441 Self::IndistinctOperator {..} => {
442 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
443 },
444 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
445 let supported_azs_str = supported_azs.iter().join("\n ");
446 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
447 }
448 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
449 let duplicate_azs = duplicate_azs.iter().join("\n ");
450 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
451 }
452 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
453 Some("All keys must be columns on the underlying relation.".into())
454 }
455 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
456 Some("All keys must be columns on the underlying relation.".into())
457 }
458 Self::DuplicateKeyColumnInSubscribeEnvelope { .. } => {
459 Some("Each KEY column must be listed at most once.".into())
460 }
461 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
462 Some("All order bys must be output columns.".into())
463 }
464 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
465 Some("See: https://materialize.com/s/sink-key-selection".into())
466 }
467 Self::IcebergSinkUnsupportedKeyType { .. } => {
468 Some("Iceberg equality delete keys must be primitive, non-floating-point columns.".into())
469 }
470 Self::Catalog(e) => e.hint(),
471 Self::VarError(e) => e.hint(),
472 Self::PgSourcePurification(e) => e.hint(),
473 Self::MySqlSourcePurification(e) => e.hint(),
474 Self::SqlServerSourcePurificationError(e) => e.hint(),
475 Self::KafkaSourcePurification(e) => e.hint(),
476 Self::LoadGeneratorSourcePurification(e) => e.hint(),
477 Self::CsrPurification(e) => e.hint(),
478 Self::KafkaSinkPurification(e) => e.hint(),
479 Self::UnknownColumn { table, similar, .. } => {
480 let suffix = "Make sure to surround case sensitive names in double quotes.";
481 match &similar[..] {
482 [] => None,
483 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
484 names => {
485 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
486 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
487 }
488 }
489 }
490 Self::RecursiveTypeMismatch(..) => {
491 Some("You will need to rewrite or cast the query's expressions.".into())
492 },
493 Self::InvalidRefreshAt
494 | Self::InvalidRefreshEveryAlignedTo => {
495 Some("Calling `mz_now()` is allowed.".into())
496 },
497 Self::TableContainsUningestableTypes { column,.. } => {
498 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
499 }
500 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
501 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
502 }
503 Self::NetworkPolicyInUse => {
504 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
505 }
506 Self::WrongParameterType(_, _, _) => {
507 Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context. Try adding an explicit cast.".into())
508 }
509 Self::InvalidSchemaName => {
510 Some("Use SET schema = name to select a schema. Use SHOW SCHEMAS to list available schemas. Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
511 }
512 _ => None,
513 }
514 }
515}
516
517impl fmt::Display for PlanError {
518 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
519 match self {
520 Self::Unsupported { feature, discussion_no } => {
521 write!(f, "{} not yet supported", feature)?;
522 if let Some(discussion_no) = discussion_no {
523 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
524 }
525 Ok(())
526 }
527 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
528 write!(f, "{feature} is not supported",)?;
529 if let Some(documentation_path) = documentation_path {
530 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
531 }
532 Ok(())
533 }
534 Self::UnknownColumn { table, column, similar: _ } => write!(
535 f,
536 "column {} does not exist",
537 ColumnDisplay { table, column }
538 ),
539 Self::UngroupedColumn { table, column } => write!(
540 f,
541 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
542 ColumnDisplay { table, column },
543 ),
544 Self::ItemWithoutColumns { name, item_type } => {
545 let name = name.quoted();
546 write!(f, "{item_type} {name} does not have columns")
547 }
548 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
549 f,
550 "column {} cannot be referenced from this part of the query: \
551 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
552 ColumnDisplay { table, column },
553 ),
554 Self::AmbiguousColumn(column) => write!(
555 f,
556 "column reference {} is ambiguous",
557 column.quoted()
558 ),
559 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
560 f,
561 "attempt to create relation with too many columns, {} max: {}",
562 req_num_columns, max_num_columns
563 ),
564 Self::ColumnAlreadyExists { column_name, object_name } => write!(
565 f,
566 "column {} of relation {} already exists",
567 column_name.quoted(), object_name.quoted(),
568 ),
569 Self::AmbiguousTable(table) => write!(
570 f,
571 "table reference {} is ambiguous",
572 table.item.as_str().quoted()
573 ),
574 Self::UnknownColumnInUsingClause { column, join_side } => write!(
575 f,
576 "column {} specified in USING clause does not exist in {} table",
577 column.quoted(),
578 join_side,
579 ),
580 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
581 f,
582 "common column name {} appears more than once in {} table",
583 column.quoted(),
584 join_side,
585 ),
586 Self::MisqualifiedName(name) => write!(
587 f,
588 "qualified name did not have between 1 and 3 components: {}",
589 name
590 ),
591 Self::OverqualifiedDatabaseName(name) => write!(
592 f,
593 "database name '{}' does not have exactly one component",
594 name
595 ),
596 Self::OverqualifiedSchemaName(name) => write!(
597 f,
598 "schema name '{}' cannot have more than two components",
599 name
600 ),
601 Self::UnderqualifiedColumnName(name) => write!(
602 f,
603 "column name '{}' must have at least a table qualification",
604 name
605 ),
606 Self::UnacceptableTimelineName(name) => {
607 write!(f, "unacceptable timeline name {}", name.quoted(),)
608 }
609 Self::SubqueriesDisallowed { context } => {
610 write!(f, "{} does not allow subqueries", context)
611 }
612 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
613 Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
614 Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
615 Self::RecursionLimit(e) => write!(f, "{}", e),
616 Self::StrconvParse(e) => write!(f, "{}", e),
617 Self::Catalog(e) => write!(f, "{}", e),
618 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
619 Self::UpsertSinkWithInvalidKey { .. } => {
620 write!(f, "upsert key could not be validated as unique")
621 }
622 Self::IcebergSinkUnsupportedKeyType { column, column_type } => {
623 write!(f, "column {column} has type {column_type} which cannot be used as an Iceberg equality delete key")
624 }
625 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
626 Self::InvalidNumericMaxScale(e) => e.fmt(f),
627 Self::InvalidCharLength(e) => e.fmt(f),
628 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
629 Self::InvalidTimestampPrecision(e) => e.fmt(f),
630 Self::Parser(e) => e.fmt(f),
631 Self::ParserStatement(e) => e.fmt(f),
632 Self::Unstructured(e) => write!(f, "{}", e),
633 Self::InvalidId(id) => write!(f, "invalid id {}", id),
634 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
635 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
636 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
637 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
638 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
639 },
640 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
641 Self::InvalidTemporarySchema => {
642 write!(f, "cannot create temporary item in non-temporary schema")
643 }
644 Self::InvalidCast { name, ccx, from, to } =>{
645 write!(
646 f,
647 "{name} does not support {ccx}casting from {from} to {to}",
648 ccx = if matches!(ccx, CastContext::Implicit) {
649 "implicitly "
650 } else {
651 ""
652 },
653 )
654 }
655 Self::UnsupportedRangeElementType { element_type_name } => {
656 write!(f, "range type over {} is not supported", element_type_name)
657 }
658 Self::InvalidTable { name } => {
659 write!(f, "invalid table definition for {}", name.quoted())
660 },
661 Self::InvalidVersion { name, version } => {
662 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
663 },
664 Self::InvalidSinkFrom { name, item_type } => {
665 write!(f, "{item_type} {name} cannot be exported as a sink")
666 },
667 Self::InvalidDependency { name, item_type } => {
668 write!(f, "{item_type} {name} cannot be depended upon")
669 },
670 Self::DropViewOnMaterializedView(name)
671 | Self::AlterViewOnMaterializedView(name)
672 | Self::ShowCreateViewOnMaterializedView(name)
673 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
674 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
675 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
676 }
677 Self::PostgresConnectionErr { .. } => {
678 write!(f, "failed to connect to PostgreSQL database")
679 }
680 Self::MySqlConnectionErr { cause } => {
681 write!(f, "failed to connect to MySQL database: {}", cause)
682 }
683 Self::SqlServerConnectionErr { cause } => {
684 write!(f, "failed to connect to SQL Server database: {}", cause)
685 }
686 Self::SubsourceNameConflict {
687 name , upstream_references: _,
688 } => {
689 write!(f, "multiple subsources would be named {}", name)
690 },
691 Self::SubsourceDuplicateReference {
692 name,
693 target_names: _,
694 } => {
695 write!(f, "multiple subsources refer to table {}", name)
696 },
697 Self::NoTablesFoundForSchemas(schemas) => {
698 write!(f, "no tables found in referenced schemas: {}",
699 separated(", ", schemas.iter().map(|c| c.quoted()))
700 )
701 },
702 Self::InvalidProtobufSchema { .. } => {
703 write!(f, "invalid protobuf schema")
704 }
705 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
706 let reason = match &dependents[..] {
707 [] => " because other objects depend on it".to_string(),
708 dependents => {
709 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
710 format!(": still depended upon by {dependents}")
711 },
712 };
713 let object_name = object_name.quoted();
714 write!(f, "cannot drop {object_type} {object_name}{reason}")
715 }
716 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
717 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
718 Self::RecursiveTypeMismatch(name, declared, inferred) => {
719 let declared = separated(", ", declared);
720 let inferred = separated(", ", inferred);
721 let name = name.quoted();
722 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
723 },
724 Self::UnknownFunction {name, arg_types, ..} => {
725 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
726 },
727 Self::IndistinctFunction {name, arg_types, ..} => {
728 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
729 },
730 Self::UnknownOperator {name, arg_types, ..} => {
731 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
732 [typ] => format!("{} {}", name, typ),
733 [ltyp, rtyp] => {
734 format!("{} {} {}", ltyp, name, rtyp)
735 }
736 _ => unreachable!("non-unary non-binary operator"),
737 })
738 },
739 Self::IndistinctOperator {name, arg_types, ..} => {
740 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
741 [typ] => format!("{} {}", name, typ),
742 [ltyp, rtyp] => {
743 format!("{} {} {}", ltyp, name, rtyp)
744 }
745 _ => unreachable!("non-unary non-binary operator"),
746 })
747 },
748 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
749 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
750 Self::InvalidSchemaName => write!(f, "no valid schema selected"),
751 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
752 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
753 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
754 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
755 }
756 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
757 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
758 }
759 Self::DuplicateKeyColumnInSubscribeEnvelope { column_name } => {
760 write!(
761 f,
762 "column {} appears more than once in SUBSCRIBE ENVELOPE KEY clause",
763 column_name.quoted(),
764 )
765 }
766 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
767 write!(
768 f,
769 "PARTITION BY expression cannot refer to non-key column {}",
770 column_name.quoted(),
771 )
772 }
773 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
774 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
775 }
776 Self::FromValueRequiresParen => f.write_str(
777 "VALUES expression in FROM clause must be surrounded by parentheses"
778 ),
779 Self::VarError(e) => e.fmt(f),
780 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
781 "could not determine polymorphic type because input has type unknown"
782 ),
783 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
784 Self::WebhookValidationDoesNotUseColumns => f.write_str(
785 "expression provided in CHECK does not reference any columns"
786 ),
787 Self::WebhookValidationNonDeterministic => f.write_str(
788 "expression provided in CHECK is not deterministic"
789 ),
790 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
791 Self::CommentTooLong { length, max_size } => {
792 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
793 }
794 Self::InvalidTimestampInterval { min, max, requested } => {
795 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
796 }
797 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
798 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
799 or LIMIT INPUT GROUP SIZE"),
800 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
801 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
802 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
803 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
804 Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
805 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
806 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
807 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
808 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
809 Self::MangedReplicaName(name) => {
810 write!(f, "{name} is reserved for replicas of managed clusters")
811 }
812 Self::MissingName(item_type) => {
813 write!(f, "unspecified name for {item_type}")
814 }
815 Self::InvalidRefreshAt => {
816 write!(f, "REFRESH AT argument must be an expression that can be simplified \
817 and/or cast to a constant whose type is mz_timestamp")
818 }
819 Self::InvalidRefreshEveryAlignedTo => {
820 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
821 and/or cast to a constant whose type is mz_timestamp")
822 }
823 Self::MismatchedObjectType {
824 name,
825 is_type,
826 expected_type,
827 } => {
828 write!(
829 f,
830 "{name} is {} {} not {} {}",
831 if *is_type == ObjectType::Index {
832 "an"
833 } else {
834 "a"
835 },
836 is_type.to_string().to_lowercase(),
837 if *expected_type == ObjectType::Index {
838 "an"
839 } else {
840 "a"
841 },
842 expected_type.to_string().to_lowercase()
843 )
844 }
845 Self::TableContainsUningestableTypes { name, type_, column } => {
846 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
847 },
848 Self::RetainHistoryLow { limit } => {
849 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
850 },
851 Self::RetainHistoryRequired => {
852 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
853 },
854 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
855 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
856 Self::Internal(msg) => write!(f, "internal error: {msg}"),
857 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
858 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
859 Self::UntilReadyTimeoutRequired => {
860 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
861 },
862 Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
863 Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
864 Self::UnknownCursor(name) => {
865 write!(f, "cursor {} does not exist", name.quoted())
866 }
867 Self::CopyFromTargetTableDropped { target_name: name } => {
868 write!(f, "COPY FROM's target table {} was dropped", name.quoted())
869 }
870 Self::InvalidAsOfUpTo => write!(
871 f,
872 "AS OF or UP TO should be castable to a (non-null) mz_timestamp value",
873 ),
874 Self::InvalidReplacement {
875 item_type, item_name, replacement_type, replacement_name,
876 } => {
877 write!(
878 f,
879 "cannot replace {item_type} {item_name} \
880 with {replacement_type} {replacement_name}",
881 )
882 }
883 }
884 }
885}
886
887impl Error for PlanError {}
888
889impl From<CatalogError> for PlanError {
890 fn from(e: CatalogError) -> PlanError {
891 PlanError::Catalog(e)
892 }
893}
894
895impl From<strconv::ParseError> for PlanError {
896 fn from(e: strconv::ParseError) -> PlanError {
897 PlanError::StrconvParse(e)
898 }
899}
900
901impl From<RecursionLimitError> for PlanError {
902 fn from(e: RecursionLimitError) -> PlanError {
903 PlanError::RecursionLimit(e)
904 }
905}
906
907impl From<InvalidNumericMaxScaleError> for PlanError {
908 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
909 PlanError::InvalidNumericMaxScale(e)
910 }
911}
912
913impl From<InvalidCharLengthError> for PlanError {
914 fn from(e: InvalidCharLengthError) -> PlanError {
915 PlanError::InvalidCharLength(e)
916 }
917}
918
919impl From<InvalidVarCharMaxLengthError> for PlanError {
920 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
921 PlanError::InvalidVarCharMaxLength(e)
922 }
923}
924
925impl From<InvalidTimestampPrecisionError> for PlanError {
926 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
927 PlanError::InvalidTimestampPrecision(e)
928 }
929}
930
931impl From<anyhow::Error> for PlanError {
932 fn from(e: anyhow::Error) -> PlanError {
933 sql_err!("{}", e.display_with_causes())
935 }
936}
937
938impl From<TryFromIntError> for PlanError {
939 fn from(e: TryFromIntError) -> PlanError {
940 sql_err!("{}", e.display_with_causes())
941 }
942}
943
944impl From<ParseIntError> for PlanError {
945 fn from(e: ParseIntError) -> PlanError {
946 sql_err!("{}", e.display_with_causes())
947 }
948}
949
950impl From<EvalError> for PlanError {
951 fn from(e: EvalError) -> PlanError {
952 sql_err!("{}", e.display_with_causes())
953 }
954}
955
956impl From<ParserError> for PlanError {
957 fn from(e: ParserError) -> PlanError {
958 PlanError::Parser(e)
959 }
960}
961
962impl From<ParserStatementError> for PlanError {
963 fn from(e: ParserStatementError) -> PlanError {
964 PlanError::ParserStatement(e)
965 }
966}
967
968impl From<PostgresError> for PlanError {
969 fn from(e: PostgresError) -> PlanError {
970 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
971 }
972}
973
974impl From<MySqlError> for PlanError {
975 fn from(e: MySqlError) -> PlanError {
976 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
977 }
978}
979
980impl From<SqlServerError> for PlanError {
981 fn from(e: SqlServerError) -> PlanError {
982 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
983 }
984}
985
986impl From<VarError> for PlanError {
987 fn from(e: VarError) -> Self {
988 PlanError::VarError(e)
989 }
990}
991
992impl From<PgSourcePurificationError> for PlanError {
993 fn from(e: PgSourcePurificationError) -> Self {
994 PlanError::PgSourcePurification(e)
995 }
996}
997
998impl From<KafkaSourcePurificationError> for PlanError {
999 fn from(e: KafkaSourcePurificationError) -> Self {
1000 PlanError::KafkaSourcePurification(e)
1001 }
1002}
1003
1004impl From<KafkaSinkPurificationError> for PlanError {
1005 fn from(e: KafkaSinkPurificationError) -> Self {
1006 PlanError::KafkaSinkPurification(e)
1007 }
1008}
1009
1010impl From<IcebergSinkPurificationError> for PlanError {
1011 fn from(e: IcebergSinkPurificationError) -> Self {
1012 PlanError::IcebergSinkPurification(e)
1013 }
1014}
1015
1016impl From<CsrPurificationError> for PlanError {
1017 fn from(e: CsrPurificationError) -> Self {
1018 PlanError::CsrPurification(e)
1019 }
1020}
1021
1022impl From<LoadGeneratorSourcePurificationError> for PlanError {
1023 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
1024 PlanError::LoadGeneratorSourcePurification(e)
1025 }
1026}
1027
1028impl From<MySqlSourcePurificationError> for PlanError {
1029 fn from(e: MySqlSourcePurificationError) -> Self {
1030 PlanError::MySqlSourcePurification(e)
1031 }
1032}
1033
1034impl From<SqlServerSourcePurificationError> for PlanError {
1035 fn from(e: SqlServerSourcePurificationError) -> Self {
1036 PlanError::SqlServerSourcePurificationError(e)
1037 }
1038}
1039
1040impl From<IdentError> for PlanError {
1041 fn from(e: IdentError) -> Self {
1042 PlanError::InvalidIdent(e)
1043 }
1044}
1045
1046impl From<ExternalReferenceResolutionError> for PlanError {
1047 fn from(e: ExternalReferenceResolutionError) -> Self {
1048 PlanError::SubsourceResolutionError(e)
1049 }
1050}
1051
1052struct ColumnDisplay<'a> {
1053 table: &'a Option<PartialItemName>,
1054 column: &'a ColumnName,
1055}
1056
1057impl<'a> fmt::Display for ColumnDisplay<'a> {
1058 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1059 if let Some(table) = &self.table {
1060 format!("{}.{}", table.item, self.column).quoted().fmt(f)
1061 } else {
1062 self.column.quoted().fmt(f)
1063 }
1064 }
1065}