1use std::collections::{BTreeMap, BTreeSet};
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
20use mz_compute_types::ComputeInstanceId;
21use mz_expr::EvalError;
22use mz_ore::error::ErrorExt;
23use mz_ore::stack::RecursionLimitError;
24use mz_ore::str::StrExt;
25use mz_pgwire_common::{ErrorResponse, Severity};
26use mz_repr::adt::timestamp::TimestampError;
27use mz_repr::explain::ExplainError;
28use mz_repr::{ColumnDiff, ColumnName, KeyDiff, NotNullViolation, RelationDescDiff, Timestamp};
29use mz_sql::plan::PlanError;
30use mz_sql::rbac;
31use mz_sql::session::vars::VarError;
32use mz_storage_types::connections::ConnectionValidationError;
33use mz_storage_types::controller::StorageError;
34use mz_storage_types::errors::CollectionMissing;
35use smallvec::SmallVec;
36use timely::progress::Antichain;
37use tokio::sync::oneshot;
38use tokio_postgres::error::SqlState;
39
40use crate::coord::NetworkPolicyError;
41use crate::optimize::OptimizerError;
42
43#[derive(Debug)]
45pub enum AdapterError {
46 AbsurdSubscribeBounds {
48 as_of: mz_repr::Timestamp,
49 up_to: mz_repr::Timestamp,
50 },
51 AmbiguousSystemColumnReference,
55 Catalog(mz_catalog::memory::error::Error),
57 ChangedPlan(String),
62 DuplicateCursor(String),
64 Eval(EvalError),
66 Explain(ExplainError),
68 IdExhaustionError,
70 Internal(String),
72 IntrospectionDisabled {
74 log_names: Vec<String>,
75 },
76 InvalidLogDependency {
79 object_type: String,
80 log_names: Vec<String>,
81 },
82 InvalidClusterReplicaAz {
84 az: String,
85 expected: Vec<String>,
86 },
87 InvalidSetIsolationLevel,
89 InvalidSetCluster,
91 InvalidStorageClusterSize {
93 size: String,
94 expected: Vec<String>,
95 },
96 SourceOrSinkSizeRequired {
98 expected: Vec<String>,
99 },
100 InvalidTableMutationSelection {
102 object_name: String,
104 object_type: String,
106 },
107 ConstraintViolation(NotNullViolation),
109 ConcurrentClusterDrop,
111 ConcurrentDependencyDrop {
113 dependency_kind: &'static str,
114 dependency_id: String,
115 },
116 CollectionUnreadable {
117 id: String,
118 },
119 NoClusterReplicasAvailable {
121 name: String,
122 is_managed: bool,
123 },
124 OperationProhibitsTransaction(String),
126 OperationRequiresTransaction(String),
128 PlanError(PlanError),
130 PreparedStatementExists(String),
132 ParseError(mz_sql_parser::parser::ParserStatementError),
134 ReadOnlyTransaction,
136 ReadWriteUnavailable,
138 RecursionLimit(RecursionLimitError),
140 RelationOutsideTimeDomain {
143 relations: Vec<String>,
144 names: Vec<String>,
145 },
146 ResourceExhaustion {
148 resource_type: String,
149 limit_name: String,
150 desired: String,
151 limit: String,
152 current: String,
153 },
154 ResultSize(String),
156 SafeModeViolation(String),
158 WrongSetOfLocks,
160 StatementTimeout,
164 Canceled,
166 IdleInTransactionSessionTimeout,
168 SubscribeOnlyTransaction,
170 Optimizer(OptimizerError),
172 UnallowedOnCluster {
174 depends_on: SmallVec<[String; 2]>,
175 cluster: String,
176 },
177 Unauthorized(rbac::UnauthorizedError),
179 UnknownCursor(String),
181 UnknownLoginRole(String),
183 UnknownPreparedStatement(String),
184 UnknownClusterReplica {
186 cluster_name: String,
187 replica_name: String,
188 },
189 UnrecognizedConfigurationParam(String),
191 Unstructured(anyhow::Error),
195 Unsupported(&'static str),
197 UnavailableFeature {
201 feature: String,
202 docs: Option<String>,
203 },
204 UntargetedLogRead {
206 log_names: Vec<String>,
207 },
208 WriteOnlyTransaction,
210 SingleStatementTransaction,
212 DDLOnlyTransaction,
214 DDLTransactionRace,
216 TransactionDryRun {
219 new_ops: Vec<crate::catalog::Op>,
221 new_state: crate::catalog::CatalogState,
223 },
224 Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
226 Compute(anyhow::Error),
228 Orchestrator(anyhow::Error),
230 DependentObject(BTreeMap<String, Vec<String>>),
234 InvalidAlter(&'static str, PlanError),
237 ConnectionValidation(ConnectionValidationError),
239 MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
243 InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
246 RtrTimeout(String),
248 RtrDropFailure(String),
250 UnreadableSinkCollection,
252 UserSessionsDisallowed,
254 NetworkPolicyDenied(NetworkPolicyError),
256 ReadOnly,
259 AlterClusterTimeout,
260 AlterClusterWhilePendingReplicas,
261 AuthenticationError(AuthenticationError),
262 ReplacementSchemaMismatch(RelationDescDiff),
264 ReplaceMaterializedViewSealed {
266 name: String,
267 },
268}
269
270#[derive(Debug, thiserror::Error)]
271pub enum AuthenticationError {
272 #[error("invalid credentials")]
273 InvalidCredentials,
274 #[error("role is not allowed to login")]
275 NonLogin,
276 #[error("role does not exist")]
277 RoleNotFound,
278 #[error("password is required")]
279 PasswordRequired,
280}
281
282impl AdapterError {
283 pub fn into_response(self, severity: Severity) -> ErrorResponse {
284 ErrorResponse {
285 severity,
286 code: self.code(),
287 message: self.to_string(),
288 detail: self.detail(),
289 hint: self.hint(),
290 position: self.position(),
291 }
292 }
293
294 pub fn position(&self) -> Option<usize> {
295 match self {
296 AdapterError::ParseError(err) => Some(err.error.pos),
297 _ => None,
298 }
299 }
300
301 pub fn detail(&self) -> Option<String> {
303 match self {
304 AdapterError::AmbiguousSystemColumnReference => {
305 Some("This is a current limitation in Materialize".into())
306 },
307 AdapterError::Catalog(c) => c.detail(),
308 AdapterError::Eval(e) => e.detail(),
309 AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
310 "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
311 relations
312 .iter()
313 .map(|r| r.quoted().to_string())
314 .collect::<Vec<_>>()
315 .join("\n"),
316 match names.is_empty() {
317 true => "No relations are available.".to_string(),
318 false => format!(
319 "Only the following relations are available:\n{}",
320 names
321 .iter()
322 .map(|name| name.quoted().to_string())
323 .collect::<Vec<_>>()
324 .join("\n")
325 ),
326 }
327 )),
328 AdapterError::SourceOrSinkSizeRequired { .. } => Some(
329 "Either specify the cluster that will maintain this object via IN CLUSTER or \
330 specify size via SIZE option."
331 .into(),
332 ),
333 AdapterError::InvalidTableMutationSelection {
334 object_name,
335 object_type,
336 } => Some(
337 format!(
338 "{object_type} '{}' may not be used in this operation; \
339 the selection may refer to views and materialized views, but transitive \
340 dependencies must not include sources or source-export tables",
341 object_name.quoted()
342 )
343 ),
344 AdapterError::SafeModeViolation(_) => Some(
345 "The Materialize server you are connected to is running in \
346 safe mode, which limits the features that are available."
347 .into(),
348 ),
349 AdapterError::IntrospectionDisabled { log_names }
350 | AdapterError::UntargetedLogRead { log_names } => Some(format!(
351 "The query references the following log sources:\n {}",
352 log_names.join("\n "),
353 )),
354 AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
355 "The object depends on the following log sources:\n {}",
356 log_names.join("\n "),
357 )),
358 AdapterError::PlanError(e) => e.detail(),
359 AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
360 AdapterError::DependentObject(dependent_objects) => {
361 Some(dependent_objects
362 .iter()
363 .map(|(role_name, err_msgs)| err_msgs
364 .iter()
365 .map(|err_msg| format!("{role_name}: {err_msg}"))
366 .join("\n"))
367 .join("\n"))
368 },
369 AdapterError::Storage(storage_error) => {
370 storage_error.source().map(|source_error| source_error.to_string_with_causes())
371 }
372 AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
373 AdapterError::InvalidAlter(_, e) => e.detail(),
374 AdapterError::Optimizer(e) => e.detail(),
375 AdapterError::ConnectionValidation(e) => e.detail(),
376 AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
377 Some(format!(
378 "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
379 view is {}.",
380 last_refresh,
381 earliest_possible,
382 ))
383 }
384 AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
385 format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
386 ),
387 AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
388 Some(format!(
389 "The requested REFRESH AT time is {}, \
390 but not all input collections are readable earlier than [{}].",
391 oracle_read_ts,
392 if least_valid_read.len() == 1 {
393 format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
394 } else {
395 format!("{:?}", least_valid_read)
397 }
398 ))
399 }
400 AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
401 AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
402 AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
403 AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
404 AdapterError::ReplacementSchemaMismatch(diff) => {
405 let mut lines: Vec<_> = diff.column_diffs.iter().map(|(idx, diff)| {
406 let pos = idx + 1;
407 match diff {
408 ColumnDiff::Missing { name } => {
409 let name = name.as_str().quoted();
410 format!("missing column {name} at position {pos}")
411 }
412 ColumnDiff::Extra { name } => {
413 let name = name.as_str().quoted();
414 format!("extra column {name} at position {pos}")
415 }
416 ColumnDiff::TypeMismatch { name, left, right } => {
417 let name = name.as_str().quoted();
418 format!("column {name} at position {pos}: type mismatch (target: {left:?}, replacement: {right:?})")
419 }
420 ColumnDiff::NullabilityMismatch { name, left, right } => {
421 let name = name.as_str().quoted();
422 let left = if *left { "NULL" } else { "NOT NULL" };
423 let right = if *right { "NULL" } else { "NOT NULL" };
424 format!("column {name} at position {pos}: nullability mismatch (target: {left}, replacement: {right})")
425 }
426 ColumnDiff::NameMismatch { left, right } => {
427 let left = left.as_str().quoted();
428 let right = right.as_str().quoted();
429 format!("column at position {pos}: name mismatch (target: {left}, replacement: {right})")
430 }
431 }
432 }).collect();
433
434 if let Some(KeyDiff { left, right }) = &diff.key_diff {
435 let format_keys = |keys: &BTreeSet<Vec<ColumnName>>| {
436 if keys.is_empty() {
437 "(none)".to_string()
438 } else {
439 keys.iter()
440 .map(|key| {
441 let cols = key.iter().map(|c| c.as_str()).join(", ");
442 format!("{{{cols}}}")
443 })
444 .join(", ")
445 }
446 };
447 lines.push(format!(
448 "keys differ (target: {}, replacement: {})",
449 format_keys(left),
450 format_keys(right)
451 ));
452 }
453 Some(lines.join("\n"))
454 }
455 AdapterError::ReplaceMaterializedViewSealed { .. } => Some(
456 "The materialized view has already computed its output until the end of time, \
457 so replacing its definition would have no effect."
458 .into(),
459 ),
460 _ => None,
461 }
462 }
463
464 pub fn hint(&self) -> Option<String> {
466 match self {
467 AdapterError::AmbiguousSystemColumnReference => Some(
468 "Rewrite the view to refer to all columns by name. Expand all wildcards and \
469 convert all NATURAL JOINs to USING joins."
470 .to_string(),
471 ),
472 AdapterError::Catalog(c) => c.hint(),
473 AdapterError::Eval(e) => e.hint(),
474 AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
475 Some(if expected.is_empty() {
476 "No availability zones configured; do not specify AVAILABILITY ZONE".into()
477 } else {
478 format!("Valid availability zones are: {}", expected.join(", "))
479 })
480 }
481 AdapterError::InvalidStorageClusterSize { expected, .. } => {
482 Some(format!("Valid sizes are: {}", expected.join(", ")))
483 }
484 AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
485 "Try choosing one of the smaller sizes to start. Available sizes: {}",
486 expected.join(", ")
487 )),
488 AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
489 Some(if *is_managed {
490 "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
491 Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
492 } else {
493 "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
494 })
495 }
496 AdapterError::UntargetedLogRead { .. } => Some(
497 "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
498 active cluster. Note that subsequent queries will only be answered by \
499 the selected replica, which might reduce availability. To undo the replica \
500 selection, use `RESET cluster_replica`."
501 .into(),
502 ),
503 AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
504 "Drop an existing {resource_type} or contact support to request a limit increase."
505 )),
506 AdapterError::StatementTimeout => Some(
507 "Consider increasing the maximum allowed statement duration for this session by \
508 setting the statement_timeout session variable. For example, `SET \
509 statement_timeout = '120s'`."
510 .into(),
511 ),
512 AdapterError::PlanError(e) => e.hint(),
513 AdapterError::UnallowedOnCluster { cluster, .. } => {
514 (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
515 "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
516 .to_string()
517 )
518 }
519 AdapterError::InvalidAlter(_, e) => e.hint(),
520 AdapterError::Optimizer(e) => e.hint(),
521 AdapterError::ConnectionValidation(e) => e.hint(),
522 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
523 "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
524 either at the explicitly specified timestamp, or now if the given timestamp would \
525 be in the past.".to_string()
526 ),
527 AdapterError::AlterClusterTimeout => Some(
528 "Consider increasing the timeout duration in the alter cluster statement.".into(),
529 ),
530 AdapterError::DDLTransactionRace => Some(
531 "Currently, DDL transactions fail when any other DDL happens concurrently, \
532 even on unrelated schemas/clusters.".into()
533 ),
534 AdapterError::CollectionUnreadable { .. } => Some(
535 "This could be because the collection has recently been dropped.".into()
536 ),
537 _ => None,
538 }
539 }
540
541 pub fn code(&self) -> SqlState {
542 match self {
547 AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
550 AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
551 AdapterError::Catalog(e) => match &e.kind {
552 mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
553 VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
554 VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
555 VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
556 VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
557 VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
558 VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
559 VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
560 VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
561 },
562 _ => SqlState::INTERNAL_ERROR,
563 },
564 AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
565 AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
566 AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
567 SqlState::PROGRAM_LIMIT_EXCEEDED
568 }
569 AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
570 SqlState::PROGRAM_LIMIT_EXCEEDED
571 }
572 AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
573 AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
574 SqlState::PROGRAM_LIMIT_EXCEEDED
575 }
576 AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
577 AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
578 AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
579 AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
580 AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
581 AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
582 AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
583 AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
584 AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
585 AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
586 AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
587 AdapterError::InvalidTableMutationSelection { .. } => {
588 SqlState::INVALID_TRANSACTION_STATE
589 }
590 AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
591 AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
592 AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
593 AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
594 AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
595 AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
596 AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
597 AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
598 AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
599 AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
600 SqlState::DUPLICATE_COLUMN
601 }
602 AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
603 SqlState::UNDEFINED_PARAMETER
604 }
605 AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
606 SqlState::UNDEFINED_PARAMETER
607 }
608 AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
609 AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
610 AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
611 AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
612 AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
613 AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
614 AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
615 AdapterError::Canceled => SqlState::QUERY_CANCELED,
616 AdapterError::IdleInTransactionSessionTimeout => {
617 SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
618 }
619 AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
620 AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
621 AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
622 AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
623 AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
624 AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
625 AdapterError::Optimizer(e) => match e {
626 OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
627 SqlState::INVALID_SCHEMA_NAME
628 }
629 OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
630 SqlState::DUPLICATE_COLUMN
631 }
632 OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
633 SqlState::UNDEFINED_PARAMETER
634 }
635 OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
636 SqlState::UNDEFINED_PARAMETER
637 }
638 OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
639 OptimizerError::RecursionLimitError(e) => {
640 AdapterError::RecursionLimit(e.clone()).code() }
642 OptimizerError::Internal(s) => {
643 AdapterError::Internal(s.clone()).code() }
645 OptimizerError::EvalError(e) => {
646 AdapterError::Eval(e.clone()).code() }
648 OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
649 OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
650 OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
651 OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
652 OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
655 },
656 AdapterError::UnallowedOnCluster { .. } => {
657 SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
658 }
659 AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
660 AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
661 AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
662 AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
663 AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
664 AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
665 AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
666 AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
667 AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
668 AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
669 AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
670 AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
671 AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
676 AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
677 AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
678 SqlState::INTERNAL_ERROR
679 }
680 AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
681 AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
682 AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
683 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
685 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
686 AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
687 AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
688 AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
689 AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
690 AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
691 AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
694 AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
695 AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
696 AdapterError::ReplacementSchemaMismatch(_) => SqlState::FEATURE_NOT_SUPPORTED,
697 AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
698 SqlState::INVALID_PASSWORD
699 }
700 AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
701 AdapterError::ReplaceMaterializedViewSealed { .. } => {
702 SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE
703 }
704 }
705 }
706
707 pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
708 AdapterError::Internal(format!("{context}: {e}"))
709 }
710
711 pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
717 AdapterError::ConcurrentDependencyDrop {
718 dependency_kind: "cluster",
719 dependency_id: e.0.to_string(),
720 }
721 }
722 pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
723 AdapterError::ConcurrentDependencyDrop {
724 dependency_kind: "collection",
725 dependency_id: e.0.to_string(),
726 }
727 }
728
729 pub fn concurrent_dependency_drop_from_collection_lookup_error(
730 e: CollectionLookupError,
731 compute_instance: ComputeInstanceId,
732 ) -> Self {
733 match e {
734 CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
735 dependency_kind: "cluster",
736 dependency_id: id.to_string(),
737 },
738 CollectionLookupError::CollectionMissing(id) => {
739 AdapterError::ConcurrentDependencyDrop {
740 dependency_kind: "collection",
741 dependency_id: id.to_string(),
742 }
743 }
744 CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
745 dependency_kind: "cluster",
746 dependency_id: compute_instance.to_string(),
747 },
748 }
749 }
750
751 pub fn concurrent_dependency_drop_from_instance_peek_error(
752 e: mz_compute_client::controller::instance::PeekError,
753 compute_instance: ComputeInstanceId,
754 ) -> AdapterError {
755 use mz_compute_client::controller::instance::PeekError::*;
756 match e {
757 ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
758 dependency_kind: "replica",
759 dependency_id: id.to_string(),
760 },
761 InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
762 dependency_kind: "cluster",
763 dependency_id: compute_instance.to_string(),
764 },
765 e @ ReadHoldIdMismatch(_) => AdapterError::internal("instance peek error", e),
766 e @ ReadHoldInsufficient(_) => AdapterError::internal("instance peek error", e),
767 }
768 }
769
770 pub fn concurrent_dependency_drop_from_peek_error(
771 e: mz_compute_client::controller::error::PeekError,
772 ) -> AdapterError {
773 use mz_compute_client::controller::error::PeekError::*;
774 match e {
775 InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
776 dependency_kind: "cluster",
777 dependency_id: id.to_string(),
778 },
779 CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
780 dependency_kind: "collection",
781 dependency_id: id.to_string(),
782 },
783 ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
784 dependency_kind: "replica",
785 dependency_id: id.to_string(),
786 },
787 e @ SinceViolation(_) => AdapterError::internal("peek error", e),
788 }
789 }
790
791 pub fn concurrent_dependency_drop_from_dataflow_creation_error(
792 e: compute_error::DataflowCreationError,
793 ) -> Self {
794 use compute_error::DataflowCreationError::*;
795 match e {
796 InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
797 dependency_kind: "cluster",
798 dependency_id: id.to_string(),
799 },
800 CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
801 dependency_kind: "collection",
802 dependency_id: id.to_string(),
803 },
804 ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
805 dependency_kind: "replica",
806 dependency_id: id.to_string(),
807 },
808 MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
809 AdapterError::internal("dataflow creation error", e)
810 }
811 }
812 }
813}
814
815impl fmt::Display for AdapterError {
816 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
817 match self {
818 AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
819 write!(
820 f,
821 "subscription lower bound (`AS OF`) is greater than its upper bound (`UP TO`): \
822 {as_of} > {up_to}",
823 )
824 }
825 AdapterError::AmbiguousSystemColumnReference => {
826 write!(
827 f,
828 "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
829 system objects"
830 )
831 }
832 AdapterError::ChangedPlan(e) => write!(f, "{}", e),
833 AdapterError::Catalog(e) => e.fmt(f),
834 AdapterError::DuplicateCursor(name) => {
835 write!(f, "cursor {} already exists", name.quoted())
836 }
837 AdapterError::Eval(e) => e.fmt(f),
838 AdapterError::Explain(e) => e.fmt(f),
839 AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
840 AdapterError::Internal(e) => write!(f, "internal error: {}", e),
841 AdapterError::IntrospectionDisabled { .. } => write!(
842 f,
843 "cannot read log sources of replica with disabled introspection"
844 ),
845 AdapterError::InvalidLogDependency { object_type, .. } => {
846 write!(f, "{object_type} objects cannot depend on log sources")
847 }
848 AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
849 write!(f, "unknown cluster replica availability zone {az}",)
850 }
851 AdapterError::InvalidSetIsolationLevel => write!(
852 f,
853 "SET TRANSACTION ISOLATION LEVEL must be called before any query"
854 ),
855 AdapterError::InvalidSetCluster => {
856 write!(f, "SET cluster cannot be called in an active transaction")
857 }
858 AdapterError::InvalidStorageClusterSize { size, .. } => {
859 write!(f, "unknown source size {size}")
860 }
861 AdapterError::SourceOrSinkSizeRequired { .. } => {
862 write!(f, "must specify either cluster or size option")
863 }
864 AdapterError::InvalidTableMutationSelection { .. } => {
865 write!(
866 f,
867 "invalid selection: operation may only (transitively) refer to non-source, non-system tables"
868 )
869 }
870 AdapterError::ReplaceMaterializedViewSealed { name } => {
871 write!(
872 f,
873 "materialized view {name} is sealed and thus cannot be replaced"
874 )
875 }
876 AdapterError::ConstraintViolation(not_null_violation) => {
877 write!(f, "{}", not_null_violation)
878 }
879 AdapterError::ConcurrentClusterDrop => {
880 write!(f, "the transaction's active cluster has been dropped")
881 }
882 AdapterError::ConcurrentDependencyDrop {
883 dependency_kind,
884 dependency_id,
885 } => {
886 write!(f, "{dependency_kind} '{dependency_id}' was dropped")
887 }
888 AdapterError::CollectionUnreadable { id } => {
889 write!(f, "collection '{id}' is not readable at any timestamp")
890 }
891 AdapterError::NoClusterReplicasAvailable { name, .. } => {
892 write!(
893 f,
894 "CLUSTER {} has no replicas available to service request",
895 name.quoted()
896 )
897 }
898 AdapterError::OperationProhibitsTransaction(op) => {
899 write!(f, "{} cannot be run inside a transaction block", op)
900 }
901 AdapterError::OperationRequiresTransaction(op) => {
902 write!(f, "{} can only be used in transaction blocks", op)
903 }
904 AdapterError::ParseError(e) => e.fmt(f),
905 AdapterError::PlanError(e) => e.fmt(f),
906 AdapterError::PreparedStatementExists(name) => {
907 write!(f, "prepared statement {} already exists", name.quoted())
908 }
909 AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
910 AdapterError::SingleStatementTransaction => {
911 f.write_str("this transaction can only execute a single statement")
912 }
913 AdapterError::ReadWriteUnavailable => {
914 f.write_str("transaction read-write mode must be set before any query")
915 }
916 AdapterError::WrongSetOfLocks => {
917 write!(f, "internal error, wrong set of locks acquired")
918 }
919 AdapterError::StatementTimeout => {
920 write!(f, "canceling statement due to statement timeout")
921 }
922 AdapterError::Canceled => {
923 write!(f, "canceling statement due to user request")
924 }
925 AdapterError::IdleInTransactionSessionTimeout => {
926 write!(
927 f,
928 "terminating connection due to idle-in-transaction timeout"
929 )
930 }
931 AdapterError::RecursionLimit(e) => e.fmt(f),
932 AdapterError::RelationOutsideTimeDomain { .. } => {
933 write!(
934 f,
935 "Transactions can only reference objects in the same timedomain. \
936 See https://materialize.com/docs/sql/begin/#same-timedomain-error",
937 )
938 }
939 AdapterError::ResourceExhaustion {
940 resource_type,
941 limit_name,
942 desired,
943 limit,
944 current,
945 } => {
946 write!(
947 f,
948 "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
949 )
950 }
951 AdapterError::ResultSize(e) => write!(f, "{e}"),
952 AdapterError::SafeModeViolation(feature) => {
953 write!(f, "cannot create {} in safe mode", feature)
954 }
955 AdapterError::SubscribeOnlyTransaction => {
956 f.write_str("SUBSCRIBE in transactions must be the only read statement")
957 }
958 AdapterError::Optimizer(e) => e.fmt(f),
959 AdapterError::UnallowedOnCluster {
960 depends_on,
961 cluster,
962 } => {
963 let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
964 write!(
965 f,
966 "querying the following items {items} is not allowed from the {} cluster",
967 cluster.quoted()
968 )
969 }
970 AdapterError::Unauthorized(unauthorized) => {
971 write!(f, "{unauthorized}")
972 }
973 AdapterError::UnknownCursor(name) => {
974 write!(f, "cursor {} does not exist", name.quoted())
975 }
976 AdapterError::UnknownLoginRole(name) => {
977 write!(f, "role {} does not exist", name.quoted())
978 }
979 AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
980 AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
981 AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
982 AdapterError::UnknownPreparedStatement(name) => {
983 write!(f, "prepared statement {} does not exist", name.quoted())
984 }
985 AdapterError::UnknownClusterReplica {
986 cluster_name,
987 replica_name,
988 } => write!(
989 f,
990 "cluster replica '{cluster_name}.{replica_name}' does not exist"
991 ),
992 AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
993 f,
994 "unrecognized configuration parameter {}",
995 setting_name.quoted()
996 ),
997 AdapterError::UntargetedLogRead { .. } => {
998 f.write_str("log source reads must target a replica")
999 }
1000 AdapterError::DDLOnlyTransaction => f.write_str(
1001 "transactions which modify objects are restricted to just modifying objects",
1002 ),
1003 AdapterError::DDLTransactionRace => f.write_str(
1004 "another session modified the catalog while this DDL transaction was open",
1005 ),
1006 AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
1007 AdapterError::Storage(e) => e.fmt(f),
1008 AdapterError::Compute(e) => e.fmt(f),
1009 AdapterError::Orchestrator(e) => e.fmt(f),
1010 AdapterError::DependentObject(dependent_objects) => {
1011 let role_str = if dependent_objects.keys().count() == 1 {
1012 "role"
1013 } else {
1014 "roles"
1015 };
1016 write!(
1017 f,
1018 "{role_str} \"{}\" cannot be dropped because some objects depend on it",
1019 dependent_objects.keys().join(", ")
1020 )
1021 }
1022 AdapterError::InvalidAlter(t, e) => {
1023 write!(f, "invalid ALTER {t}: {e}")
1024 }
1025 AdapterError::ConnectionValidation(e) => e.fmt(f),
1026 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
1027 write!(
1028 f,
1029 "all the specified refreshes of the materialized view would be too far in the past, and thus they \
1030 would never happen"
1031 )
1032 }
1033 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
1034 write!(
1035 f,
1036 "REFRESH AT requested for a time where not all the inputs are readable"
1037 )
1038 }
1039 AdapterError::RtrTimeout(_) => {
1040 write!(
1041 f,
1042 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
1043 )
1044 }
1045 AdapterError::RtrDropFailure(_) => write!(
1046 f,
1047 "real-time source dropped before ingesting the upstream system's visible frontier"
1048 ),
1049 AdapterError::UnreadableSinkCollection => {
1050 write!(f, "collection is not readable at any time")
1051 }
1052 AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
1053 AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
1054 AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
1055 AdapterError::AlterClusterTimeout => {
1056 write!(f, "canceling statement, provided timeout lapsed")
1057 }
1058 AdapterError::AuthenticationError(e) => {
1059 write!(f, "authentication error {e}")
1060 }
1061 AdapterError::UnavailableFeature { feature, docs } => {
1062 write!(f, "{} is not supported in this environment.", feature)?;
1063 if let Some(docs) = docs {
1064 write!(
1065 f,
1066 " For more information consult the documentation at {docs}"
1067 )?;
1068 }
1069 Ok(())
1070 }
1071 AdapterError::AlterClusterWhilePendingReplicas => {
1072 write!(f, "cannot alter clusters with pending updates")
1073 }
1074 AdapterError::ReplacementSchemaMismatch(_) => {
1075 write!(f, "replacement schema differs from target schema")
1076 }
1077 }
1078 }
1079}
1080
1081impl From<anyhow::Error> for AdapterError {
1082 fn from(e: anyhow::Error) -> AdapterError {
1083 match e.downcast::<PlanError>() {
1084 Ok(plan_error) => AdapterError::PlanError(plan_error),
1085 Err(e) => AdapterError::Unstructured(e),
1086 }
1087 }
1088}
1089
1090impl From<TryFromIntError> for AdapterError {
1091 fn from(e: TryFromIntError) -> AdapterError {
1092 AdapterError::Unstructured(e.into())
1093 }
1094}
1095
1096impl From<TryFromDecimalError> for AdapterError {
1097 fn from(e: TryFromDecimalError) -> AdapterError {
1098 AdapterError::Unstructured(e.into())
1099 }
1100}
1101
1102impl From<mz_catalog::memory::error::Error> for AdapterError {
1103 fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
1104 AdapterError::Catalog(e)
1105 }
1106}
1107
1108impl From<mz_catalog::durable::CatalogError> for AdapterError {
1109 fn from(e: mz_catalog::durable::CatalogError) -> Self {
1110 mz_catalog::memory::error::Error::from(e).into()
1111 }
1112}
1113
1114impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
1115 fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1116 mz_catalog::durable::CatalogError::from(e).into()
1117 }
1118}
1119
1120impl From<EvalError> for AdapterError {
1121 fn from(e: EvalError) -> AdapterError {
1122 AdapterError::Eval(e)
1123 }
1124}
1125
1126impl From<ExplainError> for AdapterError {
1127 fn from(e: ExplainError) -> AdapterError {
1128 match e {
1129 ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1130 e => AdapterError::Explain(e),
1131 }
1132 }
1133}
1134
1135impl From<mz_sql::catalog::CatalogError> for AdapterError {
1136 fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1137 AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1138 }
1139}
1140
1141impl From<PlanError> for AdapterError {
1142 fn from(e: PlanError) -> AdapterError {
1143 match e {
1144 PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1145 _ => AdapterError::PlanError(e),
1146 }
1147 }
1148}
1149
1150impl From<OptimizerError> for AdapterError {
1151 fn from(e: OptimizerError) -> AdapterError {
1152 use OptimizerError::*;
1153 match e {
1154 PlanError(e) => Self::PlanError(e),
1155 RecursionLimitError(e) => Self::RecursionLimit(e),
1156 EvalError(e) => Self::Eval(e),
1157 InternalUnsafeMfpPlan(e) => Self::Internal(e),
1158 Internal(e) => Self::Internal(e),
1159 e => Self::Optimizer(e),
1160 }
1161 }
1162}
1163
1164impl From<NotNullViolation> for AdapterError {
1165 fn from(e: NotNullViolation) -> AdapterError {
1166 AdapterError::ConstraintViolation(e)
1167 }
1168}
1169
1170impl From<RecursionLimitError> for AdapterError {
1171 fn from(e: RecursionLimitError) -> AdapterError {
1172 AdapterError::RecursionLimit(e)
1173 }
1174}
1175
1176impl From<oneshot::error::RecvError> for AdapterError {
1177 fn from(e: oneshot::error::RecvError) -> AdapterError {
1178 AdapterError::Unstructured(e.into())
1179 }
1180}
1181
1182impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1183 fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1184 AdapterError::Storage(e)
1185 }
1186}
1187
1188impl From<compute_error::InstanceExists> for AdapterError {
1189 fn from(e: compute_error::InstanceExists) -> Self {
1190 AdapterError::Compute(e.into())
1191 }
1192}
1193
1194impl From<TimestampError> for AdapterError {
1195 fn from(e: TimestampError) -> Self {
1196 let e: EvalError = e.into();
1197 e.into()
1198 }
1199}
1200
1201impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1202 fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1203 AdapterError::ParseError(e)
1204 }
1205}
1206
1207impl From<VarError> for AdapterError {
1208 fn from(e: VarError) -> Self {
1209 let e: mz_catalog::memory::error::Error = e.into();
1210 e.into()
1211 }
1212}
1213
1214impl From<rbac::UnauthorizedError> for AdapterError {
1215 fn from(e: rbac::UnauthorizedError) -> Self {
1216 AdapterError::Unauthorized(e)
1217 }
1218}
1219
1220impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1221 fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1222 AdapterError::PlanError(PlanError::InvalidIdent(value))
1223 }
1224}
1225
1226impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1227 fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1228 match value {
1229 mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1230 AdapterError::ResourceExhaustion {
1231 resource_type: "connection".into(),
1232 limit_name: "max_connections".into(),
1233 desired: (current + 1).to_string(),
1234 limit: limit.to_string(),
1235 current: current.to_string(),
1236 }
1237 }
1238 }
1239 }
1240}
1241
1242impl From<NetworkPolicyError> for AdapterError {
1243 fn from(value: NetworkPolicyError) -> Self {
1244 AdapterError::NetworkPolicyDenied(value)
1245 }
1246}
1247
1248impl From<ConnectionValidationError> for AdapterError {
1249 fn from(e: ConnectionValidationError) -> AdapterError {
1250 AdapterError::ConnectionValidation(e)
1251 }
1252}
1253
1254impl Error for AdapterError {}