1use itertools::Itertools;
14use mz_adapter_types::connection::ConnectionId;
15use mz_ore::now::EpochMillis;
16use mz_repr::{Diff, GlobalId, SqlScalarType};
17use mz_sql::names::{Aug, ResolvedIds};
18use mz_sql::plan::{Params, StatementDesc};
19use mz_sql::session::metadata::SessionMetadata;
20use mz_sql_parser::ast::{Raw, Statement};
21
22use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
23use crate::catalog::Catalog;
24use crate::coord::appends::BuiltinTableAppendNotify;
25use crate::coord::{Coordinator, Message};
26use crate::session::{Session, StateRevision, TransactionStatus};
27use crate::util::describe;
28use crate::{AdapterError, ExecuteContext, ExecuteResponse, metrics};
29
30impl Coordinator {
31 pub(crate) fn plan_statement(
35 &self,
36 session: &Session,
37 stmt: mz_sql::ast::Statement<Aug>,
38 params: &mz_sql::plan::Params,
39 resolved_ids: &ResolvedIds,
40 ) -> Result<(mz_sql::plan::Plan, ResolvedIds), AdapterError> {
41 let pcx = session.pcx();
42 let catalog = self.catalog().for_session(session);
43 let (plan, sql_impl_ids) =
44 mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
45 Ok((plan, sql_impl_ids))
46 }
47
48 pub(crate) fn declare(
49 &self,
50 mut ctx: ExecuteContext,
51 name: String,
52 stmt: Statement<Raw>,
53 sql: String,
54 params: Params,
55 ) {
56 let catalog = self.owned_catalog();
57 let now = self.now();
58 mz_ore::task::spawn(|| "coord::declare", async move {
59 let result =
60 Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
61 .map(|()| ExecuteResponse::DeclaredCursor);
62 ctx.retire(result);
63 });
64 }
65
66 fn declare_inner(
67 session: &mut Session,
68 catalog: &Catalog,
69 name: String,
70 stmt: Statement<Raw>,
71 sql: String,
72 params: Params,
73 now: EpochMillis,
74 ) -> Result<(), AdapterError> {
75 let param_types = params
76 .execute_types
77 .iter()
78 .map(|ty| Some(ty.clone()))
79 .collect::<Vec<_>>();
80 let desc = describe(catalog, stmt.clone(), ¶m_types, session)?;
81 let params = params
82 .datums
83 .into_iter()
84 .zip_eq(params.execute_types)
85 .collect();
86 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
87 let logging = session.mint_logging(sql, Some(&stmt), now);
88 let state_revision = StateRevision {
89 catalog_revision: catalog.transient_revision(),
90 session_state_revision: session.state_revision(),
91 };
92 session.set_portal(
93 name,
94 desc,
95 Some(stmt),
96 logging,
97 params,
98 result_formats,
99 state_revision,
100 )?;
101 Ok(())
102 }
103
104 #[mz_ore::instrument(level = "debug")]
105 pub(crate) fn describe(
106 catalog: &Catalog,
107 session: &Session,
108 stmt: Option<Statement<Raw>>,
109 param_types: Vec<Option<SqlScalarType>>,
110 ) -> Result<StatementDesc, AdapterError> {
111 if let Some(stmt) = stmt {
112 describe(catalog, stmt, ¶m_types, session)
113 } else {
114 Ok(StatementDesc::new(None))
115 }
116 }
117
118 pub(crate) fn verify_prepared_statement(
122 catalog: &Catalog,
123 session: &mut Session,
124 name: &str,
125 ) -> Result<(), AdapterError> {
126 let ps = match session.get_prepared_statement_unverified(name) {
127 Some(ps) => ps,
128 None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
129 };
130 if let Some(new_revision) = Self::verify_statement_revision(
131 catalog,
132 session,
133 ps.stmt(),
134 ps.desc(),
135 ps.state_revision,
136 )? {
137 let ps = session
138 .get_prepared_statement_mut_unverified(name)
139 .expect("known to exist");
140 ps.state_revision = new_revision;
141 }
142
143 Ok(())
144 }
145
146 pub(crate) fn verify_portal(
148 catalog: &Catalog,
149 session: &mut Session,
150 name: &str,
151 ) -> Result<(), AdapterError> {
152 let portal = match session.get_portal_unverified(name) {
153 Some(portal) => portal,
154 None => return Err(AdapterError::UnknownCursor(name.to_string())),
155 };
156 if let Some(new_revision) = Self::verify_statement_revision(
157 catalog,
158 session,
159 portal.stmt.as_deref(),
160 &portal.desc,
161 portal.state_revision,
162 )? {
163 let portal = session
164 .get_portal_unverified_mut(name)
165 .expect("known to exist");
166 *portal.state_revision = new_revision;
167 }
168 Ok(())
169 }
170
171 fn verify_statement_revision(
176 catalog: &Catalog,
177 session: &Session,
178 stmt: Option<&Statement<Raw>>,
179 desc: &StatementDesc,
180 old_state_revision: StateRevision,
181 ) -> Result<Option<StateRevision>, AdapterError> {
182 let current_state_revision = StateRevision {
183 catalog_revision: catalog.transient_revision(),
184 session_state_revision: session.state_revision(),
185 };
186 if old_state_revision != current_state_revision {
187 let current_desc = Self::describe(
188 catalog,
189 session,
190 stmt.cloned(),
191 desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
192 )?;
193 if ¤t_desc != desc {
194 Err(AdapterError::ChangedPlan(
195 "cached plan must not change result type".to_string(),
196 ))
197 } else {
198 Ok(Some(current_state_revision))
199 }
200 } else {
201 Ok(None)
202 }
203 }
204
205 pub(crate) async fn clear_transaction(&mut self, session: &mut Session) -> TransactionStatus {
208 self.clear_connection(session.conn_id()).await;
215 session.clear_transaction()
216 }
217
218 pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
220 self.staged_cancellation.remove(conn_id);
221 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
222 .await;
223 self.retire_cluster_reconfigurations_for_conn(conn_id).await;
224
225 if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
227 tracing::debug!(?txn_reads, "releasing txn read holds");
228
229 drop(txn_reads);
232 }
233
234 if let Some(_guard) = self
235 .active_conns
236 .get_mut(conn_id)
237 .expect("must exist for active session")
238 .deferred_lock
239 .take()
240 {
241 if !self.serialized_ddl.is_empty() {
243 let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
244 }
245 }
246 }
247
248 pub(crate) async fn add_active_compute_sink(
253 &mut self,
254 id: GlobalId,
255 active_sink: ActiveComputeSink,
256 ) -> BuiltinTableAppendNotify {
257 let user = self.active_conns()[active_sink.connection_id()].user();
258 let session_type = metrics::session_type_label_value(user);
259
260 self.active_conns
261 .get_mut(active_sink.connection_id())
262 .expect("must exist for active sessions")
263 .drop_sinks
264 .insert(id);
265
266 let ret_fut = match &active_sink {
267 ActiveComputeSink::Subscribe(active_subscribe) => {
268 let update =
269 self.catalog()
270 .state()
271 .pack_subscribe_update(id, active_subscribe, Diff::ONE);
272 let update = self.catalog().state().resolve_builtin_table_update(update);
273
274 self.metrics
275 .active_subscribes
276 .with_label_values(&[session_type])
277 .inc();
278
279 self.builtin_table_update().execute(vec![update]).await.0
280 }
281 ActiveComputeSink::CopyTo(_) => {
282 self.metrics
283 .active_copy_tos
284 .with_label_values(&[session_type])
285 .inc();
286 Box::pin(std::future::ready(()))
287 }
288 };
289 self.active_compute_sinks.insert(id, active_sink);
290 ret_fut
291 }
292
293 #[mz_ore::instrument(level = "debug")]
299 pub(crate) async fn remove_active_compute_sink(
300 &mut self,
301 id: GlobalId,
302 ) -> Option<ActiveComputeSink> {
303 if let Some(sink) = self.active_compute_sinks.remove(&id) {
304 let user = self.active_conns()[sink.connection_id()].user();
305 let session_type = metrics::session_type_label_value(user);
306
307 self.active_conns
308 .get_mut(sink.connection_id())
309 .expect("must exist for active compute sink")
310 .drop_sinks
311 .remove(&id);
312
313 match &sink {
314 ActiveComputeSink::Subscribe(active_subscribe) => {
315 let update = self.catalog().state().pack_subscribe_update(
316 id,
317 active_subscribe,
318 Diff::MINUS_ONE,
319 );
320 let update = self.catalog().state().resolve_builtin_table_update(update);
321 self.builtin_table_update().blocking(vec![update]).await;
322
323 self.metrics
324 .active_subscribes
325 .with_label_values(&[session_type])
326 .dec();
327 }
328 ActiveComputeSink::CopyTo(_) => {
329 self.metrics
330 .active_copy_tos
331 .with_label_values(&[session_type])
332 .dec();
333 }
334 }
335 Some(sink)
336 } else {
337 None
338 }
339 }
340}