1use itertools::Itertools;
14use mz_adapter_types::connection::ConnectionId;
15use mz_ore::now::EpochMillis;
16use mz_repr::{Diff, GlobalId, ScalarType};
17use mz_sql::names::{Aug, ResolvedIds};
18use mz_sql::plan::{Params, StatementDesc};
19use mz_sql::session::metadata::SessionMetadata;
20use mz_sql_parser::ast::{Raw, Statement};
21
22use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
23use crate::catalog::Catalog;
24use crate::coord::appends::BuiltinTableAppendNotify;
25use crate::coord::{Coordinator, Message};
26use crate::session::{Session, StateRevision, TransactionStatus};
27use crate::util::describe;
28use crate::{AdapterError, ExecuteContext, ExecuteResponse, metrics};
29
30impl Coordinator {
31 pub(crate) fn plan_statement(
32 &self,
33 session: &Session,
34 stmt: mz_sql::ast::Statement<Aug>,
35 params: &mz_sql::plan::Params,
36 resolved_ids: &ResolvedIds,
37 ) -> Result<mz_sql::plan::Plan, AdapterError> {
38 let pcx = session.pcx();
39 let catalog = self.catalog().for_session(session);
40 let plan = mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
41 Ok(plan)
42 }
43
44 pub(crate) fn declare(
45 &self,
46 mut ctx: ExecuteContext,
47 name: String,
48 stmt: Statement<Raw>,
49 sql: String,
50 params: Params,
51 ) {
52 let catalog = self.owned_catalog();
53 let now = self.now();
54 mz_ore::task::spawn(|| "coord::declare", async move {
55 let result =
56 Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
57 .map(|()| ExecuteResponse::DeclaredCursor);
58 ctx.retire(result);
59 });
60 }
61
62 fn declare_inner(
63 session: &mut Session,
64 catalog: &Catalog,
65 name: String,
66 stmt: Statement<Raw>,
67 sql: String,
68 params: Params,
69 now: EpochMillis,
70 ) -> Result<(), AdapterError> {
71 let param_types = params
72 .execute_types
73 .iter()
74 .map(|ty| Some(ty.clone()))
75 .collect::<Vec<_>>();
76 let desc = describe(catalog, stmt.clone(), ¶m_types, session)?;
77 let params = params
78 .datums
79 .into_iter()
80 .zip_eq(params.execute_types)
81 .collect();
82 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
83 let logging = session.mint_logging(sql, Some(&stmt), now);
84 let state_revision = StateRevision {
85 catalog_revision: catalog.transient_revision(),
86 session_state_revision: session.state_revision(),
87 };
88 session.set_portal(
89 name,
90 desc,
91 Some(stmt),
92 logging,
93 params,
94 result_formats,
95 state_revision,
96 )?;
97 Ok(())
98 }
99
100 #[mz_ore::instrument(level = "debug")]
101 pub(crate) fn describe(
102 catalog: &Catalog,
103 session: &Session,
104 stmt: Option<Statement<Raw>>,
105 param_types: Vec<Option<ScalarType>>,
106 ) -> Result<StatementDesc, AdapterError> {
107 if let Some(stmt) = stmt {
108 describe(catalog, stmt, ¶m_types, session)
109 } else {
110 Ok(StatementDesc::new(None))
111 }
112 }
113
114 pub(crate) fn verify_prepared_statement(
118 catalog: &Catalog,
119 session: &mut Session,
120 name: &str,
121 ) -> Result<(), AdapterError> {
122 let ps = match session.get_prepared_statement_unverified(name) {
123 Some(ps) => ps,
124 None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
125 };
126 if let Some(new_revision) = Self::verify_statement_revision(
127 catalog,
128 session,
129 ps.stmt(),
130 ps.desc(),
131 ps.state_revision,
132 )? {
133 let ps = session
134 .get_prepared_statement_mut_unverified(name)
135 .expect("known to exist");
136 ps.state_revision = new_revision;
137 }
138
139 Ok(())
140 }
141
142 pub(crate) fn verify_portal(
144 &self,
145 session: &mut Session,
146 name: &str,
147 ) -> Result<(), AdapterError> {
148 let portal = match session.get_portal_unverified(name) {
149 Some(portal) => portal,
150 None => return Err(AdapterError::UnknownCursor(name.to_string())),
151 };
152 if let Some(new_revision) = Self::verify_statement_revision(
153 self.catalog(),
154 session,
155 portal.stmt.as_deref(),
156 &portal.desc,
157 portal.state_revision,
158 )? {
159 let portal = session
160 .get_portal_unverified_mut(name)
161 .expect("known to exist");
162 *portal.state_revision = new_revision;
163 }
164 Ok(())
165 }
166
167 fn verify_statement_revision(
172 catalog: &Catalog,
173 session: &Session,
174 stmt: Option<&Statement<Raw>>,
175 desc: &StatementDesc,
176 old_state_revision: StateRevision,
177 ) -> Result<Option<StateRevision>, AdapterError> {
178 let current_state_revision = StateRevision {
179 catalog_revision: catalog.transient_revision(),
180 session_state_revision: session.state_revision(),
181 };
182 if old_state_revision != current_state_revision {
183 let current_desc = Self::describe(
184 catalog,
185 session,
186 stmt.cloned(),
187 desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
188 )?;
189 if ¤t_desc != desc {
190 Err(AdapterError::ChangedPlan(
191 "cached plan must not change result type".to_string(),
192 ))
193 } else {
194 Ok(Some(current_state_revision))
195 }
196 } else {
197 Ok(None)
198 }
199 }
200
201 pub(crate) async fn clear_transaction(
204 &mut self,
205 session: &mut Session,
206 ) -> TransactionStatus<mz_repr::Timestamp> {
207 self.clear_connection(session.conn_id()).await;
214 session.clear_transaction()
215 }
216
217 pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
219 self.staged_cancellation.remove(conn_id);
220 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
221 .await;
222 self.retire_cluster_reconfigurations_for_conn(conn_id).await;
223
224 if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
226 tracing::debug!(?txn_reads, "releasing txn read holds");
227
228 drop(txn_reads);
231 }
232
233 if let Some(_guard) = self
234 .active_conns
235 .get_mut(conn_id)
236 .expect("must exist for active session")
237 .deferred_lock
238 .take()
239 {
240 if !self.serialized_ddl.is_empty() {
242 let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
243 }
244 }
245 }
246
247 pub(crate) async fn add_active_compute_sink(
252 &mut self,
253 id: GlobalId,
254 active_sink: ActiveComputeSink,
255 ) -> BuiltinTableAppendNotify {
256 let user = self.active_conns()[active_sink.connection_id()].user();
257 let session_type = metrics::session_type_label_value(user);
258
259 self.active_conns
260 .get_mut(active_sink.connection_id())
261 .expect("must exist for active sessions")
262 .drop_sinks
263 .insert(id);
264
265 let ret_fut = match &active_sink {
266 ActiveComputeSink::Subscribe(active_subscribe) => {
267 let update =
268 self.catalog()
269 .state()
270 .pack_subscribe_update(id, active_subscribe, Diff::ONE);
271 let update = self.catalog().state().resolve_builtin_table_update(update);
272
273 self.metrics
274 .active_subscribes
275 .with_label_values(&[session_type])
276 .inc();
277
278 self.builtin_table_update().execute(vec![update]).await.0
279 }
280 ActiveComputeSink::CopyTo(_) => {
281 self.metrics
282 .active_copy_tos
283 .with_label_values(&[session_type])
284 .inc();
285 Box::pin(std::future::ready(()))
286 }
287 };
288 self.active_compute_sinks.insert(id, active_sink);
289 ret_fut
290 }
291
292 #[mz_ore::instrument(level = "debug")]
298 pub(crate) async fn remove_active_compute_sink(
299 &mut self,
300 id: GlobalId,
301 ) -> Option<ActiveComputeSink> {
302 if let Some(sink) = self.active_compute_sinks.remove(&id) {
303 let user = self.active_conns()[sink.connection_id()].user();
304 let session_type = metrics::session_type_label_value(user);
305
306 self.active_conns
307 .get_mut(sink.connection_id())
308 .expect("must exist for active compute sink")
309 .drop_sinks
310 .remove(&id);
311
312 match &sink {
313 ActiveComputeSink::Subscribe(active_subscribe) => {
314 let update = self.catalog().state().pack_subscribe_update(
315 id,
316 active_subscribe,
317 Diff::MINUS_ONE,
318 );
319 let update = self.catalog().state().resolve_builtin_table_update(update);
320 self.builtin_table_update().blocking(vec![update]).await;
321
322 self.metrics
323 .active_subscribes
324 .with_label_values(&[session_type])
325 .dec();
326 }
327 ActiveComputeSink::CopyTo(_) => {
328 self.metrics
329 .active_copy_tos
330 .with_label_values(&[session_type])
331 .dec();
332 }
333 }
334 Some(sink)
335 } else {
336 None
337 }
338 }
339}