1use mz_adapter_types::connection::ConnectionId;
14use mz_ore::now::EpochMillis;
15use mz_repr::{Diff, GlobalId, ScalarType};
16use mz_sql::names::{Aug, ResolvedIds};
17use mz_sql::plan::{Params, StatementDesc};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_sql_parser::ast::{Raw, Statement};
20
21use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
22use crate::catalog::Catalog;
23use crate::coord::appends::BuiltinTableAppendNotify;
24use crate::coord::{Coordinator, Message};
25use crate::session::{Session, TransactionStatus};
26use crate::util::describe;
27use crate::{AdapterError, ExecuteContext, ExecuteResponse, metrics};
28
29impl Coordinator {
30 pub(crate) fn plan_statement(
31 &self,
32 session: &Session,
33 stmt: mz_sql::ast::Statement<Aug>,
34 params: &mz_sql::plan::Params,
35 resolved_ids: &ResolvedIds,
36 ) -> Result<mz_sql::plan::Plan, AdapterError> {
37 let pcx = session.pcx();
38 let catalog = self.catalog().for_session(session);
39 let plan = mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
40 Ok(plan)
41 }
42
43 pub(crate) fn declare(
44 &self,
45 mut ctx: ExecuteContext,
46 name: String,
47 stmt: Statement<Raw>,
48 sql: String,
49 params: Params,
50 ) {
51 let catalog = self.owned_catalog();
52 let now = self.now();
53 mz_ore::task::spawn(|| "coord::declare", async move {
54 let result =
55 Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
56 .map(|()| ExecuteResponse::DeclaredCursor);
57 ctx.retire(result);
58 });
59 }
60
61 fn declare_inner(
62 session: &mut Session,
63 catalog: &Catalog,
64 name: String,
65 stmt: Statement<Raw>,
66 sql: String,
67 params: Params,
68 now: EpochMillis,
69 ) -> Result<(), AdapterError> {
70 let param_types = params
71 .types
72 .iter()
73 .map(|ty| Some(ty.clone()))
74 .collect::<Vec<_>>();
75 let desc = describe(catalog, stmt.clone(), ¶m_types, session)?;
76 let params = params.datums.into_iter().zip(params.types).collect();
77 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
78 let logging = session.mint_logging(sql, Some(&stmt), now);
79 session.set_portal(
80 name,
81 desc,
82 Some(stmt),
83 logging,
84 params,
85 result_formats,
86 catalog.transient_revision(),
87 )?;
88 Ok(())
89 }
90
91 #[mz_ore::instrument(level = "debug")]
92 pub(crate) fn describe(
93 catalog: &Catalog,
94 session: &Session,
95 stmt: Option<Statement<Raw>>,
96 param_types: Vec<Option<ScalarType>>,
97 ) -> Result<StatementDesc, AdapterError> {
98 if let Some(stmt) = stmt {
99 describe(catalog, stmt, ¶m_types, session)
100 } else {
101 Ok(StatementDesc::new(None))
102 }
103 }
104
105 pub(crate) fn verify_prepared_statement(
109 catalog: &Catalog,
110 session: &mut Session,
111 name: &str,
112 ) -> Result<(), AdapterError> {
113 let ps = match session.get_prepared_statement_unverified(name) {
114 Some(ps) => ps,
115 None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
116 };
117 if let Some(revision) = Self::verify_statement_revision(
118 catalog,
119 session,
120 ps.stmt(),
121 ps.desc(),
122 ps.catalog_revision,
123 )? {
124 let ps = session
125 .get_prepared_statement_mut_unverified(name)
126 .expect("known to exist");
127 ps.catalog_revision = revision;
128 }
129
130 Ok(())
131 }
132
133 pub(crate) fn verify_portal(
135 &self,
136 session: &mut Session,
137 name: &str,
138 ) -> Result<(), AdapterError> {
139 let portal = match session.get_portal_unverified(name) {
140 Some(portal) => portal,
141 None => return Err(AdapterError::UnknownCursor(name.to_string())),
142 };
143 if let Some(revision) = Self::verify_statement_revision(
144 self.catalog(),
145 session,
146 portal.stmt.as_deref(),
147 &portal.desc,
148 portal.catalog_revision,
149 )? {
150 let portal = session
151 .get_portal_unverified_mut(name)
152 .expect("known to exist");
153 portal.catalog_revision = revision;
154 }
155 Ok(())
156 }
157
158 fn verify_statement_revision(
163 catalog: &Catalog,
164 session: &Session,
165 stmt: Option<&Statement<Raw>>,
166 desc: &StatementDesc,
167 catalog_revision: u64,
168 ) -> Result<Option<u64>, AdapterError> {
169 let current_revision = catalog.transient_revision();
170 if catalog_revision != current_revision {
171 let current_desc = Self::describe(
172 catalog,
173 session,
174 stmt.cloned(),
175 desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
176 )?;
177 if ¤t_desc != desc {
178 Err(AdapterError::ChangedPlan(
179 "cached plan must not change result type".to_string(),
180 ))
181 } else {
182 Ok(Some(current_revision))
183 }
184 } else {
185 Ok(None)
186 }
187 }
188
189 pub(crate) async fn clear_transaction(
192 &mut self,
193 session: &mut Session,
194 ) -> TransactionStatus<mz_repr::Timestamp> {
195 self.clear_connection(session.conn_id()).await;
202 session.clear_transaction()
203 }
204
205 pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
207 self.staged_cancellation.remove(conn_id);
208 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
209 .await;
210 self.retire_cluster_reconfigurations_for_conn(conn_id).await;
211
212 if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
214 tracing::debug!(?txn_reads, "releasing txn read holds");
215
216 drop(txn_reads);
219 }
220
221 if let Some(_guard) = self
222 .active_conns
223 .get_mut(conn_id)
224 .expect("must exist for active session")
225 .deferred_lock
226 .take()
227 {
228 if !self.serialized_ddl.is_empty() {
230 let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
231 }
232 }
233 }
234
235 pub(crate) async fn add_active_compute_sink(
240 &mut self,
241 id: GlobalId,
242 active_sink: ActiveComputeSink,
243 ) -> BuiltinTableAppendNotify {
244 let user = self.active_conns()[active_sink.connection_id()].user();
245 let session_type = metrics::session_type_label_value(user);
246
247 self.active_conns
248 .get_mut(active_sink.connection_id())
249 .expect("must exist for active sessions")
250 .drop_sinks
251 .insert(id);
252
253 let ret_fut = match &active_sink {
254 ActiveComputeSink::Subscribe(active_subscribe) => {
255 let update =
256 self.catalog()
257 .state()
258 .pack_subscribe_update(id, active_subscribe, Diff::ONE);
259 let update = self.catalog().state().resolve_builtin_table_update(update);
260
261 self.metrics
262 .active_subscribes
263 .with_label_values(&[session_type])
264 .inc();
265
266 self.builtin_table_update().execute(vec![update]).await.0
267 }
268 ActiveComputeSink::CopyTo(_) => {
269 self.metrics
270 .active_copy_tos
271 .with_label_values(&[session_type])
272 .inc();
273 Box::pin(std::future::ready(()))
274 }
275 };
276 self.active_compute_sinks.insert(id, active_sink);
277 ret_fut
278 }
279
280 #[mz_ore::instrument(level = "debug")]
286 pub(crate) async fn remove_active_compute_sink(
287 &mut self,
288 id: GlobalId,
289 ) -> Option<ActiveComputeSink> {
290 if let Some(sink) = self.active_compute_sinks.remove(&id) {
291 let user = self.active_conns()[sink.connection_id()].user();
292 let session_type = metrics::session_type_label_value(user);
293
294 self.active_conns
295 .get_mut(sink.connection_id())
296 .expect("must exist for active compute sink")
297 .drop_sinks
298 .remove(&id);
299
300 match &sink {
301 ActiveComputeSink::Subscribe(active_subscribe) => {
302 let update = self.catalog().state().pack_subscribe_update(
303 id,
304 active_subscribe,
305 Diff::MINUS_ONE,
306 );
307 let update = self.catalog().state().resolve_builtin_table_update(update);
308 self.builtin_table_update().blocking(vec![update]).await;
309
310 self.metrics
311 .active_subscribes
312 .with_label_values(&[session_type])
313 .dec();
314 }
315 ActiveComputeSink::CopyTo(_) => {
316 self.metrics
317 .active_copy_tos
318 .with_label_values(&[session_type])
319 .dec();
320 }
321 }
322 Some(sink)
323 } else {
324 None
325 }
326 }
327}