1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46 KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47 MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 ItemWithoutColumns {
74 name: String,
75 item_type: CatalogItemType,
76 },
77 WrongJoinTypeForLateralColumn {
78 table: Option<PartialItemName>,
79 column: ColumnName,
80 },
81 AmbiguousColumn(ColumnName),
82 TooManyColumns {
83 max_num_columns: usize,
84 req_num_columns: usize,
85 },
86 ColumnAlreadyExists {
87 column_name: ColumnName,
88 object_name: String,
89 },
90 AmbiguousTable(PartialItemName),
91 UnknownColumnInUsingClause {
92 column: ColumnName,
93 join_side: JoinSide,
94 },
95 AmbiguousColumnInUsingClause {
96 column: ColumnName,
97 join_side: JoinSide,
98 },
99 MisqualifiedName(String),
100 OverqualifiedDatabaseName(String),
101 OverqualifiedSchemaName(String),
102 UnderqualifiedColumnName(String),
103 SubqueriesDisallowed {
104 context: String,
105 },
106 UnknownParameter(usize),
107 ParameterNotAllowed(String),
108 WrongParameterType(usize, String, String),
109 RecursionLimit(RecursionLimitError),
110 StrconvParse(strconv::ParseError),
111 Catalog(CatalogError),
112 UpsertSinkWithoutKey,
113 UpsertSinkWithInvalidKey {
114 name: String,
115 desired_key: Vec<String>,
116 valid_keys: Vec<Vec<String>>,
117 },
118 InvalidWmrRecursionLimit(String),
119 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
120 InvalidCharLength(InvalidCharLengthError),
121 InvalidId(CatalogItemId),
122 InvalidIdent(IdentError),
123 InvalidObject(Box<ResolvedItemName>),
124 InvalidObjectType {
125 expected_type: SystemObjectType,
126 actual_type: SystemObjectType,
127 object_name: String,
128 },
129 InvalidPrivilegeTypes {
130 invalid_privileges: AclMode,
131 object_description: ErrorMessageObjectDescription,
132 },
133 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
134 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
135 InvalidSecret(Box<ResolvedItemName>),
136 InvalidTemporarySchema,
137 InvalidCast {
138 name: String,
139 ccx: CastContext,
140 from: String,
141 to: String,
142 },
143 UnsupportedRangeElementType {
145 element_type_name: String,
146 },
147 InvalidTable {
148 name: String,
149 },
150 InvalidVersion {
151 name: String,
152 version: String,
153 },
154 InvalidSinkFrom {
155 name: String,
156 item_type: String,
157 },
158 InvalidDependency {
159 name: String,
160 item_type: String,
161 },
162 MangedReplicaName(String),
163 ParserStatement(ParserStatementError),
164 Parser(ParserError),
165 DropViewOnMaterializedView(String),
166 DependentObjectsStillExist {
167 object_type: String,
168 object_name: String,
169 dependents: Vec<(String, String)>,
171 },
172 AlterViewOnMaterializedView(String),
173 ShowCreateViewOnMaterializedView(String),
174 ExplainViewOnMaterializedView(String),
175 UnacceptableTimelineName(String),
176 FetchingCsrSchemaFailed {
177 schema_lookup: String,
178 cause: Arc<dyn Error + Send + Sync>,
179 },
180 PostgresConnectionErr {
181 cause: Arc<mz_postgres_util::PostgresError>,
182 },
183 MySqlConnectionErr {
184 cause: Arc<MySqlError>,
185 },
186 SqlServerConnectionErr {
187 cause: Arc<SqlServerError>,
188 },
189 SubsourceNameConflict {
190 name: UnresolvedItemName,
191 upstream_references: Vec<UnresolvedItemName>,
192 },
193 SubsourceDuplicateReference {
194 name: UnresolvedItemName,
195 target_names: Vec<UnresolvedItemName>,
196 },
197 NoTablesFoundForSchemas(Vec<String>),
198 InvalidProtobufSchema {
199 cause: protobuf_native::OperationFailedError,
200 },
201 InvalidOptionValue {
202 option_name: String,
205 err: Box<PlanError>,
206 },
207 UnexpectedDuplicateReference {
208 name: UnresolvedItemName,
209 },
210 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
212 UnknownFunction {
213 name: String,
214 arg_types: Vec<String>,
215 },
216 IndistinctFunction {
217 name: String,
218 arg_types: Vec<String>,
219 },
220 UnknownOperator {
221 name: String,
222 arg_types: Vec<String>,
223 },
224 IndistinctOperator {
225 name: String,
226 arg_types: Vec<String>,
227 },
228 InvalidPrivatelinkAvailabilityZone {
229 name: String,
230 supported_azs: BTreeSet<String>,
231 },
232 DuplicatePrivatelinkAvailabilityZone {
233 duplicate_azs: BTreeSet<String>,
234 },
235 InvalidSchemaName,
236 ItemAlreadyExists {
237 name: String,
238 item_type: CatalogItemType,
239 },
240 ManagedCluster {
241 cluster_name: String,
242 },
243 InvalidKeysInSubscribeEnvelopeUpsert,
244 InvalidKeysInSubscribeEnvelopeDebezium,
245 InvalidPartitionByEnvelopeDebezium {
246 column_name: String,
247 },
248 InvalidOrderByInSubscribeWithinTimestampOrderBy,
249 FromValueRequiresParen,
250 VarError(VarError),
251 UnsolvablePolymorphicFunctionInput,
252 ShowCommandInView,
253 WebhookValidationDoesNotUseColumns,
254 WebhookValidationNonDeterministic,
255 InternalFunctionCall,
256 CommentTooLong {
257 length: usize,
258 max_size: usize,
259 },
260 InvalidTimestampInterval {
261 min: Duration,
262 max: Duration,
263 requested: Duration,
264 },
265 InvalidGroupSizeHints,
266 PgSourcePurification(PgSourcePurificationError),
267 KafkaSourcePurification(KafkaSourcePurificationError),
268 KafkaSinkPurification(KafkaSinkPurificationError),
269 IcebergSinkPurification(IcebergSinkPurificationError),
270 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
271 CsrPurification(CsrPurificationError),
272 MySqlSourcePurification(MySqlSourcePurificationError),
273 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
274 UseTablesForSources(String),
275 MissingName(CatalogItemType),
276 InvalidRefreshAt,
277 InvalidRefreshEveryAlignedTo,
278 CreateReplicaFailStorageObjects {
279 current_replica_count: usize,
281 internal_replica_count: usize,
283 hypothetical_replica_count: usize,
286 },
287 MismatchedObjectType {
288 name: PartialItemName,
289 is_type: ObjectType,
290 expected_type: ObjectType,
291 },
292 TableContainsUningestableTypes {
294 name: String,
295 type_: String,
296 column: String,
297 },
298 RetainHistoryLow {
299 limit: Duration,
300 },
301 RetainHistoryRequired,
302 UntilReadyTimeoutRequired,
303 SubsourceResolutionError(ExternalReferenceResolutionError),
304 Replan(String),
305 NetworkPolicyLockoutError,
306 NetworkPolicyInUse,
307 ConstantExpressionSimplificationFailed(String),
309 InvalidOffset(String),
310 UnknownCursor(String),
312 CopyFromTargetTableDropped {
313 target_name: String,
314 },
315 InvalidAsOfUpTo,
317 InvalidReplacement {
318 item_type: CatalogItemType,
319 item_name: PartialItemName,
320 replacement_type: CatalogItemType,
321 replacement_name: PartialItemName,
322 },
323 Unstructured(String),
325}
326
327impl PlanError {
328 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
329 PlanError::UngroupedColumn {
330 table: item.table_name.clone(),
331 column: item.column_name.clone(),
332 }
333 }
334
335 pub fn detail(&self) -> Option<String> {
336 match self {
337 Self::NeverSupported { details, .. } => details.clone(),
338 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
339 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
340 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
341 Self::InvalidOptionValue { err, .. } => err.detail(),
342 Self::UpsertSinkWithInvalidKey {
343 name,
344 desired_key,
345 valid_keys,
346 } => {
347 let valid_keys = if valid_keys.is_empty() {
348 "There are no known valid unique keys for the underlying relation.".into()
349 } else {
350 format!(
351 "The following keys are known to be unique for the underlying relation:\n{}",
352 valid_keys
353 .iter()
354 .map(|k|
355 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
356 )
357 .join("\n"),
358 )
359 };
360 Some(format!(
361 "Materialize could not prove that the specified upsert envelope key ({}) \
362 was a unique key of the underlying relation {}. {valid_keys}",
363 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
364 name.quoted()
365 ))
366 }
367 Self::VarError(e) => e.detail(),
368 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
369 Self::PgSourcePurification(e) => e.detail(),
370 Self::MySqlSourcePurification(e) => e.detail(),
371 Self::SqlServerSourcePurificationError(e) => e.detail(),
372 Self::KafkaSourcePurification(e) => e.detail(),
373 Self::LoadGeneratorSourcePurification(e) => e.detail(),
374 Self::CsrPurification(e) => e.detail(),
375 Self::KafkaSinkPurification(e) => e.detail(),
376 Self::IcebergSinkPurification(e) => e.detail(),
377 Self::CreateReplicaFailStorageObjects {
378 current_replica_count: current,
379 internal_replica_count: internal,
380 hypothetical_replica_count: target,
381 } => {
382 Some(format!(
383 "Currently have {} replica{}{}; command would result in {}",
384 current,
385 if *current != 1 { "s" } else { "" },
386 if *internal > 0 {
387 format!(" ({} internal)", internal)
388 } else {
389 "".to_string()
390 },
391 target
392 ))
393 },
394 Self::SubsourceNameConflict {
395 name: _,
396 upstream_references,
397 } => Some(format!(
398 "referenced tables with duplicate name: {}",
399 itertools::join(upstream_references, ", ")
400 )),
401 Self::SubsourceDuplicateReference {
402 name: _,
403 target_names,
404 } => Some(format!(
405 "subsources referencing table: {}",
406 itertools::join(target_names, ", ")
407 )),
408 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
409 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
410 ),
411 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
412 "missing schemas: {}",
413 separated(", ", schemas.iter().map(|c| c.quoted()))
414 )),
415 _ => None,
416 }
417 }
418
419 pub fn hint(&self) -> Option<String> {
420 match self {
421 Self::DropViewOnMaterializedView(_) => {
422 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
423 }
424 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
425 Self::AlterViewOnMaterializedView(_) => {
426 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
427 }
428 Self::ShowCreateViewOnMaterializedView(_) => {
429 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
430 }
431 Self::ExplainViewOnMaterializedView(_) => {
432 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
433 }
434 Self::UnacceptableTimelineName(_) => {
435 Some("The prefix \"mz_\" is reserved for system timelines.".into())
436 }
437 Self::PostgresConnectionErr { cause } => {
438 if let Some(cause) = cause.source() {
439 if let Some(cause) = cause.downcast_ref::<io::Error>() {
440 if cause.kind() == io::ErrorKind::TimedOut {
441 return Some(
442 "Do you have a firewall or security group that is \
443 preventing Materialize from connecting to your PostgreSQL server?"
444 .into(),
445 );
446 }
447 }
448 }
449 None
450 }
451 Self::InvalidOptionValue { err, .. } => err.hint(),
452 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
453 Self::IndistinctFunction {..} => {
454 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
455 }
456 Self::UnknownOperator {..} => {
457 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
458 }
459 Self::IndistinctOperator {..} => {
460 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
461 },
462 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
463 let supported_azs_str = supported_azs.iter().join("\n ");
464 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
465 }
466 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
467 let duplicate_azs = duplicate_azs.iter().join("\n ");
468 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
469 }
470 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
471 Some("All keys must be columns on the underlying relation.".into())
472 }
473 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
474 Some("All keys must be columns on the underlying relation.".into())
475 }
476 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
477 Some("All order bys must be output columns.".into())
478 }
479 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
480 Some("See: https://materialize.com/s/sink-key-selection".into())
481 }
482 Self::Catalog(e) => e.hint(),
483 Self::VarError(e) => e.hint(),
484 Self::PgSourcePurification(e) => e.hint(),
485 Self::MySqlSourcePurification(e) => e.hint(),
486 Self::SqlServerSourcePurificationError(e) => e.hint(),
487 Self::KafkaSourcePurification(e) => e.hint(),
488 Self::LoadGeneratorSourcePurification(e) => e.hint(),
489 Self::CsrPurification(e) => e.hint(),
490 Self::KafkaSinkPurification(e) => e.hint(),
491 Self::UnknownColumn { table, similar, .. } => {
492 let suffix = "Make sure to surround case sensitive names in double quotes.";
493 match &similar[..] {
494 [] => None,
495 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
496 names => {
497 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
498 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
499 }
500 }
501 }
502 Self::RecursiveTypeMismatch(..) => {
503 Some("You will need to rewrite or cast the query's expressions.".into())
504 },
505 Self::InvalidRefreshAt
506 | Self::InvalidRefreshEveryAlignedTo => {
507 Some("Calling `mz_now()` is allowed.".into())
508 },
509 Self::TableContainsUningestableTypes { column,.. } => {
510 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
511 }
512 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
513 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
514 }
515 Self::NetworkPolicyInUse => {
516 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
517 }
518 Self::WrongParameterType(_, _, _) => {
519 Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context. Try adding an explicit cast.".into())
520 }
521 Self::InvalidSchemaName => {
522 Some("Use SET schema = name to select a schema. Use SHOW SCHEMAS to list available schemas. Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
523 }
524 _ => None,
525 }
526 }
527}
528
529impl fmt::Display for PlanError {
530 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
531 match self {
532 Self::Unsupported { feature, discussion_no } => {
533 write!(f, "{} not yet supported", feature)?;
534 if let Some(discussion_no) = discussion_no {
535 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
536 }
537 Ok(())
538 }
539 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
540 write!(f, "{feature} is not supported",)?;
541 if let Some(documentation_path) = documentation_path {
542 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
543 }
544 Ok(())
545 }
546 Self::UnknownColumn { table, column, similar: _ } => write!(
547 f,
548 "column {} does not exist",
549 ColumnDisplay { table, column }
550 ),
551 Self::UngroupedColumn { table, column } => write!(
552 f,
553 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
554 ColumnDisplay { table, column },
555 ),
556 Self::ItemWithoutColumns { name, item_type } => {
557 let name = name.quoted();
558 write!(f, "{item_type} {name} does not have columns")
559 }
560 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
561 f,
562 "column {} cannot be referenced from this part of the query: \
563 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
564 ColumnDisplay { table, column },
565 ),
566 Self::AmbiguousColumn(column) => write!(
567 f,
568 "column reference {} is ambiguous",
569 column.quoted()
570 ),
571 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
572 f,
573 "attempt to create relation with too many columns, {} max: {}",
574 req_num_columns, max_num_columns
575 ),
576 Self::ColumnAlreadyExists { column_name, object_name } => write!(
577 f,
578 "column {} of relation {} already exists",
579 column_name.quoted(), object_name.quoted(),
580 ),
581 Self::AmbiguousTable(table) => write!(
582 f,
583 "table reference {} is ambiguous",
584 table.item.as_str().quoted()
585 ),
586 Self::UnknownColumnInUsingClause { column, join_side } => write!(
587 f,
588 "column {} specified in USING clause does not exist in {} table",
589 column.quoted(),
590 join_side,
591 ),
592 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
593 f,
594 "common column name {} appears more than once in {} table",
595 column.quoted(),
596 join_side,
597 ),
598 Self::MisqualifiedName(name) => write!(
599 f,
600 "qualified name did not have between 1 and 3 components: {}",
601 name
602 ),
603 Self::OverqualifiedDatabaseName(name) => write!(
604 f,
605 "database name '{}' does not have exactly one component",
606 name
607 ),
608 Self::OverqualifiedSchemaName(name) => write!(
609 f,
610 "schema name '{}' cannot have more than two components",
611 name
612 ),
613 Self::UnderqualifiedColumnName(name) => write!(
614 f,
615 "column name '{}' must have at least a table qualification",
616 name
617 ),
618 Self::UnacceptableTimelineName(name) => {
619 write!(f, "unacceptable timeline name {}", name.quoted(),)
620 }
621 Self::SubqueriesDisallowed { context } => {
622 write!(f, "{} does not allow subqueries", context)
623 }
624 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
625 Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
626 Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
627 Self::RecursionLimit(e) => write!(f, "{}", e),
628 Self::StrconvParse(e) => write!(f, "{}", e),
629 Self::Catalog(e) => write!(f, "{}", e),
630 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
631 Self::UpsertSinkWithInvalidKey { .. } => {
632 write!(f, "upsert key could not be validated as unique")
633 }
634 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
635 Self::InvalidNumericMaxScale(e) => e.fmt(f),
636 Self::InvalidCharLength(e) => e.fmt(f),
637 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
638 Self::InvalidTimestampPrecision(e) => e.fmt(f),
639 Self::Parser(e) => e.fmt(f),
640 Self::ParserStatement(e) => e.fmt(f),
641 Self::Unstructured(e) => write!(f, "{}", e),
642 Self::InvalidId(id) => write!(f, "invalid id {}", id),
643 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
644 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
645 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
646 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
647 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
648 },
649 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
650 Self::InvalidTemporarySchema => {
651 write!(f, "cannot create temporary item in non-temporary schema")
652 }
653 Self::InvalidCast { name, ccx, from, to } =>{
654 write!(
655 f,
656 "{name} does not support {ccx}casting from {from} to {to}",
657 ccx = if matches!(ccx, CastContext::Implicit) {
658 "implicitly "
659 } else {
660 ""
661 },
662 )
663 }
664 Self::UnsupportedRangeElementType { element_type_name } => {
665 write!(f, "range type over {} is not supported", element_type_name)
666 }
667 Self::InvalidTable { name } => {
668 write!(f, "invalid table definition for {}", name.quoted())
669 },
670 Self::InvalidVersion { name, version } => {
671 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
672 },
673 Self::InvalidSinkFrom { name, item_type } => {
674 write!(f, "{item_type} {name} cannot be exported as a sink")
675 },
676 Self::InvalidDependency { name, item_type } => {
677 write!(f, "{item_type} {name} cannot be depended upon")
678 },
679 Self::DropViewOnMaterializedView(name)
680 | Self::AlterViewOnMaterializedView(name)
681 | Self::ShowCreateViewOnMaterializedView(name)
682 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
683 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
684 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
685 }
686 Self::PostgresConnectionErr { .. } => {
687 write!(f, "failed to connect to PostgreSQL database")
688 }
689 Self::MySqlConnectionErr { cause } => {
690 write!(f, "failed to connect to MySQL database: {}", cause)
691 }
692 Self::SqlServerConnectionErr { cause } => {
693 write!(f, "failed to connect to SQL Server database: {}", cause)
694 }
695 Self::SubsourceNameConflict {
696 name , upstream_references: _,
697 } => {
698 write!(f, "multiple subsources would be named {}", name)
699 },
700 Self::SubsourceDuplicateReference {
701 name,
702 target_names: _,
703 } => {
704 write!(f, "multiple subsources refer to table {}", name)
705 },
706 Self::NoTablesFoundForSchemas(schemas) => {
707 write!(f, "no tables found in referenced schemas: {}",
708 separated(", ", schemas.iter().map(|c| c.quoted()))
709 )
710 },
711 Self::InvalidProtobufSchema { .. } => {
712 write!(f, "invalid protobuf schema")
713 }
714 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
715 let reason = match &dependents[..] {
716 [] => " because other objects depend on it".to_string(),
717 dependents => {
718 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
719 format!(": still depended upon by {dependents}")
720 },
721 };
722 let object_name = object_name.quoted();
723 write!(f, "cannot drop {object_type} {object_name}{reason}")
724 }
725 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
726 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
727 Self::RecursiveTypeMismatch(name, declared, inferred) => {
728 let declared = separated(", ", declared);
729 let inferred = separated(", ", inferred);
730 let name = name.quoted();
731 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
732 },
733 Self::UnknownFunction {name, arg_types, ..} => {
734 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
735 },
736 Self::IndistinctFunction {name, arg_types, ..} => {
737 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
738 },
739 Self::UnknownOperator {name, arg_types, ..} => {
740 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
741 [typ] => format!("{} {}", name, typ),
742 [ltyp, rtyp] => {
743 format!("{} {} {}", ltyp, name, rtyp)
744 }
745 _ => unreachable!("non-unary non-binary operator"),
746 })
747 },
748 Self::IndistinctOperator {name, arg_types, ..} => {
749 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
750 [typ] => format!("{} {}", name, typ),
751 [ltyp, rtyp] => {
752 format!("{} {} {}", ltyp, name, rtyp)
753 }
754 _ => unreachable!("non-unary non-binary operator"),
755 })
756 },
757 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
758 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
759 Self::InvalidSchemaName => write!(f, "no valid schema selected"),
760 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
761 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
762 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
763 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
764 }
765 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
766 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
767 }
768 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
769 write!(
770 f,
771 "PARTITION BY expression cannot refer to non-key column {}",
772 column_name.quoted(),
773 )
774 }
775 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
776 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
777 }
778 Self::FromValueRequiresParen => f.write_str(
779 "VALUES expression in FROM clause must be surrounded by parentheses"
780 ),
781 Self::VarError(e) => e.fmt(f),
782 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
783 "could not determine polymorphic type because input has type unknown"
784 ),
785 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
786 Self::WebhookValidationDoesNotUseColumns => f.write_str(
787 "expression provided in CHECK does not reference any columns"
788 ),
789 Self::WebhookValidationNonDeterministic => f.write_str(
790 "expression provided in CHECK is not deterministic"
791 ),
792 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
793 Self::CommentTooLong { length, max_size } => {
794 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
795 }
796 Self::InvalidTimestampInterval { min, max, requested } => {
797 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
798 }
799 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
800 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
801 or LIMIT INPUT GROUP SIZE"),
802 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
803 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
804 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
805 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
806 Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
807 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
808 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
809 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
810 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
811 Self::MangedReplicaName(name) => {
812 write!(f, "{name} is reserved for replicas of managed clusters")
813 }
814 Self::MissingName(item_type) => {
815 write!(f, "unspecified name for {item_type}")
816 }
817 Self::InvalidRefreshAt => {
818 write!(f, "REFRESH AT argument must be an expression that can be simplified \
819 and/or cast to a constant whose type is mz_timestamp")
820 }
821 Self::InvalidRefreshEveryAlignedTo => {
822 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
823 and/or cast to a constant whose type is mz_timestamp")
824 }
825 Self::CreateReplicaFailStorageObjects {..} => {
826 write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
827 },
828 Self::MismatchedObjectType {
829 name,
830 is_type,
831 expected_type,
832 } => {
833 write!(
834 f,
835 "{name} is {} {} not {} {}",
836 if *is_type == ObjectType::Index {
837 "an"
838 } else {
839 "a"
840 },
841 is_type.to_string().to_lowercase(),
842 if *expected_type == ObjectType::Index {
843 "an"
844 } else {
845 "a"
846 },
847 expected_type.to_string().to_lowercase()
848 )
849 }
850 Self::TableContainsUningestableTypes { name, type_, column } => {
851 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
852 },
853 Self::RetainHistoryLow { limit } => {
854 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
855 },
856 Self::RetainHistoryRequired => {
857 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
858 },
859 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
860 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
861 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
862 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
863 Self::UntilReadyTimeoutRequired => {
864 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
865 },
866 Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
867 Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
868 Self::UnknownCursor(name) => {
869 write!(f, "cursor {} does not exist", name.quoted())
870 }
871 Self::CopyFromTargetTableDropped { target_name: name } => {
872 write!(f, "COPY FROM's target table {} was dropped", name.quoted())
873 }
874 Self::InvalidAsOfUpTo => write!(
875 f,
876 "AS OF or UP TO should be castable to a (non-null) mz_timestamp value",
877 ),
878 Self::InvalidReplacement {
879 item_type, item_name, replacement_type, replacement_name,
880 } => {
881 write!(
882 f,
883 "cannot replace {item_type} {item_name} \
884 with {replacement_type} {replacement_name}",
885 )
886 }
887 }
888 }
889}
890
891impl Error for PlanError {}
892
893impl From<CatalogError> for PlanError {
894 fn from(e: CatalogError) -> PlanError {
895 PlanError::Catalog(e)
896 }
897}
898
899impl From<strconv::ParseError> for PlanError {
900 fn from(e: strconv::ParseError) -> PlanError {
901 PlanError::StrconvParse(e)
902 }
903}
904
905impl From<RecursionLimitError> for PlanError {
906 fn from(e: RecursionLimitError) -> PlanError {
907 PlanError::RecursionLimit(e)
908 }
909}
910
911impl From<InvalidNumericMaxScaleError> for PlanError {
912 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
913 PlanError::InvalidNumericMaxScale(e)
914 }
915}
916
917impl From<InvalidCharLengthError> for PlanError {
918 fn from(e: InvalidCharLengthError) -> PlanError {
919 PlanError::InvalidCharLength(e)
920 }
921}
922
923impl From<InvalidVarCharMaxLengthError> for PlanError {
924 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
925 PlanError::InvalidVarCharMaxLength(e)
926 }
927}
928
929impl From<InvalidTimestampPrecisionError> for PlanError {
930 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
931 PlanError::InvalidTimestampPrecision(e)
932 }
933}
934
935impl From<anyhow::Error> for PlanError {
936 fn from(e: anyhow::Error) -> PlanError {
937 sql_err!("{}", e.display_with_causes())
939 }
940}
941
942impl From<TryFromIntError> for PlanError {
943 fn from(e: TryFromIntError) -> PlanError {
944 sql_err!("{}", e.display_with_causes())
945 }
946}
947
948impl From<ParseIntError> for PlanError {
949 fn from(e: ParseIntError) -> PlanError {
950 sql_err!("{}", e.display_with_causes())
951 }
952}
953
954impl From<EvalError> for PlanError {
955 fn from(e: EvalError) -> PlanError {
956 sql_err!("{}", e.display_with_causes())
957 }
958}
959
960impl From<ParserError> for PlanError {
961 fn from(e: ParserError) -> PlanError {
962 PlanError::Parser(e)
963 }
964}
965
966impl From<ParserStatementError> for PlanError {
967 fn from(e: ParserStatementError) -> PlanError {
968 PlanError::ParserStatement(e)
969 }
970}
971
972impl From<PostgresError> for PlanError {
973 fn from(e: PostgresError) -> PlanError {
974 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
975 }
976}
977
978impl From<MySqlError> for PlanError {
979 fn from(e: MySqlError) -> PlanError {
980 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
981 }
982}
983
984impl From<SqlServerError> for PlanError {
985 fn from(e: SqlServerError) -> PlanError {
986 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
987 }
988}
989
990impl From<VarError> for PlanError {
991 fn from(e: VarError) -> Self {
992 PlanError::VarError(e)
993 }
994}
995
996impl From<PgSourcePurificationError> for PlanError {
997 fn from(e: PgSourcePurificationError) -> Self {
998 PlanError::PgSourcePurification(e)
999 }
1000}
1001
1002impl From<KafkaSourcePurificationError> for PlanError {
1003 fn from(e: KafkaSourcePurificationError) -> Self {
1004 PlanError::KafkaSourcePurification(e)
1005 }
1006}
1007
1008impl From<KafkaSinkPurificationError> for PlanError {
1009 fn from(e: KafkaSinkPurificationError) -> Self {
1010 PlanError::KafkaSinkPurification(e)
1011 }
1012}
1013
1014impl From<IcebergSinkPurificationError> for PlanError {
1015 fn from(e: IcebergSinkPurificationError) -> Self {
1016 PlanError::IcebergSinkPurification(e)
1017 }
1018}
1019
1020impl From<CsrPurificationError> for PlanError {
1021 fn from(e: CsrPurificationError) -> Self {
1022 PlanError::CsrPurification(e)
1023 }
1024}
1025
1026impl From<LoadGeneratorSourcePurificationError> for PlanError {
1027 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
1028 PlanError::LoadGeneratorSourcePurification(e)
1029 }
1030}
1031
1032impl From<MySqlSourcePurificationError> for PlanError {
1033 fn from(e: MySqlSourcePurificationError) -> Self {
1034 PlanError::MySqlSourcePurification(e)
1035 }
1036}
1037
1038impl From<SqlServerSourcePurificationError> for PlanError {
1039 fn from(e: SqlServerSourcePurificationError) -> Self {
1040 PlanError::SqlServerSourcePurificationError(e)
1041 }
1042}
1043
1044impl From<IdentError> for PlanError {
1045 fn from(e: IdentError) -> Self {
1046 PlanError::InvalidIdent(e)
1047 }
1048}
1049
1050impl From<ExternalReferenceResolutionError> for PlanError {
1051 fn from(e: ExternalReferenceResolutionError) -> Self {
1052 PlanError::SubsourceResolutionError(e)
1053 }
1054}
1055
1056struct ColumnDisplay<'a> {
1057 table: &'a Option<PartialItemName>,
1058 column: &'a ColumnName,
1059}
1060
1061impl<'a> fmt::Display for ColumnDisplay<'a> {
1062 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1063 if let Some(table) = &self.table {
1064 format!("{}.{}", table.item, self.column).quoted().fmt(f)
1065 } else {
1066 self.column.quoted().fmt(f)
1067 }
1068 }
1069}