mz_adapter/coord/
command_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for  processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34    AlterConnectionAction, AlterConnectionStatement, AlterSourceAction, AstInfo, ConstantVisitor,
35    CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, SubscribeStatement,
36};
37use mz_sql::catalog::RoleAttributesRaw;
38use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
39use mz_sql::plan::{
40    AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
41    StatementClassification, TransactionType,
42};
43use mz_sql::pure::{
44    materialized_view_option_contains_temporal, purify_create_materialized_view_options,
45};
46use mz_sql::rbac;
47use mz_sql::rbac::CREATE_ITEM_USAGE;
48use mz_sql::session::user::User;
49use mz_sql::session::vars::{
50    EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
51};
52use mz_sql_parser::ast::display::AstDisplay;
53use mz_sql_parser::ast::{
54    CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
55    WithOptionValue,
56};
57use mz_storage_types::sources::Timeline;
58use opentelemetry::trace::TraceContextExt;
59use tokio::sync::{mpsc, oneshot};
60use tracing::{Instrument, debug_span, info, warn};
61use tracing_opentelemetry::OpenTelemetrySpanExt;
62
63use crate::command::{
64    AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse,
65    SASLVerifyProofResponse, StartupResponse,
66};
67use crate::coord::appends::PendingWriteTxn;
68use crate::coord::{
69    ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
70    PurifiedStatementReady, validate_ip_with_policy_rules,
71};
72use crate::error::{AdapterError, AuthenticationError};
73use crate::notice::AdapterNotice;
74use crate::session::{Session, TransactionOps, TransactionStatus};
75use crate::util::{ClientTransmitter, ResultExt};
76use crate::webhook::{
77    AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
78};
79use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
80
81use super::ExecuteContextExtra;
82
83impl Coordinator {
84    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
85    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
86    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
87    pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
88        async move {
89            if let Some(session) = cmd.session_mut() {
90                session.apply_external_metadata_updates();
91            }
92            match cmd {
93                Command::Startup {
94                    tx,
95                    user,
96                    conn_id,
97                    secret_key,
98                    uuid,
99                    client_ip,
100                    application_name,
101                    notice_tx,
102                } => {
103                    // Note: We purposefully do not use a ClientTransmitter here because startup
104                    // handles errors and cleanup of sessions itself.
105                    self.handle_startup(
106                        tx,
107                        user,
108                        conn_id,
109                        secret_key,
110                        uuid,
111                        client_ip,
112                        application_name,
113                        notice_tx,
114                    )
115                    .await;
116                }
117
118                Command::AuthenticatePassword {
119                    tx,
120                    role_name,
121                    password,
122                } => {
123                    self.handle_authenticate_password(tx, role_name, password)
124                        .await;
125                }
126
127                Command::AuthenticateGetSASLChallenge {
128                    tx,
129                    role_name,
130                    nonce,
131                } => {
132                    self.handle_generate_sasl_challenge(tx, role_name, nonce)
133                        .await;
134                }
135
136                Command::AuthenticateVerifySASLProof {
137                    tx,
138                    role_name,
139                    proof,
140                    mock_hash,
141                    auth_message,
142                } => {
143                    self.handle_authenticate_verify_sasl_proof(
144                        tx,
145                        role_name,
146                        proof,
147                        auth_message,
148                        mock_hash,
149                    );
150                }
151
152                Command::Execute {
153                    portal_name,
154                    session,
155                    tx,
156                    outer_ctx_extra,
157                } => {
158                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
159
160                    self.handle_execute(portal_name, session, tx, outer_ctx_extra)
161                        .await;
162                }
163
164                Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
165
166                Command::CancelRequest {
167                    conn_id,
168                    secret_key,
169                } => {
170                    self.handle_cancel(conn_id, secret_key).await;
171                }
172
173                Command::PrivilegedCancelRequest { conn_id } => {
174                    self.handle_privileged_cancel(conn_id).await;
175                }
176
177                Command::GetWebhook {
178                    database,
179                    schema,
180                    name,
181                    tx,
182                } => {
183                    self.handle_get_webhook(database, schema, name, tx);
184                }
185
186                Command::GetSystemVars { tx } => {
187                    let _ = tx.send(self.catalog.system_config().clone());
188                }
189
190                Command::SetSystemVars { vars, conn_id, tx } => {
191                    let mut ops = Vec::with_capacity(vars.len());
192                    let conn = &self.active_conns[&conn_id];
193
194                    for (name, value) in vars {
195                        if let Err(e) =
196                            self.catalog().system_config().get(&name).and_then(|var| {
197                                var.visible(conn.user(), self.catalog.system_config())
198                            })
199                        {
200                            let _ = tx.send(Err(e.into()));
201                            return;
202                        }
203
204                        ops.push(catalog::Op::UpdateSystemConfiguration {
205                            name,
206                            value: OwnedVarInput::Flat(value),
207                        });
208                    }
209
210                    let result = self.catalog_transact_conn(Some(&conn_id), ops).await;
211                    let _ = tx.send(result);
212                }
213
214                Command::Terminate { conn_id, tx } => {
215                    self.handle_terminate(conn_id).await;
216                    // Note: We purposefully do not use a ClientTransmitter here because we're already
217                    // terminating the provided session.
218                    if let Some(tx) = tx {
219                        let _ = tx.send(Ok(()));
220                    }
221                }
222
223                Command::Commit {
224                    action,
225                    session,
226                    tx,
227                } => {
228                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
229                    // We reach here not through a statement execution, but from the
230                    // "commit" pgwire command. Thus, we just generate a default statement
231                    // execution context (once statement logging is implemented, this will cause nothing to be logged
232                    // when the execution finishes.)
233                    let ctx = ExecuteContext::from_parts(
234                        tx,
235                        self.internal_cmd_tx.clone(),
236                        session,
237                        Default::default(),
238                    );
239                    let plan = match action {
240                        EndTransactionAction::Commit => {
241                            Plan::CommitTransaction(CommitTransactionPlan {
242                                transaction_type: TransactionType::Implicit,
243                            })
244                        }
245                        EndTransactionAction::Rollback => {
246                            Plan::AbortTransaction(AbortTransactionPlan {
247                                transaction_type: TransactionType::Implicit,
248                            })
249                        }
250                    };
251
252                    let conn_id = ctx.session().conn_id().clone();
253                    self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
254                    // Part of the Command::Commit contract is that the Coordinator guarantees that
255                    // it has cleared its transaction state for the connection.
256                    self.clear_connection(&conn_id).await;
257                }
258
259                Command::CatalogSnapshot { tx } => {
260                    let _ = tx.send(CatalogSnapshot {
261                        catalog: self.owned_catalog(),
262                    });
263                }
264
265                Command::CheckConsistency { tx } => {
266                    let _ = tx.send(self.check_consistency());
267                }
268
269                Command::Dump { tx } => {
270                    let _ = tx.send(self.dump().await);
271                }
272            }
273        }
274        .instrument(debug_span!("handle_command"))
275        .boxed_local()
276    }
277
278    fn handle_authenticate_verify_sasl_proof(
279        &self,
280        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
281        role_name: String,
282        proof: String,
283        auth_message: String,
284        mock_hash: String,
285    ) {
286        let role = self.catalog().try_get_role_by_name(role_name.as_str());
287        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
288
289        let login = role
290            .as_ref()
291            .map(|r| r.attributes.login.unwrap_or(false))
292            .unwrap_or(false);
293
294        let real_hash = role_auth
295            .as_ref()
296            .and_then(|auth| auth.password_hash.as_ref());
297        let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
298
299        let role_present = role.is_some();
300        let make_auth_err = |role_present: bool, login: bool| {
301            AdapterError::AuthenticationError(if role_present && !login {
302                AuthenticationError::NonLogin
303            } else {
304                AuthenticationError::InvalidCredentials
305            })
306        };
307
308        match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
309            Ok(verifier) => {
310                // Success only if role exists, allows login, and a real password hash was used.
311                if login && real_hash.is_some() {
312                    let role = role.expect("login implies role exists");
313                    let _ = tx.send(Ok(SASLVerifyProofResponse {
314                        verifier,
315                        auth_resp: AuthResponse {
316                            role_id: role.id,
317                            superuser: role.attributes.superuser.unwrap_or(false),
318                        },
319                    }));
320                } else {
321                    let _ = tx.send(Err(make_auth_err(role_present, login)));
322                }
323            }
324            Err(_) => {
325                let _ = tx.send(Err(AdapterError::AuthenticationError(
326                    AuthenticationError::InvalidCredentials,
327                )));
328            }
329        }
330    }
331
332    #[mz_ore::instrument(level = "debug")]
333    async fn handle_generate_sasl_challenge(
334        &mut self,
335        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
336        role_name: String,
337        client_nonce: String,
338    ) {
339        let role_auth = self
340            .catalog()
341            .try_get_role_by_name(&role_name)
342            .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
343
344        let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
345            Ok(n) => n,
346            Err(e) => {
347                let msg = format!(
348                    "failed to generate nonce for client nonce {}: {}",
349                    client_nonce, e
350                );
351                let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
352                soft_panic_or_log!("{msg}");
353                return;
354            }
355        };
356
357        // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
358        // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
359        // with the role name to get a per-role mock nonce.
360        let send_mock_challenge =
361            |role_name: String,
362             mock_nonce: String,
363             nonce: String,
364             tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
365                let opts = mz_auth::hash::mock_sasl_challenge(&role_name, &mock_nonce);
366                let _ = tx.send(Ok(SASLChallengeResponse {
367                    iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
368                    salt: BASE64_STANDARD.encode(opts.salt),
369                    nonce,
370                }));
371            };
372
373        match role_auth {
374            Some(auth) if auth.password_hash.is_some() => {
375                let hash = auth.password_hash.as_ref().expect("checked above");
376                match mz_auth::hash::scram256_parse_opts(hash) {
377                    Ok(opts) => {
378                        let _ = tx.send(Ok(SASLChallengeResponse {
379                            iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
380                            salt: BASE64_STANDARD.encode(opts.salt),
381                            nonce,
382                        }));
383                    }
384                    Err(_) => {
385                        send_mock_challenge(
386                            role_name,
387                            self.catalog().state().mock_authentication_nonce(),
388                            nonce,
389                            tx,
390                        );
391                    }
392                }
393            }
394            _ => {
395                send_mock_challenge(
396                    role_name,
397                    self.catalog().state().mock_authentication_nonce(),
398                    nonce,
399                    tx,
400                );
401            }
402        }
403    }
404
405    #[mz_ore::instrument(level = "debug")]
406    async fn handle_authenticate_password(
407        &mut self,
408        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
409        role_name: String,
410        password: Option<Password>,
411    ) {
412        let Some(password) = password else {
413            // The user did not provide a password.
414            let _ = tx.send(Err(AdapterError::AuthenticationError(
415                AuthenticationError::PasswordRequired,
416            )));
417            return;
418        };
419
420        if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
421            if !role.attributes.login.unwrap_or(false) {
422                // The user is not allowed to login.
423                let _ = tx.send(Err(AdapterError::AuthenticationError(
424                    AuthenticationError::NonLogin,
425                )));
426                return;
427            }
428            if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
429                if let Some(hash) = &auth.password_hash {
430                    let _ = match mz_auth::hash::scram256_verify(&password, hash) {
431                        Ok(_) => tx.send(Ok(AuthResponse {
432                            role_id: role.id,
433                            superuser: role.attributes.superuser.unwrap_or(false),
434                        })),
435                        Err(_) => tx.send(Err(AdapterError::AuthenticationError(
436                            AuthenticationError::InvalidCredentials,
437                        ))),
438                    };
439                    return;
440                }
441            }
442            // Authentication failed due to incorrect password or missing password hash.
443            let _ = tx.send(Err(AdapterError::AuthenticationError(
444                AuthenticationError::InvalidCredentials,
445            )));
446        } else {
447            // The user does not exist.
448            let _ = tx.send(Err(AdapterError::AuthenticationError(
449                AuthenticationError::RoleNotFound,
450            )));
451        }
452    }
453
454    #[mz_ore::instrument(level = "debug")]
455    async fn handle_startup(
456        &mut self,
457        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
458        user: User,
459        conn_id: ConnectionId,
460        secret_key: u32,
461        uuid: uuid::Uuid,
462        client_ip: Option<IpAddr>,
463        application_name: String,
464        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
465    ) {
466        // Early return if successful, otherwise cleanup any possible state.
467        match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
468            Ok((role_id, session_defaults)) => {
469                let session_type = metrics::session_type_label_value(&user);
470                self.metrics
471                    .active_sessions
472                    .with_label_values(&[session_type])
473                    .inc();
474                let conn = ConnMeta {
475                    secret_key,
476                    notice_tx,
477                    drop_sinks: BTreeSet::new(),
478                    pending_cluster_alters: BTreeSet::new(),
479                    connected_at: self.now(),
480                    user,
481                    application_name,
482                    uuid,
483                    client_ip,
484                    conn_id: conn_id.clone(),
485                    authenticated_role: role_id,
486                    deferred_lock: None,
487                };
488                let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
489                let update = self.catalog().state().resolve_builtin_table_update(update);
490                self.begin_session_for_statement_logging(&conn);
491                self.active_conns.insert(conn_id.clone(), conn);
492
493                // Note: Do NOT await the notify here, we pass this back to
494                // whatever requested the startup to prevent blocking startup
495                // and the Coordinator on a builtin table update.
496                let updates = vec![update];
497                // It's not a hard error if our list is missing a builtin table, but we want to
498                // make sure these two things stay in-sync.
499                if mz_ore::assert::soft_assertions_enabled() {
500                    let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
501                        .iter()
502                        .map(|table| self.catalog().resolve_builtin_table(*table))
503                        .collect();
504                    let updates_tracked = updates
505                        .iter()
506                        .all(|update| required_tables.contains(&update.id));
507                    let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
508                        .iter()
509                        .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
510                    mz_ore::soft_assert_or_log!(
511                        updates_tracked,
512                        "not tracking all required builtin table updates!"
513                    );
514                    // TODO(parkmycar): When checking if a query depends on these builtin table
515                    // writes we do not check the transitive dependencies of the query, because
516                    // we don't support creating views on mz_internal objects. If one of these
517                    // tables is promoted out of mz_internal then we'll need to add this check.
518                    mz_ore::soft_assert_or_log!(
519                        all_mz_internal,
520                        "not all builtin tables are in mz_internal! need to check transitive depends",
521                    )
522                }
523                let notify = self.builtin_table_update().background(updates);
524
525                let resp = Ok(StartupResponse {
526                    role_id,
527                    write_notify: notify,
528                    session_defaults,
529                    catalog: self.owned_catalog(),
530                });
531                if tx.send(resp).is_err() {
532                    // Failed to send to adapter, but everything is setup so we can terminate
533                    // normally.
534                    self.handle_terminate(conn_id).await;
535                }
536            }
537            Err(e) => {
538                // Error during startup or sending to adapter, cleanup possible state created by
539                // handle_startup_inner. A user may have been created and it can stay; no need to
540                // delete it.
541                self.catalog_mut()
542                    .drop_temporary_schema(&conn_id)
543                    .unwrap_or_terminate("unable to drop temporary schema");
544
545                // Communicate the error back to the client. No need to
546                // handle failures to send the error back; we've already
547                // cleaned up all necessary state.
548                let _ = tx.send(Err(e));
549            }
550        }
551    }
552
553    // Failible startup work that needs to be cleaned up on error.
554    async fn handle_startup_inner(
555        &mut self,
556        user: &User,
557        conn_id: &ConnectionId,
558        client_ip: &Option<IpAddr>,
559    ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
560        if self.catalog().try_get_role_by_name(&user.name).is_none() {
561            // If the user has made it to this point, that means they have been fully authenticated.
562            // This includes preventing any user, except a pre-defined set of system users, from
563            // connecting to an internal port. Therefore it's ok to always create a new role for the
564            // user.
565            let attributes = RoleAttributesRaw::new();
566            let plan = CreateRolePlan {
567                name: user.name.to_string(),
568                attributes,
569            };
570            self.sequence_create_role_for_startup(plan).await?;
571        }
572        let role_id = self
573            .catalog()
574            .try_get_role_by_name(&user.name)
575            .expect("created above")
576            .id;
577
578        if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
579            return Err(AdapterError::UserSessionsDisallowed);
580        }
581
582        // Initialize the default session variables for this role.
583        let mut session_defaults = BTreeMap::new();
584        let system_config = self.catalog().state().system_config();
585
586        // Override the session with any system defaults.
587        session_defaults.extend(
588            system_config
589                .iter_session()
590                .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
591        );
592        // Special case.
593        let statement_logging_default = system_config
594            .statement_logging_default_sample_rate()
595            .format();
596        session_defaults.insert(
597            STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
598            OwnedVarInput::Flat(statement_logging_default),
599        );
600        // Override system defaults with role defaults.
601        session_defaults.extend(
602            self.catalog()
603                .get_role(&role_id)
604                .vars()
605                .map(|(name, val)| (name.to_string(), val.clone())),
606        );
607
608        // Validate network policies for external users. Internal users can only connect on the
609        // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
610        // to ensure these internal interfaces are well secured.
611        //
612        // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
613        // the default network policy for this role, so we read directly out of what the session
614        // will get initialized with.
615        if !user.is_internal() {
616            let network_policy_name = session_defaults
617                .get(NETWORK_POLICY.name())
618                .and_then(|value| match value {
619                    OwnedVarInput::Flat(name) => Some(name.clone()),
620                    OwnedVarInput::SqlSet(names) => {
621                        tracing::error!(?names, "found multiple network policies");
622                        None
623                    }
624                })
625                .unwrap_or_else(|| system_config.default_network_policy_name());
626            let maybe_network_policy = self
627                .catalog()
628                .get_network_policy_by_name(&network_policy_name);
629
630            let Some(network_policy) = maybe_network_policy else {
631                // We should prevent dropping the default network policy, or setting the policy
632                // to something that doesn't exist, so complain loudly if this occurs.
633                tracing::error!(
634                    network_policy_name,
635                    "default network policy does not exist. All user traffic will be blocked"
636                );
637                let reason = match client_ip {
638                    Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
639                    None => super::NetworkPolicyError::MissingIp,
640                };
641                return Err(AdapterError::NetworkPolicyDenied(reason));
642            };
643
644            if let Some(ip) = client_ip {
645                match validate_ip_with_policy_rules(ip, &network_policy.rules) {
646                    Ok(_) => {}
647                    Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
648                }
649            } else {
650                // Only temporary and internal representation of a session
651                // should be missing a client_ip. These sessions should not be
652                // making requests or going through handle_startup.
653                return Err(AdapterError::NetworkPolicyDenied(
654                    super::NetworkPolicyError::MissingIp,
655                ));
656            }
657        }
658
659        self.catalog_mut()
660            .create_temporary_schema(conn_id, role_id)?;
661
662        Ok((role_id, session_defaults))
663    }
664
665    /// Handles an execute command.
666    #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
667    pub(crate) async fn handle_execute(
668        &mut self,
669        portal_name: String,
670        mut session: Session,
671        tx: ClientTransmitter<ExecuteResponse>,
672        // If this command was part of another execute command
673        // (for example, executing a `FETCH` statement causes an execute to be
674        //  issued for the cursor it references),
675        // then `outer_context` should be `Some`.
676        // This instructs the coordinator that the
677        // outer execute should be considered finished once the inner one is.
678        outer_context: Option<ExecuteContextExtra>,
679    ) {
680        if session.vars().emit_trace_id_notice() {
681            let span_context = tracing::Span::current()
682                .context()
683                .span()
684                .span_context()
685                .clone();
686            if span_context.is_valid() {
687                session.add_notice(AdapterNotice::QueryTrace {
688                    trace_id: span_context.trace_id(),
689                });
690            }
691        }
692
693        if let Err(err) = self.verify_portal(&mut session, &portal_name) {
694            // If statement logging hasn't started yet, we don't need
695            // to add any "end" event, so just make up a no-op
696            // `ExecuteContextExtra` here, via `Default::default`.
697            //
698            // It's a bit unfortunate because the edge case of failed
699            // portal verifications won't show up in statement
700            // logging, but there seems to be nothing else we can do,
701            // because we need access to the portal to begin logging.
702            //
703            // Another option would be to log a begin and end event, but just fill in NULLs
704            // for everything we get from the portal (prepared statement id, params).
705            let extra = outer_context.unwrap_or_else(Default::default);
706            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
707            return ctx.retire(Err(err));
708        }
709
710        // The reference to `portal` can't outlive `session`, which we
711        // use to construct the context, so scope the reference to this block where we
712        // get everything we need from the portal for later.
713        let (stmt, ctx, params) = {
714            let portal = session
715                .get_portal_unverified(&portal_name)
716                .expect("known to exist");
717            let params = portal.parameters.clone();
718            let stmt = portal.stmt.clone();
719            let logging = Arc::clone(&portal.logging);
720            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
721
722            let extra = if let Some(extra) = outer_context {
723                // We are executing in the context of another SQL statement, so we don't
724                // want to begin statement logging anew. The context of the actual statement
725                // being executed is the one that should be retired once this finishes.
726                extra
727            } else {
728                // This is a new statement, log it and return the context
729                let maybe_uuid = self.begin_statement_execution(
730                    &mut session,
731                    &params,
732                    &logging,
733                    lifecycle_timestamps,
734                );
735
736                ExecuteContextExtra::new(maybe_uuid)
737            };
738            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
739            (stmt, ctx, params)
740        };
741
742        let stmt = match stmt {
743            Some(stmt) => stmt,
744            None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
745        };
746
747        let session_type = metrics::session_type_label_value(ctx.session().user());
748        let stmt_type = metrics::statement_type_label_value(&stmt);
749        self.metrics
750            .query_total
751            .with_label_values(&[session_type, stmt_type])
752            .inc();
753        match &*stmt {
754            Statement::Subscribe(SubscribeStatement { output, .. })
755            | Statement::Copy(CopyStatement {
756                relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
757                ..
758            }) => {
759                self.metrics
760                    .subscribe_outputs
761                    .with_label_values(&[
762                        session_type,
763                        metrics::subscribe_output_label_value(output),
764                    ])
765                    .inc();
766            }
767            _ => {}
768        }
769
770        self.handle_execute_inner(stmt, params, ctx).await
771    }
772
773    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
774    pub(crate) async fn handle_execute_inner(
775        &mut self,
776        stmt: Arc<Statement<Raw>>,
777        params: Params,
778        mut ctx: ExecuteContext,
779    ) {
780        // This comment describes the various ways DDL can execute (the ordered operations: name
781        // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
782        // three notable properties that all partially interact.
783        //
784        // 1. Most DDL statements (and a few others) support single-statement transaction delayed
785        //    execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
786        //    We announce success of the single DDL when it is executed, but do not attempt to plan
787        //    or sequence it until `COMMIT`, which is able to error if needed while sequencing the
788        //    DDL (this behavior is Postgres-compatible). The purpose of this is because some
789        //    drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
790        //    work. When the single DDL is announced as successful we also put the session's
791        //    transaction ops into `SingleStatement` which will produce an error if any other
792        //    statement is run in the transaction except `COMMIT`. Additionally, this will cause
793        //    `handle_execute_inner` to stop further processing (no planning, etc.) of the
794        //    statement.
795        // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
796        //    any number of only these DDL statements to be executed in a transaction. At sequencing
797        //    these generate the `Op::TransactionDryRun` catalog op. When applied with
798        //    `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
799        //    `catalog_transact_with_ddl_transaction` function intercepts that error and reports
800        //    success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
801        //    of the ops but without dry run are applied. The purpose of this is to allow multiple,
802        //    atomic renames in the same transaction.
803        // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
804        //    makes network calls (interfacing with secrets, optimization of views/indexes, source
805        //    purification). These must guarantee correctness when they return to the main
806        //    coordinator thread because the catalog state could have changed while they were doing
807        //    the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
808        //    of IDs that we needed to exist. We discovered the way we were doing that was not
809        //    always correct. Instead of attempting to get that completely right, we have opted to
810        //    serialize DDL. Getting this right is difficult because catalog changes can affect name
811        //    resolution, planning, sequencing, and optimization. Correctly writing logic that is
812        //    aware of all possible catalog changes that would affect any of those parts is not
813        //    something our current code has been designed to be helpful at. Even if a DDL statement
814        //    is doing off-thread work, another DDL must not yet execute at all. Executing these
815        //    serially will guarantee that no off-thread work has affected the state of the catalog.
816        //    This is done by adding a VecDeque of deferred statements and a lock to the
817        //    Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
818        //    transaction ops are needed to the session as described above), it attempts to own the
819        //    lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
820        //    struct in `active_conns` and proceeds. The lock is dropped at transaction end in
821        //    `clear_transaction` and a message sent to the Coordinator to execute the next queued
822        //    DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
823        //    awaits dequeuing caused by the lock being released.
824
825        // Verify that this statement type can be executed in the current
826        // transaction state.
827        match ctx.session().transaction() {
828            // By this point we should be in a running transaction.
829            TransactionStatus::Default => unreachable!(),
830
831            // Failed transactions have already been checked in pgwire for a safe statement
832            // (COMMIT, ROLLBACK, etc.) and can proceed.
833            TransactionStatus::Failed(_) => {}
834
835            // Started is a deceptive name, and means different things depending on which
836            // protocol was used. It's either exactly one statement (known because this
837            // is the simple protocol and the parser parsed the entire string, and it had
838            // one statement). Or from the extended protocol, it means *some* query is
839            // being executed, but there might be others after it before the Sync (commit)
840            // message. Postgres handles this by teaching Started to eagerly commit certain
841            // statements that can't be run in a transaction block.
842            TransactionStatus::Started(_) => {
843                if let Statement::Declare(_) = &*stmt {
844                    // Declare is an exception. Although it's not against any spec to execute
845                    // it, it will always result in nothing happening, since all portals will be
846                    // immediately closed. Users don't know this detail, so this error helps them
847                    // understand what's going wrong. Postgres does this too.
848                    return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
849                        "DECLARE CURSOR".into(),
850                    )));
851                }
852            }
853
854            // Implicit or explicit transactions.
855            //
856            // Implicit transactions happen when a multi-statement query is executed
857            // (a "simple query"). However if a "BEGIN" appears somewhere in there,
858            // then the existing implicit transaction will be upgraded to an explicit
859            // transaction. Thus, we should not separate what implicit and explicit
860            // transactions can do unless there's some additional checking to make sure
861            // something disallowed in explicit transactions did not previously take place
862            // in the implicit portion.
863            TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
864                match &*stmt {
865                    // Statements that are safe in a transaction. We still need to verify that we
866                    // don't interleave reads and writes since we can't perform those serializably.
867                    Statement::Close(_)
868                    | Statement::Commit(_)
869                    | Statement::Copy(_)
870                    | Statement::Deallocate(_)
871                    | Statement::Declare(_)
872                    | Statement::Discard(_)
873                    | Statement::Execute(_)
874                    | Statement::ExplainPlan(_)
875                    | Statement::ExplainPushdown(_)
876                    | Statement::ExplainAnalyze(_)
877                    | Statement::ExplainTimestamp(_)
878                    | Statement::ExplainSinkSchema(_)
879                    | Statement::Fetch(_)
880                    | Statement::Prepare(_)
881                    | Statement::Rollback(_)
882                    | Statement::Select(_)
883                    | Statement::SetTransaction(_)
884                    | Statement::Show(_)
885                    | Statement::SetVariable(_)
886                    | Statement::ResetVariable(_)
887                    | Statement::StartTransaction(_)
888                    | Statement::Subscribe(_)
889                    | Statement::Raise(_) => {
890                        // Always safe.
891                    }
892
893                    Statement::Insert(InsertStatement {
894                        source, returning, ..
895                    }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
896                        // Inserting from constant values statements that do not need to execute on
897                        // any cluster (no RETURNING) is always safe.
898                    }
899
900                    // These statements must be kept in-sync with `must_serialize_ddl()`.
901                    Statement::AlterObjectRename(_)
902                    | Statement::AlterObjectSwap(_)
903                    | Statement::CreateTableFromSource(_)
904                    | Statement::CreateSource(_) => {
905                        let state = self.catalog().for_session(ctx.session()).state().clone();
906                        let revision = self.catalog().transient_revision();
907
908                        // Initialize our transaction with a set of empty ops, or return an error
909                        // if we can't run a DDL transaction
910                        let txn_status = ctx.session_mut().transaction_mut();
911                        if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
912                            ops: vec![],
913                            state,
914                            revision,
915                            side_effects: vec![],
916                        }) {
917                            return ctx.retire(Err(err));
918                        }
919                    }
920
921                    // Statements below must by run singly (in Started).
922                    Statement::AlterCluster(_)
923                    | Statement::AlterConnection(_)
924                    | Statement::AlterDefaultPrivileges(_)
925                    | Statement::AlterIndex(_)
926                    | Statement::AlterSetCluster(_)
927                    | Statement::AlterOwner(_)
928                    | Statement::AlterRetainHistory(_)
929                    | Statement::AlterRole(_)
930                    | Statement::AlterSecret(_)
931                    | Statement::AlterSink(_)
932                    | Statement::AlterSource(_)
933                    | Statement::AlterSystemReset(_)
934                    | Statement::AlterSystemResetAll(_)
935                    | Statement::AlterSystemSet(_)
936                    | Statement::AlterTableAddColumn(_)
937                    | Statement::AlterNetworkPolicy(_)
938                    | Statement::CreateCluster(_)
939                    | Statement::CreateClusterReplica(_)
940                    | Statement::CreateConnection(_)
941                    | Statement::CreateDatabase(_)
942                    | Statement::CreateIndex(_)
943                    | Statement::CreateMaterializedView(_)
944                    | Statement::CreateContinualTask(_)
945                    | Statement::CreateRole(_)
946                    | Statement::CreateSchema(_)
947                    | Statement::CreateSecret(_)
948                    | Statement::CreateSink(_)
949                    | Statement::CreateSubsource(_)
950                    | Statement::CreateTable(_)
951                    | Statement::CreateType(_)
952                    | Statement::CreateView(_)
953                    | Statement::CreateWebhookSource(_)
954                    | Statement::CreateNetworkPolicy(_)
955                    | Statement::Delete(_)
956                    | Statement::DropObjects(_)
957                    | Statement::DropOwned(_)
958                    | Statement::GrantPrivileges(_)
959                    | Statement::GrantRole(_)
960                    | Statement::Insert(_)
961                    | Statement::ReassignOwned(_)
962                    | Statement::RevokePrivileges(_)
963                    | Statement::RevokeRole(_)
964                    | Statement::Update(_)
965                    | Statement::ValidateConnection(_)
966                    | Statement::Comment(_) => {
967                        let txn_status = ctx.session_mut().transaction_mut();
968
969                        // If we're not in an implicit transaction and we could generate exactly one
970                        // valid ExecuteResponse, we can delay execution until commit.
971                        if !txn_status.is_implicit() {
972                            // Statements whose tag is trivial (known only from an unexecuted statement) can
973                            // be run in a special single-statement explicit mode. In this mode (`BEGIN;
974                            // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
975                            // delay execution until `COMMIT`.
976                            if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
977                                if let Err(err) = txn_status
978                                    .add_ops(TransactionOps::SingleStatement { stmt, params })
979                                {
980                                    ctx.retire(Err(err));
981                                    return;
982                                }
983                                ctx.retire(Ok(resp));
984                                return;
985                            }
986                        }
987
988                        return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
989                            stmt.to_string(),
990                        )));
991                    }
992                }
993            }
994        }
995
996        // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
997        // various IDs because we have incorrectly done that in the past. Attempt to acquire the
998        // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
999        // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1000        // Coordinator::clear_transaction is correctly called when session transactions are ended
1001        // because that function will release the held lock from active_conns.
1002        if Self::must_serialize_ddl(&stmt, &ctx) {
1003            if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1004                let prev = self
1005                    .active_conns
1006                    .get_mut(ctx.session().conn_id())
1007                    .expect("connection must exist")
1008                    .deferred_lock
1009                    .replace(guard);
1010                assert!(
1011                    prev.is_none(),
1012                    "connections should have at most one lock guard"
1013                );
1014            } else {
1015                if self
1016                    .active_conns
1017                    .get(ctx.session().conn_id())
1018                    .expect("connection must exist")
1019                    .deferred_lock
1020                    .is_some()
1021                {
1022                    // This session *already* has the lock, and incorrectly tried to execute another
1023                    // DDL while still holding the lock, violating the assumption documented above.
1024                    // This is an internal error, probably in some AdapterClient user (pgwire or
1025                    // http). Because the session is now in some unexpected state, return an error
1026                    // which should cause the AdapterClient user to fail the transaction.
1027                    // (Terminating the connection is maybe what we would prefer to do, but is not
1028                    // currently a thing we can do from the coordinator: calling handle_terminate
1029                    // cleans up Coordinator state for the session but doesn't inform the
1030                    // AdapterClient that the session should terminate.)
1031                    soft_panic_or_log!(
1032                        "session {} attempted to get ddl lock while already owning it",
1033                        ctx.session().conn_id()
1034                    );
1035                    ctx.retire(Err(AdapterError::Internal(
1036                        "session attempted to get ddl lock while already owning it".to_string(),
1037                    )));
1038                    return;
1039                }
1040                self.serialized_ddl.push_back(DeferredPlanStatement {
1041                    ctx,
1042                    ps: PlanStatement::Statement { stmt, params },
1043                });
1044                return;
1045            }
1046        }
1047
1048        let catalog = self.catalog();
1049        let catalog = catalog.for_session(ctx.session());
1050        let original_stmt = Arc::clone(&stmt);
1051        // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1052        // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1053        let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1054            Ok(resolved) => resolved,
1055            Err(e) => return ctx.retire(Err(e.into())),
1056        };
1057        // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1058        // purification.  This should be done back on the main thread.
1059        // We do the validation:
1060        //   - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1061        // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1062        // occurs.
1063        let (stmt, resolved_ids) = match stmt {
1064            // Various statements must be purified off the main coordinator thread of control.
1065            stmt if Self::must_spawn_purification(&stmt) => {
1066                let internal_cmd_tx = self.internal_cmd_tx.clone();
1067                let conn_id = ctx.session().conn_id().clone();
1068                let catalog = self.owned_catalog();
1069                let now = self.now();
1070                let otel_ctx = OpenTelemetryContext::obtain();
1071                let current_storage_configuration = self.controller.storage.config().clone();
1072                task::spawn(|| format!("purify:{conn_id}"), async move {
1073                    let transient_revision = catalog.transient_revision();
1074                    let catalog = catalog.for_session(ctx.session());
1075
1076                    // Checks if the session is authorized to purify a statement. Usually
1077                    // authorization is checked after planning, however purification happens before
1078                    // planning, which may require the use of some connections and secrets.
1079                    if let Err(e) = rbac::check_usage(
1080                        &catalog,
1081                        ctx.session(),
1082                        &resolved_ids,
1083                        &CREATE_ITEM_USAGE,
1084                    ) {
1085                        return ctx.retire(Err(e.into()));
1086                    }
1087
1088                    let (result, cluster_id) = mz_sql::pure::purify_statement(
1089                        catalog,
1090                        now,
1091                        stmt,
1092                        &current_storage_configuration,
1093                    )
1094                    .await;
1095                    let result = result.map_err(|e| e.into());
1096                    let dependency_ids = resolved_ids.items().copied().collect();
1097                    let plan_validity = PlanValidity::new(
1098                        transient_revision,
1099                        dependency_ids,
1100                        cluster_id,
1101                        None,
1102                        ctx.session().role_metadata().clone(),
1103                    );
1104                    // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1105                    let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1106                        PurifiedStatementReady {
1107                            ctx,
1108                            result,
1109                            params,
1110                            plan_validity,
1111                            original_stmt,
1112                            otel_ctx,
1113                        },
1114                    ));
1115                    if let Err(e) = result {
1116                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1117                    }
1118                });
1119                return;
1120            }
1121
1122            // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1123            // automatically as part of purification
1124            Statement::CreateSubsource(_) => {
1125                ctx.retire(Err(AdapterError::Unsupported(
1126                    "CREATE SUBSOURCE statements",
1127                )));
1128                return;
1129            }
1130
1131            Statement::CreateMaterializedView(mut cmvs) => {
1132                // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1133                // only used for storing initial frontiers in the catalog.
1134                if cmvs.as_of.is_some() {
1135                    return ctx.retire(Err(AdapterError::Unsupported(
1136                        "CREATE MATERIALIZED VIEW ... AS OF statements",
1137                    )));
1138                }
1139
1140                let mz_now = match self
1141                    .resolve_mz_now_for_create_materialized_view(
1142                        &cmvs,
1143                        &resolved_ids,
1144                        ctx.session_mut(),
1145                        true,
1146                    )
1147                    .await
1148                {
1149                    Ok(mz_now) => mz_now,
1150                    Err(e) => return ctx.retire(Err(e)),
1151                };
1152
1153                let owned_catalog = self.owned_catalog();
1154                let catalog = owned_catalog.for_session(ctx.session());
1155
1156                purify_create_materialized_view_options(
1157                    catalog,
1158                    mz_now,
1159                    &mut cmvs,
1160                    &mut resolved_ids,
1161                );
1162
1163                let purified_stmt =
1164                    Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1165                        if_exists: cmvs.if_exists,
1166                        name: cmvs.name,
1167                        columns: cmvs.columns,
1168                        in_cluster: cmvs.in_cluster,
1169                        query: cmvs.query,
1170                        with_options: cmvs.with_options,
1171                        as_of: None,
1172                    });
1173
1174                // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1175                // `Message::PurifiedStatementReady` here.)
1176                (purified_stmt, resolved_ids)
1177            }
1178
1179            Statement::ExplainPlan(ExplainPlanStatement {
1180                stage,
1181                with_options,
1182                format,
1183                explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1184            }) => {
1185                let mut cmvs = *box_cmvs;
1186                let mz_now = match self
1187                    .resolve_mz_now_for_create_materialized_view(
1188                        &cmvs,
1189                        &resolved_ids,
1190                        ctx.session_mut(),
1191                        false,
1192                    )
1193                    .await
1194                {
1195                    Ok(mz_now) => mz_now,
1196                    Err(e) => return ctx.retire(Err(e)),
1197                };
1198
1199                let owned_catalog = self.owned_catalog();
1200                let catalog = owned_catalog.for_session(ctx.session());
1201
1202                purify_create_materialized_view_options(
1203                    catalog,
1204                    mz_now,
1205                    &mut cmvs,
1206                    &mut resolved_ids,
1207                );
1208
1209                let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1210                    stage,
1211                    with_options,
1212                    format,
1213                    explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1214                });
1215
1216                (purified_stmt, resolved_ids)
1217            }
1218
1219            // All other statements are handled immediately.
1220            _ => (stmt, resolved_ids),
1221        };
1222
1223        match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
1224            Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1225            Err(e) => ctx.retire(Err(e)),
1226        }
1227    }
1228
1229    /// Whether the statement must be serialized and is DDL.
1230    fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1231        // Non-DDL is not serialized here.
1232        if !StatementClassification::from(&*stmt).is_ddl() {
1233            return false;
1234        }
1235        // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1236        // not be serialized. These all use PlanValidity for their checking, and we must ensure
1237        // those checks are sufficient.
1238        if Self::must_spawn_purification(stmt) {
1239            return false;
1240        }
1241
1242        // Statements that support multiple DDLs in a single transaction aren't serialized here.
1243        // Their operations are serialized when applied to the catalog, guaranteeing that any
1244        // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1245        if ctx.session.transaction().is_ddl() {
1246            return false;
1247        }
1248
1249        // Some DDL is exempt. It is not great that we are matching on Statements here because
1250        // different plans can be produced from the same top-level statement type (i.e., `ALTER
1251        // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1252        // planned in the first place, so we accept the abstraction leak.
1253        match stmt {
1254            // Secrets have a small and understood set of dependencies, and their off-thread work
1255            // interacts with k8s.
1256            Statement::AlterSecret(_) => false,
1257            Statement::CreateSecret(_) => false,
1258            Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1259                if actions
1260                    .iter()
1261                    .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1262            {
1263                false
1264            }
1265
1266            // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1267            // does not affect its catalog names or ids and so is safe to not serialize. This could
1268            // change the set of replicas that exist. For queries that name replicas or use the
1269            // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1270            // ensure that those replicas exist during the query finish stage. Additionally, that
1271            // work can take hours (configured by the user), so would also be a bad experience for
1272            // users.
1273            Statement::AlterCluster(_) => false,
1274
1275            // Everything else must be serialized.
1276            _ => true,
1277        }
1278    }
1279
1280    /// Whether the statement must be purified off of the Coordinator thread.
1281    fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1282        // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1283        // coordinator thread.
1284        if !matches!(
1285            stmt,
1286            Statement::CreateSource(_)
1287                | Statement::AlterSource(_)
1288                | Statement::CreateSink(_)
1289                | Statement::CreateTableFromSource(_)
1290        ) {
1291            return false;
1292        }
1293
1294        // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1295        if let Statement::AlterSource(stmt) = stmt {
1296            let names: Vec<CreateSourceOptionName> = match &stmt.action {
1297                AlterSourceAction::SetOptions(options) => {
1298                    options.iter().map(|o| o.name.clone()).collect()
1299                }
1300                AlterSourceAction::ResetOptions(names) => names.clone(),
1301                _ => vec![],
1302            };
1303            if !names.is_empty()
1304                && names
1305                    .iter()
1306                    .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1307            {
1308                return false;
1309            }
1310        }
1311
1312        true
1313    }
1314
1315    /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1316    /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1317    /// option, this function grabs read holds at the earliest possible time on input collections
1318    /// that might be involved in the MV.
1319    ///
1320    /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1321    /// in `with_options`).
1322    ///
1323    /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1324    /// unfortunately.)
1325    async fn resolve_mz_now_for_create_materialized_view(
1326        &mut self,
1327        cmvs: &CreateMaterializedViewStatement<Aug>,
1328        resolved_ids: &ResolvedIds,
1329        session: &Session,
1330        acquire_read_holds: bool,
1331    ) -> Result<Option<Timestamp>, AdapterError> {
1332        if cmvs
1333            .with_options
1334            .iter()
1335            .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1336        {
1337            let catalog = self.catalog().for_session(session);
1338            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1339            let ids = self
1340                .index_oracle(cluster)
1341                .sufficient_collections(resolved_ids.collections().copied());
1342
1343            // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1344            // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1345            // we want to check the AT times against the read holds that we acquire here. But
1346            // we do it for any REFRESH option, to avoid having so many code paths doing different
1347            // things.)
1348            //
1349            // It's important that we acquire read holds _before_ we determine the least valid read.
1350            // Otherwise, we're not guaranteed that the since frontier doesn't
1351            // advance forward from underneath us.
1352            let read_holds = self.acquire_read_holds(&ids);
1353
1354            // Does `mz_now()` occur?
1355            let mz_now_ts = if cmvs
1356                .with_options
1357                .iter()
1358                .any(materialized_view_option_contains_temporal)
1359            {
1360                let timeline_context = self
1361                    .catalog()
1362                    .validate_timeline_context(resolved_ids.collections().copied())?;
1363
1364                // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1365                // but even in the TimestampIndependent case.
1366                // Note that we didn't accurately decide whether we are TimestampDependent
1367                // or TimestampIndependent, because for this we'd need to also check whether
1368                // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1369                // However, this doesn't matter here, as we are just going to default to
1370                // EpochMilliseconds in both cases.
1371                let timeline = timeline_context
1372                    .timeline()
1373                    .unwrap_or(&Timeline::EpochMilliseconds);
1374
1375                // Let's start with the timestamp oracle read timestamp.
1376                let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1377
1378                // If `least_valid_read` is later than the oracle, then advance to that time.
1379                // If we didn't do this, then there would be a danger of missing the first refresh,
1380                // which might cause the materialized view to be unreadable for hours. This might
1381                // be what was happening here:
1382                // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1383                //
1384                // In the long term, it would be good to actually block the MV creation statement
1385                // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1386                // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1387                // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1388                // after its creation might see input changes that happened after the CRATE MATERIALIZED
1389                // VIEW statement returned.
1390                let oracle_timestamp = timestamp;
1391                let least_valid_read = read_holds.least_valid_read();
1392                timestamp.advance_by(least_valid_read.borrow());
1393
1394                if oracle_timestamp != timestamp {
1395                    warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1396                }
1397
1398                info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1399                Ok(Some(timestamp))
1400            } else {
1401                Ok(None)
1402            };
1403
1404            // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1405            // released when we don't use it.
1406            if acquire_read_holds {
1407                self.store_transaction_read_holds(session, read_holds);
1408            }
1409
1410            mz_now_ts
1411        } else {
1412            Ok(None)
1413        }
1414    }
1415
1416    /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1417    /// the named `conn_id` if the correct secret key is specified.
1418    ///
1419    /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1420    /// `ConnectionId` because this method gets called by external clients when
1421    /// they request to cancel a request.
1422    #[mz_ore::instrument(level = "debug")]
1423    async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1424        if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1425            // If the secret key specified by the client doesn't match the
1426            // actual secret key for the target connection, we treat this as a
1427            // rogue cancellation request and ignore it.
1428            if conn_meta.secret_key != secret_key {
1429                return;
1430            }
1431
1432            // Now that we've verified the secret key, this is a privileged
1433            // cancellation request. We can upgrade the raw connection ID to a
1434            // proper `IdHandle`.
1435            self.handle_privileged_cancel(id_handle.clone()).await;
1436        }
1437    }
1438
1439    /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1440    /// interactive work for the named `conn_id`.
1441    #[mz_ore::instrument(level = "debug")]
1442    pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1443        let mut maybe_ctx = None;
1444
1445        // Cancel pending writes. There is at most one pending write per session.
1446        if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1447            matches!(pending_write_txn, PendingWriteTxn::User {
1448                pending_txn: PendingTxn { ctx, .. },
1449                ..
1450            } if *ctx.session().conn_id() == conn_id)
1451        }) {
1452            if let PendingWriteTxn::User {
1453                pending_txn: PendingTxn { ctx, .. },
1454                ..
1455            } = self.pending_writes.remove(idx)
1456            {
1457                maybe_ctx = Some(ctx);
1458            }
1459        }
1460
1461        // Cancel deferred writes.
1462        if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1463            maybe_ctx = Some(write_op.into_ctx());
1464        }
1465
1466        // Cancel deferred statements.
1467        if let Some(idx) = self
1468            .serialized_ddl
1469            .iter()
1470            .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1471        {
1472            let deferred = self
1473                .serialized_ddl
1474                .remove(idx)
1475                .expect("known to exist from call to `position` above");
1476            maybe_ctx = Some(deferred.ctx);
1477        }
1478
1479        // Cancel reads waiting on being linearized. There is at most one linearized read per
1480        // session.
1481        if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1482            let ctx = pending_read_txn.take_context();
1483            maybe_ctx = Some(ctx);
1484        }
1485
1486        if let Some(ctx) = maybe_ctx {
1487            ctx.retire(Err(AdapterError::Canceled));
1488        }
1489
1490        self.cancel_pending_peeks(&conn_id);
1491        self.cancel_pending_watchsets(&conn_id);
1492        self.cancel_compute_sinks_for_conn(&conn_id).await;
1493        self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1494            .await;
1495        self.cancel_pending_copy(&conn_id);
1496        if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1497            let _ = tx.send(true);
1498        }
1499    }
1500
1501    /// Handle termination of a client session.
1502    ///
1503    /// This cleans up any state in the coordinator associated with the session.
1504    #[mz_ore::instrument(level = "debug")]
1505    async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1506        if !self.active_conns.contains_key(&conn_id) {
1507            // If the session doesn't exist in `active_conns`, then this method will panic later on.
1508            // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1509            // debug. This panic is very infrequent so we want as much information as possible.
1510            // See https://github.com/MaterializeInc/database-issues/issues/5627.
1511            panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1512        }
1513
1514        // We do not need to call clear_transaction here because there are no side effects to run
1515        // based on any session transaction state.
1516        self.clear_connection(&conn_id).await;
1517
1518        self.drop_temp_items(&conn_id).await;
1519        self.catalog_mut()
1520            .drop_temporary_schema(&conn_id)
1521            .unwrap_or_terminate("unable to drop temporary schema");
1522        let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1523        let session_type = metrics::session_type_label_value(conn.user());
1524        self.metrics
1525            .active_sessions
1526            .with_label_values(&[session_type])
1527            .dec();
1528        self.cancel_pending_peeks(conn.conn_id());
1529        self.cancel_pending_watchsets(&conn_id);
1530        self.cancel_pending_copy(&conn_id);
1531        self.end_session_for_statement_logging(conn.uuid());
1532
1533        // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1534        // this to prevent blocking the Coordinator in the case that a lot of connections are
1535        // closed at once, which occurs regularly in some workflows.
1536        let update = self
1537            .catalog()
1538            .state()
1539            .pack_session_update(&conn, Diff::MINUS_ONE);
1540        let update = self.catalog().state().resolve_builtin_table_update(update);
1541
1542        let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1543    }
1544
1545    /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1546    /// rows.
1547    #[mz_ore::instrument(level = "debug")]
1548    fn handle_get_webhook(
1549        &mut self,
1550        database: String,
1551        schema: String,
1552        name: String,
1553        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1554    ) {
1555        /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1556        ///
1557        /// Returns a struct that can be used to append data to the underlying storate collection, and the
1558        /// types we should cast the request to.
1559        fn resolve(
1560            coord: &mut Coordinator,
1561            database: String,
1562            schema: String,
1563            name: String,
1564        ) -> Result<AppendWebhookResponse, PartialItemName> {
1565            // Resolve our collection.
1566            let name = PartialItemName {
1567                database: Some(database),
1568                schema: Some(schema),
1569                item: name,
1570            };
1571            let Ok(entry) = coord
1572                .catalog()
1573                .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1574            else {
1575                return Err(name);
1576            };
1577
1578            // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1579            let (data_source, desc, global_id) = match entry.item() {
1580                CatalogItem::Source(Source {
1581                    data_source: data_source @ DataSourceDesc::Webhook { .. },
1582                    desc,
1583                    global_id,
1584                    ..
1585                }) => (data_source, desc.clone(), *global_id),
1586                CatalogItem::Table(
1587                    table @ Table {
1588                        desc,
1589                        data_source:
1590                            TableDataSource::DataSource {
1591                                desc: data_source @ DataSourceDesc::Webhook { .. },
1592                                ..
1593                            },
1594                        ..
1595                    },
1596                ) => (data_source, desc.latest(), table.global_id_writes()),
1597                _ => return Err(name),
1598            };
1599
1600            let DataSourceDesc::Webhook {
1601                validate_using,
1602                body_format,
1603                headers,
1604                ..
1605            } = data_source
1606            else {
1607                mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1608                return Err(name);
1609            };
1610            let body_format = body_format.clone();
1611            let header_tys = headers.clone();
1612
1613            // Assert we have one column for the body, and how ever many are required for
1614            // the headers.
1615            let num_columns = headers.num_columns() + 1;
1616            mz_ore::soft_assert_or_log!(
1617                desc.arity() <= num_columns,
1618                "expected at most {} columns, but got {}",
1619                num_columns,
1620                desc.arity()
1621            );
1622
1623            // Double check that the body column of the webhook source matches the type
1624            // we're about to deserialize as.
1625            let body_column = desc
1626                .get_by_name(&"body".into())
1627                .map(|(_idx, ty)| ty.clone())
1628                .ok_or_else(|| name.clone())?;
1629            assert!(!body_column.nullable, "webhook body column is nullable!?");
1630            assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1631
1632            // Create a validator that can be called to validate a webhook request.
1633            let validator = validate_using.as_ref().map(|v| {
1634                let validation = v.clone();
1635                AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1636            });
1637
1638            // Get a channel so we can queue updates to be written.
1639            let row_tx = coord
1640                .controller
1641                .storage
1642                .monotonic_appender(global_id)
1643                .map_err(|_| name.clone())?;
1644            let stats = coord
1645                .controller
1646                .storage
1647                .webhook_statistics(global_id)
1648                .map_err(|_| name)?;
1649            let invalidator = coord
1650                .active_webhooks
1651                .entry(entry.id())
1652                .or_insert_with(WebhookAppenderInvalidator::new);
1653            let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1654
1655            Ok(AppendWebhookResponse {
1656                tx,
1657                body_format,
1658                header_tys,
1659                validator,
1660            })
1661        }
1662
1663        let response = resolve(self, database, schema, name).map_err(|name| {
1664            AppendWebhookError::UnknownWebhook {
1665                database: name.database.expect("provided"),
1666                schema: name.schema.expect("provided"),
1667                name: name.item,
1668            }
1669        });
1670        let _ = tx.send(response);
1671    }
1672}