1use std::collections::BTreeMap;
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_expr::EvalError;
20use mz_ore::error::ErrorExt;
21use mz_ore::stack::RecursionLimitError;
22use mz_ore::str::StrExt;
23use mz_pgwire_common::{ErrorResponse, Severity};
24use mz_repr::adt::timestamp::TimestampError;
25use mz_repr::explain::ExplainError;
26use mz_repr::{NotNullViolation, Timestamp};
27use mz_sql::plan::PlanError;
28use mz_sql::rbac;
29use mz_sql::session::vars::VarError;
30use mz_storage_types::connections::ConnectionValidationError;
31use mz_storage_types::controller::StorageError;
32use smallvec::SmallVec;
33use timely::progress::Antichain;
34use tokio::sync::oneshot;
35use tokio_postgres::error::SqlState;
36
37use crate::coord::NetworkPolicyError;
38use crate::optimize::OptimizerError;
39
40#[derive(Debug)]
42pub enum AdapterError {
43 AbsurdSubscribeBounds {
45 as_of: mz_repr::Timestamp,
46 up_to: mz_repr::Timestamp,
47 },
48 AmbiguousSystemColumnReference,
52 Catalog(mz_catalog::memory::error::Error),
54 ChangedPlan(String),
56 DuplicateCursor(String),
58 Eval(EvalError),
60 Explain(ExplainError),
62 IdExhaustionError,
64 Internal(String),
66 IntrospectionDisabled {
68 log_names: Vec<String>,
69 },
70 InvalidLogDependency {
73 object_type: String,
74 log_names: Vec<String>,
75 },
76 InvalidClusterReplicaAz {
78 az: String,
79 expected: Vec<String>,
80 },
81 InvalidSetIsolationLevel,
83 InvalidSetCluster,
85 InvalidStorageClusterSize {
87 size: String,
88 expected: Vec<String>,
89 },
90 SourceOrSinkSizeRequired {
92 expected: Vec<String>,
93 },
94 InvalidTableMutationSelection,
96 ConstraintViolation(NotNullViolation),
98 ConcurrentClusterDrop,
100 NoClusterReplicasAvailable {
102 name: String,
103 is_managed: bool,
104 },
105 OperationProhibitsTransaction(String),
107 OperationRequiresTransaction(String),
109 PlanError(PlanError),
111 PreparedStatementExists(String),
113 ParseError(mz_sql_parser::parser::ParserStatementError),
115 ReadOnlyTransaction,
117 ReadWriteUnavailable,
119 RecursionLimit(RecursionLimitError),
121 RelationOutsideTimeDomain {
124 relations: Vec<String>,
125 names: Vec<String>,
126 },
127 ResourceExhaustion {
129 resource_type: String,
130 limit_name: String,
131 desired: String,
132 limit: String,
133 current: String,
134 },
135 ResultSize(String),
137 SafeModeViolation(String),
139 WrongSetOfLocks,
141 StatementTimeout,
145 Canceled,
147 IdleInTransactionSessionTimeout,
149 SubscribeOnlyTransaction,
151 Optimizer(OptimizerError),
153 UnallowedOnCluster {
155 depends_on: SmallVec<[String; 2]>,
156 cluster: String,
157 },
158 Unauthorized(rbac::UnauthorizedError),
160 UnknownCursor(String),
162 UnknownLoginRole(String),
164 UnknownPreparedStatement(String),
165 UnknownClusterReplica {
167 cluster_name: String,
168 replica_name: String,
169 },
170 UnrecognizedConfigurationParam(String),
172 Unstructured(anyhow::Error),
176 Unsupported(&'static str),
178 UnavailableFeature {
182 feature: String,
183 docs: Option<String>,
184 },
185 UntargetedLogRead {
187 log_names: Vec<String>,
188 },
189 WriteOnlyTransaction,
191 SingleStatementTransaction,
193 DDLOnlyTransaction,
195 DDLTransactionRace,
197 TransactionDryRun {
200 new_ops: Vec<crate::catalog::Op>,
202 new_state: crate::catalog::CatalogState,
204 },
205 Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
207 Compute(anyhow::Error),
209 Orchestrator(anyhow::Error),
211 DependentObject(BTreeMap<String, Vec<String>>),
215 InvalidAlter(&'static str, PlanError),
218 ConnectionValidation(ConnectionValidationError),
220 MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
224 InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
227 RtrTimeout(String),
229 RtrDropFailure(String),
231 UnreadableSinkCollection,
233 UserSessionsDisallowed,
235 NetworkPolicyDenied(NetworkPolicyError),
237 ReadOnly,
240 AlterClusterTimeout,
241 AuthenticationError,
248}
249
250impl AdapterError {
251 pub fn into_response(self, severity: Severity) -> ErrorResponse {
252 ErrorResponse {
253 severity,
254 code: self.code(),
255 message: self.to_string(),
256 detail: self.detail(),
257 hint: self.hint(),
258 position: self.position(),
259 }
260 }
261
262 pub fn position(&self) -> Option<usize> {
263 match self {
264 AdapterError::ParseError(err) => Some(err.error.pos),
265 _ => None,
266 }
267 }
268
269 pub fn detail(&self) -> Option<String> {
271 match self {
272 AdapterError::AmbiguousSystemColumnReference => {
273 Some("This is a current limitation in Materialize".into())
274 },
275 AdapterError::Catalog(c) => c.detail(),
276 AdapterError::Eval(e) => e.detail(),
277 AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
278 "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
279 relations
280 .iter()
281 .map(|r| r.quoted().to_string())
282 .collect::<Vec<_>>()
283 .join("\n"),
284 match names.is_empty() {
285 true => "No relations are available.".to_string(),
286 false => format!(
287 "Only the following relations are available:\n{}",
288 names
289 .iter()
290 .map(|name| name.quoted().to_string())
291 .collect::<Vec<_>>()
292 .join("\n")
293 ),
294 }
295 )),
296 AdapterError::SourceOrSinkSizeRequired { .. } => Some(
297 "Either specify the cluster that will maintain this object via IN CLUSTER or \
298 specify size via SIZE option."
299 .into(),
300 ),
301 AdapterError::SafeModeViolation(_) => Some(
302 "The Materialize server you are connected to is running in \
303 safe mode, which limits the features that are available."
304 .into(),
305 ),
306 AdapterError::IntrospectionDisabled { log_names }
307 | AdapterError::UntargetedLogRead { log_names } => Some(format!(
308 "The query references the following log sources:\n {}",
309 log_names.join("\n "),
310 )),
311 AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
312 "The object depends on the following log sources:\n {}",
313 log_names.join("\n "),
314 )),
315 AdapterError::PlanError(e) => e.detail(),
316 AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
317 AdapterError::DependentObject(dependent_objects) => {
318 Some(dependent_objects
319 .iter()
320 .map(|(role_name, err_msgs)| err_msgs
321 .iter()
322 .map(|err_msg| format!("{role_name}: {err_msg}"))
323 .join("\n"))
324 .join("\n"))
325 },
326 AdapterError::Storage(storage_error) => {
327 storage_error.source().map(|source_error| source_error.to_string_with_causes())
328 }
329 AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
330 AdapterError::InvalidAlter(_, e) => e.detail(),
331 AdapterError::Optimizer(e) => e.detail(),
332 AdapterError::ConnectionValidation(e) => e.detail(),
333 AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
334 Some(format!(
335 "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
336 view is {}.",
337 last_refresh,
338 earliest_possible,
339 ))
340 }
341 AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
342 format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
343 ),
344 AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
345 Some(format!(
346 "The requested REFRESH AT time is {}, \
347 but not all input collections are readable earlier than [{}].",
348 oracle_read_ts,
349 if least_valid_read.len() == 1 {
350 format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
351 } else {
352 format!("{:?}", least_valid_read)
354 }
355 ))
356 }
357 AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
358 AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
359 AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
360 AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
361 _ => None,
362 }
363 }
364
365 pub fn hint(&self) -> Option<String> {
367 match self {
368 AdapterError::AmbiguousSystemColumnReference => Some(
369 "Rewrite the view to refer to all columns by name. Expand all wildcards and \
370 convert all NATURAL JOINs to USING joins."
371 .to_string(),
372 ),
373 AdapterError::Catalog(c) => c.hint(),
374 AdapterError::Eval(e) => e.hint(),
375 AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
376 Some(if expected.is_empty() {
377 "No availability zones configured; do not specify AVAILABILITY ZONE".into()
378 } else {
379 format!("Valid availability zones are: {}", expected.join(", "))
380 })
381 }
382 AdapterError::InvalidStorageClusterSize { expected, .. } => {
383 Some(format!("Valid sizes are: {}", expected.join(", ")))
384 }
385 AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
386 "Try choosing one of the smaller sizes to start. Available sizes: {}",
387 expected.join(", ")
388 )),
389 AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
390 Some(if *is_managed {
391 "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
392 Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
393 } else {
394 "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
395 })
396 }
397 AdapterError::UntargetedLogRead { .. } => Some(
398 "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
399 active cluster. Note that subsequent queries will only be answered by \
400 the selected replica, which might reduce availability. To undo the replica \
401 selection, use `RESET cluster_replica`."
402 .into(),
403 ),
404 AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
405 "Drop an existing {resource_type} or contact support to request a limit increase."
406 )),
407 AdapterError::StatementTimeout => Some(
408 "Consider increasing the maximum allowed statement duration for this session by \
409 setting the statement_timeout session variable. For example, `SET \
410 statement_timeout = '120s'`."
411 .into(),
412 ),
413 AdapterError::PlanError(e) => e.hint(),
414 AdapterError::UnallowedOnCluster { cluster, .. } => {
415 (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
416 "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
417 .to_string()
418 )
419 }
420 AdapterError::InvalidAlter(_, e) => e.hint(),
421 AdapterError::Optimizer(e) => e.hint(),
422 AdapterError::ConnectionValidation(e) => e.hint(),
423 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
424 "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
425 either at the explicitly specified timestamp, or now if the given timestamp would \
426 be in the past.".to_string()
427 ),
428 AdapterError::AlterClusterTimeout => Some(
429 "Consider increasing the timeout duration in the alter cluster statement.".into(),
430 ),
431 AdapterError::DDLTransactionRace => Some(
432 "Currently, DDL transactions fail when any other DDL happens concurrently, \
433 even on unrelated schemas/clusters.".into()
434 ),
435 _ => None,
436 }
437 }
438
439 pub fn code(&self) -> SqlState {
440 match self {
445 AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
448 AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
449 AdapterError::Catalog(e) => match &e.kind {
450 mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
451 VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
452 VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
453 VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
454 VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
455 VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
456 VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
457 VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
458 VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
459 },
460 _ => SqlState::INTERNAL_ERROR,
461 },
462 AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
463 AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
464 AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
465 SqlState::PROGRAM_LIMIT_EXCEEDED
466 }
467 AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
468 SqlState::PROGRAM_LIMIT_EXCEEDED
469 }
470 AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
471 AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
472 SqlState::PROGRAM_LIMIT_EXCEEDED
473 }
474 AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
475 AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
476 AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
477 AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
478 AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
479 AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
480 AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
481 AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
482 AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
483 AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
484 AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
485 AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
486 AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
487 AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
488 AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
489 AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
490 AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
491 AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
492 AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
493 AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
494 SqlState::DUPLICATE_COLUMN
495 }
496 AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
497 SqlState::UNDEFINED_PARAMETER
498 }
499 AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
500 SqlState::UNDEFINED_PARAMETER
501 }
502 AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
503 AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
504 AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
505 AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
506 AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
507 AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
508 AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
509 AdapterError::Canceled => SqlState::QUERY_CANCELED,
510 AdapterError::IdleInTransactionSessionTimeout => {
511 SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
512 }
513 AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
514 AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
515 AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
516 AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
517 AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
518 AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
519 AdapterError::Optimizer(e) => match e {
520 OptimizerError::PlanError(e) => {
521 AdapterError::PlanError(e.clone()).code() }
523 OptimizerError::RecursionLimitError(e) => {
524 AdapterError::RecursionLimit(e.clone()).code() }
526 OptimizerError::Internal(s) => {
527 AdapterError::Internal(s.clone()).code() }
529 OptimizerError::EvalError(e) => {
530 AdapterError::Eval(e.clone()).code() }
532 OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
533 OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
534 OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
535 OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
536 OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
539 },
540 AdapterError::UnallowedOnCluster { .. } => {
541 SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
542 }
543 AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
544 AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
545 AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
546 AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
547 AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
548 AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
549 AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
550 AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
551 AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
552 AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
553 AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
554 AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
555 AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
560 AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
561 AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
562 SqlState::INTERNAL_ERROR
563 }
564 AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
565 AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
566 AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
567 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
569 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
570 AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
571 AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
572 AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
573 AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
574 AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
575 AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
578 AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
579 AdapterError::AuthenticationError => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
580 }
581 }
582
583 pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
584 AdapterError::Internal(format!("{context}: {e}"))
585 }
586}
587
588impl fmt::Display for AdapterError {
589 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
590 match self {
591 AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
592 assert!(up_to < as_of);
593 write!(
594 f,
595 r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
596 up_to, as_of
597 )
598 }
599 AdapterError::AmbiguousSystemColumnReference => {
600 write!(
601 f,
602 "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
603 system objects"
604 )
605 }
606 AdapterError::ChangedPlan(e) => write!(f, "{}", e),
607 AdapterError::Catalog(e) => e.fmt(f),
608 AdapterError::DuplicateCursor(name) => {
609 write!(f, "cursor {} already exists", name.quoted())
610 }
611 AdapterError::Eval(e) => e.fmt(f),
612 AdapterError::Explain(e) => e.fmt(f),
613 AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
614 AdapterError::Internal(e) => write!(f, "internal error: {}", e),
615 AdapterError::IntrospectionDisabled { .. } => write!(
616 f,
617 "cannot read log sources of replica with disabled introspection"
618 ),
619 AdapterError::InvalidLogDependency { object_type, .. } => {
620 write!(f, "{object_type} objects cannot depend on log sources")
621 }
622 AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
623 write!(f, "unknown cluster replica availability zone {az}",)
624 }
625 AdapterError::InvalidSetIsolationLevel => write!(
626 f,
627 "SET TRANSACTION ISOLATION LEVEL must be called before any query"
628 ),
629 AdapterError::InvalidSetCluster => {
630 write!(f, "SET cluster cannot be called in an active transaction")
631 }
632 AdapterError::InvalidStorageClusterSize { size, .. } => {
633 write!(f, "unknown source size {size}")
634 }
635 AdapterError::SourceOrSinkSizeRequired { .. } => {
636 write!(f, "must specify either cluster or size option")
637 }
638 AdapterError::InvalidTableMutationSelection => {
639 f.write_str("invalid selection: operation may only refer to user-defined tables")
640 }
641 AdapterError::ConstraintViolation(not_null_violation) => {
642 write!(f, "{}", not_null_violation)
643 }
644 AdapterError::ConcurrentClusterDrop => {
645 write!(f, "the transaction's active cluster has been dropped")
646 }
647 AdapterError::NoClusterReplicasAvailable { name, .. } => {
648 write!(
649 f,
650 "CLUSTER {} has no replicas available to service request",
651 name.quoted()
652 )
653 }
654 AdapterError::OperationProhibitsTransaction(op) => {
655 write!(f, "{} cannot be run inside a transaction block", op)
656 }
657 AdapterError::OperationRequiresTransaction(op) => {
658 write!(f, "{} can only be used in transaction blocks", op)
659 }
660 AdapterError::ParseError(e) => e.fmt(f),
661 AdapterError::PlanError(e) => e.fmt(f),
662 AdapterError::PreparedStatementExists(name) => {
663 write!(f, "prepared statement {} already exists", name.quoted())
664 }
665 AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
666 AdapterError::SingleStatementTransaction => {
667 f.write_str("this transaction can only execute a single statement")
668 }
669 AdapterError::ReadWriteUnavailable => {
670 f.write_str("transaction read-write mode must be set before any query")
671 }
672 AdapterError::WrongSetOfLocks => {
673 write!(f, "internal error, wrong set of locks acquired")
674 }
675 AdapterError::StatementTimeout => {
676 write!(f, "canceling statement due to statement timeout")
677 }
678 AdapterError::Canceled => {
679 write!(f, "canceling statement due to user request")
680 }
681 AdapterError::IdleInTransactionSessionTimeout => {
682 write!(
683 f,
684 "terminating connection due to idle-in-transaction timeout"
685 )
686 }
687 AdapterError::RecursionLimit(e) => e.fmt(f),
688 AdapterError::RelationOutsideTimeDomain { .. } => {
689 write!(
690 f,
691 "Transactions can only reference objects in the same timedomain. \
692 See https://materialize.com/docs/sql/begin/#same-timedomain-error",
693 )
694 }
695 AdapterError::ResourceExhaustion {
696 resource_type,
697 limit_name,
698 desired,
699 limit,
700 current,
701 } => {
702 write!(
703 f,
704 "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
705 )
706 }
707 AdapterError::ResultSize(e) => write!(f, "{e}"),
708 AdapterError::SafeModeViolation(feature) => {
709 write!(f, "cannot create {} in safe mode", feature)
710 }
711 AdapterError::SubscribeOnlyTransaction => {
712 f.write_str("SUBSCRIBE in transactions must be the only read statement")
713 }
714 AdapterError::Optimizer(e) => e.fmt(f),
715 AdapterError::UnallowedOnCluster {
716 depends_on,
717 cluster,
718 } => {
719 let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
720 write!(
721 f,
722 "querying the following items {items} is not allowed from the {} cluster",
723 cluster.quoted()
724 )
725 }
726 AdapterError::Unauthorized(unauthorized) => {
727 write!(f, "{unauthorized}")
728 }
729 AdapterError::UnknownCursor(name) => {
730 write!(f, "cursor {} does not exist", name.quoted())
731 }
732 AdapterError::UnknownLoginRole(name) => {
733 write!(f, "role {} does not exist", name.quoted())
734 }
735 AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
736 AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
737 AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
738 AdapterError::UnknownPreparedStatement(name) => {
739 write!(f, "prepared statement {} does not exist", name.quoted())
740 }
741 AdapterError::UnknownClusterReplica {
742 cluster_name,
743 replica_name,
744 } => write!(
745 f,
746 "cluster replica '{cluster_name}.{replica_name}' does not exist"
747 ),
748 AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
749 f,
750 "unrecognized configuration parameter {}",
751 setting_name.quoted()
752 ),
753 AdapterError::UntargetedLogRead { .. } => {
754 f.write_str("log source reads must target a replica")
755 }
756 AdapterError::DDLOnlyTransaction => f.write_str(
757 "transactions which modify objects are restricted to just modifying objects",
758 ),
759 AdapterError::DDLTransactionRace => f.write_str(
760 "another session modified the catalog while this DDL transaction was open",
761 ),
762 AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
763 AdapterError::Storage(e) => e.fmt(f),
764 AdapterError::Compute(e) => e.fmt(f),
765 AdapterError::Orchestrator(e) => e.fmt(f),
766 AdapterError::DependentObject(dependent_objects) => {
767 let role_str = if dependent_objects.keys().count() == 1 {
768 "role"
769 } else {
770 "roles"
771 };
772 write!(
773 f,
774 "{role_str} \"{}\" cannot be dropped because some objects depend on it",
775 dependent_objects.keys().join(", ")
776 )
777 }
778 AdapterError::InvalidAlter(t, e) => {
779 write!(f, "invalid ALTER {t}: {e}")
780 }
781 AdapterError::ConnectionValidation(e) => e.fmt(f),
782 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
783 write!(
784 f,
785 "all the specified refreshes of the materialized view would be too far in the past, and thus they \
786 would never happen"
787 )
788 }
789 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
790 write!(
791 f,
792 "REFRESH AT requested for a time where not all the inputs are readable"
793 )
794 }
795 AdapterError::RtrTimeout(_) => {
796 write!(
797 f,
798 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
799 )
800 }
801 AdapterError::RtrDropFailure(_) => write!(
802 f,
803 "real-time source dropped before ingesting the upstream system's visible frontier"
804 ),
805 AdapterError::UnreadableSinkCollection => {
806 write!(f, "collection is not readable at any time")
807 }
808 AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
809 AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
810 AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
811 AdapterError::AlterClusterTimeout => {
812 write!(f, "canceling statement, provided timeout lapsed")
813 }
814 AdapterError::AuthenticationError => {
815 write!(f, "authentication error")
816 }
817 AdapterError::UnavailableFeature { feature, docs } => {
818 write!(f, "{} is not supported in this environment.", feature)?;
819 if let Some(docs) = docs {
820 write!(
821 f,
822 " For more information consult the documentation at {docs}"
823 )?;
824 }
825 Ok(())
826 }
827 }
828 }
829}
830
831impl From<anyhow::Error> for AdapterError {
832 fn from(e: anyhow::Error) -> AdapterError {
833 match e.downcast_ref::<PlanError>() {
834 Some(plan_error) => AdapterError::PlanError(plan_error.clone()),
835 None => AdapterError::Unstructured(e),
836 }
837 }
838}
839
840impl From<TryFromIntError> for AdapterError {
841 fn from(e: TryFromIntError) -> AdapterError {
842 AdapterError::Unstructured(e.into())
843 }
844}
845
846impl From<TryFromDecimalError> for AdapterError {
847 fn from(e: TryFromDecimalError) -> AdapterError {
848 AdapterError::Unstructured(e.into())
849 }
850}
851
852impl From<mz_catalog::memory::error::Error> for AdapterError {
853 fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
854 AdapterError::Catalog(e)
855 }
856}
857
858impl From<mz_catalog::durable::CatalogError> for AdapterError {
859 fn from(e: mz_catalog::durable::CatalogError) -> Self {
860 mz_catalog::memory::error::Error::from(e).into()
861 }
862}
863
864impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
865 fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
866 mz_catalog::durable::CatalogError::from(e).into()
867 }
868}
869
870impl From<EvalError> for AdapterError {
871 fn from(e: EvalError) -> AdapterError {
872 AdapterError::Eval(e)
873 }
874}
875
876impl From<ExplainError> for AdapterError {
877 fn from(e: ExplainError) -> AdapterError {
878 match e {
879 ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
880 e => AdapterError::Explain(e),
881 }
882 }
883}
884
885impl From<mz_sql::catalog::CatalogError> for AdapterError {
886 fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
887 AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
888 }
889}
890
891impl From<PlanError> for AdapterError {
892 fn from(e: PlanError) -> AdapterError {
893 AdapterError::PlanError(e)
894 }
895}
896
897impl From<OptimizerError> for AdapterError {
898 fn from(e: OptimizerError) -> AdapterError {
899 use OptimizerError::*;
900 match e {
901 PlanError(e) => Self::PlanError(e),
902 RecursionLimitError(e) => Self::RecursionLimit(e),
903 EvalError(e) => Self::Eval(e),
904 InternalUnsafeMfpPlan(e) => Self::Internal(e),
905 Internal(e) => Self::Internal(e),
906 e => Self::Optimizer(e),
907 }
908 }
909}
910
911impl From<NotNullViolation> for AdapterError {
912 fn from(e: NotNullViolation) -> AdapterError {
913 AdapterError::ConstraintViolation(e)
914 }
915}
916
917impl From<RecursionLimitError> for AdapterError {
918 fn from(e: RecursionLimitError) -> AdapterError {
919 AdapterError::RecursionLimit(e)
920 }
921}
922
923impl From<oneshot::error::RecvError> for AdapterError {
924 fn from(e: oneshot::error::RecvError) -> AdapterError {
925 AdapterError::Unstructured(e.into())
926 }
927}
928
929impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
930 fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
931 AdapterError::Storage(e)
932 }
933}
934
935impl From<compute_error::InstanceExists> for AdapterError {
936 fn from(e: compute_error::InstanceExists) -> Self {
937 AdapterError::Compute(e.into())
938 }
939}
940
941impl From<TimestampError> for AdapterError {
942 fn from(e: TimestampError) -> Self {
943 let e: EvalError = e.into();
944 e.into()
945 }
946}
947
948impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
949 fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
950 AdapterError::ParseError(e)
951 }
952}
953
954impl From<VarError> for AdapterError {
955 fn from(e: VarError) -> Self {
956 let e: mz_catalog::memory::error::Error = e.into();
957 e.into()
958 }
959}
960
961impl From<rbac::UnauthorizedError> for AdapterError {
962 fn from(e: rbac::UnauthorizedError) -> Self {
963 AdapterError::Unauthorized(e)
964 }
965}
966
967impl From<mz_sql_parser::ast::IdentError> for AdapterError {
968 fn from(value: mz_sql_parser::ast::IdentError) -> Self {
969 AdapterError::PlanError(PlanError::InvalidIdent(value))
970 }
971}
972
973impl From<mz_pgwire_common::ConnectionError> for AdapterError {
974 fn from(value: mz_pgwire_common::ConnectionError) -> Self {
975 match value {
976 mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
977 AdapterError::ResourceExhaustion {
978 resource_type: "connection".into(),
979 limit_name: "max_connections".into(),
980 desired: (current + 1).to_string(),
981 limit: limit.to_string(),
982 current: current.to_string(),
983 }
984 }
985 }
986 }
987}
988
989impl From<NetworkPolicyError> for AdapterError {
990 fn from(value: NetworkPolicyError) -> Self {
991 AdapterError::NetworkPolicyDenied(value)
992 }
993}
994
995impl From<ConnectionValidationError> for AdapterError {
996 fn from(e: ConnectionValidationError) -> AdapterError {
997 AdapterError::ConnectionValidation(e)
998 }
999}
1000
1001impl Error for AdapterError {}