mz_adapter/coord/
command_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for  processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use differential_dataflow::lattice::Lattice;
14use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
15use mz_auth::password::Password;
16use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
17use mz_sql::session::metadata::SessionMetadata;
18use std::collections::{BTreeMap, BTreeSet};
19use std::net::IpAddr;
20use std::sync::Arc;
21
22use futures::FutureExt;
23use futures::future::LocalBoxFuture;
24use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
27use mz_ore::task;
28use mz_ore::tracing::OpenTelemetryContext;
29use mz_ore::{instrument, soft_panic_or_log};
30use mz_repr::role_id::RoleId;
31use mz_repr::{Diff, ScalarType, Timestamp};
32use mz_sql::ast::{
33    AlterConnectionAction, AlterConnectionStatement, AlterSourceAction, AstInfo, ConstantVisitor,
34    CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, SubscribeStatement,
35};
36use mz_sql::catalog::RoleAttributes;
37use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
38use mz_sql::plan::{
39    AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
40    StatementClassification, TransactionType,
41};
42use mz_sql::pure::{
43    materialized_view_option_contains_temporal, purify_create_materialized_view_options,
44};
45use mz_sql::rbac;
46use mz_sql::rbac::CREATE_ITEM_USAGE;
47use mz_sql::session::user::User;
48use mz_sql::session::vars::{
49    EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
50};
51use mz_sql_parser::ast::display::AstDisplay;
52use mz_sql_parser::ast::{
53    CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
54    WithOptionValue,
55};
56use mz_storage_types::sources::Timeline;
57use opentelemetry::trace::TraceContextExt;
58use tokio::sync::{mpsc, oneshot};
59use tracing::{Instrument, debug_span, info, warn};
60use tracing_opentelemetry::OpenTelemetrySpanExt;
61
62use crate::command::{AuthResponse, CatalogSnapshot, Command, ExecuteResponse, StartupResponse};
63use crate::coord::appends::PendingWriteTxn;
64use crate::coord::{
65    ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
66    PurifiedStatementReady, validate_ip_with_policy_rules,
67};
68use crate::error::AdapterError;
69use crate::notice::AdapterNotice;
70use crate::session::{Session, TransactionOps, TransactionStatus};
71use crate::util::{ClientTransmitter, ResultExt};
72use crate::webhook::{
73    AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
74};
75use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
76
77use super::ExecuteContextExtra;
78
79impl Coordinator {
80    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
81    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
82    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
83    pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
84        async move {
85            if let Some(session) = cmd.session_mut() {
86                session.apply_external_metadata_updates();
87            }
88            match cmd {
89                Command::Startup {
90                    tx,
91                    user,
92                    conn_id,
93                    secret_key,
94                    uuid,
95                    client_ip,
96                    application_name,
97                    notice_tx,
98                } => {
99                    // Note: We purposefully do not use a ClientTransmitter here because startup
100                    // handles errors and cleanup of sessions itself.
101                    self.handle_startup(
102                        tx,
103                        user,
104                        conn_id,
105                        secret_key,
106                        uuid,
107                        client_ip,
108                        application_name,
109                        notice_tx,
110                    )
111                    .await;
112                }
113
114                Command::AuthenticatePassword {
115                    tx,
116                    role_name,
117                    password,
118                } => {
119                    self.handle_authenticate_password(tx, role_name, password)
120                        .await;
121                }
122
123                Command::Execute {
124                    portal_name,
125                    session,
126                    tx,
127                    outer_ctx_extra,
128                } => {
129                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
130
131                    self.handle_execute(portal_name, session, tx, outer_ctx_extra)
132                        .await;
133                }
134
135                Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
136
137                Command::CancelRequest {
138                    conn_id,
139                    secret_key,
140                } => {
141                    self.handle_cancel(conn_id, secret_key).await;
142                }
143
144                Command::PrivilegedCancelRequest { conn_id } => {
145                    self.handle_privileged_cancel(conn_id).await;
146                }
147
148                Command::GetWebhook {
149                    database,
150                    schema,
151                    name,
152                    tx,
153                } => {
154                    self.handle_get_webhook(database, schema, name, tx);
155                }
156
157                Command::GetSystemVars { tx } => {
158                    let _ = tx.send(self.catalog.system_config().clone());
159                }
160
161                Command::SetSystemVars { vars, conn_id, tx } => {
162                    let mut ops = Vec::with_capacity(vars.len());
163                    let conn = &self.active_conns[&conn_id];
164
165                    for (name, value) in vars {
166                        if let Err(e) =
167                            self.catalog().system_config().get(&name).and_then(|var| {
168                                var.visible(conn.user(), self.catalog.system_config())
169                            })
170                        {
171                            let _ = tx.send(Err(e.into()));
172                            return;
173                        }
174
175                        ops.push(catalog::Op::UpdateSystemConfiguration {
176                            name,
177                            value: OwnedVarInput::Flat(value),
178                        });
179                    }
180
181                    let result = self.catalog_transact_conn(Some(&conn_id), ops).await;
182                    let _ = tx.send(result);
183                }
184
185                Command::Terminate { conn_id, tx } => {
186                    self.handle_terminate(conn_id).await;
187                    // Note: We purposefully do not use a ClientTransmitter here because we're already
188                    // terminating the provided session.
189                    if let Some(tx) = tx {
190                        let _ = tx.send(Ok(()));
191                    }
192                }
193
194                Command::Commit {
195                    action,
196                    session,
197                    tx,
198                } => {
199                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
200                    // We reach here not through a statement execution, but from the
201                    // "commit" pgwire command. Thus, we just generate a default statement
202                    // execution context (once statement logging is implemented, this will cause nothing to be logged
203                    // when the execution finishes.)
204                    let ctx = ExecuteContext::from_parts(
205                        tx,
206                        self.internal_cmd_tx.clone(),
207                        session,
208                        Default::default(),
209                    );
210                    let plan = match action {
211                        EndTransactionAction::Commit => {
212                            Plan::CommitTransaction(CommitTransactionPlan {
213                                transaction_type: TransactionType::Implicit,
214                            })
215                        }
216                        EndTransactionAction::Rollback => {
217                            Plan::AbortTransaction(AbortTransactionPlan {
218                                transaction_type: TransactionType::Implicit,
219                            })
220                        }
221                    };
222
223                    let conn_id = ctx.session().conn_id().clone();
224                    self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
225                    // Part of the Command::Commit contract is that the Coordinator guarantees that
226                    // it has cleared its transaction state for the connection.
227                    self.clear_connection(&conn_id).await;
228                }
229
230                Command::CatalogSnapshot { tx } => {
231                    let _ = tx.send(CatalogSnapshot {
232                        catalog: self.owned_catalog(),
233                    });
234                }
235
236                Command::CheckConsistency { tx } => {
237                    let _ = tx.send(self.check_consistency());
238                }
239
240                Command::Dump { tx } => {
241                    let _ = tx.send(self.dump().await);
242                }
243            }
244        }
245        .instrument(debug_span!("handle_command"))
246        .boxed_local()
247    }
248
249    #[mz_ore::instrument(level = "debug")]
250    async fn handle_authenticate_password(
251        &mut self,
252        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
253        role_name: String,
254        password: Option<Password>,
255    ) {
256        let Some(password) = password else {
257            // The user did not provide a password.
258            let _ = tx.send(Err(AdapterError::AuthenticationError));
259            return;
260        };
261
262        if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
263            if !role.attributes.login.unwrap_or(false) {
264                // The user is not allowed to login.
265                let _ = tx.send(Err(AdapterError::AuthenticationError));
266                return;
267            }
268            if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
269                if let Some(hash) = &auth.password_hash {
270                    let _ = match mz_auth::hash::scram256_verify(&password, hash) {
271                        Ok(_) => tx.send(Ok(AuthResponse {
272                            role_id: role.id,
273                            superuser: role.attributes.superuser.unwrap_or(false),
274                        })),
275                        Err(_) => tx.send(Err(AdapterError::AuthenticationError)),
276                    };
277                    return;
278                }
279            }
280            // Authentication failed due to incorrect password or missing password hash.
281            let _ = tx.send(Err(AdapterError::AuthenticationError));
282        } else {
283            // The user does not exist.
284            let _ = tx.send(Err(AdapterError::AuthenticationError));
285        }
286    }
287
288    #[mz_ore::instrument(level = "debug")]
289    async fn handle_startup(
290        &mut self,
291        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
292        user: User,
293        conn_id: ConnectionId,
294        secret_key: u32,
295        uuid: uuid::Uuid,
296        client_ip: Option<IpAddr>,
297        application_name: String,
298        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
299    ) {
300        // Early return if successful, otherwise cleanup any possible state.
301        match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
302            Ok((role_id, session_defaults)) => {
303                let session_type = metrics::session_type_label_value(&user);
304                self.metrics
305                    .active_sessions
306                    .with_label_values(&[session_type])
307                    .inc();
308                let conn = ConnMeta {
309                    secret_key,
310                    notice_tx,
311                    drop_sinks: BTreeSet::new(),
312                    pending_cluster_alters: BTreeSet::new(),
313                    connected_at: self.now(),
314                    user,
315                    application_name,
316                    uuid,
317                    client_ip,
318                    conn_id: conn_id.clone(),
319                    authenticated_role: role_id,
320                    deferred_lock: None,
321                };
322                let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
323                let update = self.catalog().state().resolve_builtin_table_update(update);
324                self.begin_session_for_statement_logging(&conn);
325                self.active_conns.insert(conn_id.clone(), conn);
326
327                // Note: Do NOT await the notify here, we pass this back to
328                // whatever requested the startup to prevent blocking startup
329                // and the Coordinator on a builtin table update.
330                let updates = vec![update];
331                // It's not a hard error if our list is missing a builtin table, but we want to
332                // make sure these two things stay in-sync.
333                if mz_ore::assert::soft_assertions_enabled() {
334                    let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
335                        .iter()
336                        .map(|table| self.catalog().resolve_builtin_table(*table))
337                        .collect();
338                    let updates_tracked = updates
339                        .iter()
340                        .all(|update| required_tables.contains(&update.id));
341                    let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
342                        .iter()
343                        .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
344                    mz_ore::soft_assert_or_log!(
345                        updates_tracked,
346                        "not tracking all required builtin table updates!"
347                    );
348                    // TODO(parkmycar): When checking if a query depends on these builtin table
349                    // writes we do not check the transitive dependencies of the query, because
350                    // we don't support creating views on mz_internal objects. If one of these
351                    // tables is promoted out of mz_internal then we'll need to add this check.
352                    mz_ore::soft_assert_or_log!(
353                        all_mz_internal,
354                        "not all builtin tables are in mz_internal! need to check transitive depends",
355                    )
356                }
357                let notify = self.builtin_table_update().background(updates);
358
359                let resp = Ok(StartupResponse {
360                    role_id,
361                    write_notify: notify,
362                    session_defaults,
363                    catalog: self.owned_catalog(),
364                });
365                if tx.send(resp).is_err() {
366                    // Failed to send to adapter, but everything is setup so we can terminate
367                    // normally.
368                    self.handle_terminate(conn_id).await;
369                }
370            }
371            Err(e) => {
372                // Error during startup or sending to adapter, cleanup possible state created by
373                // handle_startup_inner. A user may have been created and it can stay; no need to
374                // delete it.
375                self.catalog_mut()
376                    .drop_temporary_schema(&conn_id)
377                    .unwrap_or_terminate("unable to drop temporary schema");
378
379                // Communicate the error back to the client. No need to
380                // handle failures to send the error back; we've already
381                // cleaned up all necessary state.
382                let _ = tx.send(Err(e));
383            }
384        }
385    }
386
387    // Failible startup work that needs to be cleaned up on error.
388    async fn handle_startup_inner(
389        &mut self,
390        user: &User,
391        conn_id: &ConnectionId,
392        client_ip: &Option<IpAddr>,
393    ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
394        if self.catalog().try_get_role_by_name(&user.name).is_none() {
395            // If the user has made it to this point, that means they have been fully authenticated.
396            // This includes preventing any user, except a pre-defined set of system users, from
397            // connecting to an internal port. Therefore it's ok to always create a new role for the
398            // user.
399            let attributes = RoleAttributes::new();
400            let plan = CreateRolePlan {
401                name: user.name.to_string(),
402                attributes,
403            };
404            self.sequence_create_role_for_startup(plan).await?;
405        }
406        let role_id = self
407            .catalog()
408            .try_get_role_by_name(&user.name)
409            .expect("created above")
410            .id;
411
412        if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
413            return Err(AdapterError::UserSessionsDisallowed);
414        }
415
416        // Initialize the default session variables for this role.
417        let mut session_defaults = BTreeMap::new();
418        let system_config = self.catalog().state().system_config();
419
420        // Override the session with any system defaults.
421        session_defaults.extend(
422            system_config
423                .iter_session()
424                .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
425        );
426        // Special case.
427        let statement_logging_default = system_config
428            .statement_logging_default_sample_rate()
429            .format();
430        session_defaults.insert(
431            STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
432            OwnedVarInput::Flat(statement_logging_default),
433        );
434        // Override system defaults with role defaults.
435        session_defaults.extend(
436            self.catalog()
437                .get_role(&role_id)
438                .vars()
439                .map(|(name, val)| (name.to_string(), val.clone())),
440        );
441
442        // Validate network policies for external users. Internal users can only connect on the
443        // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
444        // to ensure these internal interfaces are well secured.
445        //
446        // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
447        // the default network policy for this role, so we read directly out of what the session
448        // will get initialized with.
449        if !user.is_internal() {
450            let network_policy_name = session_defaults
451                .get(NETWORK_POLICY.name())
452                .and_then(|value| match value {
453                    OwnedVarInput::Flat(name) => Some(name.clone()),
454                    OwnedVarInput::SqlSet(names) => {
455                        tracing::error!(?names, "found multiple network policies");
456                        None
457                    }
458                })
459                .unwrap_or_else(|| system_config.default_network_policy_name());
460            let maybe_network_policy = self
461                .catalog()
462                .get_network_policy_by_name(&network_policy_name);
463
464            let Some(network_policy) = maybe_network_policy else {
465                // We should prevent dropping the default network policy, or setting the policy
466                // to something that doesn't exist, so complain loudly if this occurs.
467                tracing::error!(
468                    network_policy_name,
469                    "default network policy does not exist. All user traffic will be blocked"
470                );
471                let reason = match client_ip {
472                    Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
473                    None => super::NetworkPolicyError::MissingIp,
474                };
475                return Err(AdapterError::NetworkPolicyDenied(reason));
476            };
477
478            if let Some(ip) = client_ip {
479                match validate_ip_with_policy_rules(ip, &network_policy.rules) {
480                    Ok(_) => {}
481                    Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
482                }
483            } else {
484                // Only temporary and internal representation of a session
485                // should be missing a client_ip. These sessions should not be
486                // making requests or going through handle_startup.
487                return Err(AdapterError::NetworkPolicyDenied(
488                    super::NetworkPolicyError::MissingIp,
489                ));
490            }
491        }
492
493        self.catalog_mut()
494            .create_temporary_schema(conn_id, role_id)?;
495
496        Ok((role_id, session_defaults))
497    }
498
499    /// Handles an execute command.
500    #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
501    pub(crate) async fn handle_execute(
502        &mut self,
503        portal_name: String,
504        mut session: Session,
505        tx: ClientTransmitter<ExecuteResponse>,
506        // If this command was part of another execute command
507        // (for example, executing a `FETCH` statement causes an execute to be
508        //  issued for the cursor it references),
509        // then `outer_context` should be `Some`.
510        // This instructs the coordinator that the
511        // outer execute should be considered finished once the inner one is.
512        outer_context: Option<ExecuteContextExtra>,
513    ) {
514        if session.vars().emit_trace_id_notice() {
515            let span_context = tracing::Span::current()
516                .context()
517                .span()
518                .span_context()
519                .clone();
520            if span_context.is_valid() {
521                session.add_notice(AdapterNotice::QueryTrace {
522                    trace_id: span_context.trace_id(),
523                });
524            }
525        }
526
527        if let Err(err) = self.verify_portal(&mut session, &portal_name) {
528            // If statement logging hasn't started yet, we don't need
529            // to add any "end" event, so just make up a no-op
530            // `ExecuteContextExtra` here, via `Default::default`.
531            //
532            // It's a bit unfortunate because the edge case of failed
533            // portal verifications won't show up in statement
534            // logging, but there seems to be nothing else we can do,
535            // because we need access to the portal to begin logging.
536            //
537            // Another option would be to log a begin and end event, but just fill in NULLs
538            // for everything we get from the portal (prepared statement id, params).
539            let extra = outer_context.unwrap_or_else(Default::default);
540            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
541            return ctx.retire(Err(err));
542        }
543
544        // The reference to `portal` can't outlive `session`, which we
545        // use to construct the context, so scope the reference to this block where we
546        // get everything we need from the portal for later.
547        let (stmt, ctx, params) = {
548            let portal = session
549                .get_portal_unverified(&portal_name)
550                .expect("known to exist");
551            let params = portal.parameters.clone();
552            let stmt = portal.stmt.clone();
553            let logging = Arc::clone(&portal.logging);
554            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
555
556            let extra = if let Some(extra) = outer_context {
557                // We are executing in the context of another SQL statement, so we don't
558                // want to begin statement logging anew. The context of the actual statement
559                // being executed is the one that should be retired once this finishes.
560                extra
561            } else {
562                // This is a new statement, log it and return the context
563                let maybe_uuid = self.begin_statement_execution(
564                    &mut session,
565                    &params,
566                    &logging,
567                    lifecycle_timestamps,
568                );
569
570                ExecuteContextExtra::new(maybe_uuid)
571            };
572            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
573            (stmt, ctx, params)
574        };
575
576        let stmt = match stmt {
577            Some(stmt) => stmt,
578            None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
579        };
580
581        let session_type = metrics::session_type_label_value(ctx.session().user());
582        let stmt_type = metrics::statement_type_label_value(&stmt);
583        self.metrics
584            .query_total
585            .with_label_values(&[session_type, stmt_type])
586            .inc();
587        match &*stmt {
588            Statement::Subscribe(SubscribeStatement { output, .. })
589            | Statement::Copy(CopyStatement {
590                relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
591                ..
592            }) => {
593                self.metrics
594                    .subscribe_outputs
595                    .with_label_values(&[
596                        session_type,
597                        metrics::subscribe_output_label_value(output),
598                    ])
599                    .inc();
600            }
601            _ => {}
602        }
603
604        self.handle_execute_inner(stmt, params, ctx).await
605    }
606
607    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
608    pub(crate) async fn handle_execute_inner(
609        &mut self,
610        stmt: Arc<Statement<Raw>>,
611        params: Params,
612        mut ctx: ExecuteContext,
613    ) {
614        // This comment describes the various ways DDL can execute (the ordered operations: name
615        // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
616        // three notable properties that all partially interact.
617        //
618        // 1. Most DDL statements (and a few others) support single-statement transaction delayed
619        //    execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
620        //    We announce success of the single DDL when it is executed, but do not attempt to plan
621        //    or sequence it until `COMMIT`, which is able to error if needed while sequencing the
622        //    DDL (this behavior is Postgres-compatible). The purpose of this is because some
623        //    drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
624        //    work. When the single DDL is announced as successful we also put the session's
625        //    transaction ops into `SingleStatement` which will produce an error if any other
626        //    statement is run in the transaction except `COMMIT`. Additionally, this will cause
627        //    `handle_execute_inner` to stop further processing (no planning, etc.) of the
628        //    statement.
629        // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
630        //    any number of only these DDL statements to be executed in a transaction. At sequencing
631        //    these generate the `Op::TransactionDryRun` catalog op. When applied with
632        //    `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
633        //    `catalog_transact_with_ddl_transaction` function intercepts that error and reports
634        //    success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
635        //    of the ops but without dry run are applied. The purpose of this is to allow multiple,
636        //    atomic renames in the same transaction.
637        // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
638        //    makes network calls (interfacing with secrets, optimization of views/indexes, source
639        //    purification). These must guarantee correctness when they return to the main
640        //    coordinator thread because the catalog state could have changed while they were doing
641        //    the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
642        //    of IDs that we needed to exist. We discovered the way we were doing that was not
643        //    always correct. Instead of attempting to get that completely right, we have opted to
644        //    serialize DDL. Getting this right is difficult because catalog changes can affect name
645        //    resolution, planning, sequencing, and optimization. Correctly writing logic that is
646        //    aware of all possible catalog changes that would affect any of those parts is not
647        //    something our current code has been designed to be helpful at. Even if a DDL statement
648        //    is doing off-thread work, another DDL must not yet execute at all. Executing these
649        //    serially will guarantee that no off-thread work has affected the state of the catalog.
650        //    This is done by adding a VecDeque of deferred statements and a lock to the
651        //    Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
652        //    transaction ops are needed to the session as described above), it attempts to own the
653        //    lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
654        //    struct in `active_conns` and proceeds. The lock is dropped at transaction end in
655        //    `clear_transaction` and a message sent to the Coordinator to execute the next queued
656        //    DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
657        //    awaits dequeuing caused by the lock being released.
658
659        // Verify that this statement type can be executed in the current
660        // transaction state.
661        match ctx.session().transaction() {
662            // By this point we should be in a running transaction.
663            TransactionStatus::Default => unreachable!(),
664
665            // Failed transactions have already been checked in pgwire for a safe statement
666            // (COMMIT, ROLLBACK, etc.) and can proceed.
667            TransactionStatus::Failed(_) => {}
668
669            // Started is a deceptive name, and means different things depending on which
670            // protocol was used. It's either exactly one statement (known because this
671            // is the simple protocol and the parser parsed the entire string, and it had
672            // one statement). Or from the extended protocol, it means *some* query is
673            // being executed, but there might be others after it before the Sync (commit)
674            // message. Postgres handles this by teaching Started to eagerly commit certain
675            // statements that can't be run in a transaction block.
676            TransactionStatus::Started(_) => {
677                if let Statement::Declare(_) = &*stmt {
678                    // Declare is an exception. Although it's not against any spec to execute
679                    // it, it will always result in nothing happening, since all portals will be
680                    // immediately closed. Users don't know this detail, so this error helps them
681                    // understand what's going wrong. Postgres does this too.
682                    return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
683                        "DECLARE CURSOR".into(),
684                    )));
685                }
686            }
687
688            // Implicit or explicit transactions.
689            //
690            // Implicit transactions happen when a multi-statement query is executed
691            // (a "simple query"). However if a "BEGIN" appears somewhere in there,
692            // then the existing implicit transaction will be upgraded to an explicit
693            // transaction. Thus, we should not separate what implicit and explicit
694            // transactions can do unless there's some additional checking to make sure
695            // something disallowed in explicit transactions did not previously take place
696            // in the implicit portion.
697            TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
698                match &*stmt {
699                    // Statements that are safe in a transaction. We still need to verify that we
700                    // don't interleave reads and writes since we can't perform those serializably.
701                    Statement::Close(_)
702                    | Statement::Commit(_)
703                    | Statement::Copy(_)
704                    | Statement::Deallocate(_)
705                    | Statement::Declare(_)
706                    | Statement::Discard(_)
707                    | Statement::Execute(_)
708                    | Statement::ExplainPlan(_)
709                    | Statement::ExplainPushdown(_)
710                    | Statement::ExplainAnalyze(_)
711                    | Statement::ExplainTimestamp(_)
712                    | Statement::ExplainSinkSchema(_)
713                    | Statement::Fetch(_)
714                    | Statement::Prepare(_)
715                    | Statement::Rollback(_)
716                    | Statement::Select(_)
717                    | Statement::SetTransaction(_)
718                    | Statement::Show(_)
719                    | Statement::SetVariable(_)
720                    | Statement::ResetVariable(_)
721                    | Statement::StartTransaction(_)
722                    | Statement::Subscribe(_)
723                    | Statement::Raise(_) => {
724                        // Always safe.
725                    }
726
727                    Statement::Insert(InsertStatement {
728                        source, returning, ..
729                    }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
730                        // Inserting from constant values statements that do not need to execute on
731                        // any cluster (no RETURNING) is always safe.
732                    }
733
734                    // These statements must be kept in-sync with `must_serialize_ddl()`.
735                    Statement::AlterObjectRename(_)
736                    | Statement::AlterObjectSwap(_)
737                    | Statement::CreateTableFromSource(_)
738                    | Statement::CreateSource(_) => {
739                        let state = self.catalog().for_session(ctx.session()).state().clone();
740                        let revision = self.catalog().transient_revision();
741
742                        // Initialize our transaction with a set of empty ops, or return an error
743                        // if we can't run a DDL transaction
744                        let txn_status = ctx.session_mut().transaction_mut();
745                        if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
746                            ops: vec![],
747                            state,
748                            revision,
749                            side_effects: vec![],
750                        }) {
751                            return ctx.retire(Err(err));
752                        }
753                    }
754
755                    // Statements below must by run singly (in Started).
756                    Statement::AlterCluster(_)
757                    | Statement::AlterConnection(_)
758                    | Statement::AlterDefaultPrivileges(_)
759                    | Statement::AlterIndex(_)
760                    | Statement::AlterSetCluster(_)
761                    | Statement::AlterOwner(_)
762                    | Statement::AlterRetainHistory(_)
763                    | Statement::AlterRole(_)
764                    | Statement::AlterSecret(_)
765                    | Statement::AlterSink(_)
766                    | Statement::AlterSource(_)
767                    | Statement::AlterSystemReset(_)
768                    | Statement::AlterSystemResetAll(_)
769                    | Statement::AlterSystemSet(_)
770                    | Statement::AlterTableAddColumn(_)
771                    | Statement::AlterNetworkPolicy(_)
772                    | Statement::CreateCluster(_)
773                    | Statement::CreateClusterReplica(_)
774                    | Statement::CreateConnection(_)
775                    | Statement::CreateDatabase(_)
776                    | Statement::CreateIndex(_)
777                    | Statement::CreateMaterializedView(_)
778                    | Statement::CreateContinualTask(_)
779                    | Statement::CreateRole(_)
780                    | Statement::CreateSchema(_)
781                    | Statement::CreateSecret(_)
782                    | Statement::CreateSink(_)
783                    | Statement::CreateSubsource(_)
784                    | Statement::CreateTable(_)
785                    | Statement::CreateType(_)
786                    | Statement::CreateView(_)
787                    | Statement::CreateWebhookSource(_)
788                    | Statement::CreateNetworkPolicy(_)
789                    | Statement::Delete(_)
790                    | Statement::DropObjects(_)
791                    | Statement::DropOwned(_)
792                    | Statement::GrantPrivileges(_)
793                    | Statement::GrantRole(_)
794                    | Statement::Insert(_)
795                    | Statement::ReassignOwned(_)
796                    | Statement::RevokePrivileges(_)
797                    | Statement::RevokeRole(_)
798                    | Statement::Update(_)
799                    | Statement::ValidateConnection(_)
800                    | Statement::Comment(_) => {
801                        let txn_status = ctx.session_mut().transaction_mut();
802
803                        // If we're not in an implicit transaction and we could generate exactly one
804                        // valid ExecuteResponse, we can delay execution until commit.
805                        if !txn_status.is_implicit() {
806                            // Statements whose tag is trivial (known only from an unexecuted statement) can
807                            // be run in a special single-statement explicit mode. In this mode (`BEGIN;
808                            // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
809                            // delay execution until `COMMIT`.
810                            if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
811                                if let Err(err) = txn_status
812                                    .add_ops(TransactionOps::SingleStatement { stmt, params })
813                                {
814                                    ctx.retire(Err(err));
815                                    return;
816                                }
817                                ctx.retire(Ok(resp));
818                                return;
819                            }
820                        }
821
822                        return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
823                            stmt.to_string(),
824                        )));
825                    }
826                }
827            }
828        }
829
830        // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
831        // various IDs because we have incorrectly done that in the past. Attempt to acquire the
832        // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
833        // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
834        // Coordinator::clear_transaction is correctly called when session transactions are ended
835        // because that function will release the held lock from active_conns.
836        if Self::must_serialize_ddl(&stmt, &ctx) {
837            if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
838                let prev = self
839                    .active_conns
840                    .get_mut(ctx.session().conn_id())
841                    .expect("connection must exist")
842                    .deferred_lock
843                    .replace(guard);
844                assert!(
845                    prev.is_none(),
846                    "connections should have at most one lock guard"
847                );
848            } else {
849                if self
850                    .active_conns
851                    .get(ctx.session().conn_id())
852                    .expect("connection must exist")
853                    .deferred_lock
854                    .is_some()
855                {
856                    // This session *already* has the lock, and incorrectly tried to execute another
857                    // DDL while still holding the lock, violating the assumption documented above.
858                    // This is an internal error, probably in some AdapterClient user (pgwire or
859                    // http). Because the session is now in some unexpected state, return an error
860                    // which should cause the AdapterClient user to fail the transaction.
861                    // (Terminating the connection is maybe what we would prefer to do, but is not
862                    // currently a thing we can do from the coordinator: calling handle_terminate
863                    // cleans up Coordinator state for the session but doesn't inform the
864                    // AdapterClient that the session should terminate.)
865                    soft_panic_or_log!(
866                        "session {} attempted to get ddl lock while already owning it",
867                        ctx.session().conn_id()
868                    );
869                    ctx.retire(Err(AdapterError::Internal(
870                        "session attempted to get ddl lock while already owning it".to_string(),
871                    )));
872                    return;
873                }
874                self.serialized_ddl.push_back(DeferredPlanStatement {
875                    ctx,
876                    ps: PlanStatement::Statement { stmt, params },
877                });
878                return;
879            }
880        }
881
882        let catalog = self.catalog();
883        let catalog = catalog.for_session(ctx.session());
884        let original_stmt = Arc::clone(&stmt);
885        // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
886        // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
887        let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
888            Ok(resolved) => resolved,
889            Err(e) => return ctx.retire(Err(e.into())),
890        };
891        // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
892        // purification.  This should be done back on the main thread.
893        // We do the validation:
894        //   - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
895        // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
896        // occurs.
897        let (stmt, resolved_ids) = match stmt {
898            // Various statements must be purified off the main coordinator thread of control.
899            stmt if Self::must_spawn_purification(&stmt) => {
900                let internal_cmd_tx = self.internal_cmd_tx.clone();
901                let conn_id = ctx.session().conn_id().clone();
902                let catalog = self.owned_catalog();
903                let now = self.now();
904                let otel_ctx = OpenTelemetryContext::obtain();
905                let current_storage_configuration = self.controller.storage.config().clone();
906                task::spawn(|| format!("purify:{conn_id}"), async move {
907                    let transient_revision = catalog.transient_revision();
908                    let catalog = catalog.for_session(ctx.session());
909
910                    // Checks if the session is authorized to purify a statement. Usually
911                    // authorization is checked after planning, however purification happens before
912                    // planning, which may require the use of some connections and secrets.
913                    if let Err(e) = rbac::check_usage(
914                        &catalog,
915                        ctx.session(),
916                        &resolved_ids,
917                        &CREATE_ITEM_USAGE,
918                    ) {
919                        return ctx.retire(Err(e.into()));
920                    }
921
922                    let (result, cluster_id) = mz_sql::pure::purify_statement(
923                        catalog,
924                        now,
925                        stmt,
926                        &current_storage_configuration,
927                    )
928                    .await;
929                    let result = result.map_err(|e| e.into());
930                    let dependency_ids = resolved_ids.items().copied().collect();
931                    let plan_validity = PlanValidity::new(
932                        transient_revision,
933                        dependency_ids,
934                        cluster_id,
935                        None,
936                        ctx.session().role_metadata().clone(),
937                    );
938                    // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
939                    let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
940                        PurifiedStatementReady {
941                            ctx,
942                            result,
943                            params,
944                            plan_validity,
945                            original_stmt,
946                            otel_ctx,
947                        },
948                    ));
949                    if let Err(e) = result {
950                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
951                    }
952                });
953                return;
954            }
955
956            // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
957            // automatically as part of purification
958            Statement::CreateSubsource(_) => {
959                ctx.retire(Err(AdapterError::Unsupported(
960                    "CREATE SUBSOURCE statements",
961                )));
962                return;
963            }
964
965            Statement::CreateMaterializedView(mut cmvs) => {
966                // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
967                // only used for storing initial frontiers in the catalog.
968                if cmvs.as_of.is_some() {
969                    return ctx.retire(Err(AdapterError::Unsupported(
970                        "CREATE MATERIALIZED VIEW ... AS OF statements",
971                    )));
972                }
973
974                let mz_now = match self
975                    .resolve_mz_now_for_create_materialized_view(
976                        &cmvs,
977                        &resolved_ids,
978                        ctx.session_mut(),
979                        true,
980                    )
981                    .await
982                {
983                    Ok(mz_now) => mz_now,
984                    Err(e) => return ctx.retire(Err(e)),
985                };
986
987                let owned_catalog = self.owned_catalog();
988                let catalog = owned_catalog.for_session(ctx.session());
989
990                purify_create_materialized_view_options(
991                    catalog,
992                    mz_now,
993                    &mut cmvs,
994                    &mut resolved_ids,
995                );
996
997                let purified_stmt =
998                    Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
999                        if_exists: cmvs.if_exists,
1000                        name: cmvs.name,
1001                        columns: cmvs.columns,
1002                        in_cluster: cmvs.in_cluster,
1003                        query: cmvs.query,
1004                        with_options: cmvs.with_options,
1005                        as_of: None,
1006                    });
1007
1008                // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1009                // `Message::PurifiedStatementReady` here.)
1010                (purified_stmt, resolved_ids)
1011            }
1012
1013            Statement::ExplainPlan(ExplainPlanStatement {
1014                stage,
1015                with_options,
1016                format,
1017                explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1018            }) => {
1019                let mut cmvs = *box_cmvs;
1020                let mz_now = match self
1021                    .resolve_mz_now_for_create_materialized_view(
1022                        &cmvs,
1023                        &resolved_ids,
1024                        ctx.session_mut(),
1025                        false,
1026                    )
1027                    .await
1028                {
1029                    Ok(mz_now) => mz_now,
1030                    Err(e) => return ctx.retire(Err(e)),
1031                };
1032
1033                let owned_catalog = self.owned_catalog();
1034                let catalog = owned_catalog.for_session(ctx.session());
1035
1036                purify_create_materialized_view_options(
1037                    catalog,
1038                    mz_now,
1039                    &mut cmvs,
1040                    &mut resolved_ids,
1041                );
1042
1043                let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1044                    stage,
1045                    with_options,
1046                    format,
1047                    explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1048                });
1049
1050                (purified_stmt, resolved_ids)
1051            }
1052
1053            // All other statements are handled immediately.
1054            _ => (stmt, resolved_ids),
1055        };
1056
1057        match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
1058            Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1059            Err(e) => ctx.retire(Err(e)),
1060        }
1061    }
1062
1063    /// Whether the statement must be serialized and is DDL.
1064    fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1065        // Non-DDL is not serialized here.
1066        if !StatementClassification::from(&*stmt).is_ddl() {
1067            return false;
1068        }
1069        // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1070        // not be serialized. These all use PlanValidity for their checking, and we must ensure
1071        // those checks are sufficient.
1072        if Self::must_spawn_purification(stmt) {
1073            return false;
1074        }
1075
1076        // Statements that support multiple DDLs in a single transaction aren't serialized here.
1077        // Their operations are serialized when applied to the catalog, guaranteeing that any
1078        // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1079        if ctx.session.transaction().is_ddl() {
1080            return false;
1081        }
1082
1083        // Some DDL is exempt. It is not great that we are matching on Statements here because
1084        // different plans can be produced from the same top-level statement type (i.e., `ALTER
1085        // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1086        // planned in the first place, so we accept the abstraction leak.
1087        match stmt {
1088            // Secrets have a small and understood set of dependencies, and their off-thread work
1089            // interacts with k8s.
1090            Statement::AlterSecret(_) => false,
1091            Statement::CreateSecret(_) => false,
1092            Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1093                if actions
1094                    .iter()
1095                    .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1096            {
1097                false
1098            }
1099
1100            // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1101            // does not affect its catalog names or ids and so is safe to not serialize. This could
1102            // change the set of replicas that exist. For queries that name replicas or use the
1103            // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1104            // ensure that those replicas exist during the query finish stage. Additionally, that
1105            // work can take hours (configured by the user), so would also be a bad experience for
1106            // users.
1107            Statement::AlterCluster(_) => false,
1108
1109            // Everything else must be serialized.
1110            _ => true,
1111        }
1112    }
1113
1114    /// Whether the statement must be purified off of the Coordinator thread.
1115    fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1116        // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1117        // coordinator thread.
1118        if !matches!(
1119            stmt,
1120            Statement::CreateSource(_)
1121                | Statement::AlterSource(_)
1122                | Statement::CreateSink(_)
1123                | Statement::CreateTableFromSource(_)
1124        ) {
1125            return false;
1126        }
1127
1128        // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1129        if let Statement::AlterSource(stmt) = stmt {
1130            let names: Vec<CreateSourceOptionName> = match &stmt.action {
1131                AlterSourceAction::SetOptions(options) => {
1132                    options.iter().map(|o| o.name.clone()).collect()
1133                }
1134                AlterSourceAction::ResetOptions(names) => names.clone(),
1135                _ => vec![],
1136            };
1137            if !names.is_empty()
1138                && names
1139                    .iter()
1140                    .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1141            {
1142                return false;
1143            }
1144        }
1145
1146        true
1147    }
1148
1149    /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1150    /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1151    /// option, this function grabs read holds at the earliest possible time on input collections
1152    /// that might be involved in the MV.
1153    ///
1154    /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1155    /// in `with_options`).
1156    ///
1157    /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1158    /// unfortunately.)
1159    async fn resolve_mz_now_for_create_materialized_view(
1160        &mut self,
1161        cmvs: &CreateMaterializedViewStatement<Aug>,
1162        resolved_ids: &ResolvedIds,
1163        session: &Session,
1164        acquire_read_holds: bool,
1165    ) -> Result<Option<Timestamp>, AdapterError> {
1166        if cmvs
1167            .with_options
1168            .iter()
1169            .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1170        {
1171            let catalog = self.catalog().for_session(session);
1172            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1173            let ids = self
1174                .index_oracle(cluster)
1175                .sufficient_collections(resolved_ids.collections().copied());
1176
1177            // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1178            // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1179            // we want to check the AT times against the read holds that we acquire here. But
1180            // we do it for any REFRESH option, to avoid having so many code paths doing different
1181            // things.)
1182            //
1183            // It's important that we acquire read holds _before_ we determine the least valid read.
1184            // Otherwise, we're not guaranteed that the since frontier doesn't
1185            // advance forward from underneath us.
1186            let read_holds = self.acquire_read_holds(&ids);
1187
1188            // Does `mz_now()` occur?
1189            let mz_now_ts = if cmvs
1190                .with_options
1191                .iter()
1192                .any(materialized_view_option_contains_temporal)
1193            {
1194                let timeline_context = self
1195                    .catalog()
1196                    .validate_timeline_context(resolved_ids.collections().copied())?;
1197
1198                // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1199                // but even in the TimestampIndependent case.
1200                // Note that we didn't accurately decide whether we are TimestampDependent
1201                // or TimestampIndependent, because for this we'd need to also check whether
1202                // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1203                // However, this doesn't matter here, as we are just going to default to
1204                // EpochMilliseconds in both cases.
1205                let timeline = timeline_context
1206                    .timeline()
1207                    .unwrap_or(&Timeline::EpochMilliseconds);
1208
1209                // Let's start with the timestamp oracle read timestamp.
1210                let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1211
1212                // If `least_valid_read` is later than the oracle, then advance to that time.
1213                // If we didn't do this, then there would be a danger of missing the first refresh,
1214                // which might cause the materialized view to be unreadable for hours. This might
1215                // be what was happening here:
1216                // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1217                //
1218                // In the long term, it would be good to actually block the MV creation statement
1219                // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1220                // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1221                // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1222                // after its creation might see input changes that happened after the CRATE MATERIALIZED
1223                // VIEW statement returned.
1224                let oracle_timestamp = timestamp;
1225                let least_valid_read = read_holds.least_valid_read();
1226                timestamp.advance_by(least_valid_read.borrow());
1227
1228                if oracle_timestamp != timestamp {
1229                    warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1230                }
1231
1232                info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1233                Ok(Some(timestamp))
1234            } else {
1235                Ok(None)
1236            };
1237
1238            // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1239            // released when we don't use it.
1240            if acquire_read_holds {
1241                self.store_transaction_read_holds(session, read_holds);
1242            }
1243
1244            mz_now_ts
1245        } else {
1246            Ok(None)
1247        }
1248    }
1249
1250    /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1251    /// the named `conn_id` if the correct secret key is specified.
1252    ///
1253    /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1254    /// `ConnectionId` because this method gets called by external clients when
1255    /// they request to cancel a request.
1256    #[mz_ore::instrument(level = "debug")]
1257    async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1258        if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1259            // If the secret key specified by the client doesn't match the
1260            // actual secret key for the target connection, we treat this as a
1261            // rogue cancellation request and ignore it.
1262            if conn_meta.secret_key != secret_key {
1263                return;
1264            }
1265
1266            // Now that we've verified the secret key, this is a privileged
1267            // cancellation request. We can upgrade the raw connection ID to a
1268            // proper `IdHandle`.
1269            self.handle_privileged_cancel(id_handle.clone()).await;
1270        }
1271    }
1272
1273    /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1274    /// interactive work for the named `conn_id`.
1275    #[mz_ore::instrument(level = "debug")]
1276    pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1277        let mut maybe_ctx = None;
1278
1279        // Cancel pending writes. There is at most one pending write per session.
1280        if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1281            matches!(pending_write_txn, PendingWriteTxn::User {
1282                pending_txn: PendingTxn { ctx, .. },
1283                ..
1284            } if *ctx.session().conn_id() == conn_id)
1285        }) {
1286            if let PendingWriteTxn::User {
1287                pending_txn: PendingTxn { ctx, .. },
1288                ..
1289            } = self.pending_writes.remove(idx)
1290            {
1291                maybe_ctx = Some(ctx);
1292            }
1293        }
1294
1295        // Cancel deferred writes.
1296        if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1297            maybe_ctx = Some(write_op.into_ctx());
1298        }
1299
1300        // Cancel deferred statements.
1301        if let Some(idx) = self
1302            .serialized_ddl
1303            .iter()
1304            .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1305        {
1306            let deferred = self
1307                .serialized_ddl
1308                .remove(idx)
1309                .expect("known to exist from call to `position` above");
1310            maybe_ctx = Some(deferred.ctx);
1311        }
1312
1313        // Cancel reads waiting on being linearized. There is at most one linearized read per
1314        // session.
1315        if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1316            let ctx = pending_read_txn.take_context();
1317            maybe_ctx = Some(ctx);
1318        }
1319
1320        if let Some(ctx) = maybe_ctx {
1321            ctx.retire(Err(AdapterError::Canceled));
1322        }
1323
1324        self.cancel_pending_peeks(&conn_id);
1325        self.cancel_pending_watchsets(&conn_id);
1326        self.cancel_compute_sinks_for_conn(&conn_id).await;
1327        self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1328            .await;
1329        self.cancel_pending_copy(&conn_id);
1330        if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1331            let _ = tx.send(true);
1332        }
1333    }
1334
1335    /// Handle termination of a client session.
1336    ///
1337    /// This cleans up any state in the coordinator associated with the session.
1338    #[mz_ore::instrument(level = "debug")]
1339    async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1340        if !self.active_conns.contains_key(&conn_id) {
1341            // If the session doesn't exist in `active_conns`, then this method will panic later on.
1342            // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1343            // debug. This panic is very infrequent so we want as much information as possible.
1344            // See https://github.com/MaterializeInc/database-issues/issues/5627.
1345            panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1346        }
1347
1348        // We do not need to call clear_transaction here because there are no side effects to run
1349        // based on any session transaction state.
1350        self.clear_connection(&conn_id).await;
1351
1352        self.drop_temp_items(&conn_id).await;
1353        self.catalog_mut()
1354            .drop_temporary_schema(&conn_id)
1355            .unwrap_or_terminate("unable to drop temporary schema");
1356        let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1357        let session_type = metrics::session_type_label_value(conn.user());
1358        self.metrics
1359            .active_sessions
1360            .with_label_values(&[session_type])
1361            .dec();
1362        self.cancel_pending_peeks(conn.conn_id());
1363        self.cancel_pending_watchsets(&conn_id);
1364        self.cancel_pending_copy(&conn_id);
1365        self.end_session_for_statement_logging(conn.uuid());
1366
1367        // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1368        // this to prevent blocking the Coordinator in the case that a lot of connections are
1369        // closed at once, which occurs regularly in some workflows.
1370        let update = self
1371            .catalog()
1372            .state()
1373            .pack_session_update(&conn, Diff::MINUS_ONE);
1374        let update = self.catalog().state().resolve_builtin_table_update(update);
1375
1376        let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1377    }
1378
1379    /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1380    /// rows.
1381    #[mz_ore::instrument(level = "debug")]
1382    fn handle_get_webhook(
1383        &mut self,
1384        database: String,
1385        schema: String,
1386        name: String,
1387        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1388    ) {
1389        /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1390        ///
1391        /// Returns a struct that can be used to append data to the underlying storate collection, and the
1392        /// types we should cast the request to.
1393        fn resolve(
1394            coord: &mut Coordinator,
1395            database: String,
1396            schema: String,
1397            name: String,
1398        ) -> Result<AppendWebhookResponse, PartialItemName> {
1399            // Resolve our collection.
1400            let name = PartialItemName {
1401                database: Some(database),
1402                schema: Some(schema),
1403                item: name,
1404            };
1405            let Ok(entry) = coord
1406                .catalog()
1407                .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1408            else {
1409                return Err(name);
1410            };
1411
1412            // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1413            let (data_source, desc, global_id) = match entry.item() {
1414                CatalogItem::Source(Source {
1415                    data_source: data_source @ DataSourceDesc::Webhook { .. },
1416                    desc,
1417                    global_id,
1418                    ..
1419                }) => (data_source, desc.clone(), *global_id),
1420                CatalogItem::Table(
1421                    table @ Table {
1422                        desc,
1423                        data_source:
1424                            TableDataSource::DataSource {
1425                                desc: data_source @ DataSourceDesc::Webhook { .. },
1426                                ..
1427                            },
1428                        ..
1429                    },
1430                ) => (data_source, desc.latest(), table.global_id_writes()),
1431                _ => return Err(name),
1432            };
1433
1434            let DataSourceDesc::Webhook {
1435                validate_using,
1436                body_format,
1437                headers,
1438                ..
1439            } = data_source
1440            else {
1441                mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1442                return Err(name);
1443            };
1444            let body_format = body_format.clone();
1445            let header_tys = headers.clone();
1446
1447            // Assert we have one column for the body, and how ever many are required for
1448            // the headers.
1449            let num_columns = headers.num_columns() + 1;
1450            mz_ore::soft_assert_or_log!(
1451                desc.arity() <= num_columns,
1452                "expected at most {} columns, but got {}",
1453                num_columns,
1454                desc.arity()
1455            );
1456
1457            // Double check that the body column of the webhook source matches the type
1458            // we're about to deserialize as.
1459            let body_column = desc
1460                .get_by_name(&"body".into())
1461                .map(|(_idx, ty)| ty.clone())
1462                .ok_or_else(|| name.clone())?;
1463            assert!(!body_column.nullable, "webhook body column is nullable!?");
1464            assert_eq!(body_column.scalar_type, ScalarType::from(body_format));
1465
1466            // Create a validator that can be called to validate a webhook request.
1467            let validator = validate_using.as_ref().map(|v| {
1468                let validation = v.clone();
1469                AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1470            });
1471
1472            // Get a channel so we can queue updates to be written.
1473            let row_tx = coord
1474                .controller
1475                .storage
1476                .monotonic_appender(global_id)
1477                .map_err(|_| name.clone())?;
1478            let stats = coord
1479                .controller
1480                .storage
1481                .webhook_statistics(global_id)
1482                .map_err(|_| name)?;
1483            let invalidator = coord
1484                .active_webhooks
1485                .entry(entry.id())
1486                .or_insert_with(WebhookAppenderInvalidator::new);
1487            let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1488
1489            Ok(AppendWebhookResponse {
1490                tx,
1491                body_format,
1492                header_tys,
1493                validator,
1494            })
1495        }
1496
1497        let response = resolve(self, database, schema, name).map_err(|name| {
1498            AppendWebhookError::UnknownWebhook {
1499                database: name.database.expect("provided"),
1500                schema: name.schema.expect("provided"),
1501                name: name.item,
1502            }
1503        });
1504        let _ = tx.send(response);
1505    }
1506}