use std::fmt::Debug;
use mz_catalog::durable::{DurableCatalogError, FenceError};
use mz_compute_client::controller::error::{
CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
};
use mz_controller_types::ClusterId;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{exit, soft_assert_no_log};
use mz_repr::{RelationDesc, RowIterator, ScalarType};
use mz_sql::names::FullItemName;
use mz_sql::plan::StatementDesc;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::vars::Var;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{
CreateIndexStatement, FetchStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
};
use mz_storage_types::controller::StorageError;
use mz_transform::TransformError;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use crate::catalog::{Catalog, CatalogState};
use crate::command::{Command, Response};
use crate::coord::{Message, PendingTxnResponse};
use crate::error::AdapterError;
use crate::session::{EndTransactionAction, Session};
use crate::{ExecuteContext, ExecuteResponse};
#[derive(Debug)]
pub struct ClientTransmitter<T>
where
T: Transmittable,
<T as Transmittable>::Allowed: 'static,
{
tx: Option<oneshot::Sender<Response<T>>>,
internal_cmd_tx: UnboundedSender<Message>,
allowed: Option<&'static [T::Allowed]>,
}
impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
pub fn new(
tx: oneshot::Sender<Response<T>>,
internal_cmd_tx: UnboundedSender<Message>,
) -> ClientTransmitter<T> {
ClientTransmitter {
tx: Some(tx),
internal_cmd_tx,
allowed: None,
}
}
#[mz_ore::instrument(level = "debug")]
pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
soft_assert_no_log!(
match (&result, self.allowed.take()) {
(Ok(ref t), Some(allowed)) => allowed.contains(&t.to_allowed()),
_ => true,
},
"tried to send disallowed value {result:?} through ClientTransmitter; \
see ClientTransmitter::set_allowed"
);
if let Err(res) = self
.tx
.take()
.expect("tx will always be `Some` unless `self` has been consumed")
.send(Response {
result,
session,
otel_ctx: OpenTelemetryContext::obtain(),
})
{
self.internal_cmd_tx
.send(Message::Command(
OpenTelemetryContext::obtain(),
Command::Terminate {
conn_id: res.session.conn_id().clone(),
tx: None,
},
))
.expect("coordinator unexpectedly gone");
}
}
pub fn take(mut self) -> oneshot::Sender<Response<T>> {
self.tx
.take()
.expect("tx will always be `Some` unless `self` has been consumed")
}
pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
self.allowed = Some(allowed);
}
}
pub trait Transmittable {
type Allowed: Eq + PartialEq + std::fmt::Debug;
fn to_allowed(&self) -> Self::Allowed;
}
impl Transmittable for () {
type Allowed = bool;
fn to_allowed(&self) -> Self::Allowed {
true
}
}
#[derive(Debug)]
pub struct CompletedClientTransmitter {
ctx: ExecuteContext,
response: Result<PendingTxnResponse, AdapterError>,
action: EndTransactionAction,
}
impl CompletedClientTransmitter {
pub fn new(
ctx: ExecuteContext,
response: Result<PendingTxnResponse, AdapterError>,
action: EndTransactionAction,
) -> Self {
CompletedClientTransmitter {
ctx,
response,
action,
}
}
pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
let changed = self
.ctx
.session_mut()
.vars_mut()
.end_transaction(self.action);
let response = self.response.map(|mut r| {
r.extend_params(changed);
ExecuteResponse::from(r)
});
(self.ctx, response)
}
}
impl<T: Transmittable> Drop for ClientTransmitter<T> {
fn drop(&mut self) {
if self.tx.is_some() {
panic!("client transmitter dropped without send")
}
}
}
pub fn index_sql(
index_name: String,
cluster_id: ClusterId,
view_name: FullItemName,
view_desc: &RelationDesc,
keys: &[usize],
) -> String {
use mz_sql::ast::{Expr, Value};
CreateIndexStatement::<Raw> {
name: Some(Ident::new_unchecked(index_name)),
on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
key_parts: Some(
keys.iter()
.map(|i| match view_desc.get_unambiguous_name(*i) {
Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
_ => Expr::Value(Value::Number((i + 1).to_string())),
})
.collect(),
),
with_options: vec![],
if_not_exists: false,
}
.to_ast_string_stable()
}
pub fn describe(
catalog: &Catalog,
stmt: Statement<Raw>,
param_types: &[Option<ScalarType>],
session: &Session,
) -> Result<StatementDesc, AdapterError> {
match stmt {
Statement::Fetch(FetchStatement { ref name, .. }) => {
match session
.get_portal_unverified(name.as_str())
.map(|p| p.desc.clone())
{
Some(mut desc) => {
desc.param_types = Vec::new();
Ok(desc)
}
None => Err(AdapterError::UnknownCursor(name.to_string())),
}
}
_ => {
let catalog = &catalog.for_session(session);
let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
Ok(mz_sql::plan::describe(
session.pcx(),
catalog,
stmt,
param_types,
)?)
}
}
}
pub trait ResultExt<T> {
fn unwrap_or_terminate(self, context: &str) -> T;
fn maybe_terminate(self, context: &str) -> Self;
}
impl<T, E> ResultExt<T> for Result<T, E>
where
E: ShouldTerminateGracefully + Debug,
{
fn unwrap_or_terminate(self, context: &str) -> T {
match self {
Ok(t) => t,
Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
Err(e) => panic!("{context}: {e:?}"),
}
}
fn maybe_terminate(self, context: &str) -> Self {
if let Err(e) = &self {
if e.should_terminate_gracefully() {
exit!(0, "{context}: {e:?}");
}
}
self
}
}
trait ShouldTerminateGracefully {
fn should_terminate_gracefully(&self) -> bool;
}
impl ShouldTerminateGracefully for AdapterError {
fn should_terminate_gracefully(&self) -> bool {
match self {
AdapterError::Catalog(e) => e.should_terminate_gracefully(),
_ => false,
}
}
}
impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
fn should_terminate_gracefully(&self) -> bool {
match &self.kind {
mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
_ => false,
}
}
}
impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
fn should_terminate_gracefully(&self) -> bool {
match &self {
Self::Durable(e) => e.should_terminate_gracefully(),
_ => false,
}
}
}
impl ShouldTerminateGracefully for DurableCatalogError {
fn should_terminate_gracefully(&self) -> bool {
match self {
DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
DurableCatalogError::IncompatibleDataVersion { .. }
| DurableCatalogError::IncompatiblePersistVersion { .. }
| DurableCatalogError::Proto(_)
| DurableCatalogError::Uninitialized
| DurableCatalogError::NotWritable(_)
| DurableCatalogError::DuplicateKey
| DurableCatalogError::UniquenessViolation
| DurableCatalogError::Storage(_)
| DurableCatalogError::Internal(_) => false,
}
}
}
impl ShouldTerminateGracefully for FenceError {
fn should_terminate_gracefully(&self) -> bool {
match self {
FenceError::DeployGeneration { .. } => true,
FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
}
}
}
impl<T> ShouldTerminateGracefully for StorageError<T> {
fn should_terminate_gracefully(&self) -> bool {
match self {
StorageError::ResourceExhausted(_)
| StorageError::CollectionMetadataAlreadyExists(_)
| StorageError::PersistShardAlreadyInUse(_)
| StorageError::PersistSchemaEvolveRace { .. }
| StorageError::PersistInvalidSchemaEvolve { .. }
| StorageError::TxnWalShardAlreadyExists
| StorageError::UpdateBeyondUpper(_)
| StorageError::ReadBeforeSince(_)
| StorageError::InvalidUppers(_)
| StorageError::InvalidUsage(_)
| StorageError::SourceIdReused(_)
| StorageError::SinkIdReused(_)
| StorageError::IdentifierMissing(_)
| StorageError::IdentifierInvalid(_)
| StorageError::IngestionInstanceMissing { .. }
| StorageError::ExportInstanceMissing { .. }
| StorageError::Generic(_)
| StorageError::ReadOnly
| StorageError::DataflowError(_)
| StorageError::InvalidAlter { .. }
| StorageError::ShuttingDown(_)
| StorageError::MissingSubsourceReference { .. }
| StorageError::RtrTimeout(_)
| StorageError::RtrDropFailure(_) => false,
}
}
}
impl ShouldTerminateGracefully for DataflowCreationError {
fn should_terminate_gracefully(&self) -> bool {
match self {
DataflowCreationError::SinceViolation(_)
| DataflowCreationError::InstanceMissing(_)
| DataflowCreationError::CollectionMissing(_)
| DataflowCreationError::ReplicaMissing(_)
| DataflowCreationError::MissingAsOf
| DataflowCreationError::EmptyAsOfForSubscribe
| DataflowCreationError::EmptyAsOfForCopyTo => false,
}
}
}
impl ShouldTerminateGracefully for CollectionUpdateError {
fn should_terminate_gracefully(&self) -> bool {
match self {
CollectionUpdateError::InstanceMissing(_)
| CollectionUpdateError::CollectionMissing(_) => false,
}
}
}
impl ShouldTerminateGracefully for PeekError {
fn should_terminate_gracefully(&self) -> bool {
match self {
PeekError::SinceViolation(_)
| PeekError::InstanceMissing(_)
| PeekError::CollectionMissing(_)
| PeekError::ReplicaMissing(_) => false,
}
}
}
impl ShouldTerminateGracefully for ReadPolicyError {
fn should_terminate_gracefully(&self) -> bool {
match self {
ReadPolicyError::InstanceMissing(_)
| ReadPolicyError::CollectionMissing(_)
| ReadPolicyError::WriteOnlyCollection(_) => false,
}
}
}
impl ShouldTerminateGracefully for TransformError {
fn should_terminate_gracefully(&self) -> bool {
match self {
TransformError::Internal(_)
| TransformError::IdentifierMissing(_)
| TransformError::CallerShouldPanic(_) => false,
}
}
}
impl ShouldTerminateGracefully for InstanceMissing {
fn should_terminate_gracefully(&self) -> bool {
false
}
}
pub(crate) fn viewable_variables<'a>(
catalog: &'a CatalogState,
session: &'a dyn SessionMetadata,
) -> impl Iterator<Item = &'a dyn Var> {
session
.vars()
.iter()
.chain(catalog.system_config().iter())
.filter(|v| {
v.visible(session.user(), Some(catalog.system_config()))
.is_ok()
})
}
pub fn verify_datum_desc(
desc: &RelationDesc,
rows: &mut dyn RowIterator,
) -> Result<(), AdapterError> {
let Some(row) = rows.peek() else {
return Ok(());
};
let datums = row.unpack();
let col_types = &desc.typ().column_types;
if datums.len() != col_types.len() {
let msg = format!(
"internal error: row descriptor has {} columns but row has {} columns",
col_types.len(),
datums.len(),
);
return Err(AdapterError::Internal(msg));
}
for (i, (d, t)) in datums.iter().zip(col_types).enumerate() {
if !d.is_instance_of(t) {
let msg = format!(
"internal error: column {} is not of expected type {:?}: {:?}",
i, t, d
);
return Err(AdapterError::Internal(msg));
}
}
Ok(())
}