use std::collections::{BTreeMap, BTreeSet};
use std::future::Future;
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use derivative::Derivative;
use enum_kinds::EnumKind;
use futures::future::BoxFuture;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_compute_types::ComputeInstanceId;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_no_log;
use mz_ore::tracing::OpenTelemetryContext;
use mz_pgcopy::CopyFormatParams;
use mz_repr::role_id::RoleId;
use mz_repr::{CatalogItemId, RowIterator};
use mz_sql::ast::{FetchDirection, Raw, Statement};
use mz_sql::catalog::ObjectType;
use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
use mz_sql::session::user::User;
use mz_sql::session::vars::{OwnedVarInput, Var};
use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::catalog::Catalog;
use crate::coord::consistency::CoordinatorInconsistencies;
use crate::coord::peek::PeekResponseUnary;
use crate::coord::ExecuteContextExtra;
use crate::error::AdapterError;
use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::util::Transmittable;
use crate::webhook::AppendWebhookResponse;
use crate::{AdapterNotice, AppendWebhookError};
#[derive(Debug)]
pub struct CatalogSnapshot {
pub catalog: Arc<Catalog>,
}
#[derive(Debug)]
pub enum Command {
CatalogSnapshot {
tx: oneshot::Sender<CatalogSnapshot>,
},
Startup {
tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
user: User,
conn_id: ConnectionId,
client_ip: Option<IpAddr>,
secret_key: u32,
uuid: Uuid,
application_name: String,
notice_tx: mpsc::UnboundedSender<AdapterNotice>,
},
Execute {
portal_name: String,
session: Session,
tx: oneshot::Sender<Response<ExecuteResponse>>,
outer_ctx_extra: Option<ExecuteContextExtra>,
},
Commit {
action: EndTransactionAction,
session: Session,
tx: oneshot::Sender<Response<ExecuteResponse>>,
},
CancelRequest {
conn_id: ConnectionIdType,
secret_key: u32,
},
PrivilegedCancelRequest {
conn_id: ConnectionId,
},
GetWebhook {
database: String,
schema: String,
name: String,
tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
},
GetSystemVars {
conn_id: ConnectionId,
tx: oneshot::Sender<Result<GetVariablesResponse, AdapterError>>,
},
SetSystemVars {
vars: BTreeMap<String, String>,
conn_id: ConnectionId,
tx: oneshot::Sender<Result<(), AdapterError>>,
},
Terminate {
conn_id: ConnectionId,
tx: Option<oneshot::Sender<Result<(), AdapterError>>>,
},
RetireExecute {
data: ExecuteContextExtra,
reason: StatementEndedExecutionReason,
},
CheckConsistency {
tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
},
Dump {
tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
},
}
impl Command {
pub fn session(&self) -> Option<&Session> {
match self {
Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
Command::CancelRequest { .. }
| Command::Startup { .. }
| Command::CatalogSnapshot { .. }
| Command::PrivilegedCancelRequest { .. }
| Command::GetWebhook { .. }
| Command::Terminate { .. }
| Command::GetSystemVars { .. }
| Command::SetSystemVars { .. }
| Command::RetireExecute { .. }
| Command::CheckConsistency { .. }
| Command::Dump { .. } => None,
}
}
pub fn session_mut(&mut self) -> Option<&mut Session> {
match self {
Command::Execute { session, .. } | Command::Commit { session, .. } => Some(session),
Command::CancelRequest { .. }
| Command::Startup { .. }
| Command::CatalogSnapshot { .. }
| Command::PrivilegedCancelRequest { .. }
| Command::GetWebhook { .. }
| Command::Terminate { .. }
| Command::GetSystemVars { .. }
| Command::SetSystemVars { .. }
| Command::RetireExecute { .. }
| Command::CheckConsistency { .. }
| Command::Dump { .. } => None,
}
}
}
#[derive(Debug)]
pub struct Response<T> {
pub result: Result<T, AdapterError>,
pub session: Session,
pub otel_ctx: OpenTelemetryContext,
}
pub type RowsFuture = Pin<Box<dyn Future<Output = PeekResponseUnary> + Send>>;
#[derive(Derivative)]
#[derivative(Debug)]
pub struct StartupResponse {
pub role_id: RoleId,
#[derivative(Debug = "ignore")]
pub write_notify: BoxFuture<'static, ()>,
pub session_defaults: BTreeMap<String, OwnedVarInput>,
pub catalog: Arc<Catalog>,
}
impl Transmittable for StartupResponse {
type Allowed = bool;
fn to_allowed(&self) -> Self::Allowed {
true
}
}
#[derive(Debug, Clone)]
pub struct CatalogDump(String);
impl CatalogDump {
pub fn new(raw: String) -> Self {
CatalogDump(raw)
}
pub fn into_string(self) -> String {
self.0
}
}
impl Transmittable for CatalogDump {
type Allowed = bool;
fn to_allowed(&self) -> Self::Allowed {
true
}
}
#[derive(Debug, Clone)]
pub struct GetVariablesResponse(BTreeMap<String, String>);
impl GetVariablesResponse {
pub fn new<'a>(vars: impl Iterator<Item = &'a dyn Var>) -> Self {
GetVariablesResponse(
vars.map(|var| (var.name().to_string(), var.value()))
.collect(),
)
}
pub fn get(&self, name: &str) -> Option<&str> {
self.0.get(name).map(|s| s.as_str())
}
}
impl Transmittable for GetVariablesResponse {
type Allowed = bool;
fn to_allowed(&self) -> Self::Allowed {
true
}
}
impl IntoIterator for GetVariablesResponse {
type Item = (String, String);
type IntoIter = std::collections::btree_map::IntoIter<String, String>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(EnumKind, Derivative)]
#[derivative(Debug)]
#[enum_kind(ExecuteResponseKind, derive(PartialOrd, Ord))]
pub enum ExecuteResponse {
AlteredDefaultPrivileges,
AlteredObject(ObjectType),
AlteredRole,
AlteredSystemConfiguration,
ClosedCursor,
Comment,
Copied(usize),
CopyTo {
format: mz_sql::plan::CopyFormat,
resp: Box<ExecuteResponse>,
},
CopyFrom {
id: CatalogItemId,
columns: Vec<usize>,
params: CopyFormatParams<'static>,
ctx_extra: ExecuteContextExtra,
},
CreatedConnection,
CreatedDatabase,
CreatedSchema,
CreatedRole,
CreatedCluster,
CreatedClusterReplica,
CreatedIndex,
CreatedIntrospectionSubscribe,
CreatedSecret,
CreatedSink,
CreatedSource,
CreatedTable,
CreatedView,
CreatedViews,
CreatedMaterializedView,
CreatedContinualTask,
CreatedType,
CreatedNetworkPolicy,
Deallocate { all: bool },
DeclaredCursor,
Deleted(usize),
DiscardedTemp,
DiscardedAll,
DroppedObject(ObjectType),
DroppedOwned,
EmptyQuery,
Fetch {
name: String,
count: Option<FetchDirection>,
timeout: ExecuteTimeout,
ctx_extra: ExecuteContextExtra,
},
GrantedPrivilege,
GrantedRole,
Inserted(usize),
Prepare,
Raised,
ReassignOwned,
RevokedPrivilege,
RevokedRole,
SendingRows {
#[derivative(Debug = "ignore")]
future: RowsFuture,
instance_id: ComputeInstanceId,
strategy: StatementExecutionStrategy,
},
SendingRowsImmediate {
#[derivative(Debug = "ignore")]
rows: Box<dyn RowIterator + Send + Sync>,
},
SetVariable {
name: String,
reset: bool,
},
StartedTransaction,
Subscribing {
rx: RowBatchStream,
ctx_extra: ExecuteContextExtra,
instance_id: ComputeInstanceId,
},
TransactionCommitted {
params: BTreeMap<&'static str, String>,
},
TransactionRolledBack {
params: BTreeMap<&'static str, String>,
},
Updated(usize),
ValidatedConnection,
}
impl TryFrom<&Statement<Raw>> for ExecuteResponse {
type Error = ();
fn try_from(stmt: &Statement<Raw>) -> Result<Self, Self::Error> {
let resp_kinds = Plan::generated_from(&stmt.into())
.iter()
.map(ExecuteResponse::generated_from)
.flatten()
.cloned()
.collect::<BTreeSet<ExecuteResponseKind>>();
let resps = resp_kinds
.iter()
.map(|r| (*r).try_into())
.collect::<Result<Vec<ExecuteResponse>, _>>();
if let Ok(resps) = resps {
if resps.len() == 1 {
return Ok(resps.into_element());
}
}
let resp = match stmt {
Statement::DropObjects(DropObjectsStatement { object_type, .. }) => {
ExecuteResponse::DroppedObject((*object_type).into())
}
Statement::AlterObjectRename(AlterObjectRenameStatement { object_type, .. })
| Statement::AlterOwner(AlterOwnerStatement { object_type, .. }) => {
ExecuteResponse::AlteredObject((*object_type).into())
}
_ => return Err(()),
};
soft_assert_no_log!(
resp_kinds.len() == 1
&& resp_kinds.first().expect("must exist") == &ExecuteResponseKind::from(&resp),
"ExecuteResponses out of sync with planner"
);
Ok(resp)
}
}
impl TryInto<ExecuteResponse> for ExecuteResponseKind {
type Error = ();
fn try_into(self) -> Result<ExecuteResponse, Self::Error> {
match self {
ExecuteResponseKind::AlteredDefaultPrivileges => {
Ok(ExecuteResponse::AlteredDefaultPrivileges)
}
ExecuteResponseKind::AlteredObject => Err(()),
ExecuteResponseKind::AlteredRole => Ok(ExecuteResponse::AlteredRole),
ExecuteResponseKind::AlteredSystemConfiguration => {
Ok(ExecuteResponse::AlteredSystemConfiguration)
}
ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
ExecuteResponseKind::Copied => Err(()),
ExecuteResponseKind::CopyTo => Err(()),
ExecuteResponseKind::CopyFrom => Err(()),
ExecuteResponseKind::CreatedConnection => Ok(ExecuteResponse::CreatedConnection),
ExecuteResponseKind::CreatedDatabase => Ok(ExecuteResponse::CreatedDatabase),
ExecuteResponseKind::CreatedSchema => Ok(ExecuteResponse::CreatedSchema),
ExecuteResponseKind::CreatedRole => Ok(ExecuteResponse::CreatedRole),
ExecuteResponseKind::CreatedCluster => Ok(ExecuteResponse::CreatedCluster),
ExecuteResponseKind::CreatedClusterReplica => {
Ok(ExecuteResponse::CreatedClusterReplica)
}
ExecuteResponseKind::CreatedIndex => Ok(ExecuteResponse::CreatedIndex),
ExecuteResponseKind::CreatedSecret => Ok(ExecuteResponse::CreatedSecret),
ExecuteResponseKind::CreatedSink => Ok(ExecuteResponse::CreatedSink),
ExecuteResponseKind::CreatedSource => Ok(ExecuteResponse::CreatedSource),
ExecuteResponseKind::CreatedTable => Ok(ExecuteResponse::CreatedTable),
ExecuteResponseKind::CreatedView => Ok(ExecuteResponse::CreatedView),
ExecuteResponseKind::CreatedViews => Ok(ExecuteResponse::CreatedViews),
ExecuteResponseKind::CreatedMaterializedView => {
Ok(ExecuteResponse::CreatedMaterializedView)
}
ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
ExecuteResponseKind::Deallocate => Err(()),
ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
ExecuteResponseKind::Deleted => Err(()),
ExecuteResponseKind::DiscardedTemp => Ok(ExecuteResponse::DiscardedTemp),
ExecuteResponseKind::DiscardedAll => Ok(ExecuteResponse::DiscardedAll),
ExecuteResponseKind::DroppedObject => Err(()),
ExecuteResponseKind::DroppedOwned => Ok(ExecuteResponse::DroppedOwned),
ExecuteResponseKind::EmptyQuery => Ok(ExecuteResponse::EmptyQuery),
ExecuteResponseKind::Fetch => Err(()),
ExecuteResponseKind::GrantedPrivilege => Ok(ExecuteResponse::GrantedPrivilege),
ExecuteResponseKind::GrantedRole => Ok(ExecuteResponse::GrantedRole),
ExecuteResponseKind::Inserted => Err(()),
ExecuteResponseKind::Prepare => Ok(ExecuteResponse::Prepare),
ExecuteResponseKind::Raised => Ok(ExecuteResponse::Raised),
ExecuteResponseKind::ReassignOwned => Ok(ExecuteResponse::ReassignOwned),
ExecuteResponseKind::RevokedPrivilege => Ok(ExecuteResponse::RevokedPrivilege),
ExecuteResponseKind::RevokedRole => Ok(ExecuteResponse::RevokedRole),
ExecuteResponseKind::SendingRows => Err(()),
ExecuteResponseKind::SetVariable => Err(()),
ExecuteResponseKind::StartedTransaction => Ok(ExecuteResponse::StartedTransaction),
ExecuteResponseKind::Subscribing => Err(()),
ExecuteResponseKind::TransactionCommitted => Err(()),
ExecuteResponseKind::TransactionRolledBack => Err(()),
ExecuteResponseKind::Updated => Err(()),
ExecuteResponseKind::ValidatedConnection => Ok(ExecuteResponse::ValidatedConnection),
ExecuteResponseKind::SendingRowsImmediate => Err(()),
ExecuteResponseKind::CreatedIntrospectionSubscribe => {
Ok(ExecuteResponse::CreatedIntrospectionSubscribe)
}
}
}
}
impl ExecuteResponse {
pub fn tag(&self) -> Option<String> {
use ExecuteResponse::*;
match self {
AlteredDefaultPrivileges => Some("ALTER DEFAULT PRIVILEGES".into()),
AlteredObject(o) => Some(format!("ALTER {}", o)),
AlteredRole => Some("ALTER ROLE".into()),
AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
ClosedCursor => Some("CLOSE CURSOR".into()),
Comment => Some("COMMENT".into()),
Copied(n) => Some(format!("COPY {}", n)),
CopyTo { .. } => None,
CopyFrom { .. } => None,
CreatedConnection { .. } => Some("CREATE CONNECTION".into()),
CreatedDatabase { .. } => Some("CREATE DATABASE".into()),
CreatedSchema { .. } => Some("CREATE SCHEMA".into()),
CreatedRole => Some("CREATE ROLE".into()),
CreatedCluster { .. } => Some("CREATE CLUSTER".into()),
CreatedClusterReplica { .. } => Some("CREATE CLUSTER REPLICA".into()),
CreatedIndex { .. } => Some("CREATE INDEX".into()),
CreatedSecret { .. } => Some("CREATE SECRET".into()),
CreatedSink { .. } => Some("CREATE SINK".into()),
CreatedSource { .. } => Some("CREATE SOURCE".into()),
CreatedTable { .. } => Some("CREATE TABLE".into()),
CreatedView { .. } => Some("CREATE VIEW".into()),
CreatedViews { .. } => Some("CREATE VIEWS".into()),
CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
CreatedType => Some("CREATE TYPE".into()),
CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
DeclaredCursor => Some("DECLARE CURSOR".into()),
Deleted(n) => Some(format!("DELETE {}", n)),
DiscardedTemp => Some("DISCARD TEMP".into()),
DiscardedAll => Some("DISCARD ALL".into()),
DroppedObject(o) => Some(format!("DROP {o}")),
DroppedOwned => Some("DROP OWNED".into()),
EmptyQuery => None,
Fetch { .. } => None,
GrantedPrivilege => Some("GRANT".into()),
GrantedRole => Some("GRANT ROLE".into()),
Inserted(n) => {
Some(format!("INSERT 0 {}", n))
}
Prepare => Some("PREPARE".into()),
Raised => Some("RAISE".into()),
ReassignOwned => Some("REASSIGN OWNED".into()),
RevokedPrivilege => Some("REVOKE".into()),
RevokedRole => Some("REVOKE ROLE".into()),
SendingRows { .. } | SendingRowsImmediate { .. } => None,
SetVariable { reset: true, .. } => Some("RESET".into()),
SetVariable { reset: false, .. } => Some("SET".into()),
StartedTransaction { .. } => Some("BEGIN".into()),
Subscribing { .. } => None,
TransactionCommitted { .. } => Some("COMMIT".into()),
TransactionRolledBack { .. } => Some("ROLLBACK".into()),
Updated(n) => Some(format!("UPDATE {}", n)),
ValidatedConnection => Some("VALIDATE CONNECTION".into()),
CreatedIntrospectionSubscribe => Some("CREATE INTROSPECTION SUBSCRIBE".into()),
}
}
pub fn generated_from(plan: &PlanKind) -> &'static [ExecuteResponseKind] {
use ExecuteResponseKind::*;
use PlanKind::*;
match plan {
AbortTransaction => &[TransactionRolledBack],
AlterClusterRename
| AlterClusterSwap
| AlterCluster
| AlterClusterReplicaRename
| AlterOwner
| AlterItemRename
| AlterRetainHistory
| AlterNoop
| AlterSchemaRename
| AlterSchemaSwap
| AlterSecret
| AlterConnection
| AlterSource
| AlterSink
| AlterTableAddColumn
| AlterNetworkPolicy => &[AlteredObject],
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
AlterSetCluster => &[AlteredObject],
AlterRole => &[AlteredRole],
AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
&[AlteredSystemConfiguration]
}
Close => &[ClosedCursor],
PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom],
PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
PlanKind::Comment => &[ExecuteResponseKind::Comment],
CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
CreateConnection => &[CreatedConnection],
CreateDatabase => &[CreatedDatabase],
CreateSchema => &[CreatedSchema],
CreateRole => &[CreatedRole],
CreateCluster => &[CreatedCluster],
CreateClusterReplica => &[CreatedClusterReplica],
CreateSource | CreateSources => &[CreatedSource],
CreateSecret => &[CreatedSecret],
CreateSink => &[CreatedSink],
CreateTable => &[CreatedTable],
CreateView => &[CreatedView],
CreateMaterializedView => &[CreatedMaterializedView],
CreateContinualTask => &[CreatedContinualTask],
CreateIndex => &[CreatedIndex],
CreateType => &[CreatedType],
PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
CreateNetworkPolicy => &[CreatedNetworkPolicy],
Declare => &[DeclaredCursor],
DiscardTemp => &[DiscardedTemp],
DiscardAll => &[DiscardedAll],
DropObjects => &[DroppedObject],
DropOwned => &[DroppedOwned],
PlanKind::EmptyQuery => &[ExecuteResponseKind::EmptyQuery],
ExplainPlan | ExplainPushdown | ExplainTimestamp | Select | ShowAllVariables
| ShowCreate | ShowColumns | ShowVariable | InspectShard | ExplainSinkSchema => &[
ExecuteResponseKind::CopyTo,
SendingRows,
SendingRowsImmediate,
],
Execute | ReadThenWrite => &[
Deleted,
Inserted,
SendingRows,
SendingRowsImmediate,
Updated,
],
PlanKind::Fetch => &[ExecuteResponseKind::Fetch],
GrantPrivileges => &[GrantedPrivilege],
GrantRole => &[GrantedRole],
Insert => &[Inserted, SendingRowsImmediate],
PlanKind::Prepare => &[ExecuteResponseKind::Prepare],
PlanKind::Raise => &[ExecuteResponseKind::Raised],
PlanKind::ReassignOwned => &[ExecuteResponseKind::ReassignOwned],
RevokePrivileges => &[RevokedPrivilege],
RevokeRole => &[RevokedRole],
PlanKind::SetVariable | ResetVariable | PlanKind::SetTransaction => {
&[ExecuteResponseKind::SetVariable]
}
PlanKind::Subscribe => &[Subscribing, ExecuteResponseKind::CopyTo],
StartTransaction => &[StartedTransaction],
SideEffectingFunc => &[SendingRows, SendingRowsImmediate],
ValidateConnection => &[ExecuteResponseKind::ValidatedConnection],
}
}
}
impl Transmittable for ExecuteResponse {
type Allowed = ExecuteResponseKind;
fn to_allowed(&self) -> Self::Allowed {
ExecuteResponseKind::from(self)
}
}