1use std::collections::BTreeMap;
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
20use mz_compute_client::controller::instance::PeekError;
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::timestamp::TimestampError;
28use mz_repr::explain::ExplainError;
29use mz_repr::{NotNullViolation, Timestamp};
30use mz_sql::plan::PlanError;
31use mz_sql::rbac;
32use mz_sql::session::vars::VarError;
33use mz_storage_types::connections::ConnectionValidationError;
34use mz_storage_types::controller::StorageError;
35use mz_storage_types::errors::CollectionMissing;
36use smallvec::SmallVec;
37use timely::progress::Antichain;
38use tokio::sync::oneshot;
39use tokio_postgres::error::SqlState;
40
41use crate::coord::NetworkPolicyError;
42use crate::optimize::OptimizerError;
43
44#[derive(Debug)]
46pub enum AdapterError {
47 AbsurdSubscribeBounds {
49 as_of: mz_repr::Timestamp,
50 up_to: mz_repr::Timestamp,
51 },
52 AmbiguousSystemColumnReference,
56 Catalog(mz_catalog::memory::error::Error),
58 ChangedPlan(String),
63 DuplicateCursor(String),
65 Eval(EvalError),
67 Explain(ExplainError),
69 IdExhaustionError,
71 Internal(String),
73 IntrospectionDisabled {
75 log_names: Vec<String>,
76 },
77 InvalidLogDependency {
80 object_type: String,
81 log_names: Vec<String>,
82 },
83 InvalidClusterReplicaAz {
85 az: String,
86 expected: Vec<String>,
87 },
88 InvalidSetIsolationLevel,
90 InvalidSetCluster,
92 InvalidStorageClusterSize {
94 size: String,
95 expected: Vec<String>,
96 },
97 SourceOrSinkSizeRequired {
99 expected: Vec<String>,
100 },
101 InvalidTableMutationSelection,
103 ConstraintViolation(NotNullViolation),
105 ConcurrentClusterDrop,
107 ConcurrentDependencyDrop {
109 dependency_kind: &'static str,
110 dependency_id: String,
111 },
112 CollectionUnreadable {
113 id: String,
114 },
115 NoClusterReplicasAvailable {
117 name: String,
118 is_managed: bool,
119 },
120 OperationProhibitsTransaction(String),
122 OperationRequiresTransaction(String),
124 PlanError(PlanError),
126 PreparedStatementExists(String),
128 ParseError(mz_sql_parser::parser::ParserStatementError),
130 ReadOnlyTransaction,
132 ReadWriteUnavailable,
134 RecursionLimit(RecursionLimitError),
136 RelationOutsideTimeDomain {
139 relations: Vec<String>,
140 names: Vec<String>,
141 },
142 ResourceExhaustion {
144 resource_type: String,
145 limit_name: String,
146 desired: String,
147 limit: String,
148 current: String,
149 },
150 ResultSize(String),
152 SafeModeViolation(String),
154 WrongSetOfLocks,
156 StatementTimeout,
160 Canceled,
162 IdleInTransactionSessionTimeout,
164 SubscribeOnlyTransaction,
166 Optimizer(OptimizerError),
168 UnallowedOnCluster {
170 depends_on: SmallVec<[String; 2]>,
171 cluster: String,
172 },
173 Unauthorized(rbac::UnauthorizedError),
175 UnknownCursor(String),
177 UnknownLoginRole(String),
179 UnknownPreparedStatement(String),
180 UnknownClusterReplica {
182 cluster_name: String,
183 replica_name: String,
184 },
185 UnrecognizedConfigurationParam(String),
187 Unstructured(anyhow::Error),
191 Unsupported(&'static str),
193 UnavailableFeature {
197 feature: String,
198 docs: Option<String>,
199 },
200 UntargetedLogRead {
202 log_names: Vec<String>,
203 },
204 WriteOnlyTransaction,
206 SingleStatementTransaction,
208 DDLOnlyTransaction,
210 DDLTransactionRace,
212 TransactionDryRun {
215 new_ops: Vec<crate::catalog::Op>,
217 new_state: crate::catalog::CatalogState,
219 },
220 Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
222 Compute(anyhow::Error),
224 Orchestrator(anyhow::Error),
226 DependentObject(BTreeMap<String, Vec<String>>),
230 InvalidAlter(&'static str, PlanError),
233 ConnectionValidation(ConnectionValidationError),
235 MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
239 InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
242 RtrTimeout(String),
244 RtrDropFailure(String),
246 UnreadableSinkCollection,
248 UserSessionsDisallowed,
250 NetworkPolicyDenied(NetworkPolicyError),
252 ReadOnly,
255 AlterClusterTimeout,
256 AlterClusterWhilePendingReplicas,
257 AuthenticationError(AuthenticationError),
258}
259
260#[derive(Debug, thiserror::Error)]
261pub enum AuthenticationError {
262 #[error("invalid credentials")]
263 InvalidCredentials,
264 #[error("role is not allowed to login")]
265 NonLogin,
266 #[error("role does not exist")]
267 RoleNotFound,
268 #[error("password is required")]
269 PasswordRequired,
270}
271
272impl AdapterError {
273 pub fn into_response(self, severity: Severity) -> ErrorResponse {
274 ErrorResponse {
275 severity,
276 code: self.code(),
277 message: self.to_string(),
278 detail: self.detail(),
279 hint: self.hint(),
280 position: self.position(),
281 }
282 }
283
284 pub fn position(&self) -> Option<usize> {
285 match self {
286 AdapterError::ParseError(err) => Some(err.error.pos),
287 _ => None,
288 }
289 }
290
291 pub fn detail(&self) -> Option<String> {
293 match self {
294 AdapterError::AmbiguousSystemColumnReference => {
295 Some("This is a current limitation in Materialize".into())
296 },
297 AdapterError::Catalog(c) => c.detail(),
298 AdapterError::Eval(e) => e.detail(),
299 AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
300 "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
301 relations
302 .iter()
303 .map(|r| r.quoted().to_string())
304 .collect::<Vec<_>>()
305 .join("\n"),
306 match names.is_empty() {
307 true => "No relations are available.".to_string(),
308 false => format!(
309 "Only the following relations are available:\n{}",
310 names
311 .iter()
312 .map(|name| name.quoted().to_string())
313 .collect::<Vec<_>>()
314 .join("\n")
315 ),
316 }
317 )),
318 AdapterError::SourceOrSinkSizeRequired { .. } => Some(
319 "Either specify the cluster that will maintain this object via IN CLUSTER or \
320 specify size via SIZE option."
321 .into(),
322 ),
323 AdapterError::SafeModeViolation(_) => Some(
324 "The Materialize server you are connected to is running in \
325 safe mode, which limits the features that are available."
326 .into(),
327 ),
328 AdapterError::IntrospectionDisabled { log_names }
329 | AdapterError::UntargetedLogRead { log_names } => Some(format!(
330 "The query references the following log sources:\n {}",
331 log_names.join("\n "),
332 )),
333 AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
334 "The object depends on the following log sources:\n {}",
335 log_names.join("\n "),
336 )),
337 AdapterError::PlanError(e) => e.detail(),
338 AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
339 AdapterError::DependentObject(dependent_objects) => {
340 Some(dependent_objects
341 .iter()
342 .map(|(role_name, err_msgs)| err_msgs
343 .iter()
344 .map(|err_msg| format!("{role_name}: {err_msg}"))
345 .join("\n"))
346 .join("\n"))
347 },
348 AdapterError::Storage(storage_error) => {
349 storage_error.source().map(|source_error| source_error.to_string_with_causes())
350 }
351 AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
352 AdapterError::InvalidAlter(_, e) => e.detail(),
353 AdapterError::Optimizer(e) => e.detail(),
354 AdapterError::ConnectionValidation(e) => e.detail(),
355 AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
356 Some(format!(
357 "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
358 view is {}.",
359 last_refresh,
360 earliest_possible,
361 ))
362 }
363 AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
364 format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
365 ),
366 AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
367 Some(format!(
368 "The requested REFRESH AT time is {}, \
369 but not all input collections are readable earlier than [{}].",
370 oracle_read_ts,
371 if least_valid_read.len() == 1 {
372 format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
373 } else {
374 format!("{:?}", least_valid_read)
376 }
377 ))
378 }
379 AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
380 AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
381 AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
382 AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
383 _ => None,
384 }
385 }
386
387 pub fn hint(&self) -> Option<String> {
389 match self {
390 AdapterError::AmbiguousSystemColumnReference => Some(
391 "Rewrite the view to refer to all columns by name. Expand all wildcards and \
392 convert all NATURAL JOINs to USING joins."
393 .to_string(),
394 ),
395 AdapterError::Catalog(c) => c.hint(),
396 AdapterError::Eval(e) => e.hint(),
397 AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
398 Some(if expected.is_empty() {
399 "No availability zones configured; do not specify AVAILABILITY ZONE".into()
400 } else {
401 format!("Valid availability zones are: {}", expected.join(", "))
402 })
403 }
404 AdapterError::InvalidStorageClusterSize { expected, .. } => {
405 Some(format!("Valid sizes are: {}", expected.join(", ")))
406 }
407 AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
408 "Try choosing one of the smaller sizes to start. Available sizes: {}",
409 expected.join(", ")
410 )),
411 AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
412 Some(if *is_managed {
413 "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
414 Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
415 } else {
416 "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
417 })
418 }
419 AdapterError::UntargetedLogRead { .. } => Some(
420 "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
421 active cluster. Note that subsequent queries will only be answered by \
422 the selected replica, which might reduce availability. To undo the replica \
423 selection, use `RESET cluster_replica`."
424 .into(),
425 ),
426 AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
427 "Drop an existing {resource_type} or contact support to request a limit increase."
428 )),
429 AdapterError::StatementTimeout => Some(
430 "Consider increasing the maximum allowed statement duration for this session by \
431 setting the statement_timeout session variable. For example, `SET \
432 statement_timeout = '120s'`."
433 .into(),
434 ),
435 AdapterError::PlanError(e) => e.hint(),
436 AdapterError::UnallowedOnCluster { cluster, .. } => {
437 (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
438 "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
439 .to_string()
440 )
441 }
442 AdapterError::InvalidAlter(_, e) => e.hint(),
443 AdapterError::Optimizer(e) => e.hint(),
444 AdapterError::ConnectionValidation(e) => e.hint(),
445 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
446 "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
447 either at the explicitly specified timestamp, or now if the given timestamp would \
448 be in the past.".to_string()
449 ),
450 AdapterError::AlterClusterTimeout => Some(
451 "Consider increasing the timeout duration in the alter cluster statement.".into(),
452 ),
453 AdapterError::DDLTransactionRace => Some(
454 "Currently, DDL transactions fail when any other DDL happens concurrently, \
455 even on unrelated schemas/clusters.".into()
456 ),
457 AdapterError::CollectionUnreadable { .. } => Some(
458 "This could be because the collection has recently been dropped.".into()
459 ),
460 _ => None,
461 }
462 }
463
464 pub fn code(&self) -> SqlState {
465 match self {
470 AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
473 AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
474 AdapterError::Catalog(e) => match &e.kind {
475 mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
476 VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
477 VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
478 VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
479 VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
480 VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
481 VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
482 VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
483 VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
484 },
485 _ => SqlState::INTERNAL_ERROR,
486 },
487 AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
488 AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
489 AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
490 SqlState::PROGRAM_LIMIT_EXCEEDED
491 }
492 AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
493 SqlState::PROGRAM_LIMIT_EXCEEDED
494 }
495 AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
496 AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
497 SqlState::PROGRAM_LIMIT_EXCEEDED
498 }
499 AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
500 AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
501 AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
502 AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
503 AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
504 AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
505 AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
506 AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
507 AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
508 AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
509 AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
510 AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
511 AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
512 AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
513 AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
514 AdapterError::CollectionUnreadable { .. } => SqlState::NO_DATA_FOUND,
515 AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
516 AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
517 AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
518 AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
519 AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
520 AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
521 SqlState::DUPLICATE_COLUMN
522 }
523 AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
524 SqlState::UNDEFINED_PARAMETER
525 }
526 AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
527 SqlState::UNDEFINED_PARAMETER
528 }
529 AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
530 AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
531 AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
532 AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
533 AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
534 AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
535 AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
536 AdapterError::Canceled => SqlState::QUERY_CANCELED,
537 AdapterError::IdleInTransactionSessionTimeout => {
538 SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
539 }
540 AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
541 AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
542 AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
543 AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
544 AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
545 AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
546 AdapterError::Optimizer(e) => match e {
547 OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
548 SqlState::INVALID_SCHEMA_NAME
549 }
550 OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
551 SqlState::DUPLICATE_COLUMN
552 }
553 OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
554 SqlState::UNDEFINED_PARAMETER
555 }
556 OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
557 SqlState::UNDEFINED_PARAMETER
558 }
559 OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
560 OptimizerError::RecursionLimitError(e) => {
561 AdapterError::RecursionLimit(e.clone()).code() }
563 OptimizerError::Internal(s) => {
564 AdapterError::Internal(s.clone()).code() }
566 OptimizerError::EvalError(e) => {
567 AdapterError::Eval(e.clone()).code() }
569 OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
570 OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
571 OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
572 OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
573 OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
576 },
577 AdapterError::UnallowedOnCluster { .. } => {
578 SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
579 }
580 AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
581 AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
582 AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
583 AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
584 AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
585 AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
586 AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
587 AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
588 AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
589 AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
590 AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
591 AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
592 AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
597 AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
598 AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
599 SqlState::INTERNAL_ERROR
600 }
601 AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
602 AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
603 AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
604 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
606 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
607 AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
608 AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
609 AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
610 AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
611 AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
612 AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
615 AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
616 AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
617 AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
618 SqlState::INVALID_PASSWORD
619 }
620 AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
621 }
622 }
623
624 pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
625 AdapterError::Internal(format!("{context}: {e}"))
626 }
627
628 pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
634 AdapterError::ConcurrentDependencyDrop {
635 dependency_kind: "cluster",
636 dependency_id: e.0.to_string(),
637 }
638 }
639 pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
640 AdapterError::ConcurrentDependencyDrop {
641 dependency_kind: "collection",
642 dependency_id: e.0.to_string(),
643 }
644 }
645
646 pub fn concurrent_dependency_drop_from_collection_lookup_error(
647 e: CollectionLookupError,
648 compute_instance: ComputeInstanceId,
649 ) -> Self {
650 match e {
651 CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
652 dependency_kind: "cluster",
653 dependency_id: id.to_string(),
654 },
655 CollectionLookupError::CollectionMissing(id) => {
656 AdapterError::ConcurrentDependencyDrop {
657 dependency_kind: "collection",
658 dependency_id: id.to_string(),
659 }
660 }
661 CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
662 dependency_kind: "cluster",
663 dependency_id: compute_instance.to_string(),
664 },
665 }
666 }
667
668 pub fn concurrent_dependency_drop_from_peek_error(
669 e: PeekError,
670 compute_instance: ComputeInstanceId,
671 ) -> AdapterError {
672 match e {
673 PeekError::ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
674 dependency_kind: "replica",
675 dependency_id: id.to_string(),
676 },
677 PeekError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
678 dependency_kind: "cluster",
679 dependency_id: compute_instance.to_string(),
680 },
681 e @ PeekError::ReadHoldIdMismatch(_) => AdapterError::internal("peek error", e),
682 e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e),
683 }
684 }
685
686 pub fn concurrent_dependency_drop_from_dataflow_creation_error(
687 e: compute_error::DataflowCreationError,
688 ) -> Self {
689 use compute_error::DataflowCreationError::*;
690 match e {
691 InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
692 dependency_kind: "cluster",
693 dependency_id: id.to_string(),
694 },
695 CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
696 dependency_kind: "collection",
697 dependency_id: id.to_string(),
698 },
699 ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
700 dependency_kind: "replica",
701 dependency_id: id.to_string(),
702 },
703 MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
704 AdapterError::internal("dataflow creation error", e)
705 }
706 }
707 }
708}
709
710impl fmt::Display for AdapterError {
711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 match self {
713 AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
714 assert!(up_to < as_of);
715 write!(
716 f,
717 r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
718 up_to, as_of
719 )
720 }
721 AdapterError::AmbiguousSystemColumnReference => {
722 write!(
723 f,
724 "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
725 system objects"
726 )
727 }
728 AdapterError::ChangedPlan(e) => write!(f, "{}", e),
729 AdapterError::Catalog(e) => e.fmt(f),
730 AdapterError::DuplicateCursor(name) => {
731 write!(f, "cursor {} already exists", name.quoted())
732 }
733 AdapterError::Eval(e) => e.fmt(f),
734 AdapterError::Explain(e) => e.fmt(f),
735 AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
736 AdapterError::Internal(e) => write!(f, "internal error: {}", e),
737 AdapterError::IntrospectionDisabled { .. } => write!(
738 f,
739 "cannot read log sources of replica with disabled introspection"
740 ),
741 AdapterError::InvalidLogDependency { object_type, .. } => {
742 write!(f, "{object_type} objects cannot depend on log sources")
743 }
744 AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
745 write!(f, "unknown cluster replica availability zone {az}",)
746 }
747 AdapterError::InvalidSetIsolationLevel => write!(
748 f,
749 "SET TRANSACTION ISOLATION LEVEL must be called before any query"
750 ),
751 AdapterError::InvalidSetCluster => {
752 write!(f, "SET cluster cannot be called in an active transaction")
753 }
754 AdapterError::InvalidStorageClusterSize { size, .. } => {
755 write!(f, "unknown source size {size}")
756 }
757 AdapterError::SourceOrSinkSizeRequired { .. } => {
758 write!(f, "must specify either cluster or size option")
759 }
760 AdapterError::InvalidTableMutationSelection => {
761 f.write_str("invalid selection: operation may only refer to user-defined tables")
762 }
763 AdapterError::ConstraintViolation(not_null_violation) => {
764 write!(f, "{}", not_null_violation)
765 }
766 AdapterError::ConcurrentClusterDrop => {
767 write!(f, "the transaction's active cluster has been dropped")
768 }
769 AdapterError::ConcurrentDependencyDrop {
770 dependency_kind,
771 dependency_id,
772 } => {
773 write!(f, "{dependency_kind} '{dependency_id}' was dropped")
774 }
775 AdapterError::CollectionUnreadable { id } => {
776 write!(f, "collection '{id}' is not readable at any timestamp")
777 }
778 AdapterError::NoClusterReplicasAvailable { name, .. } => {
779 write!(
780 f,
781 "CLUSTER {} has no replicas available to service request",
782 name.quoted()
783 )
784 }
785 AdapterError::OperationProhibitsTransaction(op) => {
786 write!(f, "{} cannot be run inside a transaction block", op)
787 }
788 AdapterError::OperationRequiresTransaction(op) => {
789 write!(f, "{} can only be used in transaction blocks", op)
790 }
791 AdapterError::ParseError(e) => e.fmt(f),
792 AdapterError::PlanError(e) => e.fmt(f),
793 AdapterError::PreparedStatementExists(name) => {
794 write!(f, "prepared statement {} already exists", name.quoted())
795 }
796 AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
797 AdapterError::SingleStatementTransaction => {
798 f.write_str("this transaction can only execute a single statement")
799 }
800 AdapterError::ReadWriteUnavailable => {
801 f.write_str("transaction read-write mode must be set before any query")
802 }
803 AdapterError::WrongSetOfLocks => {
804 write!(f, "internal error, wrong set of locks acquired")
805 }
806 AdapterError::StatementTimeout => {
807 write!(f, "canceling statement due to statement timeout")
808 }
809 AdapterError::Canceled => {
810 write!(f, "canceling statement due to user request")
811 }
812 AdapterError::IdleInTransactionSessionTimeout => {
813 write!(
814 f,
815 "terminating connection due to idle-in-transaction timeout"
816 )
817 }
818 AdapterError::RecursionLimit(e) => e.fmt(f),
819 AdapterError::RelationOutsideTimeDomain { .. } => {
820 write!(
821 f,
822 "Transactions can only reference objects in the same timedomain. \
823 See https://materialize.com/docs/sql/begin/#same-timedomain-error",
824 )
825 }
826 AdapterError::ResourceExhaustion {
827 resource_type,
828 limit_name,
829 desired,
830 limit,
831 current,
832 } => {
833 write!(
834 f,
835 "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
836 )
837 }
838 AdapterError::ResultSize(e) => write!(f, "{e}"),
839 AdapterError::SafeModeViolation(feature) => {
840 write!(f, "cannot create {} in safe mode", feature)
841 }
842 AdapterError::SubscribeOnlyTransaction => {
843 f.write_str("SUBSCRIBE in transactions must be the only read statement")
844 }
845 AdapterError::Optimizer(e) => e.fmt(f),
846 AdapterError::UnallowedOnCluster {
847 depends_on,
848 cluster,
849 } => {
850 let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
851 write!(
852 f,
853 "querying the following items {items} is not allowed from the {} cluster",
854 cluster.quoted()
855 )
856 }
857 AdapterError::Unauthorized(unauthorized) => {
858 write!(f, "{unauthorized}")
859 }
860 AdapterError::UnknownCursor(name) => {
861 write!(f, "cursor {} does not exist", name.quoted())
862 }
863 AdapterError::UnknownLoginRole(name) => {
864 write!(f, "role {} does not exist", name.quoted())
865 }
866 AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
867 AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
868 AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
869 AdapterError::UnknownPreparedStatement(name) => {
870 write!(f, "prepared statement {} does not exist", name.quoted())
871 }
872 AdapterError::UnknownClusterReplica {
873 cluster_name,
874 replica_name,
875 } => write!(
876 f,
877 "cluster replica '{cluster_name}.{replica_name}' does not exist"
878 ),
879 AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
880 f,
881 "unrecognized configuration parameter {}",
882 setting_name.quoted()
883 ),
884 AdapterError::UntargetedLogRead { .. } => {
885 f.write_str("log source reads must target a replica")
886 }
887 AdapterError::DDLOnlyTransaction => f.write_str(
888 "transactions which modify objects are restricted to just modifying objects",
889 ),
890 AdapterError::DDLTransactionRace => f.write_str(
891 "another session modified the catalog while this DDL transaction was open",
892 ),
893 AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
894 AdapterError::Storage(e) => e.fmt(f),
895 AdapterError::Compute(e) => e.fmt(f),
896 AdapterError::Orchestrator(e) => e.fmt(f),
897 AdapterError::DependentObject(dependent_objects) => {
898 let role_str = if dependent_objects.keys().count() == 1 {
899 "role"
900 } else {
901 "roles"
902 };
903 write!(
904 f,
905 "{role_str} \"{}\" cannot be dropped because some objects depend on it",
906 dependent_objects.keys().join(", ")
907 )
908 }
909 AdapterError::InvalidAlter(t, e) => {
910 write!(f, "invalid ALTER {t}: {e}")
911 }
912 AdapterError::ConnectionValidation(e) => e.fmt(f),
913 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
914 write!(
915 f,
916 "all the specified refreshes of the materialized view would be too far in the past, and thus they \
917 would never happen"
918 )
919 }
920 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
921 write!(
922 f,
923 "REFRESH AT requested for a time where not all the inputs are readable"
924 )
925 }
926 AdapterError::RtrTimeout(_) => {
927 write!(
928 f,
929 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
930 )
931 }
932 AdapterError::RtrDropFailure(_) => write!(
933 f,
934 "real-time source dropped before ingesting the upstream system's visible frontier"
935 ),
936 AdapterError::UnreadableSinkCollection => {
937 write!(f, "collection is not readable at any time")
938 }
939 AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
940 AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
941 AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
942 AdapterError::AlterClusterTimeout => {
943 write!(f, "canceling statement, provided timeout lapsed")
944 }
945 AdapterError::AuthenticationError(e) => {
946 write!(f, "authentication error {e}")
947 }
948 AdapterError::UnavailableFeature { feature, docs } => {
949 write!(f, "{} is not supported in this environment.", feature)?;
950 if let Some(docs) = docs {
951 write!(
952 f,
953 " For more information consult the documentation at {docs}"
954 )?;
955 }
956 Ok(())
957 }
958 AdapterError::AlterClusterWhilePendingReplicas => {
959 write!(f, "cannot alter clusters with pending updates")
960 }
961 }
962 }
963}
964
965impl From<anyhow::Error> for AdapterError {
966 fn from(e: anyhow::Error) -> AdapterError {
967 match e.downcast::<PlanError>() {
968 Ok(plan_error) => AdapterError::PlanError(plan_error),
969 Err(e) => AdapterError::Unstructured(e),
970 }
971 }
972}
973
974impl From<TryFromIntError> for AdapterError {
975 fn from(e: TryFromIntError) -> AdapterError {
976 AdapterError::Unstructured(e.into())
977 }
978}
979
980impl From<TryFromDecimalError> for AdapterError {
981 fn from(e: TryFromDecimalError) -> AdapterError {
982 AdapterError::Unstructured(e.into())
983 }
984}
985
986impl From<mz_catalog::memory::error::Error> for AdapterError {
987 fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
988 AdapterError::Catalog(e)
989 }
990}
991
992impl From<mz_catalog::durable::CatalogError> for AdapterError {
993 fn from(e: mz_catalog::durable::CatalogError) -> Self {
994 mz_catalog::memory::error::Error::from(e).into()
995 }
996}
997
998impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
999 fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
1000 mz_catalog::durable::CatalogError::from(e).into()
1001 }
1002}
1003
1004impl From<EvalError> for AdapterError {
1005 fn from(e: EvalError) -> AdapterError {
1006 AdapterError::Eval(e)
1007 }
1008}
1009
1010impl From<ExplainError> for AdapterError {
1011 fn from(e: ExplainError) -> AdapterError {
1012 match e {
1013 ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1014 e => AdapterError::Explain(e),
1015 }
1016 }
1017}
1018
1019impl From<mz_sql::catalog::CatalogError> for AdapterError {
1020 fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1021 AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1022 }
1023}
1024
1025impl From<PlanError> for AdapterError {
1026 fn from(e: PlanError) -> AdapterError {
1027 match e {
1028 PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1029 _ => AdapterError::PlanError(e),
1030 }
1031 }
1032}
1033
1034impl From<OptimizerError> for AdapterError {
1035 fn from(e: OptimizerError) -> AdapterError {
1036 use OptimizerError::*;
1037 match e {
1038 PlanError(e) => Self::PlanError(e),
1039 RecursionLimitError(e) => Self::RecursionLimit(e),
1040 EvalError(e) => Self::Eval(e),
1041 InternalUnsafeMfpPlan(e) => Self::Internal(e),
1042 Internal(e) => Self::Internal(e),
1043 e => Self::Optimizer(e),
1044 }
1045 }
1046}
1047
1048impl From<NotNullViolation> for AdapterError {
1049 fn from(e: NotNullViolation) -> AdapterError {
1050 AdapterError::ConstraintViolation(e)
1051 }
1052}
1053
1054impl From<RecursionLimitError> for AdapterError {
1055 fn from(e: RecursionLimitError) -> AdapterError {
1056 AdapterError::RecursionLimit(e)
1057 }
1058}
1059
1060impl From<oneshot::error::RecvError> for AdapterError {
1061 fn from(e: oneshot::error::RecvError) -> AdapterError {
1062 AdapterError::Unstructured(e.into())
1063 }
1064}
1065
1066impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1067 fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1068 AdapterError::Storage(e)
1069 }
1070}
1071
1072impl From<compute_error::InstanceExists> for AdapterError {
1073 fn from(e: compute_error::InstanceExists) -> Self {
1074 AdapterError::Compute(e.into())
1075 }
1076}
1077
1078impl From<TimestampError> for AdapterError {
1079 fn from(e: TimestampError) -> Self {
1080 let e: EvalError = e.into();
1081 e.into()
1082 }
1083}
1084
1085impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1086 fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1087 AdapterError::ParseError(e)
1088 }
1089}
1090
1091impl From<VarError> for AdapterError {
1092 fn from(e: VarError) -> Self {
1093 let e: mz_catalog::memory::error::Error = e.into();
1094 e.into()
1095 }
1096}
1097
1098impl From<rbac::UnauthorizedError> for AdapterError {
1099 fn from(e: rbac::UnauthorizedError) -> Self {
1100 AdapterError::Unauthorized(e)
1101 }
1102}
1103
1104impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1105 fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1106 AdapterError::PlanError(PlanError::InvalidIdent(value))
1107 }
1108}
1109
1110impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1111 fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1112 match value {
1113 mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1114 AdapterError::ResourceExhaustion {
1115 resource_type: "connection".into(),
1116 limit_name: "max_connections".into(),
1117 desired: (current + 1).to_string(),
1118 limit: limit.to_string(),
1119 current: current.to_string(),
1120 }
1121 }
1122 }
1123 }
1124}
1125
1126impl From<NetworkPolicyError> for AdapterError {
1127 fn from(value: NetworkPolicyError) -> Self {
1128 AdapterError::NetworkPolicyDenied(value)
1129 }
1130}
1131
1132impl From<ConnectionValidationError> for AdapterError {
1133 fn from(e: ConnectionValidationError) -> AdapterError {
1134 AdapterError::ConnectionValidation(e)
1135 }
1136}
1137
1138impl Error for AdapterError {}