1use std::collections::BTreeMap;
11use std::error::Error;
12use std::fmt;
13use std::num::TryFromIntError;
14
15use dec::TryFromDecimalError;
16use itertools::Itertools;
17use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
18use mz_compute_client::controller::error as compute_error;
19use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
20use mz_compute_client::controller::instance::PeekError;
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::EvalError;
23use mz_ore::error::ErrorExt;
24use mz_ore::stack::RecursionLimitError;
25use mz_ore::str::StrExt;
26use mz_pgwire_common::{ErrorResponse, Severity};
27use mz_repr::adt::timestamp::TimestampError;
28use mz_repr::explain::ExplainError;
29use mz_repr::{NotNullViolation, Timestamp};
30use mz_sql::plan::PlanError;
31use mz_sql::rbac;
32use mz_sql::session::vars::VarError;
33use mz_storage_types::connections::ConnectionValidationError;
34use mz_storage_types::controller::StorageError;
35use mz_storage_types::errors::CollectionMissing;
36use smallvec::SmallVec;
37use timely::progress::Antichain;
38use tokio::sync::oneshot;
39use tokio_postgres::error::SqlState;
40
41use crate::coord::NetworkPolicyError;
42use crate::optimize::OptimizerError;
43
44#[derive(Debug)]
46pub enum AdapterError {
47 AbsurdSubscribeBounds {
49 as_of: mz_repr::Timestamp,
50 up_to: mz_repr::Timestamp,
51 },
52 AmbiguousSystemColumnReference,
56 Catalog(mz_catalog::memory::error::Error),
58 ChangedPlan(String),
63 DuplicateCursor(String),
65 Eval(EvalError),
67 Explain(ExplainError),
69 IdExhaustionError,
71 Internal(String),
73 IntrospectionDisabled {
75 log_names: Vec<String>,
76 },
77 InvalidLogDependency {
80 object_type: String,
81 log_names: Vec<String>,
82 },
83 InvalidClusterReplicaAz {
85 az: String,
86 expected: Vec<String>,
87 },
88 InvalidSetIsolationLevel,
90 InvalidSetCluster,
92 InvalidStorageClusterSize {
94 size: String,
95 expected: Vec<String>,
96 },
97 SourceOrSinkSizeRequired {
99 expected: Vec<String>,
100 },
101 InvalidTableMutationSelection,
103 ConstraintViolation(NotNullViolation),
105 ConcurrentClusterDrop,
107 ConcurrentDependencyDrop {
109 dependency_kind: &'static str,
110 dependency_id: String,
111 },
112 NoClusterReplicasAvailable {
114 name: String,
115 is_managed: bool,
116 },
117 OperationProhibitsTransaction(String),
119 OperationRequiresTransaction(String),
121 PlanError(PlanError),
123 PreparedStatementExists(String),
125 ParseError(mz_sql_parser::parser::ParserStatementError),
127 ReadOnlyTransaction,
129 ReadWriteUnavailable,
131 RecursionLimit(RecursionLimitError),
133 RelationOutsideTimeDomain {
136 relations: Vec<String>,
137 names: Vec<String>,
138 },
139 ResourceExhaustion {
141 resource_type: String,
142 limit_name: String,
143 desired: String,
144 limit: String,
145 current: String,
146 },
147 ResultSize(String),
149 SafeModeViolation(String),
151 WrongSetOfLocks,
153 StatementTimeout,
157 Canceled,
159 IdleInTransactionSessionTimeout,
161 SubscribeOnlyTransaction,
163 Optimizer(OptimizerError),
165 UnallowedOnCluster {
167 depends_on: SmallVec<[String; 2]>,
168 cluster: String,
169 },
170 Unauthorized(rbac::UnauthorizedError),
172 UnknownCursor(String),
174 UnknownLoginRole(String),
176 UnknownPreparedStatement(String),
177 UnknownClusterReplica {
179 cluster_name: String,
180 replica_name: String,
181 },
182 UnrecognizedConfigurationParam(String),
184 Unstructured(anyhow::Error),
188 Unsupported(&'static str),
190 UnavailableFeature {
194 feature: String,
195 docs: Option<String>,
196 },
197 UntargetedLogRead {
199 log_names: Vec<String>,
200 },
201 WriteOnlyTransaction,
203 SingleStatementTransaction,
205 DDLOnlyTransaction,
207 DDLTransactionRace,
209 TransactionDryRun {
212 new_ops: Vec<crate::catalog::Op>,
214 new_state: crate::catalog::CatalogState,
216 },
217 Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
219 Compute(anyhow::Error),
221 Orchestrator(anyhow::Error),
223 DependentObject(BTreeMap<String, Vec<String>>),
227 InvalidAlter(&'static str, PlanError),
230 ConnectionValidation(ConnectionValidationError),
232 MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
236 InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
239 RtrTimeout(String),
241 RtrDropFailure(String),
243 UnreadableSinkCollection,
245 UserSessionsDisallowed,
247 NetworkPolicyDenied(NetworkPolicyError),
249 ReadOnly,
252 AlterClusterTimeout,
253 AlterClusterWhilePendingReplicas,
254 AuthenticationError(AuthenticationError),
255}
256
257#[derive(Debug, thiserror::Error)]
258pub enum AuthenticationError {
259 #[error("invalid credentials")]
260 InvalidCredentials,
261 #[error("role is not allowed to login")]
262 NonLogin,
263 #[error("role does not exist")]
264 RoleNotFound,
265 #[error("password is required")]
266 PasswordRequired,
267}
268
269impl AdapterError {
270 pub fn into_response(self, severity: Severity) -> ErrorResponse {
271 ErrorResponse {
272 severity,
273 code: self.code(),
274 message: self.to_string(),
275 detail: self.detail(),
276 hint: self.hint(),
277 position: self.position(),
278 }
279 }
280
281 pub fn position(&self) -> Option<usize> {
282 match self {
283 AdapterError::ParseError(err) => Some(err.error.pos),
284 _ => None,
285 }
286 }
287
288 pub fn detail(&self) -> Option<String> {
290 match self {
291 AdapterError::AmbiguousSystemColumnReference => {
292 Some("This is a current limitation in Materialize".into())
293 },
294 AdapterError::Catalog(c) => c.detail(),
295 AdapterError::Eval(e) => e.detail(),
296 AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
297 "The following relations in the query are outside the transaction's time domain:\n{}\n{}",
298 relations
299 .iter()
300 .map(|r| r.quoted().to_string())
301 .collect::<Vec<_>>()
302 .join("\n"),
303 match names.is_empty() {
304 true => "No relations are available.".to_string(),
305 false => format!(
306 "Only the following relations are available:\n{}",
307 names
308 .iter()
309 .map(|name| name.quoted().to_string())
310 .collect::<Vec<_>>()
311 .join("\n")
312 ),
313 }
314 )),
315 AdapterError::SourceOrSinkSizeRequired { .. } => Some(
316 "Either specify the cluster that will maintain this object via IN CLUSTER or \
317 specify size via SIZE option."
318 .into(),
319 ),
320 AdapterError::SafeModeViolation(_) => Some(
321 "The Materialize server you are connected to is running in \
322 safe mode, which limits the features that are available."
323 .into(),
324 ),
325 AdapterError::IntrospectionDisabled { log_names }
326 | AdapterError::UntargetedLogRead { log_names } => Some(format!(
327 "The query references the following log sources:\n {}",
328 log_names.join("\n "),
329 )),
330 AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
331 "The object depends on the following log sources:\n {}",
332 log_names.join("\n "),
333 )),
334 AdapterError::PlanError(e) => e.detail(),
335 AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
336 AdapterError::DependentObject(dependent_objects) => {
337 Some(dependent_objects
338 .iter()
339 .map(|(role_name, err_msgs)| err_msgs
340 .iter()
341 .map(|err_msg| format!("{role_name}: {err_msg}"))
342 .join("\n"))
343 .join("\n"))
344 },
345 AdapterError::Storage(storage_error) => {
346 storage_error.source().map(|source_error| source_error.to_string_with_causes())
347 }
348 AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
349 AdapterError::InvalidAlter(_, e) => e.detail(),
350 AdapterError::Optimizer(e) => e.detail(),
351 AdapterError::ConnectionValidation(e) => e.detail(),
352 AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
353 Some(format!(
354 "The specified last refresh is at {}, while the earliest possible time to compute the materialized \
355 view is {}.",
356 last_refresh,
357 earliest_possible,
358 ))
359 }
360 AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
361 format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
362 ),
363 AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
364 Some(format!(
365 "The requested REFRESH AT time is {}, \
366 but not all input collections are readable earlier than [{}].",
367 oracle_read_ts,
368 if least_valid_read.len() == 1 {
369 format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
370 } else {
371 format!("{:?}", least_valid_read)
373 }
374 ))
375 }
376 AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
377 AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
378 AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
379 AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
380 _ => None,
381 }
382 }
383
384 pub fn hint(&self) -> Option<String> {
386 match self {
387 AdapterError::AmbiguousSystemColumnReference => Some(
388 "Rewrite the view to refer to all columns by name. Expand all wildcards and \
389 convert all NATURAL JOINs to USING joins."
390 .to_string(),
391 ),
392 AdapterError::Catalog(c) => c.hint(),
393 AdapterError::Eval(e) => e.hint(),
394 AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
395 Some(if expected.is_empty() {
396 "No availability zones configured; do not specify AVAILABILITY ZONE".into()
397 } else {
398 format!("Valid availability zones are: {}", expected.join(", "))
399 })
400 }
401 AdapterError::InvalidStorageClusterSize { expected, .. } => {
402 Some(format!("Valid sizes are: {}", expected.join(", ")))
403 }
404 AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
405 "Try choosing one of the smaller sizes to start. Available sizes: {}",
406 expected.join(", ")
407 )),
408 AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
409 Some(if *is_managed {
410 "Use ALTER CLUSTER to adjust the replication factor of the cluster. \
411 Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
412 } else {
413 "Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
414 })
415 }
416 AdapterError::UntargetedLogRead { .. } => Some(
417 "Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
418 active cluster. Note that subsequent queries will only be answered by \
419 the selected replica, which might reduce availability. To undo the replica \
420 selection, use `RESET cluster_replica`."
421 .into(),
422 ),
423 AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
424 "Drop an existing {resource_type} or contact support to request a limit increase."
425 )),
426 AdapterError::StatementTimeout => Some(
427 "Consider increasing the maximum allowed statement duration for this session by \
428 setting the statement_timeout session variable. For example, `SET \
429 statement_timeout = '120s'`."
430 .into(),
431 ),
432 AdapterError::PlanError(e) => e.hint(),
433 AdapterError::UnallowedOnCluster { cluster, .. } => {
434 (cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
435 "Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
436 .to_string()
437 )
438 }
439 AdapterError::InvalidAlter(_, e) => e.hint(),
440 AdapterError::Optimizer(e) => e.hint(),
441 AdapterError::ConnectionValidation(e) => e.hint(),
442 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
443 "You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
444 either at the explicitly specified timestamp, or now if the given timestamp would \
445 be in the past.".to_string()
446 ),
447 AdapterError::AlterClusterTimeout => Some(
448 "Consider increasing the timeout duration in the alter cluster statement.".into(),
449 ),
450 AdapterError::DDLTransactionRace => Some(
451 "Currently, DDL transactions fail when any other DDL happens concurrently, \
452 even on unrelated schemas/clusters.".into()
453 ),
454 _ => None,
455 }
456 }
457
458 pub fn code(&self) -> SqlState {
459 match self {
464 AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
467 AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
468 AdapterError::Catalog(e) => match &e.kind {
469 mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
470 VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
471 VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
472 VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
473 VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
474 VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
475 VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
476 VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
477 VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
478 },
479 _ => SqlState::INTERNAL_ERROR,
480 },
481 AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
482 AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
483 AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
484 SqlState::PROGRAM_LIMIT_EXCEEDED
485 }
486 AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
487 SqlState::PROGRAM_LIMIT_EXCEEDED
488 }
489 AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
490 AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
491 SqlState::PROGRAM_LIMIT_EXCEEDED
492 }
493 AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
494 AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
495 AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
496 AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
497 AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
498 AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
499 AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
500 AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
501 AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
502 AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
503 AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
504 AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
505 AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
506 AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
507 AdapterError::ConcurrentDependencyDrop { .. } => SqlState::UNDEFINED_OBJECT,
508 AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
509 AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
510 AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
511 AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
512 AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
513 AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
514 SqlState::DUPLICATE_COLUMN
515 }
516 AdapterError::PlanError(PlanError::UnknownParameter(_)) => {
517 SqlState::UNDEFINED_PARAMETER
518 }
519 AdapterError::PlanError(PlanError::ParameterNotAllowed(_)) => {
520 SqlState::UNDEFINED_PARAMETER
521 }
522 AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
523 AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
524 AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
525 AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
526 AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
527 AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
528 AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
529 AdapterError::Canceled => SqlState::QUERY_CANCELED,
530 AdapterError::IdleInTransactionSessionTimeout => {
531 SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
532 }
533 AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
534 AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
535 AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
536 AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
537 AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
538 AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
539 AdapterError::Optimizer(e) => match e {
540 OptimizerError::PlanError(PlanError::InvalidSchemaName) => {
541 SqlState::INVALID_SCHEMA_NAME
542 }
543 OptimizerError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
544 SqlState::DUPLICATE_COLUMN
545 }
546 OptimizerError::PlanError(PlanError::UnknownParameter(_)) => {
547 SqlState::UNDEFINED_PARAMETER
548 }
549 OptimizerError::PlanError(PlanError::ParameterNotAllowed(_)) => {
550 SqlState::UNDEFINED_PARAMETER
551 }
552 OptimizerError::PlanError(_) => SqlState::INTERNAL_ERROR,
553 OptimizerError::RecursionLimitError(e) => {
554 AdapterError::RecursionLimit(e.clone()).code() }
556 OptimizerError::Internal(s) => {
557 AdapterError::Internal(s.clone()).code() }
559 OptimizerError::EvalError(e) => {
560 AdapterError::Eval(e.clone()).code() }
562 OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
563 OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
564 OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
565 OptimizerError::UnsupportedTemporalExpression(_) => SqlState::FEATURE_NOT_SUPPORTED,
566 OptimizerError::InternalUnsafeMfpPlan(_) => SqlState::INTERNAL_ERROR,
569 },
570 AdapterError::UnallowedOnCluster { .. } => {
571 SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
572 }
573 AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
574 AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
575 AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
576 AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
577 AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
578 AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
579 AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
580 AdapterError::UnavailableFeature { .. } => SqlState::FEATURE_NOT_SUPPORTED,
581 AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
582 AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
583 AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
584 AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
585 AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
590 AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
591 AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
592 SqlState::INTERNAL_ERROR
593 }
594 AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
595 AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
596 AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
597 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
599 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
600 AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
601 AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
602 AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
603 AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
604 AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
605 AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
608 AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
609 AdapterError::AlterClusterWhilePendingReplicas => SqlState::OBJECT_IN_USE,
610 AdapterError::AuthenticationError(AuthenticationError::InvalidCredentials) => {
611 SqlState::INVALID_PASSWORD
612 }
613 AdapterError::AuthenticationError(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
614 }
615 }
616
617 pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
618 AdapterError::Internal(format!("{context}: {e}"))
619 }
620
621 pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
627 AdapterError::ConcurrentDependencyDrop {
628 dependency_kind: "cluster",
629 dependency_id: e.0.to_string(),
630 }
631 }
632 pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
633 AdapterError::ConcurrentDependencyDrop {
634 dependency_kind: "collection",
635 dependency_id: e.0.to_string(),
636 }
637 }
638
639 pub fn concurrent_dependency_drop_from_collection_lookup_error(
640 e: CollectionLookupError,
641 compute_instance: ComputeInstanceId,
642 ) -> Self {
643 match e {
644 CollectionLookupError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
645 dependency_kind: "cluster",
646 dependency_id: id.to_string(),
647 },
648 CollectionLookupError::CollectionMissing(id) => {
649 AdapterError::ConcurrentDependencyDrop {
650 dependency_kind: "collection",
651 dependency_id: id.to_string(),
652 }
653 }
654 CollectionLookupError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
655 dependency_kind: "cluster",
656 dependency_id: compute_instance.to_string(),
657 },
658 }
659 }
660
661 pub fn concurrent_dependency_drop_from_peek_error(
662 e: PeekError,
663 compute_instance: ComputeInstanceId,
664 ) -> AdapterError {
665 match e {
666 PeekError::ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
667 dependency_kind: "replica",
668 dependency_id: id.to_string(),
669 },
670 PeekError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop {
671 dependency_kind: "cluster",
672 dependency_id: compute_instance.to_string(),
673 },
674 e @ PeekError::ReadHoldIdMismatch(_) => AdapterError::internal("peek error", e),
675 e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e),
676 }
677 }
678
679 pub fn concurrent_dependency_drop_from_dataflow_creation_error(
680 e: compute_error::DataflowCreationError,
681 ) -> Self {
682 use compute_error::DataflowCreationError::*;
683 match e {
684 InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
685 dependency_kind: "cluster",
686 dependency_id: id.to_string(),
687 },
688 CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
689 dependency_kind: "collection",
690 dependency_id: id.to_string(),
691 },
692 ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
693 dependency_kind: "replica",
694 dependency_id: id.to_string(),
695 },
696 MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
697 AdapterError::internal("dataflow creation error", e)
698 }
699 }
700 }
701}
702
703impl fmt::Display for AdapterError {
704 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
705 match self {
706 AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
707 assert!(up_to < as_of);
708 write!(
709 f,
710 r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
711 up_to, as_of
712 )
713 }
714 AdapterError::AmbiguousSystemColumnReference => {
715 write!(
716 f,
717 "cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
718 system objects"
719 )
720 }
721 AdapterError::ChangedPlan(e) => write!(f, "{}", e),
722 AdapterError::Catalog(e) => e.fmt(f),
723 AdapterError::DuplicateCursor(name) => {
724 write!(f, "cursor {} already exists", name.quoted())
725 }
726 AdapterError::Eval(e) => e.fmt(f),
727 AdapterError::Explain(e) => e.fmt(f),
728 AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
729 AdapterError::Internal(e) => write!(f, "internal error: {}", e),
730 AdapterError::IntrospectionDisabled { .. } => write!(
731 f,
732 "cannot read log sources of replica with disabled introspection"
733 ),
734 AdapterError::InvalidLogDependency { object_type, .. } => {
735 write!(f, "{object_type} objects cannot depend on log sources")
736 }
737 AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
738 write!(f, "unknown cluster replica availability zone {az}",)
739 }
740 AdapterError::InvalidSetIsolationLevel => write!(
741 f,
742 "SET TRANSACTION ISOLATION LEVEL must be called before any query"
743 ),
744 AdapterError::InvalidSetCluster => {
745 write!(f, "SET cluster cannot be called in an active transaction")
746 }
747 AdapterError::InvalidStorageClusterSize { size, .. } => {
748 write!(f, "unknown source size {size}")
749 }
750 AdapterError::SourceOrSinkSizeRequired { .. } => {
751 write!(f, "must specify either cluster or size option")
752 }
753 AdapterError::InvalidTableMutationSelection => {
754 f.write_str("invalid selection: operation may only refer to user-defined tables")
755 }
756 AdapterError::ConstraintViolation(not_null_violation) => {
757 write!(f, "{}", not_null_violation)
758 }
759 AdapterError::ConcurrentClusterDrop => {
760 write!(f, "the transaction's active cluster has been dropped")
761 }
762 AdapterError::ConcurrentDependencyDrop {
763 dependency_kind,
764 dependency_id,
765 } => {
766 write!(f, "{dependency_kind} '{dependency_id}' was dropped")
767 }
768 AdapterError::NoClusterReplicasAvailable { name, .. } => {
769 write!(
770 f,
771 "CLUSTER {} has no replicas available to service request",
772 name.quoted()
773 )
774 }
775 AdapterError::OperationProhibitsTransaction(op) => {
776 write!(f, "{} cannot be run inside a transaction block", op)
777 }
778 AdapterError::OperationRequiresTransaction(op) => {
779 write!(f, "{} can only be used in transaction blocks", op)
780 }
781 AdapterError::ParseError(e) => e.fmt(f),
782 AdapterError::PlanError(e) => e.fmt(f),
783 AdapterError::PreparedStatementExists(name) => {
784 write!(f, "prepared statement {} already exists", name.quoted())
785 }
786 AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
787 AdapterError::SingleStatementTransaction => {
788 f.write_str("this transaction can only execute a single statement")
789 }
790 AdapterError::ReadWriteUnavailable => {
791 f.write_str("transaction read-write mode must be set before any query")
792 }
793 AdapterError::WrongSetOfLocks => {
794 write!(f, "internal error, wrong set of locks acquired")
795 }
796 AdapterError::StatementTimeout => {
797 write!(f, "canceling statement due to statement timeout")
798 }
799 AdapterError::Canceled => {
800 write!(f, "canceling statement due to user request")
801 }
802 AdapterError::IdleInTransactionSessionTimeout => {
803 write!(
804 f,
805 "terminating connection due to idle-in-transaction timeout"
806 )
807 }
808 AdapterError::RecursionLimit(e) => e.fmt(f),
809 AdapterError::RelationOutsideTimeDomain { .. } => {
810 write!(
811 f,
812 "Transactions can only reference objects in the same timedomain. \
813 See https://materialize.com/docs/sql/begin/#same-timedomain-error",
814 )
815 }
816 AdapterError::ResourceExhaustion {
817 resource_type,
818 limit_name,
819 desired,
820 limit,
821 current,
822 } => {
823 write!(
824 f,
825 "creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
826 )
827 }
828 AdapterError::ResultSize(e) => write!(f, "{e}"),
829 AdapterError::SafeModeViolation(feature) => {
830 write!(f, "cannot create {} in safe mode", feature)
831 }
832 AdapterError::SubscribeOnlyTransaction => {
833 f.write_str("SUBSCRIBE in transactions must be the only read statement")
834 }
835 AdapterError::Optimizer(e) => e.fmt(f),
836 AdapterError::UnallowedOnCluster {
837 depends_on,
838 cluster,
839 } => {
840 let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
841 write!(
842 f,
843 "querying the following items {items} is not allowed from the {} cluster",
844 cluster.quoted()
845 )
846 }
847 AdapterError::Unauthorized(unauthorized) => {
848 write!(f, "{unauthorized}")
849 }
850 AdapterError::UnknownCursor(name) => {
851 write!(f, "cursor {} does not exist", name.quoted())
852 }
853 AdapterError::UnknownLoginRole(name) => {
854 write!(f, "role {} does not exist", name.quoted())
855 }
856 AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
857 AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
858 AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
859 AdapterError::UnknownPreparedStatement(name) => {
860 write!(f, "prepared statement {} does not exist", name.quoted())
861 }
862 AdapterError::UnknownClusterReplica {
863 cluster_name,
864 replica_name,
865 } => write!(
866 f,
867 "cluster replica '{cluster_name}.{replica_name}' does not exist"
868 ),
869 AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
870 f,
871 "unrecognized configuration parameter {}",
872 setting_name.quoted()
873 ),
874 AdapterError::UntargetedLogRead { .. } => {
875 f.write_str("log source reads must target a replica")
876 }
877 AdapterError::DDLOnlyTransaction => f.write_str(
878 "transactions which modify objects are restricted to just modifying objects",
879 ),
880 AdapterError::DDLTransactionRace => f.write_str(
881 "another session modified the catalog while this DDL transaction was open",
882 ),
883 AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
884 AdapterError::Storage(e) => e.fmt(f),
885 AdapterError::Compute(e) => e.fmt(f),
886 AdapterError::Orchestrator(e) => e.fmt(f),
887 AdapterError::DependentObject(dependent_objects) => {
888 let role_str = if dependent_objects.keys().count() == 1 {
889 "role"
890 } else {
891 "roles"
892 };
893 write!(
894 f,
895 "{role_str} \"{}\" cannot be dropped because some objects depend on it",
896 dependent_objects.keys().join(", ")
897 )
898 }
899 AdapterError::InvalidAlter(t, e) => {
900 write!(f, "invalid ALTER {t}: {e}")
901 }
902 AdapterError::ConnectionValidation(e) => e.fmt(f),
903 AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
904 write!(
905 f,
906 "all the specified refreshes of the materialized view would be too far in the past, and thus they \
907 would never happen"
908 )
909 }
910 AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
911 write!(
912 f,
913 "REFRESH AT requested for a time where not all the inputs are readable"
914 )
915 }
916 AdapterError::RtrTimeout(_) => {
917 write!(
918 f,
919 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
920 )
921 }
922 AdapterError::RtrDropFailure(_) => write!(
923 f,
924 "real-time source dropped before ingesting the upstream system's visible frontier"
925 ),
926 AdapterError::UnreadableSinkCollection => {
927 write!(f, "collection is not readable at any time")
928 }
929 AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
930 AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
931 AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
932 AdapterError::AlterClusterTimeout => {
933 write!(f, "canceling statement, provided timeout lapsed")
934 }
935 AdapterError::AuthenticationError(e) => {
936 write!(f, "authentication error {e}")
937 }
938 AdapterError::UnavailableFeature { feature, docs } => {
939 write!(f, "{} is not supported in this environment.", feature)?;
940 if let Some(docs) = docs {
941 write!(
942 f,
943 " For more information consult the documentation at {docs}"
944 )?;
945 }
946 Ok(())
947 }
948 AdapterError::AlterClusterWhilePendingReplicas => {
949 write!(f, "cannot alter clusters with pending updates")
950 }
951 }
952 }
953}
954
955impl From<anyhow::Error> for AdapterError {
956 fn from(e: anyhow::Error) -> AdapterError {
957 match e.downcast::<PlanError>() {
958 Ok(plan_error) => AdapterError::PlanError(plan_error),
959 Err(e) => AdapterError::Unstructured(e),
960 }
961 }
962}
963
964impl From<TryFromIntError> for AdapterError {
965 fn from(e: TryFromIntError) -> AdapterError {
966 AdapterError::Unstructured(e.into())
967 }
968}
969
970impl From<TryFromDecimalError> for AdapterError {
971 fn from(e: TryFromDecimalError) -> AdapterError {
972 AdapterError::Unstructured(e.into())
973 }
974}
975
976impl From<mz_catalog::memory::error::Error> for AdapterError {
977 fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
978 AdapterError::Catalog(e)
979 }
980}
981
982impl From<mz_catalog::durable::CatalogError> for AdapterError {
983 fn from(e: mz_catalog::durable::CatalogError) -> Self {
984 mz_catalog::memory::error::Error::from(e).into()
985 }
986}
987
988impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
989 fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
990 mz_catalog::durable::CatalogError::from(e).into()
991 }
992}
993
994impl From<EvalError> for AdapterError {
995 fn from(e: EvalError) -> AdapterError {
996 AdapterError::Eval(e)
997 }
998}
999
1000impl From<ExplainError> for AdapterError {
1001 fn from(e: ExplainError) -> AdapterError {
1002 match e {
1003 ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
1004 e => AdapterError::Explain(e),
1005 }
1006 }
1007}
1008
1009impl From<mz_sql::catalog::CatalogError> for AdapterError {
1010 fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
1011 AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
1012 }
1013}
1014
1015impl From<PlanError> for AdapterError {
1016 fn from(e: PlanError) -> AdapterError {
1017 match e {
1018 PlanError::UnknownCursor(name) => AdapterError::UnknownCursor(name),
1019 _ => AdapterError::PlanError(e),
1020 }
1021 }
1022}
1023
1024impl From<OptimizerError> for AdapterError {
1025 fn from(e: OptimizerError) -> AdapterError {
1026 use OptimizerError::*;
1027 match e {
1028 PlanError(e) => Self::PlanError(e),
1029 RecursionLimitError(e) => Self::RecursionLimit(e),
1030 EvalError(e) => Self::Eval(e),
1031 InternalUnsafeMfpPlan(e) => Self::Internal(e),
1032 Internal(e) => Self::Internal(e),
1033 e => Self::Optimizer(e),
1034 }
1035 }
1036}
1037
1038impl From<NotNullViolation> for AdapterError {
1039 fn from(e: NotNullViolation) -> AdapterError {
1040 AdapterError::ConstraintViolation(e)
1041 }
1042}
1043
1044impl From<RecursionLimitError> for AdapterError {
1045 fn from(e: RecursionLimitError) -> AdapterError {
1046 AdapterError::RecursionLimit(e)
1047 }
1048}
1049
1050impl From<oneshot::error::RecvError> for AdapterError {
1051 fn from(e: oneshot::error::RecvError) -> AdapterError {
1052 AdapterError::Unstructured(e.into())
1053 }
1054}
1055
1056impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
1057 fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
1058 AdapterError::Storage(e)
1059 }
1060}
1061
1062impl From<compute_error::InstanceExists> for AdapterError {
1063 fn from(e: compute_error::InstanceExists) -> Self {
1064 AdapterError::Compute(e.into())
1065 }
1066}
1067
1068impl From<TimestampError> for AdapterError {
1069 fn from(e: TimestampError) -> Self {
1070 let e: EvalError = e.into();
1071 e.into()
1072 }
1073}
1074
1075impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
1076 fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
1077 AdapterError::ParseError(e)
1078 }
1079}
1080
1081impl From<VarError> for AdapterError {
1082 fn from(e: VarError) -> Self {
1083 let e: mz_catalog::memory::error::Error = e.into();
1084 e.into()
1085 }
1086}
1087
1088impl From<rbac::UnauthorizedError> for AdapterError {
1089 fn from(e: rbac::UnauthorizedError) -> Self {
1090 AdapterError::Unauthorized(e)
1091 }
1092}
1093
1094impl From<mz_sql_parser::ast::IdentError> for AdapterError {
1095 fn from(value: mz_sql_parser::ast::IdentError) -> Self {
1096 AdapterError::PlanError(PlanError::InvalidIdent(value))
1097 }
1098}
1099
1100impl From<mz_pgwire_common::ConnectionError> for AdapterError {
1101 fn from(value: mz_pgwire_common::ConnectionError) -> Self {
1102 match value {
1103 mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
1104 AdapterError::ResourceExhaustion {
1105 resource_type: "connection".into(),
1106 limit_name: "max_connections".into(),
1107 desired: (current + 1).to_string(),
1108 limit: limit.to_string(),
1109 current: current.to_string(),
1110 }
1111 }
1112 }
1113 }
1114}
1115
1116impl From<NetworkPolicyError> for AdapterError {
1117 fn from(value: NetworkPolicyError) -> Self {
1118 AdapterError::NetworkPolicyDenied(value)
1119 }
1120}
1121
1122impl From<ConnectionValidationError> for AdapterError {
1123 fn from(e: ConnectionValidationError) -> AdapterError {
1124 AdapterError::ConnectionValidation(e)
1125 }
1126}
1127
1128impl Error for AdapterError {}