mz_adapter/coord/
sql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Various utility methods used by the [`Coordinator`]. Ideally these are all
11//! put in more meaningfully named modules.
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_ore::now::EpochMillis;
15use mz_repr::{Diff, GlobalId, ScalarType};
16use mz_sql::names::{Aug, ResolvedIds};
17use mz_sql::plan::{Params, StatementDesc};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_sql_parser::ast::{Raw, Statement};
20
21use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
22use crate::catalog::Catalog;
23use crate::coord::appends::BuiltinTableAppendNotify;
24use crate::coord::{Coordinator, Message};
25use crate::session::{Session, TransactionStatus};
26use crate::util::describe;
27use crate::{AdapterError, ExecuteContext, ExecuteResponse, metrics};
28
29impl Coordinator {
30    pub(crate) fn plan_statement(
31        &self,
32        session: &Session,
33        stmt: mz_sql::ast::Statement<Aug>,
34        params: &mz_sql::plan::Params,
35        resolved_ids: &ResolvedIds,
36    ) -> Result<mz_sql::plan::Plan, AdapterError> {
37        let pcx = session.pcx();
38        let catalog = self.catalog().for_session(session);
39        let plan = mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
40        Ok(plan)
41    }
42
43    pub(crate) fn declare(
44        &self,
45        mut ctx: ExecuteContext,
46        name: String,
47        stmt: Statement<Raw>,
48        sql: String,
49        params: Params,
50    ) {
51        let catalog = self.owned_catalog();
52        let now = self.now();
53        mz_ore::task::spawn(|| "coord::declare", async move {
54            let result =
55                Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
56                    .map(|()| ExecuteResponse::DeclaredCursor);
57            ctx.retire(result);
58        });
59    }
60
61    fn declare_inner(
62        session: &mut Session,
63        catalog: &Catalog,
64        name: String,
65        stmt: Statement<Raw>,
66        sql: String,
67        params: Params,
68        now: EpochMillis,
69    ) -> Result<(), AdapterError> {
70        let param_types = params
71            .types
72            .iter()
73            .map(|ty| Some(ty.clone()))
74            .collect::<Vec<_>>();
75        let desc = describe(catalog, stmt.clone(), &param_types, session)?;
76        let params = params.datums.into_iter().zip(params.types).collect();
77        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
78        let logging = session.mint_logging(sql, Some(&stmt), now);
79        session.set_portal(
80            name,
81            desc,
82            Some(stmt),
83            logging,
84            params,
85            result_formats,
86            catalog.transient_revision(),
87        )?;
88        Ok(())
89    }
90
91    #[mz_ore::instrument(level = "debug")]
92    pub(crate) fn describe(
93        catalog: &Catalog,
94        session: &Session,
95        stmt: Option<Statement<Raw>>,
96        param_types: Vec<Option<ScalarType>>,
97    ) -> Result<StatementDesc, AdapterError> {
98        if let Some(stmt) = stmt {
99            describe(catalog, stmt, &param_types, session)
100        } else {
101            Ok(StatementDesc::new(None))
102        }
103    }
104
105    /// Verify a prepared statement is still valid. This will return an error if
106    /// the catalog's revision has changed and the statement now produces a
107    /// different type than its original.
108    pub(crate) fn verify_prepared_statement(
109        catalog: &Catalog,
110        session: &mut Session,
111        name: &str,
112    ) -> Result<(), AdapterError> {
113        let ps = match session.get_prepared_statement_unverified(name) {
114            Some(ps) => ps,
115            None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
116        };
117        if let Some(revision) = Self::verify_statement_revision(
118            catalog,
119            session,
120            ps.stmt(),
121            ps.desc(),
122            ps.catalog_revision,
123        )? {
124            let ps = session
125                .get_prepared_statement_mut_unverified(name)
126                .expect("known to exist");
127            ps.catalog_revision = revision;
128        }
129
130        Ok(())
131    }
132
133    /// Verify a portal is still valid.
134    pub(crate) fn verify_portal(
135        &self,
136        session: &mut Session,
137        name: &str,
138    ) -> Result<(), AdapterError> {
139        let portal = match session.get_portal_unverified(name) {
140            Some(portal) => portal,
141            None => return Err(AdapterError::UnknownCursor(name.to_string())),
142        };
143        if let Some(revision) = Self::verify_statement_revision(
144            self.catalog(),
145            session,
146            portal.stmt.as_deref(),
147            &portal.desc,
148            portal.catalog_revision,
149        )? {
150            let portal = session
151                .get_portal_unverified_mut(name)
152                .expect("known to exist");
153            portal.catalog_revision = revision;
154        }
155        Ok(())
156    }
157
158    /// If the catalog and portal revisions don't match, re-describe the statement
159    /// and ensure its result type has not changed. Return `Some(x)` with the new
160    /// (valid) revision if its plan has changed. Return `None` if the revisions
161    /// match. Return an error if the plan has changed.
162    fn verify_statement_revision(
163        catalog: &Catalog,
164        session: &Session,
165        stmt: Option<&Statement<Raw>>,
166        desc: &StatementDesc,
167        catalog_revision: u64,
168    ) -> Result<Option<u64>, AdapterError> {
169        let current_revision = catalog.transient_revision();
170        if catalog_revision != current_revision {
171            let current_desc = Self::describe(
172                catalog,
173                session,
174                stmt.cloned(),
175                desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
176            )?;
177            if &current_desc != desc {
178                Err(AdapterError::ChangedPlan(
179                    "cached plan must not change result type".to_string(),
180                ))
181            } else {
182                Ok(Some(current_revision))
183            }
184        } else {
185            Ok(None)
186        }
187    }
188
189    /// Handle removing in-progress transaction state regardless of the end action
190    /// of the transaction.
191    pub(crate) async fn clear_transaction(
192        &mut self,
193        session: &mut Session,
194    ) -> TransactionStatus<mz_repr::Timestamp> {
195        // This function is *usually* called when transactions end, but it can fail to be called in
196        // some cases (for example if the session's role id was dropped, then we return early and
197        // don't go through the normal sequence_end_transaction path). The `Command::Commit` handler
198        // and `AdapterClient::end_transaction` protect against this by each executing their parts
199        // of this function. Thus, if this function changes, ensure that the changes are propogated
200        // to either of those components.
201        self.clear_connection(session.conn_id()).await;
202        session.clear_transaction()
203    }
204
205    /// Clears coordinator state for a connection.
206    pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
207        self.staged_cancellation.remove(conn_id);
208        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
209            .await;
210        self.retire_cluster_reconfigurations_for_conn(conn_id).await;
211
212        // Release this transaction's compaction hold on collections.
213        if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
214            tracing::debug!(?txn_reads, "releasing txn read holds");
215
216            // Make it explicit that we're dropping these read holds. Dropping
217            // them will release them at the Coordinator.
218            drop(txn_reads);
219        }
220
221        if let Some(_guard) = self
222            .active_conns
223            .get_mut(conn_id)
224            .expect("must exist for active session")
225            .deferred_lock
226            .take()
227        {
228            // If there are waiting deferred statements, process one.
229            if !self.serialized_ddl.is_empty() {
230                let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
231            }
232        }
233    }
234
235    /// Adds coordinator bookkeeping for an active compute sink.
236    ///
237    /// This is a low-level method. The caller is responsible for installing the
238    /// sink in the controller.
239    pub(crate) async fn add_active_compute_sink(
240        &mut self,
241        id: GlobalId,
242        active_sink: ActiveComputeSink,
243    ) -> BuiltinTableAppendNotify {
244        let user = self.active_conns()[active_sink.connection_id()].user();
245        let session_type = metrics::session_type_label_value(user);
246
247        self.active_conns
248            .get_mut(active_sink.connection_id())
249            .expect("must exist for active sessions")
250            .drop_sinks
251            .insert(id);
252
253        let ret_fut = match &active_sink {
254            ActiveComputeSink::Subscribe(active_subscribe) => {
255                let update =
256                    self.catalog()
257                        .state()
258                        .pack_subscribe_update(id, active_subscribe, Diff::ONE);
259                let update = self.catalog().state().resolve_builtin_table_update(update);
260
261                self.metrics
262                    .active_subscribes
263                    .with_label_values(&[session_type])
264                    .inc();
265
266                self.builtin_table_update().execute(vec![update]).await.0
267            }
268            ActiveComputeSink::CopyTo(_) => {
269                self.metrics
270                    .active_copy_tos
271                    .with_label_values(&[session_type])
272                    .inc();
273                Box::pin(std::future::ready(()))
274            }
275        };
276        self.active_compute_sinks.insert(id, active_sink);
277        ret_fut
278    }
279
280    /// Removes coordinator bookkeeping for an active compute sink.
281    ///
282    /// This is a low-level method. The caller is responsible for dropping the
283    /// sink from the controller. Consider calling `drop_compute_sink` or
284    /// `retire_compute_sink` instead.
285    #[mz_ore::instrument(level = "debug")]
286    pub(crate) async fn remove_active_compute_sink(
287        &mut self,
288        id: GlobalId,
289    ) -> Option<ActiveComputeSink> {
290        if let Some(sink) = self.active_compute_sinks.remove(&id) {
291            let user = self.active_conns()[sink.connection_id()].user();
292            let session_type = metrics::session_type_label_value(user);
293
294            self.active_conns
295                .get_mut(sink.connection_id())
296                .expect("must exist for active compute sink")
297                .drop_sinks
298                .remove(&id);
299
300            match &sink {
301                ActiveComputeSink::Subscribe(active_subscribe) => {
302                    let update = self.catalog().state().pack_subscribe_update(
303                        id,
304                        active_subscribe,
305                        Diff::MINUS_ONE,
306                    );
307                    let update = self.catalog().state().resolve_builtin_table_update(update);
308                    self.builtin_table_update().blocking(vec![update]).await;
309
310                    self.metrics
311                        .active_subscribes
312                        .with_label_values(&[session_type])
313                        .dec();
314                }
315                ActiveComputeSink::CopyTo(_) => {
316                    self.metrics
317                        .active_copy_tos
318                        .with_label_values(&[session_type])
319                        .dec();
320                }
321            }
322            Some(sink)
323        } else {
324            None
325        }
326    }
327}