1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
46 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
47 SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Clone, Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 WrongJoinTypeForLateralColumn {
74 table: Option<PartialItemName>,
75 column: ColumnName,
76 },
77 AmbiguousColumn(ColumnName),
78 TooManyColumns {
79 max_num_columns: usize,
80 req_num_columns: usize,
81 },
82 ColumnAlreadyExists {
83 column_name: ColumnName,
84 object_name: String,
85 },
86 AmbiguousTable(PartialItemName),
87 UnknownColumnInUsingClause {
88 column: ColumnName,
89 join_side: JoinSide,
90 },
91 AmbiguousColumnInUsingClause {
92 column: ColumnName,
93 join_side: JoinSide,
94 },
95 MisqualifiedName(String),
96 OverqualifiedDatabaseName(String),
97 OverqualifiedSchemaName(String),
98 UnderqualifiedColumnName(String),
99 SubqueriesDisallowed {
100 context: String,
101 },
102 UnknownParameter(usize),
103 RecursionLimit(RecursionLimitError),
104 StrconvParse(strconv::ParseError),
105 Catalog(CatalogError),
106 UpsertSinkWithoutKey,
107 UpsertSinkWithInvalidKey {
108 name: String,
109 desired_key: Vec<String>,
110 valid_keys: Vec<Vec<String>>,
111 },
112 InvalidWmrRecursionLimit(String),
113 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
114 InvalidCharLength(InvalidCharLengthError),
115 InvalidId(CatalogItemId),
116 InvalidIdent(IdentError),
117 InvalidObject(Box<ResolvedItemName>),
118 InvalidObjectType {
119 expected_type: SystemObjectType,
120 actual_type: SystemObjectType,
121 object_name: String,
122 },
123 InvalidPrivilegeTypes {
124 invalid_privileges: AclMode,
125 object_description: ErrorMessageObjectDescription,
126 },
127 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
128 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
129 InvalidSecret(Box<ResolvedItemName>),
130 InvalidTemporarySchema,
131 InvalidCast {
132 name: String,
133 ccx: CastContext,
134 from: String,
135 to: String,
136 },
137 InvalidTable {
138 name: String,
139 },
140 InvalidVersion {
141 name: String,
142 version: String,
143 },
144 MangedReplicaName(String),
145 ParserStatement(ParserStatementError),
146 Parser(ParserError),
147 DropViewOnMaterializedView(String),
148 DependentObjectsStillExist {
149 object_type: String,
150 object_name: String,
151 dependents: Vec<(String, String)>,
153 },
154 AlterViewOnMaterializedView(String),
155 ShowCreateViewOnMaterializedView(String),
156 ExplainViewOnMaterializedView(String),
157 UnacceptableTimelineName(String),
158 FetchingCsrSchemaFailed {
159 schema_lookup: String,
160 cause: Arc<dyn Error + Send + Sync>,
161 },
162 PostgresConnectionErr {
163 cause: Arc<mz_postgres_util::PostgresError>,
164 },
165 MySqlConnectionErr {
166 cause: Arc<MySqlError>,
167 },
168 SqlServerConnectionErr {
169 cause: Arc<SqlServerError>,
170 },
171 SubsourceNameConflict {
172 name: UnresolvedItemName,
173 upstream_references: Vec<UnresolvedItemName>,
174 },
175 SubsourceDuplicateReference {
176 name: UnresolvedItemName,
177 target_names: Vec<UnresolvedItemName>,
178 },
179 NoTablesFoundForSchemas(Vec<String>),
180 InvalidProtobufSchema {
181 cause: protobuf_native::OperationFailedError,
182 },
183 InvalidOptionValue {
184 option_name: String,
187 err: Box<PlanError>,
188 },
189 UnexpectedDuplicateReference {
190 name: UnresolvedItemName,
191 },
192 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
194 UnknownFunction {
195 name: String,
196 arg_types: Vec<String>,
197 },
198 IndistinctFunction {
199 name: String,
200 arg_types: Vec<String>,
201 },
202 UnknownOperator {
203 name: String,
204 arg_types: Vec<String>,
205 },
206 IndistinctOperator {
207 name: String,
208 arg_types: Vec<String>,
209 },
210 InvalidPrivatelinkAvailabilityZone {
211 name: String,
212 supported_azs: BTreeSet<String>,
213 },
214 DuplicatePrivatelinkAvailabilityZone {
215 duplicate_azs: BTreeSet<String>,
216 },
217 InvalidSchemaName,
218 ItemAlreadyExists {
219 name: String,
220 item_type: CatalogItemType,
221 },
222 ManagedCluster {
223 cluster_name: String,
224 },
225 InvalidKeysInSubscribeEnvelopeUpsert,
226 InvalidKeysInSubscribeEnvelopeDebezium,
227 InvalidPartitionByEnvelopeDebezium {
228 column_name: String,
229 },
230 InvalidOrderByInSubscribeWithinTimestampOrderBy,
231 FromValueRequiresParen,
232 VarError(VarError),
233 UnsolvablePolymorphicFunctionInput,
234 ShowCommandInView,
235 WebhookValidationDoesNotUseColumns,
236 WebhookValidationNonDeterministic,
237 InternalFunctionCall,
238 CommentTooLong {
239 length: usize,
240 max_size: usize,
241 },
242 InvalidTimestampInterval {
243 min: Duration,
244 max: Duration,
245 requested: Duration,
246 },
247 InvalidGroupSizeHints,
248 PgSourcePurification(PgSourcePurificationError),
249 KafkaSourcePurification(KafkaSourcePurificationError),
250 KafkaSinkPurification(KafkaSinkPurificationError),
251 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
252 CsrPurification(CsrPurificationError),
253 MySqlSourcePurification(MySqlSourcePurificationError),
254 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
255 UseTablesForSources(String),
256 MissingName(CatalogItemType),
257 InvalidRefreshAt,
258 InvalidRefreshEveryAlignedTo,
259 CreateReplicaFailStorageObjects {
260 current_replica_count: usize,
262 internal_replica_count: usize,
264 hypothetical_replica_count: usize,
267 },
268 MismatchedObjectType {
269 name: PartialItemName,
270 is_type: ObjectType,
271 expected_type: ObjectType,
272 },
273 TableContainsUningestableTypes {
275 name: String,
276 type_: String,
277 column: String,
278 },
279 RetainHistoryLow {
280 limit: Duration,
281 },
282 RetainHistoryRequired,
283 UntilReadyTimeoutRequired,
284 SubsourceResolutionError(ExternalReferenceResolutionError),
285 Replan(String),
286 NetworkPolicyLockoutError,
287 NetworkPolicyInUse,
288 Unstructured(String),
290}
291
292impl PlanError {
293 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
294 PlanError::UngroupedColumn {
295 table: item.table_name.clone(),
296 column: item.column_name.clone(),
297 }
298 }
299
300 pub fn detail(&self) -> Option<String> {
301 match self {
302 Self::NeverSupported { details, .. } => details.clone(),
303 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
304 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
305 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
306 Self::InvalidOptionValue { err, .. } => err.detail(),
307 Self::UpsertSinkWithInvalidKey {
308 name,
309 desired_key,
310 valid_keys,
311 } => {
312 let valid_keys = if valid_keys.is_empty() {
313 "There are no known valid unique keys for the underlying relation.".into()
314 } else {
315 format!(
316 "The following keys are known to be unique for the underlying relation:\n{}",
317 valid_keys
318 .iter()
319 .map(|k|
320 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
321 )
322 .join("\n"),
323 )
324 };
325 Some(format!(
326 "Materialize could not prove that the specified upsert envelope key ({}) \
327 was a unique key of the underlying relation {}. {valid_keys}",
328 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
329 name.quoted()
330 ))
331 }
332 Self::VarError(e) => e.detail(),
333 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
334 Self::PgSourcePurification(e) => e.detail(),
335 Self::MySqlSourcePurification(e) => e.detail(),
336 Self::SqlServerSourcePurificationError(e) => e.detail(),
337 Self::KafkaSourcePurification(e) => e.detail(),
338 Self::LoadGeneratorSourcePurification(e) => e.detail(),
339 Self::CsrPurification(e) => e.detail(),
340 Self::KafkaSinkPurification(e) => e.detail(),
341 Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
342 Some(format!(
343 "Currently have {} replica{}{}; command would result in {}",
344 current,
345 if *current != 1 { "s" } else { "" },
346 if *internal > 0 {
347 format!(" ({} internal)", internal)
348 } else {
349 "".to_string()
350 },
351 target
352 ))
353 },
354 Self::SubsourceNameConflict {
355 name: _,
356 upstream_references,
357 } => Some(format!(
358 "referenced tables with duplicate name: {}",
359 itertools::join(upstream_references, ", ")
360 )),
361 Self::SubsourceDuplicateReference {
362 name: _,
363 target_names,
364 } => Some(format!(
365 "subsources referencing table: {}",
366 itertools::join(target_names, ", ")
367 )),
368 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
369 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
370 ),
371 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
372 "missing schemas: {}",
373 separated(", ", schemas.iter().map(|c| c.quoted()))
374 )),
375 _ => None,
376 }
377 }
378
379 pub fn hint(&self) -> Option<String> {
380 match self {
381 Self::DropViewOnMaterializedView(_) => {
382 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
383 }
384 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
385 Self::AlterViewOnMaterializedView(_) => {
386 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
387 }
388 Self::ShowCreateViewOnMaterializedView(_) => {
389 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
390 }
391 Self::ExplainViewOnMaterializedView(_) => {
392 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
393 }
394 Self::UnacceptableTimelineName(_) => {
395 Some("The prefix \"mz_\" is reserved for system timelines.".into())
396 }
397 Self::PostgresConnectionErr { cause } => {
398 if let Some(cause) = cause.source() {
399 if let Some(cause) = cause.downcast_ref::<io::Error>() {
400 if cause.kind() == io::ErrorKind::TimedOut {
401 return Some(
402 "Do you have a firewall or security group that is \
403 preventing Materialize from connecting to your PostgreSQL server?"
404 .into(),
405 );
406 }
407 }
408 }
409 None
410 }
411 Self::InvalidOptionValue { err, .. } => err.hint(),
412 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
413 Self::IndistinctFunction {..} => {
414 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
415 }
416 Self::UnknownOperator {..} => {
417 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
418 }
419 Self::IndistinctOperator {..} => {
420 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
421 },
422 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
423 let supported_azs_str = supported_azs.iter().join("\n ");
424 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
425 }
426 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
427 let duplicate_azs = duplicate_azs.iter().join("\n ");
428 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
429 }
430 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
431 Some("All keys must be columns on the underlying relation.".into())
432 }
433 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
434 Some("All keys must be columns on the underlying relation.".into())
435 }
436 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
437 Some("All order bys must be output columns.".into())
438 }
439 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
440 Some("See: https://materialize.com/s/sink-key-selection".into())
441 }
442 Self::Catalog(e) => e.hint(),
443 Self::VarError(e) => e.hint(),
444 Self::PgSourcePurification(e) => e.hint(),
445 Self::MySqlSourcePurification(e) => e.hint(),
446 Self::SqlServerSourcePurificationError(e) => e.hint(),
447 Self::KafkaSourcePurification(e) => e.hint(),
448 Self::LoadGeneratorSourcePurification(e) => e.hint(),
449 Self::CsrPurification(e) => e.hint(),
450 Self::KafkaSinkPurification(e) => e.hint(),
451 Self::UnknownColumn { table, similar, .. } => {
452 let suffix = "Make sure to surround case sensitive names in double quotes.";
453 match &similar[..] {
454 [] => None,
455 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
456 names => {
457 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
458 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
459 }
460 }
461 }
462 Self::RecursiveTypeMismatch(..) => {
463 Some("You will need to rewrite or cast the query's expressions.".into())
464 },
465 Self::InvalidRefreshAt
466 | Self::InvalidRefreshEveryAlignedTo => {
467 Some("Calling `mz_now()` is allowed.".into())
468 },
469 Self::TableContainsUningestableTypes { column,.. } => {
470 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
471 }
472 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
473 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
474 }
475 Self::NetworkPolicyInUse => {
476 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
477 }
478 _ => None,
479 }
480 }
481}
482
483impl fmt::Display for PlanError {
484 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485 match self {
486 Self::Unsupported { feature, discussion_no } => {
487 write!(f, "{} not yet supported", feature)?;
488 if let Some(discussion_no) = discussion_no {
489 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
490 }
491 Ok(())
492 }
493 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
494 write!(f, "{feature} is not supported",)?;
495 if let Some(documentation_path) = documentation_path {
496 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
497 }
498 Ok(())
499 }
500 Self::UnknownColumn { table, column, similar: _ } => write!(
501 f,
502 "column {} does not exist",
503 ColumnDisplay { table, column }
504 ),
505 Self::UngroupedColumn { table, column } => write!(
506 f,
507 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
508 ColumnDisplay { table, column },
509 ),
510 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
511 f,
512 "column {} cannot be referenced from this part of the query: \
513 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
514 ColumnDisplay { table, column },
515 ),
516 Self::AmbiguousColumn(column) => write!(
517 f,
518 "column reference {} is ambiguous",
519 column.as_str().quoted()
520 ),
521 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
522 f,
523 "attempt to create relation with too many columns, {} max: {}",
524 req_num_columns, max_num_columns
525 ),
526 Self::ColumnAlreadyExists { column_name, object_name } => write!(
527 f,
528 "column {} of relation {} already exists",
529 column_name.as_str().quoted(), object_name.quoted(),
530 ),
531 Self::AmbiguousTable(table) => write!(
532 f,
533 "table reference {} is ambiguous",
534 table.item.as_str().quoted()
535 ),
536 Self::UnknownColumnInUsingClause { column, join_side } => write!(
537 f,
538 "column {} specified in USING clause does not exist in {} table",
539 column.as_str().quoted(),
540 join_side,
541 ),
542 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
543 f,
544 "common column name {} appears more than once in {} table",
545 column.as_str().quoted(),
546 join_side,
547 ),
548 Self::MisqualifiedName(name) => write!(
549 f,
550 "qualified name did not have between 1 and 3 components: {}",
551 name
552 ),
553 Self::OverqualifiedDatabaseName(name) => write!(
554 f,
555 "database name '{}' does not have exactly one component",
556 name
557 ),
558 Self::OverqualifiedSchemaName(name) => write!(
559 f,
560 "schema name '{}' cannot have more than two components",
561 name
562 ),
563 Self::UnderqualifiedColumnName(name) => write!(
564 f,
565 "column name '{}' must have at least a table qualification",
566 name
567 ),
568 Self::UnacceptableTimelineName(name) => {
569 write!(f, "unacceptable timeline name {}", name.quoted(),)
570 }
571 Self::SubqueriesDisallowed { context } => {
572 write!(f, "{} does not allow subqueries", context)
573 }
574 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
575 Self::RecursionLimit(e) => write!(f, "{}", e),
576 Self::StrconvParse(e) => write!(f, "{}", e),
577 Self::Catalog(e) => write!(f, "{}", e),
578 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
579 Self::UpsertSinkWithInvalidKey { .. } => {
580 write!(f, "upsert key could not be validated as unique")
581 }
582 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
583 Self::InvalidNumericMaxScale(e) => e.fmt(f),
584 Self::InvalidCharLength(e) => e.fmt(f),
585 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
586 Self::InvalidTimestampPrecision(e) => e.fmt(f),
587 Self::Parser(e) => e.fmt(f),
588 Self::ParserStatement(e) => e.fmt(f),
589 Self::Unstructured(e) => write!(f, "{}", e),
590 Self::InvalidId(id) => write!(f, "invalid id {}", id),
591 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
592 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
593 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
594 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
595 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
596 },
597 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
598 Self::InvalidTemporarySchema => {
599 write!(f, "cannot create temporary item in non-temporary schema")
600 }
601 Self::InvalidCast { name, ccx, from, to } =>{
602 write!(
603 f,
604 "{name} does not support {ccx}casting from {from} to {to}",
605 ccx = if matches!(ccx, CastContext::Implicit) {
606 "implicitly "
607 } else {
608 ""
609 },
610 )
611 }
612 Self::InvalidTable { name } => {
613 write!(f, "invalid table definition for {}", name.quoted())
614 },
615 Self::InvalidVersion { name, version } => {
616 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
617 },
618 Self::DropViewOnMaterializedView(name)
619 | Self::AlterViewOnMaterializedView(name)
620 | Self::ShowCreateViewOnMaterializedView(name)
621 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
622 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
623 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
624 }
625 Self::PostgresConnectionErr { .. } => {
626 write!(f, "failed to connect to PostgreSQL database")
627 }
628 Self::MySqlConnectionErr { cause } => {
629 write!(f, "failed to connect to MySQL database: {}", cause)
630 }
631 Self::SqlServerConnectionErr { cause } => {
632 write!(f, "failed to connect to SQL Server database: {}", cause)
633 }
634 Self::SubsourceNameConflict {
635 name , upstream_references: _,
636 } => {
637 write!(f, "multiple subsources would be named {}", name)
638 },
639 Self::SubsourceDuplicateReference {
640 name,
641 target_names: _,
642 } => {
643 write!(f, "multiple subsources refer to table {}", name)
644 },
645 Self::NoTablesFoundForSchemas(schemas) => {
646 write!(f, "no tables found in referenced schemas: {}",
647 separated(", ", schemas.iter().map(|c| c.quoted()))
648 )
649 },
650 Self::InvalidProtobufSchema { .. } => {
651 write!(f, "invalid protobuf schema")
652 }
653 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
654 let reason = match &dependents[..] {
655 [] => " because other objects depend on it".to_string(),
656 dependents => {
657 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
658 format!(": still depended upon by {dependents}")
659 },
660 };
661 let object_name = object_name.quoted();
662 write!(f, "cannot drop {object_type} {object_name}{reason}")
663 }
664 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
665 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
666 Self::RecursiveTypeMismatch(name, declared, inferred) => {
667 let declared = separated(", ", declared);
668 let inferred = separated(", ", inferred);
669 let name = name.quoted();
670 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
671 },
672 Self::UnknownFunction {name, arg_types, ..} => {
673 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
674 },
675 Self::IndistinctFunction {name, arg_types, ..} => {
676 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
677 },
678 Self::UnknownOperator {name, arg_types, ..} => {
679 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
680 [typ] => format!("{} {}", name, typ),
681 [ltyp, rtyp] => {
682 format!("{} {} {}", ltyp, name, rtyp)
683 }
684 _ => unreachable!("non-unary non-binary operator"),
685 })
686 },
687 Self::IndistinctOperator {name, arg_types, ..} => {
688 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
689 [typ] => format!("{} {}", name, typ),
690 [ltyp, rtyp] => {
691 format!("{} {} {}", ltyp, name, rtyp)
692 }
693 _ => unreachable!("non-unary non-binary operator"),
694 })
695 },
696 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
697 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
698 Self::InvalidSchemaName => write!(f, "no schema has been selected to create in"),
699 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
700 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
701 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
702 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
703 }
704 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
705 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
706 }
707 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
708 write!(
709 f,
710 "PARTITION BY expression cannot refer to non-key column {}",
711 column_name.quoted(),
712 )
713 }
714 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
715 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
716 }
717 Self::FromValueRequiresParen => f.write_str(
718 "VALUES expression in FROM clause must be surrounded by parentheses"
719 ),
720 Self::VarError(e) => e.fmt(f),
721 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
722 "could not determine polymorphic type because input has type unknown"
723 ),
724 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
725 Self::WebhookValidationDoesNotUseColumns => f.write_str(
726 "expression provided in CHECK does not reference any columns"
727 ),
728 Self::WebhookValidationNonDeterministic => f.write_str(
729 "expression provided in CHECK is not deterministic"
730 ),
731 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
732 Self::CommentTooLong { length, max_size } => {
733 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
734 }
735 Self::InvalidTimestampInterval { min, max, requested } => {
736 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
737 }
738 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
739 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
740 or LIMIT INPUT GROUP SIZE"),
741 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
742 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
743 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
744 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
745 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
746 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
747 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
748 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
749 Self::MangedReplicaName(name) => {
750 write!(f, "{name} is reserved for replicas of managed clusters")
751 }
752 Self::MissingName(item_type) => {
753 write!(f, "unspecified name for {item_type}")
754 }
755 Self::InvalidRefreshAt => {
756 write!(f, "REFRESH AT argument must be an expression that can be simplified \
757 and/or cast to a constant whose type is mz_timestamp")
758 }
759 Self::InvalidRefreshEveryAlignedTo => {
760 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
761 and/or cast to a constant whose type is mz_timestamp")
762 }
763 Self::CreateReplicaFailStorageObjects {..} => {
764 write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
765 },
766 Self::MismatchedObjectType {
767 name,
768 is_type,
769 expected_type,
770 } => {
771 write!(
772 f,
773 "{name} is {} {} not {} {}",
774 if *is_type == ObjectType::Index {
775 "an"
776 } else {
777 "a"
778 },
779 is_type.to_string().to_lowercase(),
780 if *expected_type == ObjectType::Index {
781 "an"
782 } else {
783 "a"
784 },
785 expected_type.to_string().to_lowercase()
786 )
787 }
788 Self::TableContainsUningestableTypes { name, type_, column } => {
789 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
790 },
791 Self::RetainHistoryLow { limit } => {
792 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
793 },
794 Self::RetainHistoryRequired => {
795 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
796 },
797 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
798 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
799 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
800 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
801 Self::UntilReadyTimeoutRequired => {
802 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
803 },
804 }
805 }
806}
807
808impl Error for PlanError {}
809
810impl From<CatalogError> for PlanError {
811 fn from(e: CatalogError) -> PlanError {
812 PlanError::Catalog(e)
813 }
814}
815
816impl From<strconv::ParseError> for PlanError {
817 fn from(e: strconv::ParseError) -> PlanError {
818 PlanError::StrconvParse(e)
819 }
820}
821
822impl From<RecursionLimitError> for PlanError {
823 fn from(e: RecursionLimitError) -> PlanError {
824 PlanError::RecursionLimit(e)
825 }
826}
827
828impl From<InvalidNumericMaxScaleError> for PlanError {
829 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
830 PlanError::InvalidNumericMaxScale(e)
831 }
832}
833
834impl From<InvalidCharLengthError> for PlanError {
835 fn from(e: InvalidCharLengthError) -> PlanError {
836 PlanError::InvalidCharLength(e)
837 }
838}
839
840impl From<InvalidVarCharMaxLengthError> for PlanError {
841 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
842 PlanError::InvalidVarCharMaxLength(e)
843 }
844}
845
846impl From<InvalidTimestampPrecisionError> for PlanError {
847 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
848 PlanError::InvalidTimestampPrecision(e)
849 }
850}
851
852impl From<anyhow::Error> for PlanError {
853 fn from(e: anyhow::Error) -> PlanError {
854 sql_err!("{}", e.display_with_causes())
856 }
857}
858
859impl From<TryFromIntError> for PlanError {
860 fn from(e: TryFromIntError) -> PlanError {
861 sql_err!("{}", e.display_with_causes())
862 }
863}
864
865impl From<ParseIntError> for PlanError {
866 fn from(e: ParseIntError) -> PlanError {
867 sql_err!("{}", e.display_with_causes())
868 }
869}
870
871impl From<EvalError> for PlanError {
872 fn from(e: EvalError) -> PlanError {
873 sql_err!("{}", e.display_with_causes())
874 }
875}
876
877impl From<ParserError> for PlanError {
878 fn from(e: ParserError) -> PlanError {
879 PlanError::Parser(e)
880 }
881}
882
883impl From<ParserStatementError> for PlanError {
884 fn from(e: ParserStatementError) -> PlanError {
885 PlanError::ParserStatement(e)
886 }
887}
888
889impl From<PostgresError> for PlanError {
890 fn from(e: PostgresError) -> PlanError {
891 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
892 }
893}
894
895impl From<MySqlError> for PlanError {
896 fn from(e: MySqlError) -> PlanError {
897 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
898 }
899}
900
901impl From<SqlServerError> for PlanError {
902 fn from(e: SqlServerError) -> PlanError {
903 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
904 }
905}
906
907impl From<VarError> for PlanError {
908 fn from(e: VarError) -> Self {
909 PlanError::VarError(e)
910 }
911}
912
913impl From<PgSourcePurificationError> for PlanError {
914 fn from(e: PgSourcePurificationError) -> Self {
915 PlanError::PgSourcePurification(e)
916 }
917}
918
919impl From<KafkaSourcePurificationError> for PlanError {
920 fn from(e: KafkaSourcePurificationError) -> Self {
921 PlanError::KafkaSourcePurification(e)
922 }
923}
924
925impl From<KafkaSinkPurificationError> for PlanError {
926 fn from(e: KafkaSinkPurificationError) -> Self {
927 PlanError::KafkaSinkPurification(e)
928 }
929}
930
931impl From<CsrPurificationError> for PlanError {
932 fn from(e: CsrPurificationError) -> Self {
933 PlanError::CsrPurification(e)
934 }
935}
936
937impl From<LoadGeneratorSourcePurificationError> for PlanError {
938 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
939 PlanError::LoadGeneratorSourcePurification(e)
940 }
941}
942
943impl From<MySqlSourcePurificationError> for PlanError {
944 fn from(e: MySqlSourcePurificationError) -> Self {
945 PlanError::MySqlSourcePurification(e)
946 }
947}
948
949impl From<SqlServerSourcePurificationError> for PlanError {
950 fn from(e: SqlServerSourcePurificationError) -> Self {
951 PlanError::SqlServerSourcePurificationError(e)
952 }
953}
954
955impl From<IdentError> for PlanError {
956 fn from(e: IdentError) -> Self {
957 PlanError::InvalidIdent(e)
958 }
959}
960
961impl From<ExternalReferenceResolutionError> for PlanError {
962 fn from(e: ExternalReferenceResolutionError) -> Self {
963 PlanError::SubsourceResolutionError(e)
964 }
965}
966
967struct ColumnDisplay<'a> {
968 table: &'a Option<PartialItemName>,
969 column: &'a ColumnName,
970}
971
972impl<'a> fmt::Display for ColumnDisplay<'a> {
973 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
974 if let Some(table) = &self.table {
975 format!("{}.{}", table.item, self.column).quoted().fmt(f)
976 } else {
977 self.column.as_str().quoted().fmt(f)
978 }
979 }
980}