Skip to main content

mz_adapter/coord/
sql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Various utility methods used by the [`Coordinator`]. Ideally these are all
11//! put in more meaningfully named modules.
12
13use itertools::Itertools;
14use mz_adapter_types::connection::ConnectionId;
15use mz_ore::now::EpochMillis;
16use mz_repr::{Diff, GlobalId, SqlScalarType};
17use mz_sql::names::{Aug, ResolvedIds};
18use mz_sql::plan::{Params, StatementDesc};
19use mz_sql::session::metadata::SessionMetadata;
20use mz_sql_parser::ast::{Raw, Statement};
21
22use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
23use crate::catalog::Catalog;
24use crate::coord::appends::BuiltinTableAppendNotify;
25use crate::coord::{Coordinator, Message};
26use crate::session::{Session, StateRevision, TransactionStatus};
27use crate::util::describe;
28use crate::{AdapterError, ExecuteContext, ExecuteResponse, metrics};
29
30impl Coordinator {
31    /// Plans a statement and returns the plan along with any resolved IDs
32    /// discovered inside SQL-implemented function bodies. The extra IDs should
33    /// only be used for the `restrict_to_user_objects` RBAC check.
34    pub(crate) fn plan_statement(
35        &self,
36        session: &Session,
37        stmt: mz_sql::ast::Statement<Aug>,
38        params: &mz_sql::plan::Params,
39        resolved_ids: &ResolvedIds,
40    ) -> Result<(mz_sql::plan::Plan, ResolvedIds), AdapterError> {
41        let pcx = session.pcx();
42        let catalog = self.catalog().for_session(session);
43        let (plan, sql_impl_ids) =
44            mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
45        Ok((plan, sql_impl_ids))
46    }
47
48    pub(crate) fn declare(
49        &self,
50        mut ctx: ExecuteContext,
51        name: String,
52        stmt: Statement<Raw>,
53        sql: String,
54        params: Params,
55    ) {
56        let catalog = self.owned_catalog();
57        let now = self.now();
58        mz_ore::task::spawn(|| "coord::declare", async move {
59            let result =
60                Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
61                    .map(|()| ExecuteResponse::DeclaredCursor);
62            ctx.retire(result);
63        });
64    }
65
66    fn declare_inner(
67        session: &mut Session,
68        catalog: &Catalog,
69        name: String,
70        stmt: Statement<Raw>,
71        sql: String,
72        params: Params,
73        now: EpochMillis,
74    ) -> Result<(), AdapterError> {
75        let param_types = params
76            .execute_types
77            .iter()
78            .map(|ty| Some(ty.clone()))
79            .collect::<Vec<_>>();
80        let desc = describe(catalog, stmt.clone(), &param_types, session)?;
81        let params = params
82            .datums
83            .into_iter()
84            .zip_eq(params.execute_types)
85            .collect();
86        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
87        let logging = session.mint_logging(sql, Some(&stmt), now);
88        let state_revision = StateRevision {
89            catalog_revision: catalog.transient_revision(),
90            session_state_revision: session.state_revision(),
91        };
92        session.set_portal(
93            name,
94            desc,
95            Some(stmt),
96            logging,
97            params,
98            result_formats,
99            state_revision,
100        )?;
101        Ok(())
102    }
103
104    #[mz_ore::instrument(level = "debug")]
105    pub(crate) fn describe(
106        catalog: &Catalog,
107        session: &Session,
108        stmt: Option<Statement<Raw>>,
109        param_types: Vec<Option<SqlScalarType>>,
110    ) -> Result<StatementDesc, AdapterError> {
111        if let Some(stmt) = stmt {
112            describe(catalog, stmt, &param_types, session)
113        } else {
114            Ok(StatementDesc::new(None))
115        }
116    }
117
118    /// Verify a prepared statement is still valid. This will return an error if
119    /// the catalog's revision has changed and the statement now produces a
120    /// different type than its original.
121    pub(crate) fn verify_prepared_statement(
122        catalog: &Catalog,
123        session: &mut Session,
124        name: &str,
125    ) -> Result<(), AdapterError> {
126        let ps = match session.get_prepared_statement_unverified(name) {
127            Some(ps) => ps,
128            None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
129        };
130        if let Some(new_revision) = Self::verify_statement_revision(
131            catalog,
132            session,
133            ps.stmt(),
134            ps.desc(),
135            ps.state_revision,
136        )? {
137            let ps = session
138                .get_prepared_statement_mut_unverified(name)
139                .expect("known to exist");
140            ps.state_revision = new_revision;
141        }
142
143        Ok(())
144    }
145
146    /// Verify a portal is still valid.
147    pub(crate) fn verify_portal(
148        catalog: &Catalog,
149        session: &mut Session,
150        name: &str,
151    ) -> Result<(), AdapterError> {
152        let portal = match session.get_portal_unverified(name) {
153            Some(portal) => portal,
154            None => return Err(AdapterError::UnknownCursor(name.to_string())),
155        };
156        if let Some(new_revision) = Self::verify_statement_revision(
157            catalog,
158            session,
159            portal.stmt.as_deref(),
160            &portal.desc,
161            portal.state_revision,
162        )? {
163            let portal = session
164                .get_portal_unverified_mut(name)
165                .expect("known to exist");
166            *portal.state_revision = new_revision;
167        }
168        Ok(())
169    }
170
171    /// If the current catalog/session revisions don't match the given revisions, re-describe the
172    /// statement and ensure its result type has not changed. Return `Some((c, s))` with the new
173    /// (valid) catalog and session state revisions if its plan has changed. Return `None` if the
174    /// revisions match. Return an error if the plan has changed.
175    fn verify_statement_revision(
176        catalog: &Catalog,
177        session: &Session,
178        stmt: Option<&Statement<Raw>>,
179        desc: &StatementDesc,
180        old_state_revision: StateRevision,
181    ) -> Result<Option<StateRevision>, AdapterError> {
182        let current_state_revision = StateRevision {
183            catalog_revision: catalog.transient_revision(),
184            session_state_revision: session.state_revision(),
185        };
186        if old_state_revision != current_state_revision {
187            let current_desc = Self::describe(
188                catalog,
189                session,
190                stmt.cloned(),
191                desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
192            )?;
193            if &current_desc != desc {
194                Err(AdapterError::ChangedPlan(
195                    "cached plan must not change result type".to_string(),
196                ))
197            } else {
198                Ok(Some(current_state_revision))
199            }
200        } else {
201            Ok(None)
202        }
203    }
204
205    /// Handle removing in-progress transaction state regardless of the end action
206    /// of the transaction.
207    pub(crate) async fn clear_transaction(&mut self, session: &mut Session) -> TransactionStatus {
208        // This function is *usually* called when transactions end, but it can fail to be called in
209        // some cases (for example if the session's role id was dropped, then we return early and
210        // don't go through the normal sequence_end_transaction path). The `Command::Commit` handler
211        // and `AdapterClient::end_transaction` protect against this by each executing their parts
212        // of this function. Thus, if this function changes, ensure that the changes are propogated
213        // to either of those components.
214        self.clear_connection(session.conn_id()).await;
215        session.clear_transaction()
216    }
217
218    /// Clears coordinator state for a connection.
219    pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
220        self.staged_cancellation.remove(conn_id);
221        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
222            .await;
223        self.retire_cluster_reconfigurations_for_conn(conn_id).await;
224
225        // Release this transaction's compaction hold on collections.
226        if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
227            tracing::debug!(?txn_reads, "releasing txn read holds");
228
229            // Make it explicit that we're dropping these read holds. Dropping
230            // them will release them at the Coordinator.
231            drop(txn_reads);
232        }
233
234        if let Some(_guard) = self
235            .active_conns
236            .get_mut(conn_id)
237            .expect("must exist for active session")
238            .deferred_lock
239            .take()
240        {
241            // If there are waiting deferred statements, process one.
242            if !self.serialized_ddl.is_empty() {
243                let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
244            }
245        }
246    }
247
248    /// Adds coordinator bookkeeping for an active compute sink.
249    ///
250    /// This is a low-level method. The caller is responsible for installing the
251    /// sink in the controller.
252    pub(crate) async fn add_active_compute_sink(
253        &mut self,
254        id: GlobalId,
255        active_sink: ActiveComputeSink,
256    ) -> BuiltinTableAppendNotify {
257        let user = self.active_conns()[active_sink.connection_id()].user();
258        let session_type = metrics::session_type_label_value(user);
259
260        self.active_conns
261            .get_mut(active_sink.connection_id())
262            .expect("must exist for active sessions")
263            .drop_sinks
264            .insert(id);
265
266        let ret_fut = match &active_sink {
267            ActiveComputeSink::Subscribe(active_subscribe) => {
268                let update =
269                    self.catalog()
270                        .state()
271                        .pack_subscribe_update(id, active_subscribe, Diff::ONE);
272                let update = self.catalog().state().resolve_builtin_table_update(update);
273
274                self.metrics
275                    .active_subscribes
276                    .with_label_values(&[session_type])
277                    .inc();
278
279                self.builtin_table_update().execute(vec![update]).await.0
280            }
281            ActiveComputeSink::CopyTo(_) => {
282                self.metrics
283                    .active_copy_tos
284                    .with_label_values(&[session_type])
285                    .inc();
286                Box::pin(std::future::ready(()))
287            }
288        };
289        self.active_compute_sinks.insert(id, active_sink);
290        ret_fut
291    }
292
293    /// Removes coordinator bookkeeping for an active compute sink.
294    ///
295    /// This is a low-level method. The caller is responsible for dropping the
296    /// sink from the controller. Consider calling `drop_compute_sink` or
297    /// `retire_compute_sink` instead.
298    #[mz_ore::instrument(level = "debug")]
299    pub(crate) async fn remove_active_compute_sink(
300        &mut self,
301        id: GlobalId,
302    ) -> Option<ActiveComputeSink> {
303        if let Some(sink) = self.active_compute_sinks.remove(&id) {
304            let user = self.active_conns()[sink.connection_id()].user();
305            let session_type = metrics::session_type_label_value(user);
306
307            self.active_conns
308                .get_mut(sink.connection_id())
309                .expect("must exist for active compute sink")
310                .drop_sinks
311                .remove(&id);
312
313            match &sink {
314                ActiveComputeSink::Subscribe(active_subscribe) => {
315                    let update = self.catalog().state().pack_subscribe_update(
316                        id,
317                        active_subscribe,
318                        Diff::MINUS_ONE,
319                    );
320                    let update = self.catalog().state().resolve_builtin_table_update(update);
321                    self.builtin_table_update().blocking(vec![update]).await;
322
323                    self.metrics
324                        .active_subscribes
325                        .with_label_values(&[session_type])
326                        .dec();
327                }
328                ActiveComputeSink::CopyTo(_) => {
329                    self.metrics
330                        .active_copy_tos
331                        .with_label_values(&[session_type])
332                        .dec();
333                }
334            }
335            Some(sink)
336        } else {
337            None
338        }
339    }
340}