use std::borrow::Cow;
use std::fmt;
#[derive(Debug)]
pub struct OpError {
kind: OpErrorKind,
context: Vec<Cow<'static, str>>,
}
impl OpError {
pub fn new(kind: OpErrorKind) -> Self {
OpError {
kind,
context: Vec::new(),
}
}
pub fn kind(&self) -> &OpErrorKind {
&self.kind
}
}
impl<T: Into<OpErrorKind>> From<T> for OpError {
fn from(value: T) -> Self {
OpError::new(value.into())
}
}
impl fmt::Display for OpError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.kind)?;
for cause in &self.context {
write!(f, ": {}", cause)?;
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum OpErrorKind {
#[error("the required field '{0}' is missing")]
FieldMissing(&'static str),
#[error("failed to create a temporary resource: {0:?}")]
TemporaryResource(tokio_postgres::Error),
#[error("internal invariant was violated: {0}")]
InvariantViolated(String),
#[error("error when calling Materialize: {0:?}")]
MaterializeError(#[from] tokio_postgres::Error),
#[error("postgres type error: {0:?}")]
PgTypeError(#[from] mz_pgrepr::TypeFromOidError),
#[error("user lacks privilege \"{privilege}\" on object \"{object}\"")]
MissingPrivilege {
privilege: &'static str,
object: String,
},
#[error("filesystem error: {0:?}")]
Filesystem(#[from] std::io::Error),
#[error("cryptography error: {0:?}")]
Crypto(#[from] openssl::error::ErrorStack),
#[error("failure when parsing csv: {0:?}")]
CSV(#[from] csv_async::Error),
#[error("this feature is unsupported: {0}")]
Unsupported(String),
#[error("invalid identifier: {0:?}")]
IdentError(#[from] mz_sql_parser::ast::IdentError),
#[error("unknown table: {database}.{schema}.{table}")]
UnknownTable {
database: String,
schema: String,
table: String,
},
#[error("unknown request: {0}")]
UnknownRequest(String),
}
impl OpErrorKind {
pub fn can_retry(&self) -> bool {
use OpErrorKind::*;
match self {
e @ TemporaryResource(err) | e @ MaterializeError(err) => {
use tokio_postgres::error::SqlState;
let Some(db_error) = err.as_db_error() else {
return false;
};
match *db_error.code() {
SqlState::CONNECTION_EXCEPTION
| SqlState::CONNECTION_FAILURE
| SqlState::CONNECTION_DOES_NOT_EXIST
| SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
| SqlState::TRANSACTION_RESOLUTION_UNKNOWN => true,
SqlState::DUPLICATE_OBJECT | SqlState::DUPLICATE_TABLE
if matches!(e, TemporaryResource(_)) =>
{
true
}
_ => false,
}
}
e @ Filesystem(_) | e @ CSV(_) => {
use std::io::ErrorKind::*;
let io_error_kind = match e {
Filesystem(err) => err.kind(),
CSV(err) => match err.kind() {
csv_async::ErrorKind::Io(io_err) => io_err.kind(),
_ => return false,
},
e => unreachable!("programming error, {e:?} should have been matched above"),
};
match io_error_kind {
NotFound | ConnectionReset | ConnectionAborted | BrokenPipe | TimedOut
| Interrupted | UnexpectedEof => true,
_ => false,
}
}
FieldMissing(_)
| InvariantViolated(_)
| PgTypeError(_)
| MissingPrivilege { .. }
| Crypto(_)
| Unsupported(_)
| IdentError(_)
| UnknownTable { .. }
| UnknownRequest(_) => false,
}
}
}
pub trait Context {
type TransformType;
fn context(self, context: &'static str) -> Self::TransformType;
fn with_context(self, f: impl FnOnce() -> String) -> Self::TransformType;
}
impl Context for OpErrorKind {
type TransformType = OpError;
fn context(self, context: &'static str) -> Self::TransformType {
OpError {
kind: self,
context: vec![context.into()],
}
}
fn with_context(self, f: impl FnOnce() -> String) -> Self::TransformType {
OpError {
kind: self,
context: vec![f().into()],
}
}
}
impl<T> Context for Result<T, OpError> {
type TransformType = Result<T, OpError>;
fn context(mut self, context: &'static str) -> Self::TransformType {
if let Err(err) = &mut self {
err.context.push(context.into());
}
self
}
fn with_context(mut self, f: impl FnOnce() -> String) -> Self::TransformType {
if let Err(err) = &mut self {
err.context.push(f().into());
}
self
}
}
impl<T, E: Into<OpErrorKind>> Context for Result<T, E> {
type TransformType = Result<T, OpError>;
fn context(self, context: &'static str) -> Self::TransformType {
self.map_err(|err| {
let kind: OpErrorKind = err.into();
OpError {
kind,
context: vec![context.into()],
}
})
}
fn with_context(self, f: impl FnOnce() -> String) -> Self::TransformType {
self.map_err(|err| {
let kind: OpErrorKind = err.into();
OpError {
kind,
context: vec![f().into()],
}
})
}
}