use std::collections::BTreeMap;
use std::error::Error;
use std::fmt;
use std::num::TryFromIntError;
use dec::TryFromDecimalError;
use itertools::Itertools;
use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
use mz_compute_client::controller::error as compute_error;
use mz_expr::EvalError;
use mz_ore::error::ErrorExt;
use mz_ore::stack::RecursionLimitError;
use mz_ore::str::StrExt;
use mz_pgwire_common::{ErrorResponse, Severity};
use mz_repr::adt::timestamp::TimestampError;
use mz_repr::explain::ExplainError;
use mz_repr::{NotNullViolation, Timestamp};
use mz_sql::plan::PlanError;
use mz_sql::rbac;
use mz_sql::session::vars::VarError;
use mz_storage_types::connections::ConnectionValidationError;
use mz_storage_types::controller::StorageError;
use smallvec::SmallVec;
use timely::progress::Antichain;
use tokio::sync::oneshot;
use tokio_postgres::error::SqlState;
use crate::coord::NetworkPolicyError;
use crate::optimize::OptimizerError;
#[derive(Debug)]
pub enum AdapterError {
AbsurdSubscribeBounds {
as_of: mz_repr::Timestamp,
up_to: mz_repr::Timestamp,
},
AmbiguousSystemColumnReference,
Catalog(mz_catalog::memory::error::Error),
ChangedPlan(String),
DuplicateCursor(String),
Eval(EvalError),
Explain(ExplainError),
IdExhaustionError,
Internal(String),
IntrospectionDisabled {
log_names: Vec<String>,
},
InvalidLogDependency {
object_type: String,
log_names: Vec<String>,
},
InvalidClusterReplicaAz {
az: String,
expected: Vec<String>,
},
InvalidSetIsolationLevel,
InvalidSetCluster,
InvalidStorageClusterSize {
size: String,
expected: Vec<String>,
},
SourceOrSinkSizeRequired {
expected: Vec<String>,
},
InvalidTableMutationSelection,
ConstraintViolation(NotNullViolation),
ConcurrentClusterDrop,
NoClusterReplicasAvailable {
name: String,
is_managed: bool,
},
OperationProhibitsTransaction(String),
OperationRequiresTransaction(String),
PlanError(PlanError),
PreparedStatementExists(String),
ParseError(mz_sql_parser::parser::ParserStatementError),
ReadOnlyTransaction,
ReadWriteUnavailable,
RecursionLimit(RecursionLimitError),
RelationOutsideTimeDomain {
relations: Vec<String>,
names: Vec<String>,
},
ResourceExhaustion {
resource_type: String,
limit_name: String,
desired: String,
limit: String,
current: String,
},
ResultSize(String),
SafeModeViolation(String),
WrongSetOfLocks,
StatementTimeout,
Canceled,
IdleInTransactionSessionTimeout,
SubscribeOnlyTransaction,
Optimizer(OptimizerError),
UnallowedOnCluster {
depends_on: SmallVec<[String; 2]>,
cluster: String,
},
Unauthorized(rbac::UnauthorizedError),
UnknownCursor(String),
UnknownLoginRole(String),
UnknownPreparedStatement(String),
UnknownClusterReplica {
cluster_name: String,
replica_name: String,
},
UnrecognizedConfigurationParam(String),
Unstructured(anyhow::Error),
Unsupported(&'static str),
UntargetedLogRead {
log_names: Vec<String>,
},
WriteOnlyTransaction,
SingleStatementTransaction,
DDLOnlyTransaction,
DDLTransactionRace,
TransactionDryRun {
new_ops: Vec<crate::catalog::Op>,
new_state: crate::catalog::CatalogState,
},
Storage(mz_storage_types::controller::StorageError<mz_repr::Timestamp>),
Compute(anyhow::Error),
Orchestrator(anyhow::Error),
DependentObject(BTreeMap<String, Vec<String>>),
InvalidAlter(&'static str, PlanError),
ConnectionValidation(ConnectionValidationError),
MaterializedViewWouldNeverRefresh(Timestamp, Timestamp),
InputNotReadableAtRefreshAtTime(Timestamp, Antichain<Timestamp>),
RtrTimeout(String),
RtrDropFailure(String),
UnreadableSinkCollection,
UserSessionsDisallowed,
NetworkPolicyDenied(NetworkPolicyError),
ReadOnly,
AlterClusterTimeout,
}
impl AdapterError {
pub fn into_response(self, severity: Severity) -> ErrorResponse {
ErrorResponse {
severity,
code: self.code(),
message: self.to_string(),
detail: self.detail(),
hint: self.hint(),
position: self.position(),
}
}
pub fn position(&self) -> Option<usize> {
match self {
AdapterError::ParseError(err) => Some(err.error.pos),
_ => None,
}
}
pub fn detail(&self) -> Option<String> {
match self {
AdapterError::AmbiguousSystemColumnReference => {
Some("This is a current limitation in Materialize".into())
},
AdapterError::Catalog(c) => c.detail(),
AdapterError::Eval(e) => e.detail(),
AdapterError::RelationOutsideTimeDomain { relations, names } => Some(format!(
"The following relations in the query are outside the transaction's time domain:\n{}\n{}",
relations
.iter()
.map(|r| r.quoted().to_string())
.collect::<Vec<_>>()
.join("\n"),
match names.is_empty() {
true => "No relations are available.".to_string(),
false => format!(
"Only the following relations are available:\n{}",
names
.iter()
.map(|name| name.quoted().to_string())
.collect::<Vec<_>>()
.join("\n")
),
}
)),
AdapterError::SourceOrSinkSizeRequired { .. } => Some(
"Either specify the cluster that will maintain this object via IN CLUSTER or \
specify size via SIZE option."
.into(),
),
AdapterError::SafeModeViolation(_) => Some(
"The Materialize server you are connected to is running in \
safe mode, which limits the features that are available."
.into(),
),
AdapterError::IntrospectionDisabled { log_names }
| AdapterError::UntargetedLogRead { log_names } => Some(format!(
"The query references the following log sources:\n {}",
log_names.join("\n "),
)),
AdapterError::InvalidLogDependency { log_names, .. } => Some(format!(
"The object depends on the following log sources:\n {}",
log_names.join("\n "),
)),
AdapterError::PlanError(e) => e.detail(),
AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
AdapterError::DependentObject(dependent_objects) => {
Some(dependent_objects
.iter()
.map(|(role_name, err_msgs)| err_msgs
.iter()
.map(|err_msg| format!("{role_name}: {err_msg}"))
.join("\n"))
.join("\n"))
},
AdapterError::Storage(storage_error) => {
storage_error.source().map(|source_error| source_error.to_string_with_causes())
}
AdapterError::ReadOnlyTransaction => Some("SELECT queries cannot be combined with other query types, including SUBSCRIBE.".into()),
AdapterError::InvalidAlter(_, e) => e.detail(),
AdapterError::Optimizer(e) => e.detail(),
AdapterError::ConnectionValidation(e) => e.detail(),
AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => {
Some(format!(
"The specified last refresh is at {}, while the earliest possible time to compute the materialized \
view is {}.",
last_refresh,
earliest_possible,
))
}
AdapterError::UnallowedOnCluster { cluster, .. } => (cluster == MZ_CATALOG_SERVER_CLUSTER.name).then(||
format!("The transaction is executing on the {cluster} cluster, maybe having been routed there by the first statement in the transaction.")
),
AdapterError::InputNotReadableAtRefreshAtTime(oracle_read_ts, least_valid_read) => {
Some(format!(
"The requested REFRESH AT time is {}, \
but not all input collections are readable earlier than [{}].",
oracle_read_ts,
if least_valid_read.len() == 1 {
format!("{}", least_valid_read.as_option().expect("antichain contains exactly 1 timestamp"))
} else {
format!("{:?}", least_valid_read)
}
))
}
AdapterError::RtrTimeout(name) => Some(format!("{name} failed to ingest data up to the real-time recency point")),
AdapterError::RtrDropFailure(name) => Some(format!("{name} dropped before ingesting data to the real-time recency point")),
AdapterError::UserSessionsDisallowed => Some("Your organization has been blocked. Please contact support.".to_string()),
AdapterError::NetworkPolicyDenied(reason)=> Some(format!("{reason}.")),
_ => None,
}
}
pub fn hint(&self) -> Option<String> {
match self {
AdapterError::AmbiguousSystemColumnReference => Some(
"Rewrite the view to refer to all columns by name. Expand all wildcards and \
convert all NATURAL JOINs to USING joins."
.to_string(),
),
AdapterError::Catalog(c) => c.hint(),
AdapterError::Eval(e) => e.hint(),
AdapterError::InvalidClusterReplicaAz { expected, az: _ } => {
Some(if expected.is_empty() {
"No availability zones configured; do not specify AVAILABILITY ZONE".into()
} else {
format!("Valid availability zones are: {}", expected.join(", "))
})
}
AdapterError::InvalidStorageClusterSize { expected, .. } => {
Some(format!("Valid sizes are: {}", expected.join(", ")))
}
AdapterError::SourceOrSinkSizeRequired { expected } => Some(format!(
"Try choosing one of the smaller sizes to start. Available sizes: {}",
expected.join(", ")
)),
AdapterError::NoClusterReplicasAvailable { is_managed, .. } => {
Some(if *is_managed {
"Use ALTER CLUSTER to adjust the replication factor of the cluster. \
Example:`ALTER CLUSTER <cluster-name> SET (REPLICATION FACTOR 1)`".into()
} else {
"Use CREATE CLUSTER REPLICA to attach cluster replicas to the cluster".into()
})
}
AdapterError::UntargetedLogRead { .. } => Some(
"Use `SET cluster_replica = <replica-name>` to target a specific replica in the \
active cluster. Note that subsequent queries will only be answered by \
the selected replica, which might reduce availability. To undo the replica \
selection, use `RESET cluster_replica`."
.into(),
),
AdapterError::ResourceExhaustion { resource_type, .. } => Some(format!(
"Drop an existing {resource_type} or contact support to request a limit increase."
)),
AdapterError::StatementTimeout => Some(
"Consider increasing the maximum allowed statement duration for this session by \
setting the statement_timeout session variable. For example, `SET \
statement_timeout = '120s'`."
.into(),
),
AdapterError::PlanError(e) => e.hint(),
AdapterError::UnallowedOnCluster { cluster, .. } => {
(cluster != MZ_CATALOG_SERVER_CLUSTER.name).then(||
"Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
.to_string()
)
}
AdapterError::InvalidAlter(_, e) => e.hint(),
AdapterError::Optimizer(e) => e.hint(),
AdapterError::ConnectionValidation(e) => e.hint(),
AdapterError::InputNotReadableAtRefreshAtTime(_, _) => Some(
"You can use `REFRESH AT greatest(mz_now(), <explicit timestamp>)` to refresh \
either at the explicitly specified timestamp, or now if the given timestamp would \
be in the past.".to_string()
),
AdapterError::AlterClusterTimeout => Some(
"Consider increasing the timeout duration in the alter cluster statement.".into(),
),
_ => None,
}
}
pub fn code(&self) -> SqlState {
match self {
AdapterError::AbsurdSubscribeBounds { .. } => SqlState::DATA_EXCEPTION,
AdapterError::AmbiguousSystemColumnReference => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::Catalog(e) => match &e.kind {
mz_catalog::memory::error::ErrorKind::VarError(e) => match e {
VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
VarError::FixedValueParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
VarError::InvalidParameterType { .. } => SqlState::INVALID_PARAMETER_VALUE,
VarError::InvalidParameterValue { .. } => SqlState::INVALID_PARAMETER_VALUE,
VarError::ReadOnlyParameter(_) => SqlState::CANT_CHANGE_RUNTIME_PARAM,
VarError::UnknownParameter(_) => SqlState::UNDEFINED_OBJECT,
VarError::RequiresUnsafeMode { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
VarError::RequiresFeatureFlag { .. } => SqlState::CANT_CHANGE_RUNTIME_PARAM,
},
_ => SqlState::INTERNAL_ERROR,
},
AdapterError::ChangedPlan(_) => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::DuplicateCursor(_) => SqlState::DUPLICATE_CURSOR,
AdapterError::Eval(EvalError::CharacterNotValidForEncoding(_)) => {
SqlState::PROGRAM_LIMIT_EXCEEDED
}
AdapterError::Eval(EvalError::CharacterTooLargeForEncoding(_)) => {
SqlState::PROGRAM_LIMIT_EXCEEDED
}
AdapterError::Eval(EvalError::LengthTooLarge) => SqlState::PROGRAM_LIMIT_EXCEEDED,
AdapterError::Eval(EvalError::NullCharacterNotPermitted) => {
SqlState::PROGRAM_LIMIT_EXCEEDED
}
AdapterError::Eval(_) => SqlState::INTERNAL_ERROR,
AdapterError::Explain(_) => SqlState::INTERNAL_ERROR,
AdapterError::IdExhaustionError => SqlState::INTERNAL_ERROR,
AdapterError::Internal(_) => SqlState::INTERNAL_ERROR,
AdapterError::IntrospectionDisabled { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidLogDependency { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::NoClusterReplicasAvailable { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
AdapterError::ParseError(_) => SqlState::SYNTAX_ERROR,
AdapterError::PlanError(PlanError::InvalidSchemaName) => SqlState::INVALID_SCHEMA_NAME,
AdapterError::PlanError(PlanError::ColumnAlreadyExists { .. }) => {
SqlState::DUPLICATE_COLUMN
}
AdapterError::PlanError(_) => SqlState::INTERNAL_ERROR,
AdapterError::PreparedStatementExists(_) => SqlState::DUPLICATE_PSTATEMENT,
AdapterError::ReadOnlyTransaction => SqlState::READ_ONLY_SQL_TRANSACTION,
AdapterError::ReadWriteUnavailable => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::SingleStatementTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::WrongSetOfLocks => SqlState::LOCK_NOT_AVAILABLE,
AdapterError::StatementTimeout => SqlState::QUERY_CANCELED,
AdapterError::Canceled => SqlState::QUERY_CANCELED,
AdapterError::IdleInTransactionSessionTimeout => {
SqlState::IDLE_IN_TRANSACTION_SESSION_TIMEOUT
}
AdapterError::RecursionLimit(_) => SqlState::INTERNAL_ERROR,
AdapterError::RelationOutsideTimeDomain { .. } => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::ResourceExhaustion { .. } => SqlState::INSUFFICIENT_RESOURCES,
AdapterError::ResultSize(_) => SqlState::OUT_OF_MEMORY,
AdapterError::SafeModeViolation(_) => SqlState::INTERNAL_ERROR,
AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Optimizer(e) => match e {
OptimizerError::PlanError(e) => {
AdapterError::PlanError(e.clone()).code() }
OptimizerError::RecursionLimitError(e) => {
AdapterError::RecursionLimit(e.clone()).code() }
OptimizerError::Internal(s) => {
AdapterError::Internal(s.clone()).code() }
OptimizerError::EvalError(e) => {
AdapterError::Eval(e.clone()).code() }
OptimizerError::TransformError(_) => SqlState::INTERNAL_ERROR,
OptimizerError::UnmaterializableFunction(_) => SqlState::FEATURE_NOT_SUPPORTED,
OptimizerError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
OptimizerError::UnsafeMfpPlan => SqlState::INTERNAL_ERROR,
},
AdapterError::UnallowedOnCluster { .. } => {
SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
}
AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
AdapterError::UnknownPreparedStatement(_) => SqlState::UNDEFINED_PSTATEMENT,
AdapterError::UnknownLoginRole(_) => SqlState::INVALID_AUTHORIZATION_SPECIFICATION,
AdapterError::UnknownClusterReplica { .. } => SqlState::UNDEFINED_OBJECT,
AdapterError::UnrecognizedConfigurationParam(_) => SqlState::UNDEFINED_OBJECT,
AdapterError::Unsupported(..) => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::Unstructured(_) => SqlState::INTERNAL_ERROR,
AdapterError::UntargetedLogRead { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::DDLTransactionRace => SqlState::T_R_SERIALIZATION_FAILURE,
AdapterError::TransactionDryRun { .. } => SqlState::T_R_SERIALIZATION_FAILURE,
AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::DDLOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
SqlState::INTERNAL_ERROR
}
AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR,
AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION,
AdapterError::InputNotReadableAtRefreshAtTime(_, _) => SqlState::DATA_EXCEPTION,
AdapterError::RtrTimeout(_) => SqlState::QUERY_CANCELED,
AdapterError::RtrDropFailure(_) => SqlState::UNDEFINED_OBJECT,
AdapterError::UnreadableSinkCollection => SqlState::from_code("MZ009"),
AdapterError::UserSessionsDisallowed => SqlState::from_code("MZ010"),
AdapterError::NetworkPolicyDenied(_) => SqlState::from_code("MZ011"),
AdapterError::ReadOnly => SqlState::READ_ONLY_SQL_TRANSACTION,
AdapterError::AlterClusterTimeout => SqlState::QUERY_CANCELED,
}
}
pub fn internal<E: std::fmt::Display>(context: &str, e: E) -> AdapterError {
AdapterError::Internal(format!("{context}: {e}"))
}
}
impl fmt::Display for AdapterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AdapterError::AbsurdSubscribeBounds { as_of, up_to } => {
assert!(up_to < as_of);
write!(
f,
r#"subscription lower ("as of") bound is beyond its upper ("up to") bound: {} < {}"#,
up_to, as_of
)
}
AdapterError::AmbiguousSystemColumnReference => {
write!(
f,
"cannot use wildcard expansions or NATURAL JOINs in a view that depends on \
system objects"
)
}
AdapterError::ChangedPlan(e) => write!(f, "{}", e),
AdapterError::Catalog(e) => e.fmt(f),
AdapterError::DuplicateCursor(name) => {
write!(f, "cursor {} already exists", name.quoted())
}
AdapterError::Eval(e) => e.fmt(f),
AdapterError::Explain(e) => e.fmt(f),
AdapterError::IdExhaustionError => f.write_str("ID allocator exhausted all valid IDs"),
AdapterError::Internal(e) => write!(f, "internal error: {}", e),
AdapterError::IntrospectionDisabled { .. } => write!(
f,
"cannot read log sources of replica with disabled introspection"
),
AdapterError::InvalidLogDependency { object_type, .. } => {
write!(f, "{object_type} objects cannot depend on log sources")
}
AdapterError::InvalidClusterReplicaAz { az, expected: _ } => {
write!(f, "unknown cluster replica availability zone {az}",)
}
AdapterError::InvalidSetIsolationLevel => write!(
f,
"SET TRANSACTION ISOLATION LEVEL must be called before any query"
),
AdapterError::InvalidSetCluster => {
write!(f, "SET cluster cannot be called in an active transaction")
}
AdapterError::InvalidStorageClusterSize { size, .. } => {
write!(f, "unknown source size {size}")
}
AdapterError::SourceOrSinkSizeRequired { .. } => {
write!(f, "must specify either cluster or size option")
}
AdapterError::InvalidTableMutationSelection => {
f.write_str("invalid selection: operation may only refer to user-defined tables")
}
AdapterError::ConstraintViolation(not_null_violation) => {
write!(f, "{}", not_null_violation)
}
AdapterError::ConcurrentClusterDrop => {
write!(f, "the transaction's active cluster has been dropped")
}
AdapterError::NoClusterReplicasAvailable { name, .. } => {
write!(
f,
"CLUSTER {} has no replicas available to service request",
name.quoted()
)
}
AdapterError::OperationProhibitsTransaction(op) => {
write!(f, "{} cannot be run inside a transaction block", op)
}
AdapterError::OperationRequiresTransaction(op) => {
write!(f, "{} can only be used in transaction blocks", op)
}
AdapterError::ParseError(e) => e.fmt(f),
AdapterError::PlanError(e) => e.fmt(f),
AdapterError::PreparedStatementExists(name) => {
write!(f, "prepared statement {} already exists", name.quoted())
}
AdapterError::ReadOnlyTransaction => f.write_str("transaction in read-only mode"),
AdapterError::SingleStatementTransaction => {
f.write_str("this transaction can only execute a single statement")
}
AdapterError::ReadWriteUnavailable => {
f.write_str("transaction read-write mode must be set before any query")
}
AdapterError::WrongSetOfLocks => {
write!(f, "internal error, wrong set of locks acquired")
}
AdapterError::StatementTimeout => {
write!(f, "canceling statement due to statement timeout")
}
AdapterError::Canceled => {
write!(f, "canceling statement due to user request")
}
AdapterError::IdleInTransactionSessionTimeout => {
write!(
f,
"terminating connection due to idle-in-transaction timeout"
)
}
AdapterError::RecursionLimit(e) => e.fmt(f),
AdapterError::RelationOutsideTimeDomain { .. } => {
write!(
f,
"Transactions can only reference objects in the same timedomain. \
See https://materialize.com/docs/sql/begin/#same-timedomain-error",
)
}
AdapterError::ResourceExhaustion {
resource_type,
limit_name,
desired,
limit,
current,
} => {
write!(
f,
"creating {resource_type} would violate {limit_name} limit (desired: {desired}, limit: {limit}, current: {current})"
)
}
AdapterError::ResultSize(e) => write!(f, "{e}"),
AdapterError::SafeModeViolation(feature) => {
write!(f, "cannot create {} in safe mode", feature)
}
AdapterError::SubscribeOnlyTransaction => {
f.write_str("SUBSCRIBE in transactions must be the only read statement")
}
AdapterError::Optimizer(e) => e.fmt(f),
AdapterError::UnallowedOnCluster {
depends_on,
cluster,
} => {
let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
write!(
f,
"querying the following items {items} is not allowed from the {} cluster",
cluster.quoted()
)
}
AdapterError::Unauthorized(unauthorized) => {
write!(f, "{unauthorized}")
}
AdapterError::UnknownCursor(name) => {
write!(f, "cursor {} does not exist", name.quoted())
}
AdapterError::UnknownLoginRole(name) => {
write!(f, "role {} does not exist", name.quoted())
}
AdapterError::Unsupported(features) => write!(f, "{} are not supported", features),
AdapterError::Unstructured(e) => write!(f, "{}", e.display_with_causes()),
AdapterError::WriteOnlyTransaction => f.write_str("transaction in write-only mode"),
AdapterError::UnknownPreparedStatement(name) => {
write!(f, "prepared statement {} does not exist", name.quoted())
}
AdapterError::UnknownClusterReplica {
cluster_name,
replica_name,
} => write!(
f,
"cluster replica '{cluster_name}.{replica_name}' does not exist"
),
AdapterError::UnrecognizedConfigurationParam(setting_name) => write!(
f,
"unrecognized configuration parameter {}",
setting_name.quoted()
),
AdapterError::UntargetedLogRead { .. } => {
f.write_str("log source reads must target a replica")
}
AdapterError::DDLOnlyTransaction => f.write_str(
"transactions which modify objects are restricted to just modifying objects",
),
AdapterError::DDLTransactionRace => {
f.write_str("object state changed while transaction was in progress")
}
AdapterError::TransactionDryRun { .. } => f.write_str("transaction dry run"),
AdapterError::Storage(e) => e.fmt(f),
AdapterError::Compute(e) => e.fmt(f),
AdapterError::Orchestrator(e) => e.fmt(f),
AdapterError::DependentObject(dependent_objects) => {
let role_str = if dependent_objects.keys().count() == 1 {
"role"
} else {
"roles"
};
write!(
f,
"{role_str} \"{}\" cannot be dropped because some objects depend on it",
dependent_objects.keys().join(", ")
)
}
AdapterError::InvalidAlter(t, e) => {
write!(f, "invalid ALTER {t}: {e}")
}
AdapterError::ConnectionValidation(e) => e.fmt(f),
AdapterError::MaterializedViewWouldNeverRefresh(_, _) => {
write!(
f,
"all the specified refreshes of the materialized view would be too far in the past, and thus they \
would never happen"
)
}
AdapterError::InputNotReadableAtRefreshAtTime(_, _) => {
write!(
f,
"REFRESH AT requested for a time where not all the inputs are readable"
)
}
AdapterError::RtrTimeout(_) => {
write!(f, "timed out before ingesting the source's visible frontier when real-time-recency query issued")
}
AdapterError::RtrDropFailure(_) => write!(
f,
"real-time source dropped before ingesting the upstream system's visible frontier"
),
AdapterError::UnreadableSinkCollection => {
write!(f, "collection is not readable at any time")
}
AdapterError::UserSessionsDisallowed => write!(f, "login blocked"),
AdapterError::NetworkPolicyDenied(_) => write!(f, "session denied"),
AdapterError::ReadOnly => write!(f, "cannot write in read-only mode"),
AdapterError::AlterClusterTimeout => {
write!(f, "canceling statement, provided timeout lapsed")
}
}
}
}
impl From<anyhow::Error> for AdapterError {
fn from(e: anyhow::Error) -> AdapterError {
match e.downcast_ref::<PlanError>() {
Some(plan_error) => AdapterError::PlanError(plan_error.clone()),
None => AdapterError::Unstructured(e),
}
}
}
impl From<TryFromIntError> for AdapterError {
fn from(e: TryFromIntError) -> AdapterError {
AdapterError::Unstructured(e.into())
}
}
impl From<TryFromDecimalError> for AdapterError {
fn from(e: TryFromDecimalError) -> AdapterError {
AdapterError::Unstructured(e.into())
}
}
impl From<mz_catalog::memory::error::Error> for AdapterError {
fn from(e: mz_catalog::memory::error::Error) -> AdapterError {
AdapterError::Catalog(e)
}
}
impl From<mz_catalog::durable::CatalogError> for AdapterError {
fn from(e: mz_catalog::durable::CatalogError) -> Self {
mz_catalog::memory::error::Error::from(e).into()
}
}
impl From<mz_catalog::durable::DurableCatalogError> for AdapterError {
fn from(e: mz_catalog::durable::DurableCatalogError) -> Self {
mz_catalog::durable::CatalogError::from(e).into()
}
}
impl From<EvalError> for AdapterError {
fn from(e: EvalError) -> AdapterError {
AdapterError::Eval(e)
}
}
impl From<ExplainError> for AdapterError {
fn from(e: ExplainError) -> AdapterError {
match e {
ExplainError::RecursionLimitError(e) => AdapterError::RecursionLimit(e),
e => AdapterError::Explain(e),
}
}
}
impl From<mz_sql::catalog::CatalogError> for AdapterError {
fn from(e: mz_sql::catalog::CatalogError) -> AdapterError {
AdapterError::Catalog(mz_catalog::memory::error::Error::from(e))
}
}
impl From<PlanError> for AdapterError {
fn from(e: PlanError) -> AdapterError {
AdapterError::PlanError(e)
}
}
impl From<OptimizerError> for AdapterError {
fn from(e: OptimizerError) -> AdapterError {
use OptimizerError::*;
match e {
PlanError(e) => Self::PlanError(e),
RecursionLimitError(e) => Self::RecursionLimit(e),
EvalError(e) => Self::Eval(e),
Internal(e) => Self::Internal(e),
e => Self::Optimizer(e),
}
}
}
impl From<NotNullViolation> for AdapterError {
fn from(e: NotNullViolation) -> AdapterError {
AdapterError::ConstraintViolation(e)
}
}
impl From<RecursionLimitError> for AdapterError {
fn from(e: RecursionLimitError) -> AdapterError {
AdapterError::RecursionLimit(e)
}
}
impl From<oneshot::error::RecvError> for AdapterError {
fn from(e: oneshot::error::RecvError) -> AdapterError {
AdapterError::Unstructured(e.into())
}
}
impl From<StorageError<mz_repr::Timestamp>> for AdapterError {
fn from(e: StorageError<mz_repr::Timestamp>) -> Self {
AdapterError::Storage(e)
}
}
impl From<compute_error::InstanceExists> for AdapterError {
fn from(e: compute_error::InstanceExists) -> Self {
AdapterError::Compute(e.into())
}
}
impl From<TimestampError> for AdapterError {
fn from(e: TimestampError) -> Self {
let e: EvalError = e.into();
e.into()
}
}
impl From<mz_sql_parser::parser::ParserStatementError> for AdapterError {
fn from(e: mz_sql_parser::parser::ParserStatementError) -> Self {
AdapterError::ParseError(e)
}
}
impl From<VarError> for AdapterError {
fn from(e: VarError) -> Self {
let e: mz_catalog::memory::error::Error = e.into();
e.into()
}
}
impl From<rbac::UnauthorizedError> for AdapterError {
fn from(e: rbac::UnauthorizedError) -> Self {
AdapterError::Unauthorized(e)
}
}
impl From<mz_sql_parser::ast::IdentError> for AdapterError {
fn from(value: mz_sql_parser::ast::IdentError) -> Self {
AdapterError::PlanError(PlanError::InvalidIdent(value))
}
}
impl From<mz_sql::session::vars::ConnectionError> for AdapterError {
fn from(value: mz_sql::session::vars::ConnectionError) -> Self {
match value {
mz_sql::session::vars::ConnectionError::TooManyConnections { current, limit } => {
AdapterError::ResourceExhaustion {
resource_type: "connection".into(),
limit_name: "max_connections".into(),
desired: (current + 1).to_string(),
limit: limit.to_string(),
current: current.to_string(),
}
}
}
}
}
impl From<NetworkPolicyError> for AdapterError {
fn from(value: NetworkPolicyError) -> Self {
AdapterError::NetworkPolicyDenied(value)
}
}
impl From<ConnectionValidationError> for AdapterError {
fn from(e: ConnectionValidationError) -> AdapterError {
AdapterError::ConnectionValidation(e)
}
}
impl Error for AdapterError {}