use mz_adapter_types::connection::ConnectionId;
use mz_ore::now::EpochMillis;
use mz_repr::{GlobalId, ScalarType};
use mz_sql::names::{Aug, ResolvedIds};
use mz_sql::plan::{Params, StatementDesc};
use mz_sql::session::metadata::SessionMetadata;
use mz_sql_parser::ast::{Raw, Statement};
use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
use crate::catalog::Catalog;
use crate::coord::appends::BuiltinTableAppendNotify;
use crate::coord::{Coordinator, Message};
use crate::session::{Session, TransactionStatus};
use crate::util::describe;
use crate::{metrics, AdapterError, ExecuteContext, ExecuteResponse};
impl Coordinator {
pub(crate) fn plan_statement(
&self,
session: &Session,
stmt: mz_sql::ast::Statement<Aug>,
params: &mz_sql::plan::Params,
resolved_ids: &ResolvedIds,
) -> Result<mz_sql::plan::Plan, AdapterError> {
let pcx = session.pcx();
let catalog = self.catalog().for_session(session);
let plan = mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
Ok(plan)
}
pub(crate) fn declare(
&self,
mut ctx: ExecuteContext,
name: String,
stmt: Statement<Raw>,
sql: String,
params: Params,
) {
let catalog = self.owned_catalog();
let now = self.now();
mz_ore::task::spawn(|| "coord::declare", async move {
let result =
Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
.map(|()| ExecuteResponse::DeclaredCursor);
ctx.retire(result);
});
}
fn declare_inner(
session: &mut Session,
catalog: &Catalog,
name: String,
stmt: Statement<Raw>,
sql: String,
params: Params,
now: EpochMillis,
) -> Result<(), AdapterError> {
let param_types = params
.types
.iter()
.map(|ty| Some(ty.clone()))
.collect::<Vec<_>>();
let desc = describe(catalog, stmt.clone(), ¶m_types, session)?;
let params = params.datums.into_iter().zip(params.types).collect();
let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
let logging = session.mint_logging(sql, Some(&stmt), now);
session.set_portal(
name,
desc,
Some(stmt),
logging,
params,
result_formats,
catalog.transient_revision(),
)?;
Ok(())
}
#[mz_ore::instrument(level = "debug")]
pub(crate) fn describe(
catalog: &Catalog,
session: &Session,
stmt: Option<Statement<Raw>>,
param_types: Vec<Option<ScalarType>>,
) -> Result<StatementDesc, AdapterError> {
if let Some(stmt) = stmt {
describe(catalog, stmt, ¶m_types, session)
} else {
Ok(StatementDesc::new(None))
}
}
pub(crate) fn verify_prepared_statement(
catalog: &Catalog,
session: &mut Session,
name: &str,
) -> Result<(), AdapterError> {
let ps = match session.get_prepared_statement_unverified(name) {
Some(ps) => ps,
None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
};
if let Some(revision) = Self::verify_statement_revision(
catalog,
session,
ps.stmt(),
ps.desc(),
ps.catalog_revision,
)? {
let ps = session
.get_prepared_statement_mut_unverified(name)
.expect("known to exist");
ps.catalog_revision = revision;
}
Ok(())
}
pub(crate) fn verify_portal(
&self,
session: &mut Session,
name: &str,
) -> Result<(), AdapterError> {
let portal = match session.get_portal_unverified(name) {
Some(portal) => portal,
None => return Err(AdapterError::UnknownCursor(name.to_string())),
};
if let Some(revision) = Self::verify_statement_revision(
self.catalog(),
session,
portal.stmt.as_deref(),
&portal.desc,
portal.catalog_revision,
)? {
let portal = session
.get_portal_unverified_mut(name)
.expect("known to exist");
portal.catalog_revision = revision;
}
Ok(())
}
fn verify_statement_revision(
catalog: &Catalog,
session: &Session,
stmt: Option<&Statement<Raw>>,
desc: &StatementDesc,
catalog_revision: u64,
) -> Result<Option<u64>, AdapterError> {
let current_revision = catalog.transient_revision();
if catalog_revision != current_revision {
let current_desc = Self::describe(
catalog,
session,
stmt.cloned(),
desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
)?;
if ¤t_desc != desc {
Err(AdapterError::ChangedPlan(
"cached plan must not change result type".to_string(),
))
} else {
Ok(Some(current_revision))
}
} else {
Ok(None)
}
}
pub(crate) async fn clear_transaction(
&mut self,
session: &mut Session,
) -> TransactionStatus<mz_repr::Timestamp> {
self.clear_connection(session.conn_id()).await;
session.clear_transaction()
}
pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
self.staged_cancellation.remove(conn_id);
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
.await;
self.retire_cluster_reconfigurations_for_conn(conn_id).await;
if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
tracing::debug!(?txn_reads, "releasing txn read holds");
drop(txn_reads);
}
if let Some(_guard) = self
.active_conns
.get_mut(conn_id)
.expect("must exist for active session")
.deferred_lock
.take()
{
if !self.serialized_ddl.is_empty() {
let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
}
}
}
pub(crate) async fn add_active_compute_sink(
&mut self,
id: GlobalId,
active_sink: ActiveComputeSink,
) -> BuiltinTableAppendNotify {
let user = self.active_conns()[active_sink.connection_id()].user();
let session_type = metrics::session_type_label_value(user);
self.active_conns
.get_mut(active_sink.connection_id())
.expect("must exist for active sessions")
.drop_sinks
.insert(id);
let ret_fut = match &active_sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
let update = self
.catalog()
.state()
.pack_subscribe_update(id, active_subscribe, 1);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.metrics
.active_subscribes
.with_label_values(&[session_type])
.inc();
self.builtin_table_update().execute(vec![update]).await.0
}
ActiveComputeSink::CopyTo(_) => {
self.metrics
.active_copy_tos
.with_label_values(&[session_type])
.inc();
Box::pin(std::future::ready(()))
}
};
self.active_compute_sinks.insert(id, active_sink);
ret_fut
}
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn remove_active_compute_sink(
&mut self,
id: GlobalId,
) -> Option<ActiveComputeSink> {
if let Some(sink) = self.active_compute_sinks.remove(&id) {
let user = self.active_conns()[sink.connection_id()].user();
let session_type = metrics::session_type_label_value(user);
self.active_conns
.get_mut(sink.connection_id())
.expect("must exist for active compute sink")
.drop_sinks
.remove(&id);
match &sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
let update =
self.catalog()
.state()
.pack_subscribe_update(id, active_subscribe, -1);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.builtin_table_update().blocking(vec![update]).await;
self.metrics
.active_subscribes
.with_label_values(&[session_type])
.dec();
}
ActiveComputeSink::CopyTo(_) => {
self.metrics
.active_copy_tos
.with_label_values(&[session_type])
.dec();
}
}
Some(sink)
} else {
None
}
}
}