1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46 KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47 MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 TypeWithoutColumns {
74 type_name: PartialItemName,
75 },
76 WrongJoinTypeForLateralColumn {
77 table: Option<PartialItemName>,
78 column: ColumnName,
79 },
80 AmbiguousColumn(ColumnName),
81 TooManyColumns {
82 max_num_columns: usize,
83 req_num_columns: usize,
84 },
85 ColumnAlreadyExists {
86 column_name: ColumnName,
87 object_name: String,
88 },
89 AmbiguousTable(PartialItemName),
90 UnknownColumnInUsingClause {
91 column: ColumnName,
92 join_side: JoinSide,
93 },
94 AmbiguousColumnInUsingClause {
95 column: ColumnName,
96 join_side: JoinSide,
97 },
98 MisqualifiedName(String),
99 OverqualifiedDatabaseName(String),
100 OverqualifiedSchemaName(String),
101 UnderqualifiedColumnName(String),
102 SubqueriesDisallowed {
103 context: String,
104 },
105 UnknownParameter(usize),
106 ParameterNotAllowed(String),
107 WrongParameterType(usize, String, String),
108 RecursionLimit(RecursionLimitError),
109 StrconvParse(strconv::ParseError),
110 Catalog(CatalogError),
111 UpsertSinkWithoutKey,
112 UpsertSinkWithInvalidKey {
113 name: String,
114 desired_key: Vec<String>,
115 valid_keys: Vec<Vec<String>>,
116 },
117 InvalidWmrRecursionLimit(String),
118 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
119 InvalidCharLength(InvalidCharLengthError),
120 InvalidId(CatalogItemId),
121 InvalidIdent(IdentError),
122 InvalidObject(Box<ResolvedItemName>),
123 InvalidObjectType {
124 expected_type: SystemObjectType,
125 actual_type: SystemObjectType,
126 object_name: String,
127 },
128 InvalidPrivilegeTypes {
129 invalid_privileges: AclMode,
130 object_description: ErrorMessageObjectDescription,
131 },
132 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
133 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
134 InvalidSecret(Box<ResolvedItemName>),
135 InvalidTemporarySchema,
136 InvalidCast {
137 name: String,
138 ccx: CastContext,
139 from: String,
140 to: String,
141 },
142 InvalidTable {
143 name: String,
144 },
145 InvalidVersion {
146 name: String,
147 version: String,
148 },
149 InvalidSinkFrom {
150 name: String,
151 item_type: CatalogItemType,
152 },
153 MangedReplicaName(String),
154 ParserStatement(ParserStatementError),
155 Parser(ParserError),
156 DropViewOnMaterializedView(String),
157 DependentObjectsStillExist {
158 object_type: String,
159 object_name: String,
160 dependents: Vec<(String, String)>,
162 },
163 AlterViewOnMaterializedView(String),
164 ShowCreateViewOnMaterializedView(String),
165 ExplainViewOnMaterializedView(String),
166 UnacceptableTimelineName(String),
167 FetchingCsrSchemaFailed {
168 schema_lookup: String,
169 cause: Arc<dyn Error + Send + Sync>,
170 },
171 PostgresConnectionErr {
172 cause: Arc<mz_postgres_util::PostgresError>,
173 },
174 MySqlConnectionErr {
175 cause: Arc<MySqlError>,
176 },
177 SqlServerConnectionErr {
178 cause: Arc<SqlServerError>,
179 },
180 SubsourceNameConflict {
181 name: UnresolvedItemName,
182 upstream_references: Vec<UnresolvedItemName>,
183 },
184 SubsourceDuplicateReference {
185 name: UnresolvedItemName,
186 target_names: Vec<UnresolvedItemName>,
187 },
188 NoTablesFoundForSchemas(Vec<String>),
189 InvalidProtobufSchema {
190 cause: protobuf_native::OperationFailedError,
191 },
192 InvalidOptionValue {
193 option_name: String,
196 err: Box<PlanError>,
197 },
198 UnexpectedDuplicateReference {
199 name: UnresolvedItemName,
200 },
201 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
203 UnknownFunction {
204 name: String,
205 arg_types: Vec<String>,
206 },
207 IndistinctFunction {
208 name: String,
209 arg_types: Vec<String>,
210 },
211 UnknownOperator {
212 name: String,
213 arg_types: Vec<String>,
214 },
215 IndistinctOperator {
216 name: String,
217 arg_types: Vec<String>,
218 },
219 InvalidPrivatelinkAvailabilityZone {
220 name: String,
221 supported_azs: BTreeSet<String>,
222 },
223 DuplicatePrivatelinkAvailabilityZone {
224 duplicate_azs: BTreeSet<String>,
225 },
226 InvalidSchemaName,
227 ItemAlreadyExists {
228 name: String,
229 item_type: CatalogItemType,
230 },
231 ManagedCluster {
232 cluster_name: String,
233 },
234 InvalidKeysInSubscribeEnvelopeUpsert,
235 InvalidKeysInSubscribeEnvelopeDebezium,
236 InvalidPartitionByEnvelopeDebezium {
237 column_name: String,
238 },
239 InvalidOrderByInSubscribeWithinTimestampOrderBy,
240 FromValueRequiresParen,
241 VarError(VarError),
242 UnsolvablePolymorphicFunctionInput,
243 ShowCommandInView,
244 WebhookValidationDoesNotUseColumns,
245 WebhookValidationNonDeterministic,
246 InternalFunctionCall,
247 CommentTooLong {
248 length: usize,
249 max_size: usize,
250 },
251 InvalidTimestampInterval {
252 min: Duration,
253 max: Duration,
254 requested: Duration,
255 },
256 InvalidGroupSizeHints,
257 PgSourcePurification(PgSourcePurificationError),
258 KafkaSourcePurification(KafkaSourcePurificationError),
259 KafkaSinkPurification(KafkaSinkPurificationError),
260 IcebergSinkPurification(IcebergSinkPurificationError),
261 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
262 CsrPurification(CsrPurificationError),
263 MySqlSourcePurification(MySqlSourcePurificationError),
264 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
265 UseTablesForSources(String),
266 MissingName(CatalogItemType),
267 InvalidRefreshAt,
268 InvalidRefreshEveryAlignedTo,
269 CreateReplicaFailStorageObjects {
270 current_replica_count: usize,
272 internal_replica_count: usize,
274 hypothetical_replica_count: usize,
277 },
278 MismatchedObjectType {
279 name: PartialItemName,
280 is_type: ObjectType,
281 expected_type: ObjectType,
282 },
283 TableContainsUningestableTypes {
285 name: String,
286 type_: String,
287 column: String,
288 },
289 RetainHistoryLow {
290 limit: Duration,
291 },
292 RetainHistoryRequired,
293 UntilReadyTimeoutRequired,
294 SubsourceResolutionError(ExternalReferenceResolutionError),
295 Replan(String),
296 NetworkPolicyLockoutError,
297 NetworkPolicyInUse,
298 ConstantExpressionSimplificationFailed(String),
300 InvalidOffset(String),
301 UnknownCursor(String),
303 CopyFromTargetTableDropped {
304 target_name: String,
305 },
306 InvalidAsOfUpTo,
308 InvalidReplacement {
309 item_type: CatalogItemType,
310 item_name: PartialItemName,
311 replacement_type: CatalogItemType,
312 replacement_name: PartialItemName,
313 },
314 Unstructured(String),
316}
317
318impl PlanError {
319 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
320 PlanError::UngroupedColumn {
321 table: item.table_name.clone(),
322 column: item.column_name.clone(),
323 }
324 }
325
326 pub fn detail(&self) -> Option<String> {
327 match self {
328 Self::NeverSupported { details, .. } => details.clone(),
329 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
330 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
331 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
332 Self::InvalidOptionValue { err, .. } => err.detail(),
333 Self::UpsertSinkWithInvalidKey {
334 name,
335 desired_key,
336 valid_keys,
337 } => {
338 let valid_keys = if valid_keys.is_empty() {
339 "There are no known valid unique keys for the underlying relation.".into()
340 } else {
341 format!(
342 "The following keys are known to be unique for the underlying relation:\n{}",
343 valid_keys
344 .iter()
345 .map(|k|
346 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
347 )
348 .join("\n"),
349 )
350 };
351 Some(format!(
352 "Materialize could not prove that the specified upsert envelope key ({}) \
353 was a unique key of the underlying relation {}. {valid_keys}",
354 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
355 name.quoted()
356 ))
357 }
358 Self::VarError(e) => e.detail(),
359 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
360 Self::PgSourcePurification(e) => e.detail(),
361 Self::MySqlSourcePurification(e) => e.detail(),
362 Self::SqlServerSourcePurificationError(e) => e.detail(),
363 Self::KafkaSourcePurification(e) => e.detail(),
364 Self::LoadGeneratorSourcePurification(e) => e.detail(),
365 Self::CsrPurification(e) => e.detail(),
366 Self::KafkaSinkPurification(e) => e.detail(),
367 Self::IcebergSinkPurification(e) => e.detail(),
368 Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
369 Some(format!(
370 "Currently have {} replica{}{}; command would result in {}",
371 current,
372 if *current != 1 { "s" } else { "" },
373 if *internal > 0 {
374 format!(" ({} internal)", internal)
375 } else {
376 "".to_string()
377 },
378 target
379 ))
380 },
381 Self::SubsourceNameConflict {
382 name: _,
383 upstream_references,
384 } => Some(format!(
385 "referenced tables with duplicate name: {}",
386 itertools::join(upstream_references, ", ")
387 )),
388 Self::SubsourceDuplicateReference {
389 name: _,
390 target_names,
391 } => Some(format!(
392 "subsources referencing table: {}",
393 itertools::join(target_names, ", ")
394 )),
395 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
396 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
397 ),
398 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
399 "missing schemas: {}",
400 separated(", ", schemas.iter().map(|c| c.quoted()))
401 )),
402 _ => None,
403 }
404 }
405
406 pub fn hint(&self) -> Option<String> {
407 match self {
408 Self::DropViewOnMaterializedView(_) => {
409 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
410 }
411 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
412 Self::AlterViewOnMaterializedView(_) => {
413 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
414 }
415 Self::ShowCreateViewOnMaterializedView(_) => {
416 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
417 }
418 Self::ExplainViewOnMaterializedView(_) => {
419 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
420 }
421 Self::UnacceptableTimelineName(_) => {
422 Some("The prefix \"mz_\" is reserved for system timelines.".into())
423 }
424 Self::PostgresConnectionErr { cause } => {
425 if let Some(cause) = cause.source() {
426 if let Some(cause) = cause.downcast_ref::<io::Error>() {
427 if cause.kind() == io::ErrorKind::TimedOut {
428 return Some(
429 "Do you have a firewall or security group that is \
430 preventing Materialize from connecting to your PostgreSQL server?"
431 .into(),
432 );
433 }
434 }
435 }
436 None
437 }
438 Self::InvalidOptionValue { err, .. } => err.hint(),
439 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
440 Self::IndistinctFunction {..} => {
441 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
442 }
443 Self::UnknownOperator {..} => {
444 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
445 }
446 Self::IndistinctOperator {..} => {
447 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
448 },
449 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
450 let supported_azs_str = supported_azs.iter().join("\n ");
451 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
452 }
453 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
454 let duplicate_azs = duplicate_azs.iter().join("\n ");
455 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
456 }
457 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
458 Some("All keys must be columns on the underlying relation.".into())
459 }
460 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
461 Some("All keys must be columns on the underlying relation.".into())
462 }
463 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
464 Some("All order bys must be output columns.".into())
465 }
466 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
467 Some("See: https://materialize.com/s/sink-key-selection".into())
468 }
469 Self::Catalog(e) => e.hint(),
470 Self::VarError(e) => e.hint(),
471 Self::PgSourcePurification(e) => e.hint(),
472 Self::MySqlSourcePurification(e) => e.hint(),
473 Self::SqlServerSourcePurificationError(e) => e.hint(),
474 Self::KafkaSourcePurification(e) => e.hint(),
475 Self::LoadGeneratorSourcePurification(e) => e.hint(),
476 Self::CsrPurification(e) => e.hint(),
477 Self::KafkaSinkPurification(e) => e.hint(),
478 Self::UnknownColumn { table, similar, .. } => {
479 let suffix = "Make sure to surround case sensitive names in double quotes.";
480 match &similar[..] {
481 [] => None,
482 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
483 names => {
484 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
485 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
486 }
487 }
488 }
489 Self::RecursiveTypeMismatch(..) => {
490 Some("You will need to rewrite or cast the query's expressions.".into())
491 },
492 Self::InvalidRefreshAt
493 | Self::InvalidRefreshEveryAlignedTo => {
494 Some("Calling `mz_now()` is allowed.".into())
495 },
496 Self::TableContainsUningestableTypes { column,.. } => {
497 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
498 }
499 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
500 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
501 }
502 Self::NetworkPolicyInUse => {
503 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
504 }
505 Self::WrongParameterType(_, _, _) => {
506 Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context. Try adding an explicit cast.".into())
507 }
508 Self::InvalidSchemaName => {
509 Some("Use SET schema = name to select a schema. Use SHOW SCHEMAS to list available schemas. Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
510 }
511 _ => None,
512 }
513 }
514}
515
516impl fmt::Display for PlanError {
517 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
518 match self {
519 Self::Unsupported { feature, discussion_no } => {
520 write!(f, "{} not yet supported", feature)?;
521 if let Some(discussion_no) = discussion_no {
522 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
523 }
524 Ok(())
525 }
526 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
527 write!(f, "{feature} is not supported",)?;
528 if let Some(documentation_path) = documentation_path {
529 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
530 }
531 Ok(())
532 }
533 Self::UnknownColumn { table, column, similar: _ } => write!(
534 f,
535 "column {} does not exist",
536 ColumnDisplay { table, column }
537 ),
538 Self::UngroupedColumn { table, column } => write!(
539 f,
540 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
541 ColumnDisplay { table, column },
542 ),
543 Self::TypeWithoutColumns { type_name } => write!(
544 f,
545 "type {} does not have addressable columns",
546 type_name.item.quoted(),
547 ),
548 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
549 f,
550 "column {} cannot be referenced from this part of the query: \
551 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
552 ColumnDisplay { table, column },
553 ),
554 Self::AmbiguousColumn(column) => write!(
555 f,
556 "column reference {} is ambiguous",
557 column.quoted()
558 ),
559 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
560 f,
561 "attempt to create relation with too many columns, {} max: {}",
562 req_num_columns, max_num_columns
563 ),
564 Self::ColumnAlreadyExists { column_name, object_name } => write!(
565 f,
566 "column {} of relation {} already exists",
567 column_name.quoted(), object_name.quoted(),
568 ),
569 Self::AmbiguousTable(table) => write!(
570 f,
571 "table reference {} is ambiguous",
572 table.item.as_str().quoted()
573 ),
574 Self::UnknownColumnInUsingClause { column, join_side } => write!(
575 f,
576 "column {} specified in USING clause does not exist in {} table",
577 column.quoted(),
578 join_side,
579 ),
580 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
581 f,
582 "common column name {} appears more than once in {} table",
583 column.quoted(),
584 join_side,
585 ),
586 Self::MisqualifiedName(name) => write!(
587 f,
588 "qualified name did not have between 1 and 3 components: {}",
589 name
590 ),
591 Self::OverqualifiedDatabaseName(name) => write!(
592 f,
593 "database name '{}' does not have exactly one component",
594 name
595 ),
596 Self::OverqualifiedSchemaName(name) => write!(
597 f,
598 "schema name '{}' cannot have more than two components",
599 name
600 ),
601 Self::UnderqualifiedColumnName(name) => write!(
602 f,
603 "column name '{}' must have at least a table qualification",
604 name
605 ),
606 Self::UnacceptableTimelineName(name) => {
607 write!(f, "unacceptable timeline name {}", name.quoted(),)
608 }
609 Self::SubqueriesDisallowed { context } => {
610 write!(f, "{} does not allow subqueries", context)
611 }
612 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
613 Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
614 Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
615 Self::RecursionLimit(e) => write!(f, "{}", e),
616 Self::StrconvParse(e) => write!(f, "{}", e),
617 Self::Catalog(e) => write!(f, "{}", e),
618 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
619 Self::UpsertSinkWithInvalidKey { .. } => {
620 write!(f, "upsert key could not be validated as unique")
621 }
622 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
623 Self::InvalidNumericMaxScale(e) => e.fmt(f),
624 Self::InvalidCharLength(e) => e.fmt(f),
625 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
626 Self::InvalidTimestampPrecision(e) => e.fmt(f),
627 Self::Parser(e) => e.fmt(f),
628 Self::ParserStatement(e) => e.fmt(f),
629 Self::Unstructured(e) => write!(f, "{}", e),
630 Self::InvalidId(id) => write!(f, "invalid id {}", id),
631 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
632 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
633 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
634 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
635 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
636 },
637 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
638 Self::InvalidTemporarySchema => {
639 write!(f, "cannot create temporary item in non-temporary schema")
640 }
641 Self::InvalidCast { name, ccx, from, to } =>{
642 write!(
643 f,
644 "{name} does not support {ccx}casting from {from} to {to}",
645 ccx = if matches!(ccx, CastContext::Implicit) {
646 "implicitly "
647 } else {
648 ""
649 },
650 )
651 }
652 Self::InvalidTable { name } => {
653 write!(f, "invalid table definition for {}", name.quoted())
654 },
655 Self::InvalidVersion { name, version } => {
656 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
657 },
658 Self::InvalidSinkFrom { name, item_type } => {
659 write!(f, "{name} is a {item_type}, which cannot be exported as a sink")
660 },
661 Self::DropViewOnMaterializedView(name)
662 | Self::AlterViewOnMaterializedView(name)
663 | Self::ShowCreateViewOnMaterializedView(name)
664 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
665 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
666 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
667 }
668 Self::PostgresConnectionErr { .. } => {
669 write!(f, "failed to connect to PostgreSQL database")
670 }
671 Self::MySqlConnectionErr { cause } => {
672 write!(f, "failed to connect to MySQL database: {}", cause)
673 }
674 Self::SqlServerConnectionErr { cause } => {
675 write!(f, "failed to connect to SQL Server database: {}", cause)
676 }
677 Self::SubsourceNameConflict {
678 name , upstream_references: _,
679 } => {
680 write!(f, "multiple subsources would be named {}", name)
681 },
682 Self::SubsourceDuplicateReference {
683 name,
684 target_names: _,
685 } => {
686 write!(f, "multiple subsources refer to table {}", name)
687 },
688 Self::NoTablesFoundForSchemas(schemas) => {
689 write!(f, "no tables found in referenced schemas: {}",
690 separated(", ", schemas.iter().map(|c| c.quoted()))
691 )
692 },
693 Self::InvalidProtobufSchema { .. } => {
694 write!(f, "invalid protobuf schema")
695 }
696 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
697 let reason = match &dependents[..] {
698 [] => " because other objects depend on it".to_string(),
699 dependents => {
700 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
701 format!(": still depended upon by {dependents}")
702 },
703 };
704 let object_name = object_name.quoted();
705 write!(f, "cannot drop {object_type} {object_name}{reason}")
706 }
707 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
708 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
709 Self::RecursiveTypeMismatch(name, declared, inferred) => {
710 let declared = separated(", ", declared);
711 let inferred = separated(", ", inferred);
712 let name = name.quoted();
713 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
714 },
715 Self::UnknownFunction {name, arg_types, ..} => {
716 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
717 },
718 Self::IndistinctFunction {name, arg_types, ..} => {
719 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
720 },
721 Self::UnknownOperator {name, arg_types, ..} => {
722 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
723 [typ] => format!("{} {}", name, typ),
724 [ltyp, rtyp] => {
725 format!("{} {} {}", ltyp, name, rtyp)
726 }
727 _ => unreachable!("non-unary non-binary operator"),
728 })
729 },
730 Self::IndistinctOperator {name, arg_types, ..} => {
731 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
732 [typ] => format!("{} {}", name, typ),
733 [ltyp, rtyp] => {
734 format!("{} {} {}", ltyp, name, rtyp)
735 }
736 _ => unreachable!("non-unary non-binary operator"),
737 })
738 },
739 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
740 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
741 Self::InvalidSchemaName => write!(f, "no valid schema selected"),
742 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
743 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
744 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
745 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
746 }
747 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
748 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
749 }
750 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
751 write!(
752 f,
753 "PARTITION BY expression cannot refer to non-key column {}",
754 column_name.quoted(),
755 )
756 }
757 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
758 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
759 }
760 Self::FromValueRequiresParen => f.write_str(
761 "VALUES expression in FROM clause must be surrounded by parentheses"
762 ),
763 Self::VarError(e) => e.fmt(f),
764 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
765 "could not determine polymorphic type because input has type unknown"
766 ),
767 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
768 Self::WebhookValidationDoesNotUseColumns => f.write_str(
769 "expression provided in CHECK does not reference any columns"
770 ),
771 Self::WebhookValidationNonDeterministic => f.write_str(
772 "expression provided in CHECK is not deterministic"
773 ),
774 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
775 Self::CommentTooLong { length, max_size } => {
776 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
777 }
778 Self::InvalidTimestampInterval { min, max, requested } => {
779 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
780 }
781 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
782 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
783 or LIMIT INPUT GROUP SIZE"),
784 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
785 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
786 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
787 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
788 Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
789 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
790 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
791 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
792 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
793 Self::MangedReplicaName(name) => {
794 write!(f, "{name} is reserved for replicas of managed clusters")
795 }
796 Self::MissingName(item_type) => {
797 write!(f, "unspecified name for {item_type}")
798 }
799 Self::InvalidRefreshAt => {
800 write!(f, "REFRESH AT argument must be an expression that can be simplified \
801 and/or cast to a constant whose type is mz_timestamp")
802 }
803 Self::InvalidRefreshEveryAlignedTo => {
804 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
805 and/or cast to a constant whose type is mz_timestamp")
806 }
807 Self::CreateReplicaFailStorageObjects {..} => {
808 write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
809 },
810 Self::MismatchedObjectType {
811 name,
812 is_type,
813 expected_type,
814 } => {
815 write!(
816 f,
817 "{name} is {} {} not {} {}",
818 if *is_type == ObjectType::Index {
819 "an"
820 } else {
821 "a"
822 },
823 is_type.to_string().to_lowercase(),
824 if *expected_type == ObjectType::Index {
825 "an"
826 } else {
827 "a"
828 },
829 expected_type.to_string().to_lowercase()
830 )
831 }
832 Self::TableContainsUningestableTypes { name, type_, column } => {
833 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
834 },
835 Self::RetainHistoryLow { limit } => {
836 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
837 },
838 Self::RetainHistoryRequired => {
839 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
840 },
841 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
842 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
843 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
844 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
845 Self::UntilReadyTimeoutRequired => {
846 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
847 },
848 Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
849 Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
850 Self::UnknownCursor(name) => {
851 write!(f, "cursor {} does not exist", name.quoted())
852 }
853 Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
854 Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"),
855 Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => {
856 write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}")
857 }
858 }
859 }
860}
861
862impl Error for PlanError {}
863
864impl From<CatalogError> for PlanError {
865 fn from(e: CatalogError) -> PlanError {
866 PlanError::Catalog(e)
867 }
868}
869
870impl From<strconv::ParseError> for PlanError {
871 fn from(e: strconv::ParseError) -> PlanError {
872 PlanError::StrconvParse(e)
873 }
874}
875
876impl From<RecursionLimitError> for PlanError {
877 fn from(e: RecursionLimitError) -> PlanError {
878 PlanError::RecursionLimit(e)
879 }
880}
881
882impl From<InvalidNumericMaxScaleError> for PlanError {
883 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
884 PlanError::InvalidNumericMaxScale(e)
885 }
886}
887
888impl From<InvalidCharLengthError> for PlanError {
889 fn from(e: InvalidCharLengthError) -> PlanError {
890 PlanError::InvalidCharLength(e)
891 }
892}
893
894impl From<InvalidVarCharMaxLengthError> for PlanError {
895 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
896 PlanError::InvalidVarCharMaxLength(e)
897 }
898}
899
900impl From<InvalidTimestampPrecisionError> for PlanError {
901 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
902 PlanError::InvalidTimestampPrecision(e)
903 }
904}
905
906impl From<anyhow::Error> for PlanError {
907 fn from(e: anyhow::Error) -> PlanError {
908 sql_err!("{}", e.display_with_causes())
910 }
911}
912
913impl From<TryFromIntError> for PlanError {
914 fn from(e: TryFromIntError) -> PlanError {
915 sql_err!("{}", e.display_with_causes())
916 }
917}
918
919impl From<ParseIntError> for PlanError {
920 fn from(e: ParseIntError) -> PlanError {
921 sql_err!("{}", e.display_with_causes())
922 }
923}
924
925impl From<EvalError> for PlanError {
926 fn from(e: EvalError) -> PlanError {
927 sql_err!("{}", e.display_with_causes())
928 }
929}
930
931impl From<ParserError> for PlanError {
932 fn from(e: ParserError) -> PlanError {
933 PlanError::Parser(e)
934 }
935}
936
937impl From<ParserStatementError> for PlanError {
938 fn from(e: ParserStatementError) -> PlanError {
939 PlanError::ParserStatement(e)
940 }
941}
942
943impl From<PostgresError> for PlanError {
944 fn from(e: PostgresError) -> PlanError {
945 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
946 }
947}
948
949impl From<MySqlError> for PlanError {
950 fn from(e: MySqlError) -> PlanError {
951 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
952 }
953}
954
955impl From<SqlServerError> for PlanError {
956 fn from(e: SqlServerError) -> PlanError {
957 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
958 }
959}
960
961impl From<VarError> for PlanError {
962 fn from(e: VarError) -> Self {
963 PlanError::VarError(e)
964 }
965}
966
967impl From<PgSourcePurificationError> for PlanError {
968 fn from(e: PgSourcePurificationError) -> Self {
969 PlanError::PgSourcePurification(e)
970 }
971}
972
973impl From<KafkaSourcePurificationError> for PlanError {
974 fn from(e: KafkaSourcePurificationError) -> Self {
975 PlanError::KafkaSourcePurification(e)
976 }
977}
978
979impl From<KafkaSinkPurificationError> for PlanError {
980 fn from(e: KafkaSinkPurificationError) -> Self {
981 PlanError::KafkaSinkPurification(e)
982 }
983}
984
985impl From<IcebergSinkPurificationError> for PlanError {
986 fn from(e: IcebergSinkPurificationError) -> Self {
987 PlanError::IcebergSinkPurification(e)
988 }
989}
990
991impl From<CsrPurificationError> for PlanError {
992 fn from(e: CsrPurificationError) -> Self {
993 PlanError::CsrPurification(e)
994 }
995}
996
997impl From<LoadGeneratorSourcePurificationError> for PlanError {
998 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
999 PlanError::LoadGeneratorSourcePurification(e)
1000 }
1001}
1002
1003impl From<MySqlSourcePurificationError> for PlanError {
1004 fn from(e: MySqlSourcePurificationError) -> Self {
1005 PlanError::MySqlSourcePurification(e)
1006 }
1007}
1008
1009impl From<SqlServerSourcePurificationError> for PlanError {
1010 fn from(e: SqlServerSourcePurificationError) -> Self {
1011 PlanError::SqlServerSourcePurificationError(e)
1012 }
1013}
1014
1015impl From<IdentError> for PlanError {
1016 fn from(e: IdentError) -> Self {
1017 PlanError::InvalidIdent(e)
1018 }
1019}
1020
1021impl From<ExternalReferenceResolutionError> for PlanError {
1022 fn from(e: ExternalReferenceResolutionError) -> Self {
1023 PlanError::SubsourceResolutionError(e)
1024 }
1025}
1026
1027struct ColumnDisplay<'a> {
1028 table: &'a Option<PartialItemName>,
1029 column: &'a ColumnName,
1030}
1031
1032impl<'a> fmt::Display for ColumnDisplay<'a> {
1033 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1034 if let Some(table) = &self.table {
1035 format!("{}.{}", table.item, self.column).quoted().fmt(f)
1036 } else {
1037 self.column.quoted().fmt(f)
1038 }
1039 }
1040}