1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46 KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47 MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 ItemWithoutColumns {
74 name: String,
75 item_type: CatalogItemType,
76 },
77 WrongJoinTypeForLateralColumn {
78 table: Option<PartialItemName>,
79 column: ColumnName,
80 },
81 AmbiguousColumn(ColumnName),
82 TooManyColumns {
83 max_num_columns: usize,
84 req_num_columns: usize,
85 },
86 ColumnAlreadyExists {
87 column_name: ColumnName,
88 object_name: String,
89 },
90 AmbiguousTable(PartialItemName),
91 UnknownColumnInUsingClause {
92 column: ColumnName,
93 join_side: JoinSide,
94 },
95 AmbiguousColumnInUsingClause {
96 column: ColumnName,
97 join_side: JoinSide,
98 },
99 MisqualifiedName(String),
100 OverqualifiedDatabaseName(String),
101 OverqualifiedSchemaName(String),
102 UnderqualifiedColumnName(String),
103 SubqueriesDisallowed {
104 context: String,
105 },
106 UnknownParameter(usize),
107 ParameterNotAllowed(String),
108 WrongParameterType(usize, String, String),
109 RecursionLimit(RecursionLimitError),
110 StrconvParse(strconv::ParseError),
111 Catalog(CatalogError),
112 UpsertSinkWithoutKey,
113 UpsertSinkWithInvalidKey {
114 name: String,
115 desired_key: Vec<String>,
116 valid_keys: Vec<Vec<String>>,
117 },
118 IcebergSinkUnsupportedKeyType {
119 column: String,
120 column_type: String,
121 },
122 InvalidWmrRecursionLimit(String),
123 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
124 InvalidCharLength(InvalidCharLengthError),
125 InvalidId(CatalogItemId),
126 InvalidIdent(IdentError),
127 InvalidObject(Box<ResolvedItemName>),
128 InvalidObjectType {
129 expected_type: SystemObjectType,
130 actual_type: SystemObjectType,
131 object_name: String,
132 },
133 InvalidPrivilegeTypes {
134 invalid_privileges: AclMode,
135 object_description: ErrorMessageObjectDescription,
136 },
137 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
138 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
139 InvalidSecret(Box<ResolvedItemName>),
140 InvalidTemporarySchema,
141 InvalidCast {
142 name: String,
143 ccx: CastContext,
144 from: String,
145 to: String,
146 },
147 UnsupportedRangeElementType {
149 element_type_name: String,
150 },
151 InvalidTable {
152 name: String,
153 },
154 InvalidVersion {
155 name: String,
156 version: String,
157 },
158 InvalidSinkFrom {
159 name: String,
160 item_type: String,
161 },
162 InvalidDependency {
163 name: String,
164 item_type: String,
165 },
166 MangedReplicaName(String),
167 ParserStatement(ParserStatementError),
168 Parser(ParserError),
169 DropViewOnMaterializedView(String),
170 DependentObjectsStillExist {
171 object_type: String,
172 object_name: String,
173 dependents: Vec<(String, String)>,
175 },
176 AlterViewOnMaterializedView(String),
177 ShowCreateViewOnMaterializedView(String),
178 ExplainViewOnMaterializedView(String),
179 UnacceptableTimelineName(String),
180 FetchingCsrSchemaFailed {
181 schema_lookup: String,
182 cause: Arc<dyn Error + Send + Sync>,
183 },
184 PostgresConnectionErr {
185 cause: Arc<mz_postgres_util::PostgresError>,
186 },
187 MySqlConnectionErr {
188 cause: Arc<MySqlError>,
189 },
190 SqlServerConnectionErr {
191 cause: Arc<SqlServerError>,
192 },
193 SubsourceNameConflict {
194 name: UnresolvedItemName,
195 upstream_references: Vec<UnresolvedItemName>,
196 },
197 SubsourceDuplicateReference {
198 name: UnresolvedItemName,
199 target_names: Vec<UnresolvedItemName>,
200 },
201 NoTablesFoundForSchemas(Vec<String>),
202 InvalidProtobufSchema {
203 cause: protobuf_native::OperationFailedError,
204 },
205 InvalidOptionValue {
206 option_name: String,
209 err: Box<PlanError>,
210 },
211 UnexpectedDuplicateReference {
212 name: UnresolvedItemName,
213 },
214 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
216 UnknownFunction {
217 name: String,
218 arg_types: Vec<String>,
219 },
220 IndistinctFunction {
221 name: String,
222 arg_types: Vec<String>,
223 },
224 UnknownOperator {
225 name: String,
226 arg_types: Vec<String>,
227 },
228 IndistinctOperator {
229 name: String,
230 arg_types: Vec<String>,
231 },
232 InvalidPrivatelinkAvailabilityZone {
233 name: String,
234 supported_azs: BTreeSet<String>,
235 },
236 DuplicatePrivatelinkAvailabilityZone {
237 duplicate_azs: BTreeSet<String>,
238 },
239 InvalidSchemaName,
240 ItemAlreadyExists {
241 name: String,
242 item_type: CatalogItemType,
243 },
244 ManagedCluster {
245 cluster_name: String,
246 },
247 InvalidKeysInSubscribeEnvelopeUpsert,
248 InvalidKeysInSubscribeEnvelopeDebezium,
249 InvalidPartitionByEnvelopeDebezium {
250 column_name: String,
251 },
252 InvalidOrderByInSubscribeWithinTimestampOrderBy,
253 FromValueRequiresParen,
254 VarError(VarError),
255 UnsolvablePolymorphicFunctionInput,
256 ShowCommandInView,
257 WebhookValidationDoesNotUseColumns,
258 WebhookValidationNonDeterministic,
259 InternalFunctionCall,
260 CommentTooLong {
261 length: usize,
262 max_size: usize,
263 },
264 InvalidTimestampInterval {
265 min: Duration,
266 max: Duration,
267 requested: Duration,
268 },
269 InvalidGroupSizeHints,
270 PgSourcePurification(PgSourcePurificationError),
271 KafkaSourcePurification(KafkaSourcePurificationError),
272 KafkaSinkPurification(KafkaSinkPurificationError),
273 IcebergSinkPurification(IcebergSinkPurificationError),
274 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
275 CsrPurification(CsrPurificationError),
276 MySqlSourcePurification(MySqlSourcePurificationError),
277 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
278 UseTablesForSources(String),
279 MissingName(CatalogItemType),
280 InvalidRefreshAt,
281 InvalidRefreshEveryAlignedTo,
282 CreateReplicaFailStorageObjects {
283 current_replica_count: usize,
285 internal_replica_count: usize,
287 hypothetical_replica_count: usize,
290 },
291 MismatchedObjectType {
292 name: PartialItemName,
293 is_type: ObjectType,
294 expected_type: ObjectType,
295 },
296 TableContainsUningestableTypes {
298 name: String,
299 type_: String,
300 column: String,
301 },
302 RetainHistoryLow {
303 limit: Duration,
304 },
305 RetainHistoryRequired,
306 UntilReadyTimeoutRequired,
307 SubsourceResolutionError(ExternalReferenceResolutionError),
308 Replan(String),
309 NetworkPolicyLockoutError,
310 NetworkPolicyInUse,
311 ConstantExpressionSimplificationFailed(String),
313 InvalidOffset(String),
314 UnknownCursor(String),
316 CopyFromTargetTableDropped {
317 target_name: String,
318 },
319 InvalidAsOfUpTo,
321 InvalidReplacement {
322 item_type: CatalogItemType,
323 item_name: PartialItemName,
324 replacement_type: CatalogItemType,
325 replacement_name: PartialItemName,
326 },
327 Unstructured(String),
329}
330
331impl PlanError {
332 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
333 PlanError::UngroupedColumn {
334 table: item.table_name.clone(),
335 column: item.column_name.clone(),
336 }
337 }
338
339 pub fn detail(&self) -> Option<String> {
340 match self {
341 Self::NeverSupported { details, .. } => details.clone(),
342 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
343 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
344 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
345 Self::InvalidOptionValue { err, .. } => err.detail(),
346 Self::UpsertSinkWithInvalidKey {
347 name,
348 desired_key,
349 valid_keys,
350 } => {
351 let valid_keys = if valid_keys.is_empty() {
352 "There are no known valid unique keys for the underlying relation.".into()
353 } else {
354 format!(
355 "The following keys are known to be unique for the underlying relation:\n{}",
356 valid_keys
357 .iter()
358 .map(|k|
359 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
360 )
361 .join("\n"),
362 )
363 };
364 Some(format!(
365 "Materialize could not prove that the specified upsert envelope key ({}) \
366 was a unique key of the underlying relation {}. {valid_keys}",
367 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
368 name.quoted()
369 ))
370 }
371 Self::VarError(e) => e.detail(),
372 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
373 Self::PgSourcePurification(e) => e.detail(),
374 Self::MySqlSourcePurification(e) => e.detail(),
375 Self::SqlServerSourcePurificationError(e) => e.detail(),
376 Self::KafkaSourcePurification(e) => e.detail(),
377 Self::LoadGeneratorSourcePurification(e) => e.detail(),
378 Self::CsrPurification(e) => e.detail(),
379 Self::KafkaSinkPurification(e) => e.detail(),
380 Self::IcebergSinkPurification(e) => e.detail(),
381 Self::CreateReplicaFailStorageObjects {
382 current_replica_count: current,
383 internal_replica_count: internal,
384 hypothetical_replica_count: target,
385 } => {
386 Some(format!(
387 "Currently have {} replica{}{}; command would result in {}",
388 current,
389 if *current != 1 { "s" } else { "" },
390 if *internal > 0 {
391 format!(" ({} internal)", internal)
392 } else {
393 "".to_string()
394 },
395 target
396 ))
397 },
398 Self::SubsourceNameConflict {
399 name: _,
400 upstream_references,
401 } => Some(format!(
402 "referenced tables with duplicate name: {}",
403 itertools::join(upstream_references, ", ")
404 )),
405 Self::SubsourceDuplicateReference {
406 name: _,
407 target_names,
408 } => Some(format!(
409 "subsources referencing table: {}",
410 itertools::join(target_names, ", ")
411 )),
412 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
413 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
414 ),
415 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
416 "missing schemas: {}",
417 separated(", ", schemas.iter().map(|c| c.quoted()))
418 )),
419 _ => None,
420 }
421 }
422
423 pub fn hint(&self) -> Option<String> {
424 match self {
425 Self::DropViewOnMaterializedView(_) => {
426 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
427 }
428 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
429 Self::AlterViewOnMaterializedView(_) => {
430 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
431 }
432 Self::ShowCreateViewOnMaterializedView(_) => {
433 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
434 }
435 Self::ExplainViewOnMaterializedView(_) => {
436 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
437 }
438 Self::UnacceptableTimelineName(_) => {
439 Some("The prefix \"mz_\" is reserved for system timelines.".into())
440 }
441 Self::PostgresConnectionErr { cause } => {
442 if let Some(cause) = cause.source() {
443 if let Some(cause) = cause.downcast_ref::<io::Error>() {
444 if cause.kind() == io::ErrorKind::TimedOut {
445 return Some(
446 "Do you have a firewall or security group that is \
447 preventing Materialize from connecting to your PostgreSQL server?"
448 .into(),
449 );
450 }
451 }
452 }
453 None
454 }
455 Self::InvalidOptionValue { err, .. } => err.hint(),
456 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
457 Self::IndistinctFunction {..} => {
458 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
459 }
460 Self::UnknownOperator {..} => {
461 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
462 }
463 Self::IndistinctOperator {..} => {
464 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
465 },
466 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
467 let supported_azs_str = supported_azs.iter().join("\n ");
468 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
469 }
470 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
471 let duplicate_azs = duplicate_azs.iter().join("\n ");
472 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
473 }
474 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
475 Some("All keys must be columns on the underlying relation.".into())
476 }
477 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
478 Some("All keys must be columns on the underlying relation.".into())
479 }
480 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
481 Some("All order bys must be output columns.".into())
482 }
483 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
484 Some("See: https://materialize.com/s/sink-key-selection".into())
485 }
486 Self::IcebergSinkUnsupportedKeyType { .. } => {
487 Some("Iceberg equality delete keys must be primitive, non-floating-point columns.".into())
488 }
489 Self::Catalog(e) => e.hint(),
490 Self::VarError(e) => e.hint(),
491 Self::PgSourcePurification(e) => e.hint(),
492 Self::MySqlSourcePurification(e) => e.hint(),
493 Self::SqlServerSourcePurificationError(e) => e.hint(),
494 Self::KafkaSourcePurification(e) => e.hint(),
495 Self::LoadGeneratorSourcePurification(e) => e.hint(),
496 Self::CsrPurification(e) => e.hint(),
497 Self::KafkaSinkPurification(e) => e.hint(),
498 Self::UnknownColumn { table, similar, .. } => {
499 let suffix = "Make sure to surround case sensitive names in double quotes.";
500 match &similar[..] {
501 [] => None,
502 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
503 names => {
504 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
505 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
506 }
507 }
508 }
509 Self::RecursiveTypeMismatch(..) => {
510 Some("You will need to rewrite or cast the query's expressions.".into())
511 },
512 Self::InvalidRefreshAt
513 | Self::InvalidRefreshEveryAlignedTo => {
514 Some("Calling `mz_now()` is allowed.".into())
515 },
516 Self::TableContainsUningestableTypes { column,.. } => {
517 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
518 }
519 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
520 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
521 }
522 Self::NetworkPolicyInUse => {
523 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
524 }
525 Self::WrongParameterType(_, _, _) => {
526 Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context. Try adding an explicit cast.".into())
527 }
528 Self::InvalidSchemaName => {
529 Some("Use SET schema = name to select a schema. Use SHOW SCHEMAS to list available schemas. Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
530 }
531 _ => None,
532 }
533 }
534}
535
536impl fmt::Display for PlanError {
537 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
538 match self {
539 Self::Unsupported { feature, discussion_no } => {
540 write!(f, "{} not yet supported", feature)?;
541 if let Some(discussion_no) = discussion_no {
542 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
543 }
544 Ok(())
545 }
546 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
547 write!(f, "{feature} is not supported",)?;
548 if let Some(documentation_path) = documentation_path {
549 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
550 }
551 Ok(())
552 }
553 Self::UnknownColumn { table, column, similar: _ } => write!(
554 f,
555 "column {} does not exist",
556 ColumnDisplay { table, column }
557 ),
558 Self::UngroupedColumn { table, column } => write!(
559 f,
560 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
561 ColumnDisplay { table, column },
562 ),
563 Self::ItemWithoutColumns { name, item_type } => {
564 let name = name.quoted();
565 write!(f, "{item_type} {name} does not have columns")
566 }
567 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
568 f,
569 "column {} cannot be referenced from this part of the query: \
570 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
571 ColumnDisplay { table, column },
572 ),
573 Self::AmbiguousColumn(column) => write!(
574 f,
575 "column reference {} is ambiguous",
576 column.quoted()
577 ),
578 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
579 f,
580 "attempt to create relation with too many columns, {} max: {}",
581 req_num_columns, max_num_columns
582 ),
583 Self::ColumnAlreadyExists { column_name, object_name } => write!(
584 f,
585 "column {} of relation {} already exists",
586 column_name.quoted(), object_name.quoted(),
587 ),
588 Self::AmbiguousTable(table) => write!(
589 f,
590 "table reference {} is ambiguous",
591 table.item.as_str().quoted()
592 ),
593 Self::UnknownColumnInUsingClause { column, join_side } => write!(
594 f,
595 "column {} specified in USING clause does not exist in {} table",
596 column.quoted(),
597 join_side,
598 ),
599 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
600 f,
601 "common column name {} appears more than once in {} table",
602 column.quoted(),
603 join_side,
604 ),
605 Self::MisqualifiedName(name) => write!(
606 f,
607 "qualified name did not have between 1 and 3 components: {}",
608 name
609 ),
610 Self::OverqualifiedDatabaseName(name) => write!(
611 f,
612 "database name '{}' does not have exactly one component",
613 name
614 ),
615 Self::OverqualifiedSchemaName(name) => write!(
616 f,
617 "schema name '{}' cannot have more than two components",
618 name
619 ),
620 Self::UnderqualifiedColumnName(name) => write!(
621 f,
622 "column name '{}' must have at least a table qualification",
623 name
624 ),
625 Self::UnacceptableTimelineName(name) => {
626 write!(f, "unacceptable timeline name {}", name.quoted(),)
627 }
628 Self::SubqueriesDisallowed { context } => {
629 write!(f, "{} does not allow subqueries", context)
630 }
631 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
632 Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
633 Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
634 Self::RecursionLimit(e) => write!(f, "{}", e),
635 Self::StrconvParse(e) => write!(f, "{}", e),
636 Self::Catalog(e) => write!(f, "{}", e),
637 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
638 Self::UpsertSinkWithInvalidKey { .. } => {
639 write!(f, "upsert key could not be validated as unique")
640 }
641 Self::IcebergSinkUnsupportedKeyType { column, column_type } => {
642 write!(f, "column {column} has type {column_type} which cannot be used as an Iceberg equality delete key")
643 }
644 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
645 Self::InvalidNumericMaxScale(e) => e.fmt(f),
646 Self::InvalidCharLength(e) => e.fmt(f),
647 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
648 Self::InvalidTimestampPrecision(e) => e.fmt(f),
649 Self::Parser(e) => e.fmt(f),
650 Self::ParserStatement(e) => e.fmt(f),
651 Self::Unstructured(e) => write!(f, "{}", e),
652 Self::InvalidId(id) => write!(f, "invalid id {}", id),
653 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
654 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
655 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
656 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
657 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
658 },
659 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
660 Self::InvalidTemporarySchema => {
661 write!(f, "cannot create temporary item in non-temporary schema")
662 }
663 Self::InvalidCast { name, ccx, from, to } =>{
664 write!(
665 f,
666 "{name} does not support {ccx}casting from {from} to {to}",
667 ccx = if matches!(ccx, CastContext::Implicit) {
668 "implicitly "
669 } else {
670 ""
671 },
672 )
673 }
674 Self::UnsupportedRangeElementType { element_type_name } => {
675 write!(f, "range type over {} is not supported", element_type_name)
676 }
677 Self::InvalidTable { name } => {
678 write!(f, "invalid table definition for {}", name.quoted())
679 },
680 Self::InvalidVersion { name, version } => {
681 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
682 },
683 Self::InvalidSinkFrom { name, item_type } => {
684 write!(f, "{item_type} {name} cannot be exported as a sink")
685 },
686 Self::InvalidDependency { name, item_type } => {
687 write!(f, "{item_type} {name} cannot be depended upon")
688 },
689 Self::DropViewOnMaterializedView(name)
690 | Self::AlterViewOnMaterializedView(name)
691 | Self::ShowCreateViewOnMaterializedView(name)
692 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
693 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
694 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
695 }
696 Self::PostgresConnectionErr { .. } => {
697 write!(f, "failed to connect to PostgreSQL database")
698 }
699 Self::MySqlConnectionErr { cause } => {
700 write!(f, "failed to connect to MySQL database: {}", cause)
701 }
702 Self::SqlServerConnectionErr { cause } => {
703 write!(f, "failed to connect to SQL Server database: {}", cause)
704 }
705 Self::SubsourceNameConflict {
706 name , upstream_references: _,
707 } => {
708 write!(f, "multiple subsources would be named {}", name)
709 },
710 Self::SubsourceDuplicateReference {
711 name,
712 target_names: _,
713 } => {
714 write!(f, "multiple subsources refer to table {}", name)
715 },
716 Self::NoTablesFoundForSchemas(schemas) => {
717 write!(f, "no tables found in referenced schemas: {}",
718 separated(", ", schemas.iter().map(|c| c.quoted()))
719 )
720 },
721 Self::InvalidProtobufSchema { .. } => {
722 write!(f, "invalid protobuf schema")
723 }
724 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
725 let reason = match &dependents[..] {
726 [] => " because other objects depend on it".to_string(),
727 dependents => {
728 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
729 format!(": still depended upon by {dependents}")
730 },
731 };
732 let object_name = object_name.quoted();
733 write!(f, "cannot drop {object_type} {object_name}{reason}")
734 }
735 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
736 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
737 Self::RecursiveTypeMismatch(name, declared, inferred) => {
738 let declared = separated(", ", declared);
739 let inferred = separated(", ", inferred);
740 let name = name.quoted();
741 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
742 },
743 Self::UnknownFunction {name, arg_types, ..} => {
744 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
745 },
746 Self::IndistinctFunction {name, arg_types, ..} => {
747 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
748 },
749 Self::UnknownOperator {name, arg_types, ..} => {
750 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
751 [typ] => format!("{} {}", name, typ),
752 [ltyp, rtyp] => {
753 format!("{} {} {}", ltyp, name, rtyp)
754 }
755 _ => unreachable!("non-unary non-binary operator"),
756 })
757 },
758 Self::IndistinctOperator {name, arg_types, ..} => {
759 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
760 [typ] => format!("{} {}", name, typ),
761 [ltyp, rtyp] => {
762 format!("{} {} {}", ltyp, name, rtyp)
763 }
764 _ => unreachable!("non-unary non-binary operator"),
765 })
766 },
767 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
768 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
769 Self::InvalidSchemaName => write!(f, "no valid schema selected"),
770 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
771 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
772 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
773 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
774 }
775 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
776 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
777 }
778 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
779 write!(
780 f,
781 "PARTITION BY expression cannot refer to non-key column {}",
782 column_name.quoted(),
783 )
784 }
785 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
786 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
787 }
788 Self::FromValueRequiresParen => f.write_str(
789 "VALUES expression in FROM clause must be surrounded by parentheses"
790 ),
791 Self::VarError(e) => e.fmt(f),
792 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
793 "could not determine polymorphic type because input has type unknown"
794 ),
795 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
796 Self::WebhookValidationDoesNotUseColumns => f.write_str(
797 "expression provided in CHECK does not reference any columns"
798 ),
799 Self::WebhookValidationNonDeterministic => f.write_str(
800 "expression provided in CHECK is not deterministic"
801 ),
802 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
803 Self::CommentTooLong { length, max_size } => {
804 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
805 }
806 Self::InvalidTimestampInterval { min, max, requested } => {
807 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
808 }
809 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
810 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
811 or LIMIT INPUT GROUP SIZE"),
812 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
813 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
814 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
815 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
816 Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
817 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
818 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
819 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
820 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
821 Self::MangedReplicaName(name) => {
822 write!(f, "{name} is reserved for replicas of managed clusters")
823 }
824 Self::MissingName(item_type) => {
825 write!(f, "unspecified name for {item_type}")
826 }
827 Self::InvalidRefreshAt => {
828 write!(f, "REFRESH AT argument must be an expression that can be simplified \
829 and/or cast to a constant whose type is mz_timestamp")
830 }
831 Self::InvalidRefreshEveryAlignedTo => {
832 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
833 and/or cast to a constant whose type is mz_timestamp")
834 }
835 Self::CreateReplicaFailStorageObjects {..} => {
836 write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
837 },
838 Self::MismatchedObjectType {
839 name,
840 is_type,
841 expected_type,
842 } => {
843 write!(
844 f,
845 "{name} is {} {} not {} {}",
846 if *is_type == ObjectType::Index {
847 "an"
848 } else {
849 "a"
850 },
851 is_type.to_string().to_lowercase(),
852 if *expected_type == ObjectType::Index {
853 "an"
854 } else {
855 "a"
856 },
857 expected_type.to_string().to_lowercase()
858 )
859 }
860 Self::TableContainsUningestableTypes { name, type_, column } => {
861 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
862 },
863 Self::RetainHistoryLow { limit } => {
864 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
865 },
866 Self::RetainHistoryRequired => {
867 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
868 },
869 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
870 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
871 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
872 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
873 Self::UntilReadyTimeoutRequired => {
874 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
875 },
876 Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
877 Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
878 Self::UnknownCursor(name) => {
879 write!(f, "cursor {} does not exist", name.quoted())
880 }
881 Self::CopyFromTargetTableDropped { target_name: name } => {
882 write!(f, "COPY FROM's target table {} was dropped", name.quoted())
883 }
884 Self::InvalidAsOfUpTo => write!(
885 f,
886 "AS OF or UP TO should be castable to a (non-null) mz_timestamp value",
887 ),
888 Self::InvalidReplacement {
889 item_type, item_name, replacement_type, replacement_name,
890 } => {
891 write!(
892 f,
893 "cannot replace {item_type} {item_name} \
894 with {replacement_type} {replacement_name}",
895 )
896 }
897 }
898 }
899}
900
901impl Error for PlanError {}
902
903impl From<CatalogError> for PlanError {
904 fn from(e: CatalogError) -> PlanError {
905 PlanError::Catalog(e)
906 }
907}
908
909impl From<strconv::ParseError> for PlanError {
910 fn from(e: strconv::ParseError) -> PlanError {
911 PlanError::StrconvParse(e)
912 }
913}
914
915impl From<RecursionLimitError> for PlanError {
916 fn from(e: RecursionLimitError) -> PlanError {
917 PlanError::RecursionLimit(e)
918 }
919}
920
921impl From<InvalidNumericMaxScaleError> for PlanError {
922 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
923 PlanError::InvalidNumericMaxScale(e)
924 }
925}
926
927impl From<InvalidCharLengthError> for PlanError {
928 fn from(e: InvalidCharLengthError) -> PlanError {
929 PlanError::InvalidCharLength(e)
930 }
931}
932
933impl From<InvalidVarCharMaxLengthError> for PlanError {
934 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
935 PlanError::InvalidVarCharMaxLength(e)
936 }
937}
938
939impl From<InvalidTimestampPrecisionError> for PlanError {
940 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
941 PlanError::InvalidTimestampPrecision(e)
942 }
943}
944
945impl From<anyhow::Error> for PlanError {
946 fn from(e: anyhow::Error) -> PlanError {
947 sql_err!("{}", e.display_with_causes())
949 }
950}
951
952impl From<TryFromIntError> for PlanError {
953 fn from(e: TryFromIntError) -> PlanError {
954 sql_err!("{}", e.display_with_causes())
955 }
956}
957
958impl From<ParseIntError> for PlanError {
959 fn from(e: ParseIntError) -> PlanError {
960 sql_err!("{}", e.display_with_causes())
961 }
962}
963
964impl From<EvalError> for PlanError {
965 fn from(e: EvalError) -> PlanError {
966 sql_err!("{}", e.display_with_causes())
967 }
968}
969
970impl From<ParserError> for PlanError {
971 fn from(e: ParserError) -> PlanError {
972 PlanError::Parser(e)
973 }
974}
975
976impl From<ParserStatementError> for PlanError {
977 fn from(e: ParserStatementError) -> PlanError {
978 PlanError::ParserStatement(e)
979 }
980}
981
982impl From<PostgresError> for PlanError {
983 fn from(e: PostgresError) -> PlanError {
984 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
985 }
986}
987
988impl From<MySqlError> for PlanError {
989 fn from(e: MySqlError) -> PlanError {
990 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
991 }
992}
993
994impl From<SqlServerError> for PlanError {
995 fn from(e: SqlServerError) -> PlanError {
996 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
997 }
998}
999
1000impl From<VarError> for PlanError {
1001 fn from(e: VarError) -> Self {
1002 PlanError::VarError(e)
1003 }
1004}
1005
1006impl From<PgSourcePurificationError> for PlanError {
1007 fn from(e: PgSourcePurificationError) -> Self {
1008 PlanError::PgSourcePurification(e)
1009 }
1010}
1011
1012impl From<KafkaSourcePurificationError> for PlanError {
1013 fn from(e: KafkaSourcePurificationError) -> Self {
1014 PlanError::KafkaSourcePurification(e)
1015 }
1016}
1017
1018impl From<KafkaSinkPurificationError> for PlanError {
1019 fn from(e: KafkaSinkPurificationError) -> Self {
1020 PlanError::KafkaSinkPurification(e)
1021 }
1022}
1023
1024impl From<IcebergSinkPurificationError> for PlanError {
1025 fn from(e: IcebergSinkPurificationError) -> Self {
1026 PlanError::IcebergSinkPurification(e)
1027 }
1028}
1029
1030impl From<CsrPurificationError> for PlanError {
1031 fn from(e: CsrPurificationError) -> Self {
1032 PlanError::CsrPurification(e)
1033 }
1034}
1035
1036impl From<LoadGeneratorSourcePurificationError> for PlanError {
1037 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
1038 PlanError::LoadGeneratorSourcePurification(e)
1039 }
1040}
1041
1042impl From<MySqlSourcePurificationError> for PlanError {
1043 fn from(e: MySqlSourcePurificationError) -> Self {
1044 PlanError::MySqlSourcePurification(e)
1045 }
1046}
1047
1048impl From<SqlServerSourcePurificationError> for PlanError {
1049 fn from(e: SqlServerSourcePurificationError) -> Self {
1050 PlanError::SqlServerSourcePurificationError(e)
1051 }
1052}
1053
1054impl From<IdentError> for PlanError {
1055 fn from(e: IdentError) -> Self {
1056 PlanError::InvalidIdent(e)
1057 }
1058}
1059
1060impl From<ExternalReferenceResolutionError> for PlanError {
1061 fn from(e: ExternalReferenceResolutionError) -> Self {
1062 PlanError::SubsourceResolutionError(e)
1063 }
1064}
1065
1066struct ColumnDisplay<'a> {
1067 table: &'a Option<PartialItemName>,
1068 column: &'a ColumnName,
1069}
1070
1071impl<'a> fmt::Display for ColumnDisplay<'a> {
1072 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1073 if let Some(table) = &self.table {
1074 format!("{}.{}", table.item, self.column).quoted().fmt(f)
1075 } else {
1076 self.column.quoted().fmt(f)
1077 }
1078 }
1079}