mz_adapter/coord/
sql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Various utility methods used by the [`Coordinator`]. Ideally these are all
11//! put in more meaningfully named modules.
12
13use itertools::Itertools;
14use mz_adapter_types::connection::ConnectionId;
15use mz_ore::now::EpochMillis;
16use mz_repr::{Diff, GlobalId, ScalarType};
17use mz_sql::names::{Aug, ResolvedIds};
18use mz_sql::plan::{Params, StatementDesc};
19use mz_sql::session::metadata::SessionMetadata;
20use mz_sql_parser::ast::{Raw, Statement};
21
22use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
23use crate::catalog::Catalog;
24use crate::coord::appends::BuiltinTableAppendNotify;
25use crate::coord::{Coordinator, Message};
26use crate::session::{Session, StateRevision, TransactionStatus};
27use crate::util::describe;
28use crate::{AdapterError, ExecuteContext, ExecuteResponse, metrics};
29
30impl Coordinator {
31    pub(crate) fn plan_statement(
32        &self,
33        session: &Session,
34        stmt: mz_sql::ast::Statement<Aug>,
35        params: &mz_sql::plan::Params,
36        resolved_ids: &ResolvedIds,
37    ) -> Result<mz_sql::plan::Plan, AdapterError> {
38        let pcx = session.pcx();
39        let catalog = self.catalog().for_session(session);
40        let plan = mz_sql::plan::plan(Some(pcx), &catalog, stmt, params, resolved_ids)?;
41        Ok(plan)
42    }
43
44    pub(crate) fn declare(
45        &self,
46        mut ctx: ExecuteContext,
47        name: String,
48        stmt: Statement<Raw>,
49        sql: String,
50        params: Params,
51    ) {
52        let catalog = self.owned_catalog();
53        let now = self.now();
54        mz_ore::task::spawn(|| "coord::declare", async move {
55            let result =
56                Self::declare_inner(ctx.session_mut(), &catalog, name, stmt, sql, params, now)
57                    .map(|()| ExecuteResponse::DeclaredCursor);
58            ctx.retire(result);
59        });
60    }
61
62    fn declare_inner(
63        session: &mut Session,
64        catalog: &Catalog,
65        name: String,
66        stmt: Statement<Raw>,
67        sql: String,
68        params: Params,
69        now: EpochMillis,
70    ) -> Result<(), AdapterError> {
71        let param_types = params
72            .execute_types
73            .iter()
74            .map(|ty| Some(ty.clone()))
75            .collect::<Vec<_>>();
76        let desc = describe(catalog, stmt.clone(), &param_types, session)?;
77        let params = params
78            .datums
79            .into_iter()
80            .zip_eq(params.execute_types)
81            .collect();
82        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
83        let logging = session.mint_logging(sql, Some(&stmt), now);
84        let state_revision = StateRevision {
85            catalog_revision: catalog.transient_revision(),
86            session_state_revision: session.state_revision(),
87        };
88        session.set_portal(
89            name,
90            desc,
91            Some(stmt),
92            logging,
93            params,
94            result_formats,
95            state_revision,
96        )?;
97        Ok(())
98    }
99
100    #[mz_ore::instrument(level = "debug")]
101    pub(crate) fn describe(
102        catalog: &Catalog,
103        session: &Session,
104        stmt: Option<Statement<Raw>>,
105        param_types: Vec<Option<ScalarType>>,
106    ) -> Result<StatementDesc, AdapterError> {
107        if let Some(stmt) = stmt {
108            describe(catalog, stmt, &param_types, session)
109        } else {
110            Ok(StatementDesc::new(None))
111        }
112    }
113
114    /// Verify a prepared statement is still valid. This will return an error if
115    /// the catalog's revision has changed and the statement now produces a
116    /// different type than its original.
117    pub(crate) fn verify_prepared_statement(
118        catalog: &Catalog,
119        session: &mut Session,
120        name: &str,
121    ) -> Result<(), AdapterError> {
122        let ps = match session.get_prepared_statement_unverified(name) {
123            Some(ps) => ps,
124            None => return Err(AdapterError::UnknownPreparedStatement(name.to_string())),
125        };
126        if let Some(new_revision) = Self::verify_statement_revision(
127            catalog,
128            session,
129            ps.stmt(),
130            ps.desc(),
131            ps.state_revision,
132        )? {
133            let ps = session
134                .get_prepared_statement_mut_unverified(name)
135                .expect("known to exist");
136            ps.state_revision = new_revision;
137        }
138
139        Ok(())
140    }
141
142    /// Verify a portal is still valid.
143    pub(crate) fn verify_portal(
144        &self,
145        session: &mut Session,
146        name: &str,
147    ) -> Result<(), AdapterError> {
148        let portal = match session.get_portal_unverified(name) {
149            Some(portal) => portal,
150            None => return Err(AdapterError::UnknownCursor(name.to_string())),
151        };
152        if let Some(new_revision) = Self::verify_statement_revision(
153            self.catalog(),
154            session,
155            portal.stmt.as_deref(),
156            &portal.desc,
157            portal.state_revision,
158        )? {
159            let portal = session
160                .get_portal_unverified_mut(name)
161                .expect("known to exist");
162            *portal.state_revision = new_revision;
163        }
164        Ok(())
165    }
166
167    /// If the current catalog/session revisions don't match the given revisions, re-describe the
168    /// statement and ensure its result type has not changed. Return `Some((c, s))` with the new
169    /// (valid) catalog and session state revisions if its plan has changed. Return `None` if the
170    /// revisions match. Return an error if the plan has changed.
171    fn verify_statement_revision(
172        catalog: &Catalog,
173        session: &Session,
174        stmt: Option<&Statement<Raw>>,
175        desc: &StatementDesc,
176        old_state_revision: StateRevision,
177    ) -> Result<Option<StateRevision>, AdapterError> {
178        let current_state_revision = StateRevision {
179            catalog_revision: catalog.transient_revision(),
180            session_state_revision: session.state_revision(),
181        };
182        if old_state_revision != current_state_revision {
183            let current_desc = Self::describe(
184                catalog,
185                session,
186                stmt.cloned(),
187                desc.param_types.iter().map(|ty| Some(ty.clone())).collect(),
188            )?;
189            if &current_desc != desc {
190                Err(AdapterError::ChangedPlan(
191                    "cached plan must not change result type".to_string(),
192                ))
193            } else {
194                Ok(Some(current_state_revision))
195            }
196        } else {
197            Ok(None)
198        }
199    }
200
201    /// Handle removing in-progress transaction state regardless of the end action
202    /// of the transaction.
203    pub(crate) async fn clear_transaction(
204        &mut self,
205        session: &mut Session,
206    ) -> TransactionStatus<mz_repr::Timestamp> {
207        // This function is *usually* called when transactions end, but it can fail to be called in
208        // some cases (for example if the session's role id was dropped, then we return early and
209        // don't go through the normal sequence_end_transaction path). The `Command::Commit` handler
210        // and `AdapterClient::end_transaction` protect against this by each executing their parts
211        // of this function. Thus, if this function changes, ensure that the changes are propogated
212        // to either of those components.
213        self.clear_connection(session.conn_id()).await;
214        session.clear_transaction()
215    }
216
217    /// Clears coordinator state for a connection.
218    pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
219        self.staged_cancellation.remove(conn_id);
220        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
221            .await;
222        self.retire_cluster_reconfigurations_for_conn(conn_id).await;
223
224        // Release this transaction's compaction hold on collections.
225        if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
226            tracing::debug!(?txn_reads, "releasing txn read holds");
227
228            // Make it explicit that we're dropping these read holds. Dropping
229            // them will release them at the Coordinator.
230            drop(txn_reads);
231        }
232
233        if let Some(_guard) = self
234            .active_conns
235            .get_mut(conn_id)
236            .expect("must exist for active session")
237            .deferred_lock
238            .take()
239        {
240            // If there are waiting deferred statements, process one.
241            if !self.serialized_ddl.is_empty() {
242                let _ = self.internal_cmd_tx.send(Message::DeferredStatementReady);
243            }
244        }
245    }
246
247    /// Adds coordinator bookkeeping for an active compute sink.
248    ///
249    /// This is a low-level method. The caller is responsible for installing the
250    /// sink in the controller.
251    pub(crate) async fn add_active_compute_sink(
252        &mut self,
253        id: GlobalId,
254        active_sink: ActiveComputeSink,
255    ) -> BuiltinTableAppendNotify {
256        let user = self.active_conns()[active_sink.connection_id()].user();
257        let session_type = metrics::session_type_label_value(user);
258
259        self.active_conns
260            .get_mut(active_sink.connection_id())
261            .expect("must exist for active sessions")
262            .drop_sinks
263            .insert(id);
264
265        let ret_fut = match &active_sink {
266            ActiveComputeSink::Subscribe(active_subscribe) => {
267                let update =
268                    self.catalog()
269                        .state()
270                        .pack_subscribe_update(id, active_subscribe, Diff::ONE);
271                let update = self.catalog().state().resolve_builtin_table_update(update);
272
273                self.metrics
274                    .active_subscribes
275                    .with_label_values(&[session_type])
276                    .inc();
277
278                self.builtin_table_update().execute(vec![update]).await.0
279            }
280            ActiveComputeSink::CopyTo(_) => {
281                self.metrics
282                    .active_copy_tos
283                    .with_label_values(&[session_type])
284                    .inc();
285                Box::pin(std::future::ready(()))
286            }
287        };
288        self.active_compute_sinks.insert(id, active_sink);
289        ret_fut
290    }
291
292    /// Removes coordinator bookkeeping for an active compute sink.
293    ///
294    /// This is a low-level method. The caller is responsible for dropping the
295    /// sink from the controller. Consider calling `drop_compute_sink` or
296    /// `retire_compute_sink` instead.
297    #[mz_ore::instrument(level = "debug")]
298    pub(crate) async fn remove_active_compute_sink(
299        &mut self,
300        id: GlobalId,
301    ) -> Option<ActiveComputeSink> {
302        if let Some(sink) = self.active_compute_sinks.remove(&id) {
303            let user = self.active_conns()[sink.connection_id()].user();
304            let session_type = metrics::session_type_label_value(user);
305
306            self.active_conns
307                .get_mut(sink.connection_id())
308                .expect("must exist for active compute sink")
309                .drop_sinks
310                .remove(&id);
311
312            match &sink {
313                ActiveComputeSink::Subscribe(active_subscribe) => {
314                    let update = self.catalog().state().pack_subscribe_update(
315                        id,
316                        active_subscribe,
317                        Diff::MINUS_ONE,
318                    );
319                    let update = self.catalog().state().resolve_builtin_table_update(update);
320                    self.builtin_table_update().blocking(vec![update]).await;
321
322                    self.metrics
323                        .active_subscribes
324                        .with_label_values(&[session_type])
325                        .dec();
326                }
327                ActiveComputeSink::CopyTo(_) => {
328                    self.metrics
329                        .active_copy_tos
330                        .with_label_values(&[session_type])
331                        .dec();
332                }
333            }
334            Some(sink)
335        } else {
336            None
337        }
338    }
339}