1use std::collections::BTreeSet;
11use std::error::Error;
12use std::num::{ParseIntError, TryFromIntError};
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, io};
16
17use itertools::Itertools;
18use mz_expr::EvalError;
19use mz_mysql_util::MySqlError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::{StrExt, separated};
23use mz_postgres_util::PostgresError;
24use mz_repr::adt::char::InvalidCharLengthError;
25use mz_repr::adt::mz_acl_item::AclMode;
26use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
27use mz_repr::adt::timestamp::InvalidTimestampPrecisionError;
28use mz_repr::adt::varchar::InvalidVarCharMaxLengthError;
29use mz_repr::{CatalogItemId, ColumnName, strconv};
30use mz_sql_parser::ast::display::AstDisplay;
31use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
32use mz_sql_parser::parser::{ParserError, ParserStatementError};
33use mz_sql_server_util::SqlServerError;
34use mz_storage_types::sources::ExternalReferenceResolutionError;
35
36use crate::catalog::{
37 CatalogError, CatalogItemType, ErrorMessageObjectDescription, SystemObjectType,
38};
39use crate::names::{PartialItemName, ResolvedItemName};
40use crate::plan::ObjectType;
41use crate::plan::plan_utils::JoinSide;
42use crate::plan::scope::ScopeItem;
43use crate::plan::typeconv::CastContext;
44use crate::pure::error::{
45 CsrPurificationError, IcebergSinkPurificationError, KafkaSinkPurificationError,
46 KafkaSourcePurificationError, LoadGeneratorSourcePurificationError,
47 MySqlSourcePurificationError, PgSourcePurificationError, SqlServerSourcePurificationError,
48};
49use crate::session::vars::VarError;
50
51#[derive(Debug)]
52pub enum PlanError {
53 Unsupported {
55 feature: String,
56 discussion_no: Option<usize>,
57 },
58 NeverSupported {
60 feature: String,
61 documentation_link: Option<String>,
62 details: Option<String>,
63 },
64 UnknownColumn {
65 table: Option<PartialItemName>,
66 column: ColumnName,
67 similar: Box<[ColumnName]>,
68 },
69 UngroupedColumn {
70 table: Option<PartialItemName>,
71 column: ColumnName,
72 },
73 WrongJoinTypeForLateralColumn {
74 table: Option<PartialItemName>,
75 column: ColumnName,
76 },
77 AmbiguousColumn(ColumnName),
78 TooManyColumns {
79 max_num_columns: usize,
80 req_num_columns: usize,
81 },
82 ColumnAlreadyExists {
83 column_name: ColumnName,
84 object_name: String,
85 },
86 AmbiguousTable(PartialItemName),
87 UnknownColumnInUsingClause {
88 column: ColumnName,
89 join_side: JoinSide,
90 },
91 AmbiguousColumnInUsingClause {
92 column: ColumnName,
93 join_side: JoinSide,
94 },
95 MisqualifiedName(String),
96 OverqualifiedDatabaseName(String),
97 OverqualifiedSchemaName(String),
98 UnderqualifiedColumnName(String),
99 SubqueriesDisallowed {
100 context: String,
101 },
102 UnknownParameter(usize),
103 ParameterNotAllowed(String),
104 WrongParameterType(usize, String, String),
105 RecursionLimit(RecursionLimitError),
106 StrconvParse(strconv::ParseError),
107 Catalog(CatalogError),
108 UpsertSinkWithoutKey,
109 UpsertSinkWithInvalidKey {
110 name: String,
111 desired_key: Vec<String>,
112 valid_keys: Vec<Vec<String>>,
113 },
114 InvalidWmrRecursionLimit(String),
115 InvalidNumericMaxScale(InvalidNumericMaxScaleError),
116 InvalidCharLength(InvalidCharLengthError),
117 InvalidId(CatalogItemId),
118 InvalidIdent(IdentError),
119 InvalidObject(Box<ResolvedItemName>),
120 InvalidObjectType {
121 expected_type: SystemObjectType,
122 actual_type: SystemObjectType,
123 object_name: String,
124 },
125 InvalidPrivilegeTypes {
126 invalid_privileges: AclMode,
127 object_description: ErrorMessageObjectDescription,
128 },
129 InvalidVarCharMaxLength(InvalidVarCharMaxLengthError),
130 InvalidTimestampPrecision(InvalidTimestampPrecisionError),
131 InvalidSecret(Box<ResolvedItemName>),
132 InvalidTemporarySchema,
133 InvalidCast {
134 name: String,
135 ccx: CastContext,
136 from: String,
137 to: String,
138 },
139 InvalidTable {
140 name: String,
141 },
142 InvalidVersion {
143 name: String,
144 version: String,
145 },
146 MangedReplicaName(String),
147 ParserStatement(ParserStatementError),
148 Parser(ParserError),
149 DropViewOnMaterializedView(String),
150 DependentObjectsStillExist {
151 object_type: String,
152 object_name: String,
153 dependents: Vec<(String, String)>,
155 },
156 AlterViewOnMaterializedView(String),
157 ShowCreateViewOnMaterializedView(String),
158 ExplainViewOnMaterializedView(String),
159 UnacceptableTimelineName(String),
160 FetchingCsrSchemaFailed {
161 schema_lookup: String,
162 cause: Arc<dyn Error + Send + Sync>,
163 },
164 PostgresConnectionErr {
165 cause: Arc<mz_postgres_util::PostgresError>,
166 },
167 MySqlConnectionErr {
168 cause: Arc<MySqlError>,
169 },
170 SqlServerConnectionErr {
171 cause: Arc<SqlServerError>,
172 },
173 SubsourceNameConflict {
174 name: UnresolvedItemName,
175 upstream_references: Vec<UnresolvedItemName>,
176 },
177 SubsourceDuplicateReference {
178 name: UnresolvedItemName,
179 target_names: Vec<UnresolvedItemName>,
180 },
181 NoTablesFoundForSchemas(Vec<String>),
182 InvalidProtobufSchema {
183 cause: protobuf_native::OperationFailedError,
184 },
185 InvalidOptionValue {
186 option_name: String,
189 err: Box<PlanError>,
190 },
191 UnexpectedDuplicateReference {
192 name: UnresolvedItemName,
193 },
194 RecursiveTypeMismatch(String, Vec<String>, Vec<String>),
196 UnknownFunction {
197 name: String,
198 arg_types: Vec<String>,
199 },
200 IndistinctFunction {
201 name: String,
202 arg_types: Vec<String>,
203 },
204 UnknownOperator {
205 name: String,
206 arg_types: Vec<String>,
207 },
208 IndistinctOperator {
209 name: String,
210 arg_types: Vec<String>,
211 },
212 InvalidPrivatelinkAvailabilityZone {
213 name: String,
214 supported_azs: BTreeSet<String>,
215 },
216 DuplicatePrivatelinkAvailabilityZone {
217 duplicate_azs: BTreeSet<String>,
218 },
219 InvalidSchemaName,
220 ItemAlreadyExists {
221 name: String,
222 item_type: CatalogItemType,
223 },
224 ManagedCluster {
225 cluster_name: String,
226 },
227 InvalidKeysInSubscribeEnvelopeUpsert,
228 InvalidKeysInSubscribeEnvelopeDebezium,
229 InvalidPartitionByEnvelopeDebezium {
230 column_name: String,
231 },
232 InvalidOrderByInSubscribeWithinTimestampOrderBy,
233 FromValueRequiresParen,
234 VarError(VarError),
235 UnsolvablePolymorphicFunctionInput,
236 ShowCommandInView,
237 WebhookValidationDoesNotUseColumns,
238 WebhookValidationNonDeterministic,
239 InternalFunctionCall,
240 CommentTooLong {
241 length: usize,
242 max_size: usize,
243 },
244 InvalidTimestampInterval {
245 min: Duration,
246 max: Duration,
247 requested: Duration,
248 },
249 InvalidGroupSizeHints,
250 PgSourcePurification(PgSourcePurificationError),
251 KafkaSourcePurification(KafkaSourcePurificationError),
252 KafkaSinkPurification(KafkaSinkPurificationError),
253 IcebergSinkPurification(IcebergSinkPurificationError),
254 LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
255 CsrPurification(CsrPurificationError),
256 MySqlSourcePurification(MySqlSourcePurificationError),
257 SqlServerSourcePurificationError(SqlServerSourcePurificationError),
258 UseTablesForSources(String),
259 MissingName(CatalogItemType),
260 InvalidRefreshAt,
261 InvalidRefreshEveryAlignedTo,
262 CreateReplicaFailStorageObjects {
263 current_replica_count: usize,
265 internal_replica_count: usize,
267 hypothetical_replica_count: usize,
270 },
271 MismatchedObjectType {
272 name: PartialItemName,
273 is_type: ObjectType,
274 expected_type: ObjectType,
275 },
276 TableContainsUningestableTypes {
278 name: String,
279 type_: String,
280 column: String,
281 },
282 RetainHistoryLow {
283 limit: Duration,
284 },
285 RetainHistoryRequired,
286 UntilReadyTimeoutRequired,
287 SubsourceResolutionError(ExternalReferenceResolutionError),
288 Replan(String),
289 NetworkPolicyLockoutError,
290 NetworkPolicyInUse,
291 ConstantExpressionSimplificationFailed(String),
293 InvalidOffset(String),
294 UnknownCursor(String),
296 CopyFromTargetTableDropped {
297 target_name: String,
298 },
299 InvalidAsOfUpTo,
301 Unstructured(String),
303}
304
305impl PlanError {
306 pub(crate) fn ungrouped_column(item: &ScopeItem) -> PlanError {
307 PlanError::UngroupedColumn {
308 table: item.table_name.clone(),
309 column: item.column_name.clone(),
310 }
311 }
312
313 pub fn detail(&self) -> Option<String> {
314 match self {
315 Self::NeverSupported { details, .. } => details.clone(),
316 Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
317 Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
318 Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
319 Self::InvalidOptionValue { err, .. } => err.detail(),
320 Self::UpsertSinkWithInvalidKey {
321 name,
322 desired_key,
323 valid_keys,
324 } => {
325 let valid_keys = if valid_keys.is_empty() {
326 "There are no known valid unique keys for the underlying relation.".into()
327 } else {
328 format!(
329 "The following keys are known to be unique for the underlying relation:\n{}",
330 valid_keys
331 .iter()
332 .map(|k|
333 format!(" ({})", k.iter().map(|c| c.as_str().quoted()).join(", "))
334 )
335 .join("\n"),
336 )
337 };
338 Some(format!(
339 "Materialize could not prove that the specified upsert envelope key ({}) \
340 was a unique key of the underlying relation {}. {valid_keys}",
341 separated(", ", desired_key.iter().map(|c| c.as_str().quoted())),
342 name.quoted()
343 ))
344 }
345 Self::VarError(e) => e.detail(),
346 Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
347 Self::PgSourcePurification(e) => e.detail(),
348 Self::MySqlSourcePurification(e) => e.detail(),
349 Self::SqlServerSourcePurificationError(e) => e.detail(),
350 Self::KafkaSourcePurification(e) => e.detail(),
351 Self::LoadGeneratorSourcePurification(e) => e.detail(),
352 Self::CsrPurification(e) => e.detail(),
353 Self::KafkaSinkPurification(e) => e.detail(),
354 Self::IcebergSinkPurification(e) => e.detail(),
355 Self::CreateReplicaFailStorageObjects { current_replica_count: current, internal_replica_count: internal, hypothetical_replica_count: target } => {
356 Some(format!(
357 "Currently have {} replica{}{}; command would result in {}",
358 current,
359 if *current != 1 { "s" } else { "" },
360 if *internal > 0 {
361 format!(" ({} internal)", internal)
362 } else {
363 "".to_string()
364 },
365 target
366 ))
367 },
368 Self::SubsourceNameConflict {
369 name: _,
370 upstream_references,
371 } => Some(format!(
372 "referenced tables with duplicate name: {}",
373 itertools::join(upstream_references, ", ")
374 )),
375 Self::SubsourceDuplicateReference {
376 name: _,
377 target_names,
378 } => Some(format!(
379 "subsources referencing table: {}",
380 itertools::join(target_names, ", ")
381 )),
382 Self::InvalidPartitionByEnvelopeDebezium { .. } => Some(
383 "When using ENVELOPE DEBEZIUM, only columns in the key can be referenced in the PARTITION BY expression.".to_string()
384 ),
385 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
386 "missing schemas: {}",
387 separated(", ", schemas.iter().map(|c| c.quoted()))
388 )),
389 _ => None,
390 }
391 }
392
393 pub fn hint(&self) -> Option<String> {
394 match self {
395 Self::DropViewOnMaterializedView(_) => {
396 Some("Use DROP MATERIALIZED VIEW to remove a materialized view.".into())
397 }
398 Self::DependentObjectsStillExist {..} => Some("Use DROP ... CASCADE to drop the dependent objects too.".into()),
399 Self::AlterViewOnMaterializedView(_) => {
400 Some("Use ALTER MATERIALIZED VIEW to rename a materialized view.".into())
401 }
402 Self::ShowCreateViewOnMaterializedView(_) => {
403 Some("Use SHOW CREATE MATERIALIZED VIEW to show a materialized view.".into())
404 }
405 Self::ExplainViewOnMaterializedView(_) => {
406 Some("Use EXPLAIN [...] MATERIALIZED VIEW to explain a materialized view.".into())
407 }
408 Self::UnacceptableTimelineName(_) => {
409 Some("The prefix \"mz_\" is reserved for system timelines.".into())
410 }
411 Self::PostgresConnectionErr { cause } => {
412 if let Some(cause) = cause.source() {
413 if let Some(cause) = cause.downcast_ref::<io::Error>() {
414 if cause.kind() == io::ErrorKind::TimedOut {
415 return Some(
416 "Do you have a firewall or security group that is \
417 preventing Materialize from connecting to your PostgreSQL server?"
418 .into(),
419 );
420 }
421 }
422 }
423 None
424 }
425 Self::InvalidOptionValue { err, .. } => err.hint(),
426 Self::UnknownFunction { ..} => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
427 Self::IndistinctFunction {..} => {
428 Some("Could not choose a best candidate function. You might need to add explicit type casts.".into())
429 }
430 Self::UnknownOperator {..} => {
431 Some("No operator matches the given name and argument types. You might need to add explicit type casts.".into())
432 }
433 Self::IndistinctOperator {..} => {
434 Some("Could not choose a best candidate operator. You might need to add explicit type casts.".into())
435 },
436 Self::InvalidPrivatelinkAvailabilityZone { supported_azs, ..} => {
437 let supported_azs_str = supported_azs.iter().join("\n ");
438 Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
439 }
440 Self::DuplicatePrivatelinkAvailabilityZone { duplicate_azs, ..} => {
441 let duplicate_azs = duplicate_azs.iter().join("\n ");
442 Some(format!("Duplicated availability zones:\n {}", duplicate_azs))
443 }
444 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
445 Some("All keys must be columns on the underlying relation.".into())
446 }
447 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
448 Some("All keys must be columns on the underlying relation.".into())
449 }
450 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
451 Some("All order bys must be output columns.".into())
452 }
453 Self::UpsertSinkWithInvalidKey { .. } | Self::UpsertSinkWithoutKey => {
454 Some("See: https://materialize.com/s/sink-key-selection".into())
455 }
456 Self::Catalog(e) => e.hint(),
457 Self::VarError(e) => e.hint(),
458 Self::PgSourcePurification(e) => e.hint(),
459 Self::MySqlSourcePurification(e) => e.hint(),
460 Self::SqlServerSourcePurificationError(e) => e.hint(),
461 Self::KafkaSourcePurification(e) => e.hint(),
462 Self::LoadGeneratorSourcePurification(e) => e.hint(),
463 Self::CsrPurification(e) => e.hint(),
464 Self::KafkaSinkPurification(e) => e.hint(),
465 Self::UnknownColumn { table, similar, .. } => {
466 let suffix = "Make sure to surround case sensitive names in double quotes.";
467 match &similar[..] {
468 [] => None,
469 [column] => Some(format!("The similarly named column {} does exist. {suffix}", ColumnDisplay { table, column })),
470 names => {
471 let similar = names.into_iter().map(|column| ColumnDisplay { table, column }).join(", ");
472 Some(format!("There are similarly named columns that do exist: {similar}. {suffix}"))
473 }
474 }
475 }
476 Self::RecursiveTypeMismatch(..) => {
477 Some("You will need to rewrite or cast the query's expressions.".into())
478 },
479 Self::InvalidRefreshAt
480 | Self::InvalidRefreshEveryAlignedTo => {
481 Some("Calling `mz_now()` is allowed.".into())
482 },
483 Self::TableContainsUningestableTypes { column,.. } => {
484 Some(format!("Remove the table or use TEXT COLUMNS ({column}, ..) to ingest this column as text"))
485 }
486 Self::RetainHistoryLow { .. } | Self::RetainHistoryRequired => {
487 Some("Use ALTER ... RESET (RETAIN HISTORY) to set the retain history to its default and lowest value.".into())
488 }
489 Self::NetworkPolicyInUse => {
490 Some("Use ALTER SYSTEM SET 'network_policy' to change the default network policy.".into())
491 }
492 Self::WrongParameterType(_, _, _) => {
493 Some("EXECUTE automatically inserts only such casts that are allowed in an assignment cast context. Try adding an explicit cast.".into())
494 }
495 Self::InvalidSchemaName => {
496 Some("Use SET schema = name to select a schema. Use SHOW SCHEMAS to list available schemas. Use SHOW search_path to show the schema names that we looked for, but none of them existed.".into())
497 }
498 _ => None,
499 }
500 }
501}
502
503impl fmt::Display for PlanError {
504 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
505 match self {
506 Self::Unsupported { feature, discussion_no } => {
507 write!(f, "{} not yet supported", feature)?;
508 if let Some(discussion_no) = discussion_no {
509 write!(f, ", see https://github.com/MaterializeInc/materialize/discussions/{} for more details", discussion_no)?;
510 }
511 Ok(())
512 }
513 Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
514 write!(f, "{feature} is not supported",)?;
515 if let Some(documentation_path) = documentation_path {
516 write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
517 }
518 Ok(())
519 }
520 Self::UnknownColumn { table, column, similar: _ } => write!(
521 f,
522 "column {} does not exist",
523 ColumnDisplay { table, column }
524 ),
525 Self::UngroupedColumn { table, column } => write!(
526 f,
527 "column {} must appear in the GROUP BY clause or be used in an aggregate function",
528 ColumnDisplay { table, column },
529 ),
530 Self::WrongJoinTypeForLateralColumn { table, column } => write!(
531 f,
532 "column {} cannot be referenced from this part of the query: \
533 the combining JOIN type must be INNER or LEFT for a LATERAL reference",
534 ColumnDisplay { table, column },
535 ),
536 Self::AmbiguousColumn(column) => write!(
537 f,
538 "column reference {} is ambiguous",
539 column.quoted()
540 ),
541 Self::TooManyColumns { max_num_columns, req_num_columns } => write!(
542 f,
543 "attempt to create relation with too many columns, {} max: {}",
544 req_num_columns, max_num_columns
545 ),
546 Self::ColumnAlreadyExists { column_name, object_name } => write!(
547 f,
548 "column {} of relation {} already exists",
549 column_name.quoted(), object_name.quoted(),
550 ),
551 Self::AmbiguousTable(table) => write!(
552 f,
553 "table reference {} is ambiguous",
554 table.item.as_str().quoted()
555 ),
556 Self::UnknownColumnInUsingClause { column, join_side } => write!(
557 f,
558 "column {} specified in USING clause does not exist in {} table",
559 column.quoted(),
560 join_side,
561 ),
562 Self::AmbiguousColumnInUsingClause { column, join_side } => write!(
563 f,
564 "common column name {} appears more than once in {} table",
565 column.quoted(),
566 join_side,
567 ),
568 Self::MisqualifiedName(name) => write!(
569 f,
570 "qualified name did not have between 1 and 3 components: {}",
571 name
572 ),
573 Self::OverqualifiedDatabaseName(name) => write!(
574 f,
575 "database name '{}' does not have exactly one component",
576 name
577 ),
578 Self::OverqualifiedSchemaName(name) => write!(
579 f,
580 "schema name '{}' cannot have more than two components",
581 name
582 ),
583 Self::UnderqualifiedColumnName(name) => write!(
584 f,
585 "column name '{}' must have at least a table qualification",
586 name
587 ),
588 Self::UnacceptableTimelineName(name) => {
589 write!(f, "unacceptable timeline name {}", name.quoted(),)
590 }
591 Self::SubqueriesDisallowed { context } => {
592 write!(f, "{} does not allow subqueries", context)
593 }
594 Self::UnknownParameter(n) => write!(f, "there is no parameter ${}", n),
595 Self::ParameterNotAllowed(object_type) => write!(f, "{} cannot have parameters", object_type),
596 Self::WrongParameterType(i, expected_ty, actual_ty) => write!(f, "unable to cast given parameter ${}: expected {}, got {}", i, expected_ty, actual_ty),
597 Self::RecursionLimit(e) => write!(f, "{}", e),
598 Self::StrconvParse(e) => write!(f, "{}", e),
599 Self::Catalog(e) => write!(f, "{}", e),
600 Self::UpsertSinkWithoutKey => write!(f, "upsert sinks must specify a key"),
601 Self::UpsertSinkWithInvalidKey { .. } => {
602 write!(f, "upsert key could not be validated as unique")
603 }
604 Self::InvalidWmrRecursionLimit(msg) => write!(f, "Invalid WITH MUTUALLY RECURSIVE recursion limit. {}", msg),
605 Self::InvalidNumericMaxScale(e) => e.fmt(f),
606 Self::InvalidCharLength(e) => e.fmt(f),
607 Self::InvalidVarCharMaxLength(e) => e.fmt(f),
608 Self::InvalidTimestampPrecision(e) => e.fmt(f),
609 Self::Parser(e) => e.fmt(f),
610 Self::ParserStatement(e) => e.fmt(f),
611 Self::Unstructured(e) => write!(f, "{}", e),
612 Self::InvalidId(id) => write!(f, "invalid id {}", id),
613 Self::InvalidIdent(err) => write!(f, "invalid identifier, {err}"),
614 Self::InvalidObject(i) => write!(f, "{} is not a database object", i.full_name_str()),
615 Self::InvalidObjectType{expected_type, actual_type, object_name} => write!(f, "{actual_type} {object_name} is not a {expected_type}"),
616 Self::InvalidPrivilegeTypes{ invalid_privileges, object_description, } => {
617 write!(f, "invalid privilege types {} for {}", invalid_privileges.to_error_string(), object_description)
618 },
619 Self::InvalidSecret(i) => write!(f, "{} is not a secret", i.full_name_str()),
620 Self::InvalidTemporarySchema => {
621 write!(f, "cannot create temporary item in non-temporary schema")
622 }
623 Self::InvalidCast { name, ccx, from, to } =>{
624 write!(
625 f,
626 "{name} does not support {ccx}casting from {from} to {to}",
627 ccx = if matches!(ccx, CastContext::Implicit) {
628 "implicitly "
629 } else {
630 ""
631 },
632 )
633 }
634 Self::InvalidTable { name } => {
635 write!(f, "invalid table definition for {}", name.quoted())
636 },
637 Self::InvalidVersion { name, version } => {
638 write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
639 },
640 Self::DropViewOnMaterializedView(name)
641 | Self::AlterViewOnMaterializedView(name)
642 | Self::ShowCreateViewOnMaterializedView(name)
643 | Self::ExplainViewOnMaterializedView(name) => write!(f, "{name} is not a view"),
644 Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
645 write!(f, "failed to fetch schema {schema_lookup} from schema registry")
646 }
647 Self::PostgresConnectionErr { .. } => {
648 write!(f, "failed to connect to PostgreSQL database")
649 }
650 Self::MySqlConnectionErr { cause } => {
651 write!(f, "failed to connect to MySQL database: {}", cause)
652 }
653 Self::SqlServerConnectionErr { cause } => {
654 write!(f, "failed to connect to SQL Server database: {}", cause)
655 }
656 Self::SubsourceNameConflict {
657 name , upstream_references: _,
658 } => {
659 write!(f, "multiple subsources would be named {}", name)
660 },
661 Self::SubsourceDuplicateReference {
662 name,
663 target_names: _,
664 } => {
665 write!(f, "multiple subsources refer to table {}", name)
666 },
667 Self::NoTablesFoundForSchemas(schemas) => {
668 write!(f, "no tables found in referenced schemas: {}",
669 separated(", ", schemas.iter().map(|c| c.quoted()))
670 )
671 },
672 Self::InvalidProtobufSchema { .. } => {
673 write!(f, "invalid protobuf schema")
674 }
675 Self::DependentObjectsStillExist {object_type, object_name, dependents} => {
676 let reason = match &dependents[..] {
677 [] => " because other objects depend on it".to_string(),
678 dependents => {
679 let dependents = dependents.iter().map(|(dependent_type, dependent_name)| format!("{} {}", dependent_type, dependent_name.quoted())).join(", ");
680 format!(": still depended upon by {dependents}")
681 },
682 };
683 let object_name = object_name.quoted();
684 write!(f, "cannot drop {object_type} {object_name}{reason}")
685 }
686 Self::InvalidOptionValue { option_name, err } => write!(f, "invalid {} option value: {}", option_name, err),
687 Self::UnexpectedDuplicateReference { name } => write!(f, "unexpected multiple references to {}", name.to_ast_string_simple()),
688 Self::RecursiveTypeMismatch(name, declared, inferred) => {
689 let declared = separated(", ", declared);
690 let inferred = separated(", ", inferred);
691 let name = name.quoted();
692 write!(f, "WITH MUTUALLY RECURSIVE query {name} declared types ({declared}), but query returns types ({inferred})")
693 },
694 Self::UnknownFunction {name, arg_types, ..} => {
695 write!(f, "function {}({}) does not exist", name, arg_types.join(", "))
696 },
697 Self::IndistinctFunction {name, arg_types, ..} => {
698 write!(f, "function {}({}) is not unique", name, arg_types.join(", "))
699 },
700 Self::UnknownOperator {name, arg_types, ..} => {
701 write!(f, "operator does not exist: {}", match arg_types.as_slice(){
702 [typ] => format!("{} {}", name, typ),
703 [ltyp, rtyp] => {
704 format!("{} {} {}", ltyp, name, rtyp)
705 }
706 _ => unreachable!("non-unary non-binary operator"),
707 })
708 },
709 Self::IndistinctOperator {name, arg_types, ..} => {
710 write!(f, "operator is not unique: {}", match arg_types.as_slice(){
711 [typ] => format!("{} {}", name, typ),
712 [ltyp, rtyp] => {
713 format!("{} {} {}", ltyp, name, rtyp)
714 }
715 _ => unreachable!("non-unary non-binary operator"),
716 })
717 },
718 Self::InvalidPrivatelinkAvailabilityZone { name, ..} => write!(f, "invalid AWS PrivateLink availability zone {}", name.quoted()),
719 Self::DuplicatePrivatelinkAvailabilityZone {..} => write!(f, "connection cannot contain duplicate availability zones"),
720 Self::InvalidSchemaName => write!(f, "no valid schema selected"),
721 Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
722 Self::ManagedCluster {cluster_name} => write!(f, "cannot modify managed cluster {cluster_name}"),
723 Self::InvalidKeysInSubscribeEnvelopeUpsert => {
724 write!(f, "invalid keys in SUBSCRIBE ENVELOPE UPSERT (KEY (..))")
725 }
726 Self::InvalidKeysInSubscribeEnvelopeDebezium => {
727 write!(f, "invalid keys in SUBSCRIBE ENVELOPE DEBEZIUM (KEY (..))")
728 }
729 Self::InvalidPartitionByEnvelopeDebezium { column_name } => {
730 write!(
731 f,
732 "PARTITION BY expression cannot refer to non-key column {}",
733 column_name.quoted(),
734 )
735 }
736 Self::InvalidOrderByInSubscribeWithinTimestampOrderBy => {
737 write!(f, "invalid ORDER BY in SUBSCRIBE WITHIN TIMESTAMP ORDER BY")
738 }
739 Self::FromValueRequiresParen => f.write_str(
740 "VALUES expression in FROM clause must be surrounded by parentheses"
741 ),
742 Self::VarError(e) => e.fmt(f),
743 Self::UnsolvablePolymorphicFunctionInput => f.write_str(
744 "could not determine polymorphic type because input has type unknown"
745 ),
746 Self::ShowCommandInView => f.write_str("SHOW commands are not allowed in views"),
747 Self::WebhookValidationDoesNotUseColumns => f.write_str(
748 "expression provided in CHECK does not reference any columns"
749 ),
750 Self::WebhookValidationNonDeterministic => f.write_str(
751 "expression provided in CHECK is not deterministic"
752 ),
753 Self::InternalFunctionCall => f.write_str("cannot call function with arguments of type internal"),
754 Self::CommentTooLong { length, max_size } => {
755 write!(f, "provided comment was {length} bytes long, max size is {max_size} bytes")
756 }
757 Self::InvalidTimestampInterval { min, max, requested } => {
758 write!(f, "invalid timestamp interval of {}ms, must be in the range [{}ms, {}ms]", requested.as_millis(), min.as_millis(), max.as_millis())
759 }
760 Self::InvalidGroupSizeHints => f.write_str("EXPECTED GROUP SIZE cannot be provided \
761 simultaneously with any of AGGREGATE INPUT GROUP SIZE, DISTINCT ON INPUT GROUP SIZE, \
762 or LIMIT INPUT GROUP SIZE"),
763 Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
764 Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
765 Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
766 Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
767 Self::IcebergSinkPurification(e) => write!(f, "ICEBERG sink validation: {}", e),
768 Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
769 Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
770 Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
771 Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
772 Self::MangedReplicaName(name) => {
773 write!(f, "{name} is reserved for replicas of managed clusters")
774 }
775 Self::MissingName(item_type) => {
776 write!(f, "unspecified name for {item_type}")
777 }
778 Self::InvalidRefreshAt => {
779 write!(f, "REFRESH AT argument must be an expression that can be simplified \
780 and/or cast to a constant whose type is mz_timestamp")
781 }
782 Self::InvalidRefreshEveryAlignedTo => {
783 write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
784 and/or cast to a constant whose type is mz_timestamp")
785 }
786 Self::CreateReplicaFailStorageObjects {..} => {
787 write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
788 },
789 Self::MismatchedObjectType {
790 name,
791 is_type,
792 expected_type,
793 } => {
794 write!(
795 f,
796 "{name} is {} {} not {} {}",
797 if *is_type == ObjectType::Index {
798 "an"
799 } else {
800 "a"
801 },
802 is_type.to_string().to_lowercase(),
803 if *expected_type == ObjectType::Index {
804 "an"
805 } else {
806 "a"
807 },
808 expected_type.to_string().to_lowercase()
809 )
810 }
811 Self::TableContainsUningestableTypes { name, type_, column } => {
812 write!(f, "table {name} contains column {column} of type {type_} which Materialize cannot currently ingest")
813 },
814 Self::RetainHistoryLow { limit } => {
815 write!(f, "RETAIN HISTORY cannot be set lower than {}ms", limit.as_millis())
816 },
817 Self::RetainHistoryRequired => {
818 write!(f, "RETAIN HISTORY cannot be disabled or set to 0")
819 },
820 Self::SubsourceResolutionError(e) => write!(f, "{}", e),
821 Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
822 Self::NetworkPolicyLockoutError => write!(f, "policy would block current session IP"),
823 Self::NetworkPolicyInUse => write!(f, "network policy is currently in use"),
824 Self::UntilReadyTimeoutRequired => {
825 write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH (WAIT UNTIL READY ( ... ))")
826 },
827 Self::ConstantExpressionSimplificationFailed(e) => write!(f, "{}", e),
828 Self::InvalidOffset(e) => write!(f, "Invalid OFFSET clause: {}", e),
829 Self::UnknownCursor(name) => {
830 write!(f, "cursor {} does not exist", name.quoted())
831 }
832 Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
833 Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value")
834 }
835 }
836}
837
838impl Error for PlanError {}
839
840impl From<CatalogError> for PlanError {
841 fn from(e: CatalogError) -> PlanError {
842 PlanError::Catalog(e)
843 }
844}
845
846impl From<strconv::ParseError> for PlanError {
847 fn from(e: strconv::ParseError) -> PlanError {
848 PlanError::StrconvParse(e)
849 }
850}
851
852impl From<RecursionLimitError> for PlanError {
853 fn from(e: RecursionLimitError) -> PlanError {
854 PlanError::RecursionLimit(e)
855 }
856}
857
858impl From<InvalidNumericMaxScaleError> for PlanError {
859 fn from(e: InvalidNumericMaxScaleError) -> PlanError {
860 PlanError::InvalidNumericMaxScale(e)
861 }
862}
863
864impl From<InvalidCharLengthError> for PlanError {
865 fn from(e: InvalidCharLengthError) -> PlanError {
866 PlanError::InvalidCharLength(e)
867 }
868}
869
870impl From<InvalidVarCharMaxLengthError> for PlanError {
871 fn from(e: InvalidVarCharMaxLengthError) -> PlanError {
872 PlanError::InvalidVarCharMaxLength(e)
873 }
874}
875
876impl From<InvalidTimestampPrecisionError> for PlanError {
877 fn from(e: InvalidTimestampPrecisionError) -> PlanError {
878 PlanError::InvalidTimestampPrecision(e)
879 }
880}
881
882impl From<anyhow::Error> for PlanError {
883 fn from(e: anyhow::Error) -> PlanError {
884 sql_err!("{}", e.display_with_causes())
886 }
887}
888
889impl From<TryFromIntError> for PlanError {
890 fn from(e: TryFromIntError) -> PlanError {
891 sql_err!("{}", e.display_with_causes())
892 }
893}
894
895impl From<ParseIntError> for PlanError {
896 fn from(e: ParseIntError) -> PlanError {
897 sql_err!("{}", e.display_with_causes())
898 }
899}
900
901impl From<EvalError> for PlanError {
902 fn from(e: EvalError) -> PlanError {
903 sql_err!("{}", e.display_with_causes())
904 }
905}
906
907impl From<ParserError> for PlanError {
908 fn from(e: ParserError) -> PlanError {
909 PlanError::Parser(e)
910 }
911}
912
913impl From<ParserStatementError> for PlanError {
914 fn from(e: ParserStatementError) -> PlanError {
915 PlanError::ParserStatement(e)
916 }
917}
918
919impl From<PostgresError> for PlanError {
920 fn from(e: PostgresError) -> PlanError {
921 PlanError::PostgresConnectionErr { cause: Arc::new(e) }
922 }
923}
924
925impl From<MySqlError> for PlanError {
926 fn from(e: MySqlError) -> PlanError {
927 PlanError::MySqlConnectionErr { cause: Arc::new(e) }
928 }
929}
930
931impl From<SqlServerError> for PlanError {
932 fn from(e: SqlServerError) -> PlanError {
933 PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
934 }
935}
936
937impl From<VarError> for PlanError {
938 fn from(e: VarError) -> Self {
939 PlanError::VarError(e)
940 }
941}
942
943impl From<PgSourcePurificationError> for PlanError {
944 fn from(e: PgSourcePurificationError) -> Self {
945 PlanError::PgSourcePurification(e)
946 }
947}
948
949impl From<KafkaSourcePurificationError> for PlanError {
950 fn from(e: KafkaSourcePurificationError) -> Self {
951 PlanError::KafkaSourcePurification(e)
952 }
953}
954
955impl From<KafkaSinkPurificationError> for PlanError {
956 fn from(e: KafkaSinkPurificationError) -> Self {
957 PlanError::KafkaSinkPurification(e)
958 }
959}
960
961impl From<IcebergSinkPurificationError> for PlanError {
962 fn from(e: IcebergSinkPurificationError) -> Self {
963 PlanError::IcebergSinkPurification(e)
964 }
965}
966
967impl From<CsrPurificationError> for PlanError {
968 fn from(e: CsrPurificationError) -> Self {
969 PlanError::CsrPurification(e)
970 }
971}
972
973impl From<LoadGeneratorSourcePurificationError> for PlanError {
974 fn from(e: LoadGeneratorSourcePurificationError) -> Self {
975 PlanError::LoadGeneratorSourcePurification(e)
976 }
977}
978
979impl From<MySqlSourcePurificationError> for PlanError {
980 fn from(e: MySqlSourcePurificationError) -> Self {
981 PlanError::MySqlSourcePurification(e)
982 }
983}
984
985impl From<SqlServerSourcePurificationError> for PlanError {
986 fn from(e: SqlServerSourcePurificationError) -> Self {
987 PlanError::SqlServerSourcePurificationError(e)
988 }
989}
990
991impl From<IdentError> for PlanError {
992 fn from(e: IdentError) -> Self {
993 PlanError::InvalidIdent(e)
994 }
995}
996
997impl From<ExternalReferenceResolutionError> for PlanError {
998 fn from(e: ExternalReferenceResolutionError) -> Self {
999 PlanError::SubsourceResolutionError(e)
1000 }
1001}
1002
1003struct ColumnDisplay<'a> {
1004 table: &'a Option<PartialItemName>,
1005 column: &'a ColumnName,
1006}
1007
1008impl<'a> fmt::Display for ColumnDisplay<'a> {
1009 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1010 if let Some(table) = &self.table {
1011 format!("{}.{}", table.item, self.column).quoted().fmt(f)
1012 } else {
1013 self.column.quoted().fmt(f)
1014 }
1015 }
1016}