mz_adapter/coord/
command_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for  processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34    AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
35    ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
36    SubscribeStatement,
37};
38use mz_sql::catalog::RoleAttributesRaw;
39use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
40use mz_sql::plan::{
41    AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
42    StatementClassification, TransactionType,
43};
44use mz_sql::pure::{
45    materialized_view_option_contains_temporal, purify_create_materialized_view_options,
46};
47use mz_sql::rbac;
48use mz_sql::rbac::CREATE_ITEM_USAGE;
49use mz_sql::session::user::User;
50use mz_sql::session::vars::{
51    EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
52};
53use mz_sql_parser::ast::display::AstDisplay;
54use mz_sql_parser::ast::{
55    CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
56    WithOptionValue,
57};
58use mz_storage_types::sources::Timeline;
59use opentelemetry::trace::TraceContextExt;
60use tokio::sync::{mpsc, oneshot};
61use tracing::{Instrument, debug_span, info, warn};
62use tracing_opentelemetry::OpenTelemetrySpanExt;
63
64use crate::command::{
65    AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse,
66    SASLVerifyProofResponse, StartupResponse,
67};
68use crate::coord::appends::PendingWriteTxn;
69use crate::coord::{
70    ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
71    PurifiedStatementReady, validate_ip_with_policy_rules,
72};
73use crate::error::{AdapterError, AuthenticationError};
74use crate::notice::AdapterNotice;
75use crate::session::{Session, TransactionOps, TransactionStatus};
76use crate::util::{ClientTransmitter, ResultExt};
77use crate::webhook::{
78    AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
79};
80use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
81
82use super::ExecuteContextExtra;
83
84impl Coordinator {
85    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
86    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
87    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
88    pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
89        async move {
90            if let Some(session) = cmd.session_mut() {
91                session.apply_external_metadata_updates();
92            }
93            match cmd {
94                Command::Startup {
95                    tx,
96                    user,
97                    conn_id,
98                    secret_key,
99                    uuid,
100                    client_ip,
101                    application_name,
102                    notice_tx,
103                } => {
104                    // Note: We purposefully do not use a ClientTransmitter here because startup
105                    // handles errors and cleanup of sessions itself.
106                    self.handle_startup(
107                        tx,
108                        user,
109                        conn_id,
110                        secret_key,
111                        uuid,
112                        client_ip,
113                        application_name,
114                        notice_tx,
115                    )
116                    .await;
117                }
118
119                Command::AuthenticatePassword {
120                    tx,
121                    role_name,
122                    password,
123                } => {
124                    self.handle_authenticate_password(tx, role_name, password)
125                        .await;
126                }
127
128                Command::AuthenticateGetSASLChallenge {
129                    tx,
130                    role_name,
131                    nonce,
132                } => {
133                    self.handle_generate_sasl_challenge(tx, role_name, nonce)
134                        .await;
135                }
136
137                Command::AuthenticateVerifySASLProof {
138                    tx,
139                    role_name,
140                    proof,
141                    mock_hash,
142                    auth_message,
143                } => {
144                    self.handle_authenticate_verify_sasl_proof(
145                        tx,
146                        role_name,
147                        proof,
148                        auth_message,
149                        mock_hash,
150                    );
151                }
152
153                Command::Execute {
154                    portal_name,
155                    session,
156                    tx,
157                    outer_ctx_extra,
158                } => {
159                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
160
161                    self.handle_execute(portal_name, session, tx, outer_ctx_extra)
162                        .await;
163                }
164
165                Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
166
167                Command::CancelRequest {
168                    conn_id,
169                    secret_key,
170                } => {
171                    self.handle_cancel(conn_id, secret_key).await;
172                }
173
174                Command::PrivilegedCancelRequest { conn_id } => {
175                    self.handle_privileged_cancel(conn_id).await;
176                }
177
178                Command::GetWebhook {
179                    database,
180                    schema,
181                    name,
182                    tx,
183                } => {
184                    self.handle_get_webhook(database, schema, name, tx);
185                }
186
187                Command::GetSystemVars { tx } => {
188                    let _ = tx.send(self.catalog.system_config().clone());
189                }
190
191                Command::SetSystemVars { vars, conn_id, tx } => {
192                    let mut ops = Vec::with_capacity(vars.len());
193                    let conn = &self.active_conns[&conn_id];
194
195                    for (name, value) in vars {
196                        if let Err(e) =
197                            self.catalog().system_config().get(&name).and_then(|var| {
198                                var.visible(conn.user(), self.catalog.system_config())
199                            })
200                        {
201                            let _ = tx.send(Err(e.into()));
202                            return;
203                        }
204
205                        ops.push(catalog::Op::UpdateSystemConfiguration {
206                            name,
207                            value: OwnedVarInput::Flat(value),
208                        });
209                    }
210
211                    let result = self
212                        .catalog_transact_with_context(Some(&conn_id), None, ops)
213                        .await;
214                    let _ = tx.send(result);
215                }
216
217                Command::Terminate { conn_id, tx } => {
218                    self.handle_terminate(conn_id).await;
219                    // Note: We purposefully do not use a ClientTransmitter here because we're already
220                    // terminating the provided session.
221                    if let Some(tx) = tx {
222                        let _ = tx.send(Ok(()));
223                    }
224                }
225
226                Command::Commit {
227                    action,
228                    session,
229                    tx,
230                } => {
231                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
232                    // We reach here not through a statement execution, but from the
233                    // "commit" pgwire command. Thus, we just generate a default statement
234                    // execution context (once statement logging is implemented, this will cause nothing to be logged
235                    // when the execution finishes.)
236                    let ctx = ExecuteContext::from_parts(
237                        tx,
238                        self.internal_cmd_tx.clone(),
239                        session,
240                        Default::default(),
241                    );
242                    let plan = match action {
243                        EndTransactionAction::Commit => {
244                            Plan::CommitTransaction(CommitTransactionPlan {
245                                transaction_type: TransactionType::Implicit,
246                            })
247                        }
248                        EndTransactionAction::Rollback => {
249                            Plan::AbortTransaction(AbortTransactionPlan {
250                                transaction_type: TransactionType::Implicit,
251                            })
252                        }
253                    };
254
255                    let conn_id = ctx.session().conn_id().clone();
256                    self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
257                    // Part of the Command::Commit contract is that the Coordinator guarantees that
258                    // it has cleared its transaction state for the connection.
259                    self.clear_connection(&conn_id).await;
260                }
261
262                Command::CatalogSnapshot { tx } => {
263                    let _ = tx.send(CatalogSnapshot {
264                        catalog: self.owned_catalog(),
265                    });
266                }
267
268                Command::CheckConsistency { tx } => {
269                    let _ = tx.send(self.check_consistency());
270                }
271
272                Command::Dump { tx } => {
273                    let _ = tx.send(self.dump().await);
274                }
275
276                Command::GetComputeInstanceClient { instance_id, tx } => {
277                    let _ = tx.send(self.controller.compute.instance_client(instance_id));
278                }
279
280                Command::GetOracle { timeline, tx } => {
281                    let oracle = self
282                        .global_timelines
283                        .get(&timeline)
284                        .map(|timeline_state| Arc::clone(&timeline_state.oracle))
285                        .ok_or(AdapterError::ChangedPlan(
286                            "timeline has disappeared during planning".to_string(),
287                        ));
288                    let _ = tx.send(oracle);
289                }
290
291                Command::DetermineRealTimeRecentTimestamp {
292                    source_ids,
293                    real_time_recency_timeout,
294                    tx,
295                } => {
296                    let result = self
297                        .determine_real_time_recent_timestamp(
298                            source_ids.iter().copied(),
299                            real_time_recency_timeout,
300                        )
301                        .await;
302
303                    match result {
304                        Ok(Some(fut)) => {
305                            task::spawn(|| "determine real time recent timestamp", async move {
306                                let result = fut.await.map(Some).map_err(AdapterError::from);
307                                let _ = tx.send(result);
308                            });
309                        }
310                        Ok(None) => {
311                            let _ = tx.send(Ok(None));
312                        }
313                        Err(e) => {
314                            let _ = tx.send(Err(e));
315                        }
316                    }
317                }
318
319                Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
320                    let read_holds = self.txn_read_holds.get(&conn_id).cloned();
321                    let _ = tx.send(read_holds);
322                }
323
324                Command::StoreTransactionReadHolds {
325                    conn_id,
326                    read_holds,
327                    tx,
328                } => {
329                    self.store_transaction_read_holds(conn_id, read_holds);
330                    let _ = tx.send(());
331                }
332
333                Command::ExecuteSlowPathPeek {
334                    dataflow_plan,
335                    determination,
336                    finishing,
337                    compute_instance,
338                    target_replica,
339                    intermediate_result_type,
340                    source_ids,
341                    conn_id,
342                    max_result_size,
343                    max_query_result_size,
344                    tx,
345                } => {
346                    let result = self
347                        .implement_slow_path_peek(
348                            *dataflow_plan,
349                            determination,
350                            finishing,
351                            compute_instance,
352                            target_replica,
353                            intermediate_result_type,
354                            source_ids,
355                            conn_id,
356                            max_result_size,
357                            max_query_result_size,
358                        )
359                        .await;
360
361                    let _ = tx.send(result);
362                }
363
364                Command::ExecuteCopyTo {
365                    df_desc,
366                    compute_instance,
367                    target_replica,
368                    source_ids,
369                    conn_id,
370                    tx,
371                } => {
372                    // implement_copy_to spawns a background task that sends the response
373                    // through tx when the COPY TO completes (or immediately if setup fails).
374                    // We just call it and let it handle all response sending.
375                    self.implement_copy_to(
376                        *df_desc,
377                        compute_instance,
378                        target_replica,
379                        source_ids,
380                        conn_id,
381                        tx,
382                    )
383                    .await;
384                }
385            }
386        }
387        .instrument(debug_span!("handle_command"))
388        .boxed_local()
389    }
390
391    fn handle_authenticate_verify_sasl_proof(
392        &self,
393        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
394        role_name: String,
395        proof: String,
396        auth_message: String,
397        mock_hash: String,
398    ) {
399        let role = self.catalog().try_get_role_by_name(role_name.as_str());
400        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
401
402        let login = role
403            .as_ref()
404            .map(|r| r.attributes.login.unwrap_or(false))
405            .unwrap_or(false);
406
407        let real_hash = role_auth
408            .as_ref()
409            .and_then(|auth| auth.password_hash.as_ref());
410        let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
411
412        let role_present = role.is_some();
413        let make_auth_err = |role_present: bool, login: bool| {
414            AdapterError::AuthenticationError(if role_present && !login {
415                AuthenticationError::NonLogin
416            } else {
417                AuthenticationError::InvalidCredentials
418            })
419        };
420
421        match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
422            Ok(verifier) => {
423                // Success only if role exists, allows login, and a real password hash was used.
424                if login && real_hash.is_some() {
425                    let role = role.expect("login implies role exists");
426                    let _ = tx.send(Ok(SASLVerifyProofResponse {
427                        verifier,
428                        auth_resp: AuthResponse {
429                            role_id: role.id,
430                            superuser: role.attributes.superuser.unwrap_or(false),
431                        },
432                    }));
433                } else {
434                    let _ = tx.send(Err(make_auth_err(role_present, login)));
435                }
436            }
437            Err(_) => {
438                let _ = tx.send(Err(AdapterError::AuthenticationError(
439                    AuthenticationError::InvalidCredentials,
440                )));
441            }
442        }
443    }
444
445    #[mz_ore::instrument(level = "debug")]
446    async fn handle_generate_sasl_challenge(
447        &self,
448        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
449        role_name: String,
450        client_nonce: String,
451    ) {
452        let role_auth = self
453            .catalog()
454            .try_get_role_by_name(&role_name)
455            .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
456
457        let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
458            Ok(n) => n,
459            Err(e) => {
460                let msg = format!(
461                    "failed to generate nonce for client nonce {}: {}",
462                    client_nonce, e
463                );
464                let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
465                soft_panic_or_log!("{msg}");
466                return;
467            }
468        };
469
470        // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
471        // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
472        // with the role name to get a per-role mock nonce.
473        let send_mock_challenge =
474            |role_name: String,
475             mock_nonce: String,
476             nonce: String,
477             tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
478                let opts = mz_auth::hash::mock_sasl_challenge(
479                    &role_name,
480                    &mock_nonce,
481                    &self.catalog().system_config().scram_iterations(),
482                );
483                let _ = tx.send(Ok(SASLChallengeResponse {
484                    iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
485                    salt: BASE64_STANDARD.encode(opts.salt),
486                    nonce,
487                }));
488            };
489
490        match role_auth {
491            Some(auth) if auth.password_hash.is_some() => {
492                let hash = auth.password_hash.as_ref().expect("checked above");
493                match mz_auth::hash::scram256_parse_opts(hash) {
494                    Ok(opts) => {
495                        let _ = tx.send(Ok(SASLChallengeResponse {
496                            iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
497                            salt: BASE64_STANDARD.encode(opts.salt),
498                            nonce,
499                        }));
500                    }
501                    Err(_) => {
502                        send_mock_challenge(
503                            role_name,
504                            self.catalog().state().mock_authentication_nonce(),
505                            nonce,
506                            tx,
507                        );
508                    }
509                }
510            }
511            _ => {
512                send_mock_challenge(
513                    role_name,
514                    self.catalog().state().mock_authentication_nonce(),
515                    nonce,
516                    tx,
517                );
518            }
519        }
520    }
521
522    #[mz_ore::instrument(level = "debug")]
523    async fn handle_authenticate_password(
524        &self,
525        tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
526        role_name: String,
527        password: Option<Password>,
528    ) {
529        let Some(password) = password else {
530            // The user did not provide a password.
531            let _ = tx.send(Err(AdapterError::AuthenticationError(
532                AuthenticationError::PasswordRequired,
533            )));
534            return;
535        };
536
537        if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
538            if !role.attributes.login.unwrap_or(false) {
539                // The user is not allowed to login.
540                let _ = tx.send(Err(AdapterError::AuthenticationError(
541                    AuthenticationError::NonLogin,
542                )));
543                return;
544            }
545            if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
546                if let Some(hash) = &auth.password_hash {
547                    let hash = hash.clone();
548                    let role_id = role.id;
549                    let superuser = role.attributes.superuser.unwrap_or(false);
550                    task::spawn_blocking(
551                        || "auth-check-hash",
552                        move || {
553                            let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
554                                Ok(_) => tx.send(Ok(AuthResponse { role_id, superuser })),
555                                Err(_) => tx.send(Err(AdapterError::AuthenticationError(
556                                    AuthenticationError::InvalidCredentials,
557                                ))),
558                            };
559                        },
560                    );
561                    return;
562                }
563            }
564            // Authentication failed due to incorrect password or missing password hash.
565            let _ = tx.send(Err(AdapterError::AuthenticationError(
566                AuthenticationError::InvalidCredentials,
567            )));
568        } else {
569            // The user does not exist.
570            let _ = tx.send(Err(AdapterError::AuthenticationError(
571                AuthenticationError::RoleNotFound,
572            )));
573        }
574    }
575
576    #[mz_ore::instrument(level = "debug")]
577    async fn handle_startup(
578        &mut self,
579        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
580        user: User,
581        conn_id: ConnectionId,
582        secret_key: u32,
583        uuid: uuid::Uuid,
584        client_ip: Option<IpAddr>,
585        application_name: String,
586        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
587    ) {
588        // Early return if successful, otherwise cleanup any possible state.
589        match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
590            Ok((role_id, session_defaults)) => {
591                let session_type = metrics::session_type_label_value(&user);
592                self.metrics
593                    .active_sessions
594                    .with_label_values(&[session_type])
595                    .inc();
596                let conn = ConnMeta {
597                    secret_key,
598                    notice_tx,
599                    drop_sinks: BTreeSet::new(),
600                    pending_cluster_alters: BTreeSet::new(),
601                    connected_at: self.now(),
602                    user,
603                    application_name,
604                    uuid,
605                    client_ip,
606                    conn_id: conn_id.clone(),
607                    authenticated_role: role_id,
608                    deferred_lock: None,
609                };
610                let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
611                let update = self.catalog().state().resolve_builtin_table_update(update);
612                self.begin_session_for_statement_logging(&conn);
613                self.active_conns.insert(conn_id.clone(), conn);
614
615                // Note: Do NOT await the notify here, we pass this back to
616                // whatever requested the startup to prevent blocking startup
617                // and the Coordinator on a builtin table update.
618                let updates = vec![update];
619                // It's not a hard error if our list is missing a builtin table, but we want to
620                // make sure these two things stay in-sync.
621                if mz_ore::assert::soft_assertions_enabled() {
622                    let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
623                        .iter()
624                        .map(|table| self.catalog().resolve_builtin_table(*table))
625                        .collect();
626                    let updates_tracked = updates
627                        .iter()
628                        .all(|update| required_tables.contains(&update.id));
629                    let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
630                        .iter()
631                        .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
632                    mz_ore::soft_assert_or_log!(
633                        updates_tracked,
634                        "not tracking all required builtin table updates!"
635                    );
636                    // TODO(parkmycar): When checking if a query depends on these builtin table
637                    // writes we do not check the transitive dependencies of the query, because
638                    // we don't support creating views on mz_internal objects. If one of these
639                    // tables is promoted out of mz_internal then we'll need to add this check.
640                    mz_ore::soft_assert_or_log!(
641                        all_mz_internal,
642                        "not all builtin tables are in mz_internal! need to check transitive depends",
643                    )
644                }
645                let notify = self.builtin_table_update().background(updates);
646
647                let resp = Ok(StartupResponse {
648                    role_id,
649                    write_notify: notify,
650                    session_defaults,
651                    catalog: self.owned_catalog(),
652                    storage_collections: Arc::clone(&self.controller.storage_collections),
653                    transient_id_gen: Arc::clone(&self.transient_id_gen),
654                    optimizer_metrics: self.optimizer_metrics.clone(),
655                    persist_client: self.persist_client.clone(),
656                });
657                if tx.send(resp).is_err() {
658                    // Failed to send to adapter, but everything is setup so we can terminate
659                    // normally.
660                    self.handle_terminate(conn_id).await;
661                }
662            }
663            Err(e) => {
664                // Error during startup or sending to adapter, cleanup possible state created by
665                // handle_startup_inner. A user may have been created and it can stay; no need to
666                // delete it.
667                self.catalog_mut()
668                    .drop_temporary_schema(&conn_id)
669                    .unwrap_or_terminate("unable to drop temporary schema");
670
671                // Communicate the error back to the client. No need to
672                // handle failures to send the error back; we've already
673                // cleaned up all necessary state.
674                let _ = tx.send(Err(e));
675            }
676        }
677    }
678
679    // Failible startup work that needs to be cleaned up on error.
680    async fn handle_startup_inner(
681        &mut self,
682        user: &User,
683        conn_id: &ConnectionId,
684        client_ip: &Option<IpAddr>,
685    ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
686        if self.catalog().try_get_role_by_name(&user.name).is_none() {
687            // If the user has made it to this point, that means they have been fully authenticated.
688            // This includes preventing any user, except a pre-defined set of system users, from
689            // connecting to an internal port. Therefore it's ok to always create a new role for the
690            // user.
691            let attributes = RoleAttributesRaw::new();
692            let plan = CreateRolePlan {
693                name: user.name.to_string(),
694                attributes,
695            };
696            self.sequence_create_role_for_startup(plan).await?;
697        }
698        let role_id = self
699            .catalog()
700            .try_get_role_by_name(&user.name)
701            .expect("created above")
702            .id;
703
704        if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
705            return Err(AdapterError::UserSessionsDisallowed);
706        }
707
708        // Initialize the default session variables for this role.
709        let mut session_defaults = BTreeMap::new();
710        let system_config = self.catalog().state().system_config();
711
712        // Override the session with any system defaults.
713        session_defaults.extend(
714            system_config
715                .iter_session()
716                .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
717        );
718        // Special case.
719        let statement_logging_default = system_config
720            .statement_logging_default_sample_rate()
721            .format();
722        session_defaults.insert(
723            STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
724            OwnedVarInput::Flat(statement_logging_default),
725        );
726        // Override system defaults with role defaults.
727        session_defaults.extend(
728            self.catalog()
729                .get_role(&role_id)
730                .vars()
731                .map(|(name, val)| (name.to_string(), val.clone())),
732        );
733
734        // Validate network policies for external users. Internal users can only connect on the
735        // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
736        // to ensure these internal interfaces are well secured.
737        //
738        // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
739        // the default network policy for this role, so we read directly out of what the session
740        // will get initialized with.
741        if !user.is_internal() {
742            let network_policy_name = session_defaults
743                .get(NETWORK_POLICY.name())
744                .and_then(|value| match value {
745                    OwnedVarInput::Flat(name) => Some(name.clone()),
746                    OwnedVarInput::SqlSet(names) => {
747                        tracing::error!(?names, "found multiple network policies");
748                        None
749                    }
750                })
751                .unwrap_or_else(|| system_config.default_network_policy_name());
752            let maybe_network_policy = self
753                .catalog()
754                .get_network_policy_by_name(&network_policy_name);
755
756            let Some(network_policy) = maybe_network_policy else {
757                // We should prevent dropping the default network policy, or setting the policy
758                // to something that doesn't exist, so complain loudly if this occurs.
759                tracing::error!(
760                    network_policy_name,
761                    "default network policy does not exist. All user traffic will be blocked"
762                );
763                let reason = match client_ip {
764                    Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
765                    None => super::NetworkPolicyError::MissingIp,
766                };
767                return Err(AdapterError::NetworkPolicyDenied(reason));
768            };
769
770            if let Some(ip) = client_ip {
771                match validate_ip_with_policy_rules(ip, &network_policy.rules) {
772                    Ok(_) => {}
773                    Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
774                }
775            } else {
776                // Only temporary and internal representation of a session
777                // should be missing a client_ip. These sessions should not be
778                // making requests or going through handle_startup.
779                return Err(AdapterError::NetworkPolicyDenied(
780                    super::NetworkPolicyError::MissingIp,
781                ));
782            }
783        }
784
785        self.catalog_mut()
786            .create_temporary_schema(conn_id, role_id)?;
787
788        Ok((role_id, session_defaults))
789    }
790
791    /// Handles an execute command.
792    #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
793    pub(crate) async fn handle_execute(
794        &mut self,
795        portal_name: String,
796        mut session: Session,
797        tx: ClientTransmitter<ExecuteResponse>,
798        // If this command was part of another execute command
799        // (for example, executing a `FETCH` statement causes an execute to be
800        //  issued for the cursor it references),
801        // then `outer_context` should be `Some`.
802        // This instructs the coordinator that the
803        // outer execute should be considered finished once the inner one is.
804        outer_context: Option<ExecuteContextExtra>,
805    ) {
806        if session.vars().emit_trace_id_notice() {
807            let span_context = tracing::Span::current()
808                .context()
809                .span()
810                .span_context()
811                .clone();
812            if span_context.is_valid() {
813                session.add_notice(AdapterNotice::QueryTrace {
814                    trace_id: span_context.trace_id(),
815                });
816            }
817        }
818
819        if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
820            // If statement logging hasn't started yet, we don't need
821            // to add any "end" event, so just make up a no-op
822            // `ExecuteContextExtra` here, via `Default::default`.
823            //
824            // It's a bit unfortunate because the edge case of failed
825            // portal verifications won't show up in statement
826            // logging, but there seems to be nothing else we can do,
827            // because we need access to the portal to begin logging.
828            //
829            // Another option would be to log a begin and end event, but just fill in NULLs
830            // for everything we get from the portal (prepared statement id, params).
831            let extra = outer_context.unwrap_or_else(Default::default);
832            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
833            return ctx.retire(Err(err));
834        }
835
836        // The reference to `portal` can't outlive `session`, which we
837        // use to construct the context, so scope the reference to this block where we
838        // get everything we need from the portal for later.
839        let (stmt, ctx, params) = {
840            let portal = session
841                .get_portal_unverified(&portal_name)
842                .expect("known to exist");
843            let params = portal.parameters.clone();
844            let stmt = portal.stmt.clone();
845            let logging = Arc::clone(&portal.logging);
846            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
847
848            let extra = if let Some(extra) = outer_context {
849                // We are executing in the context of another SQL statement, so we don't
850                // want to begin statement logging anew. The context of the actual statement
851                // being executed is the one that should be retired once this finishes.
852                extra
853            } else {
854                // This is a new statement, log it and return the context
855                let maybe_uuid = self.begin_statement_execution(
856                    &mut session,
857                    &params,
858                    &logging,
859                    lifecycle_timestamps,
860                );
861
862                ExecuteContextExtra::new(maybe_uuid)
863            };
864            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
865            (stmt, ctx, params)
866        };
867
868        let stmt = match stmt {
869            Some(stmt) => stmt,
870            None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
871        };
872
873        let session_type = metrics::session_type_label_value(ctx.session().user());
874        let stmt_type = metrics::statement_type_label_value(&stmt);
875        self.metrics
876            .query_total
877            .with_label_values(&[session_type, stmt_type])
878            .inc();
879        match &*stmt {
880            Statement::Subscribe(SubscribeStatement { output, .. })
881            | Statement::Copy(CopyStatement {
882                relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
883                ..
884            }) => {
885                self.metrics
886                    .subscribe_outputs
887                    .with_label_values(&[
888                        session_type,
889                        metrics::subscribe_output_label_value(output),
890                    ])
891                    .inc();
892            }
893            _ => {}
894        }
895
896        self.handle_execute_inner(stmt, params, ctx).await
897    }
898
899    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
900    pub(crate) async fn handle_execute_inner(
901        &mut self,
902        stmt: Arc<Statement<Raw>>,
903        params: Params,
904        mut ctx: ExecuteContext,
905    ) {
906        // This comment describes the various ways DDL can execute (the ordered operations: name
907        // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
908        // three notable properties that all partially interact.
909        //
910        // 1. Most DDL statements (and a few others) support single-statement transaction delayed
911        //    execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
912        //    We announce success of the single DDL when it is executed, but do not attempt to plan
913        //    or sequence it until `COMMIT`, which is able to error if needed while sequencing the
914        //    DDL (this behavior is Postgres-compatible). The purpose of this is because some
915        //    drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
916        //    work. When the single DDL is announced as successful we also put the session's
917        //    transaction ops into `SingleStatement` which will produce an error if any other
918        //    statement is run in the transaction except `COMMIT`. Additionally, this will cause
919        //    `handle_execute_inner` to stop further processing (no planning, etc.) of the
920        //    statement.
921        // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
922        //    any number of only these DDL statements to be executed in a transaction. At sequencing
923        //    these generate the `Op::TransactionDryRun` catalog op. When applied with
924        //    `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
925        //    `catalog_transact_with_ddl_transaction` function intercepts that error and reports
926        //    success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
927        //    of the ops but without dry run are applied. The purpose of this is to allow multiple,
928        //    atomic renames in the same transaction.
929        // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
930        //    makes network calls (interfacing with secrets, optimization of views/indexes, source
931        //    purification). These must guarantee correctness when they return to the main
932        //    coordinator thread because the catalog state could have changed while they were doing
933        //    the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
934        //    of IDs that we needed to exist. We discovered the way we were doing that was not
935        //    always correct. Instead of attempting to get that completely right, we have opted to
936        //    serialize DDL. Getting this right is difficult because catalog changes can affect name
937        //    resolution, planning, sequencing, and optimization. Correctly writing logic that is
938        //    aware of all possible catalog changes that would affect any of those parts is not
939        //    something our current code has been designed to be helpful at. Even if a DDL statement
940        //    is doing off-thread work, another DDL must not yet execute at all. Executing these
941        //    serially will guarantee that no off-thread work has affected the state of the catalog.
942        //    This is done by adding a VecDeque of deferred statements and a lock to the
943        //    Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
944        //    transaction ops are needed to the session as described above), it attempts to own the
945        //    lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
946        //    struct in `active_conns` and proceeds. The lock is dropped at transaction end in
947        //    `clear_transaction` and a message sent to the Coordinator to execute the next queued
948        //    DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
949        //    awaits dequeuing caused by the lock being released.
950
951        // Verify that this statement type can be executed in the current
952        // transaction state.
953        match ctx.session().transaction() {
954            // By this point we should be in a running transaction.
955            TransactionStatus::Default => unreachable!(),
956
957            // Failed transactions have already been checked in pgwire for a safe statement
958            // (COMMIT, ROLLBACK, etc.) and can proceed.
959            TransactionStatus::Failed(_) => {}
960
961            // Started is a deceptive name, and means different things depending on which
962            // protocol was used. It's either exactly one statement (known because this
963            // is the simple protocol and the parser parsed the entire string, and it had
964            // one statement). Or from the extended protocol, it means *some* query is
965            // being executed, but there might be others after it before the Sync (commit)
966            // message. Postgres handles this by teaching Started to eagerly commit certain
967            // statements that can't be run in a transaction block.
968            TransactionStatus::Started(_) => {
969                if let Statement::Declare(_) = &*stmt {
970                    // Declare is an exception. Although it's not against any spec to execute
971                    // it, it will always result in nothing happening, since all portals will be
972                    // immediately closed. Users don't know this detail, so this error helps them
973                    // understand what's going wrong. Postgres does this too.
974                    return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
975                        "DECLARE CURSOR".into(),
976                    )));
977                }
978            }
979
980            // Implicit or explicit transactions.
981            //
982            // Implicit transactions happen when a multi-statement query is executed
983            // (a "simple query"). However if a "BEGIN" appears somewhere in there,
984            // then the existing implicit transaction will be upgraded to an explicit
985            // transaction. Thus, we should not separate what implicit and explicit
986            // transactions can do unless there's some additional checking to make sure
987            // something disallowed in explicit transactions did not previously take place
988            // in the implicit portion.
989            TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
990                match &*stmt {
991                    // Statements that are safe in a transaction. We still need to verify that we
992                    // don't interleave reads and writes since we can't perform those serializably.
993                    Statement::Close(_)
994                    | Statement::Commit(_)
995                    | Statement::Copy(_)
996                    | Statement::Deallocate(_)
997                    | Statement::Declare(_)
998                    | Statement::Discard(_)
999                    | Statement::Execute(_)
1000                    | Statement::ExplainPlan(_)
1001                    | Statement::ExplainPushdown(_)
1002                    | Statement::ExplainAnalyzeObject(_)
1003                    | Statement::ExplainAnalyzeCluster(_)
1004                    | Statement::ExplainTimestamp(_)
1005                    | Statement::ExplainSinkSchema(_)
1006                    | Statement::Fetch(_)
1007                    | Statement::Prepare(_)
1008                    | Statement::Rollback(_)
1009                    | Statement::Select(_)
1010                    | Statement::SetTransaction(_)
1011                    | Statement::Show(_)
1012                    | Statement::SetVariable(_)
1013                    | Statement::ResetVariable(_)
1014                    | Statement::StartTransaction(_)
1015                    | Statement::Subscribe(_)
1016                    | Statement::Raise(_) => {
1017                        // Always safe.
1018                    }
1019
1020                    Statement::Insert(InsertStatement {
1021                        source, returning, ..
1022                    }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1023                        // Inserting from constant values statements that do not need to execute on
1024                        // any cluster (no RETURNING) is always safe.
1025                    }
1026
1027                    // These statements must be kept in-sync with `must_serialize_ddl()`.
1028                    Statement::AlterObjectRename(_)
1029                    | Statement::AlterObjectSwap(_)
1030                    | Statement::CreateTableFromSource(_)
1031                    | Statement::CreateSource(_) => {
1032                        let state = self.catalog().for_session(ctx.session()).state().clone();
1033                        let revision = self.catalog().transient_revision();
1034
1035                        // Initialize our transaction with a set of empty ops, or return an error
1036                        // if we can't run a DDL transaction
1037                        let txn_status = ctx.session_mut().transaction_mut();
1038                        if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1039                            ops: vec![],
1040                            state,
1041                            revision,
1042                            side_effects: vec![],
1043                        }) {
1044                            return ctx.retire(Err(err));
1045                        }
1046                    }
1047
1048                    // Statements below must by run singly (in Started).
1049                    Statement::AlterCluster(_)
1050                    | Statement::AlterConnection(_)
1051                    | Statement::AlterDefaultPrivileges(_)
1052                    | Statement::AlterIndex(_)
1053                    | Statement::AlterMaterializedViewApplyReplacement(_)
1054                    | Statement::AlterSetCluster(_)
1055                    | Statement::AlterOwner(_)
1056                    | Statement::AlterRetainHistory(_)
1057                    | Statement::AlterRole(_)
1058                    | Statement::AlterSecret(_)
1059                    | Statement::AlterSink(_)
1060                    | Statement::AlterSource(_)
1061                    | Statement::AlterSystemReset(_)
1062                    | Statement::AlterSystemResetAll(_)
1063                    | Statement::AlterSystemSet(_)
1064                    | Statement::AlterTableAddColumn(_)
1065                    | Statement::AlterNetworkPolicy(_)
1066                    | Statement::CreateCluster(_)
1067                    | Statement::CreateClusterReplica(_)
1068                    | Statement::CreateConnection(_)
1069                    | Statement::CreateDatabase(_)
1070                    | Statement::CreateIndex(_)
1071                    | Statement::CreateMaterializedView(_)
1072                    | Statement::CreateContinualTask(_)
1073                    | Statement::CreateRole(_)
1074                    | Statement::CreateSchema(_)
1075                    | Statement::CreateSecret(_)
1076                    | Statement::CreateSink(_)
1077                    | Statement::CreateSubsource(_)
1078                    | Statement::CreateTable(_)
1079                    | Statement::CreateType(_)
1080                    | Statement::CreateView(_)
1081                    | Statement::CreateWebhookSource(_)
1082                    | Statement::CreateNetworkPolicy(_)
1083                    | Statement::Delete(_)
1084                    | Statement::DropObjects(_)
1085                    | Statement::DropOwned(_)
1086                    | Statement::GrantPrivileges(_)
1087                    | Statement::GrantRole(_)
1088                    | Statement::Insert(_)
1089                    | Statement::ReassignOwned(_)
1090                    | Statement::RevokePrivileges(_)
1091                    | Statement::RevokeRole(_)
1092                    | Statement::Update(_)
1093                    | Statement::ValidateConnection(_)
1094                    | Statement::Comment(_) => {
1095                        let txn_status = ctx.session_mut().transaction_mut();
1096
1097                        // If we're not in an implicit transaction and we could generate exactly one
1098                        // valid ExecuteResponse, we can delay execution until commit.
1099                        if !txn_status.is_implicit() {
1100                            // Statements whose tag is trivial (known only from an unexecuted statement) can
1101                            // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1102                            // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1103                            // delay execution until `COMMIT`.
1104                            if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1105                                if let Err(err) = txn_status
1106                                    .add_ops(TransactionOps::SingleStatement { stmt, params })
1107                                {
1108                                    ctx.retire(Err(err));
1109                                    return;
1110                                }
1111                                ctx.retire(Ok(resp));
1112                                return;
1113                            }
1114                        }
1115
1116                        return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1117                            stmt.to_string(),
1118                        )));
1119                    }
1120                }
1121            }
1122        }
1123
1124        // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1125        // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1126        // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1127        // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1128        // Coordinator::clear_transaction is correctly called when session transactions are ended
1129        // because that function will release the held lock from active_conns.
1130        if Self::must_serialize_ddl(&stmt, &ctx) {
1131            if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1132                let prev = self
1133                    .active_conns
1134                    .get_mut(ctx.session().conn_id())
1135                    .expect("connection must exist")
1136                    .deferred_lock
1137                    .replace(guard);
1138                assert!(
1139                    prev.is_none(),
1140                    "connections should have at most one lock guard"
1141                );
1142            } else {
1143                if self
1144                    .active_conns
1145                    .get(ctx.session().conn_id())
1146                    .expect("connection must exist")
1147                    .deferred_lock
1148                    .is_some()
1149                {
1150                    // This session *already* has the lock, and incorrectly tried to execute another
1151                    // DDL while still holding the lock, violating the assumption documented above.
1152                    // This is an internal error, probably in some AdapterClient user (pgwire or
1153                    // http). Because the session is now in some unexpected state, return an error
1154                    // which should cause the AdapterClient user to fail the transaction.
1155                    // (Terminating the connection is maybe what we would prefer to do, but is not
1156                    // currently a thing we can do from the coordinator: calling handle_terminate
1157                    // cleans up Coordinator state for the session but doesn't inform the
1158                    // AdapterClient that the session should terminate.)
1159                    soft_panic_or_log!(
1160                        "session {} attempted to get ddl lock while already owning it",
1161                        ctx.session().conn_id()
1162                    );
1163                    ctx.retire(Err(AdapterError::Internal(
1164                        "session attempted to get ddl lock while already owning it".to_string(),
1165                    )));
1166                    return;
1167                }
1168                self.serialized_ddl.push_back(DeferredPlanStatement {
1169                    ctx,
1170                    ps: PlanStatement::Statement { stmt, params },
1171                });
1172                return;
1173            }
1174        }
1175
1176        let catalog = self.catalog();
1177        let catalog = catalog.for_session(ctx.session());
1178        let original_stmt = Arc::clone(&stmt);
1179        // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1180        // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1181        let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1182            Ok(resolved) => resolved,
1183            Err(e) => return ctx.retire(Err(e.into())),
1184        };
1185        // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1186        // purification.  This should be done back on the main thread.
1187        // We do the validation:
1188        //   - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1189        // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1190        // occurs.
1191        let (stmt, resolved_ids) = match stmt {
1192            // Various statements must be purified off the main coordinator thread of control.
1193            stmt if Self::must_spawn_purification(&stmt) => {
1194                let internal_cmd_tx = self.internal_cmd_tx.clone();
1195                let conn_id = ctx.session().conn_id().clone();
1196                let catalog = self.owned_catalog();
1197                let now = self.now();
1198                let otel_ctx = OpenTelemetryContext::obtain();
1199                let current_storage_configuration = self.controller.storage.config().clone();
1200                task::spawn(|| format!("purify:{conn_id}"), async move {
1201                    let transient_revision = catalog.transient_revision();
1202                    let catalog = catalog.for_session(ctx.session());
1203
1204                    // Checks if the session is authorized to purify a statement. Usually
1205                    // authorization is checked after planning, however purification happens before
1206                    // planning, which may require the use of some connections and secrets.
1207                    if let Err(e) = rbac::check_usage(
1208                        &catalog,
1209                        ctx.session(),
1210                        &resolved_ids,
1211                        &CREATE_ITEM_USAGE,
1212                    ) {
1213                        return ctx.retire(Err(e.into()));
1214                    }
1215
1216                    let (result, cluster_id) = mz_sql::pure::purify_statement(
1217                        catalog,
1218                        now,
1219                        stmt,
1220                        &current_storage_configuration,
1221                    )
1222                    .await;
1223                    let result = result.map_err(|e| e.into());
1224                    let dependency_ids = resolved_ids.items().copied().collect();
1225                    let plan_validity = PlanValidity::new(
1226                        transient_revision,
1227                        dependency_ids,
1228                        cluster_id,
1229                        None,
1230                        ctx.session().role_metadata().clone(),
1231                    );
1232                    // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1233                    let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1234                        PurifiedStatementReady {
1235                            ctx,
1236                            result,
1237                            params,
1238                            plan_validity,
1239                            original_stmt,
1240                            otel_ctx,
1241                        },
1242                    ));
1243                    if let Err(e) = result {
1244                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1245                    }
1246                });
1247                return;
1248            }
1249
1250            // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1251            // automatically as part of purification
1252            Statement::CreateSubsource(_) => {
1253                ctx.retire(Err(AdapterError::Unsupported(
1254                    "CREATE SUBSOURCE statements",
1255                )));
1256                return;
1257            }
1258
1259            Statement::CreateMaterializedView(mut cmvs) => {
1260                // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1261                // only used for storing initial frontiers in the catalog.
1262                if cmvs.as_of.is_some() {
1263                    return ctx.retire(Err(AdapterError::Unsupported(
1264                        "CREATE MATERIALIZED VIEW ... AS OF statements",
1265                    )));
1266                }
1267
1268                let mz_now = match self
1269                    .resolve_mz_now_for_create_materialized_view(
1270                        &cmvs,
1271                        &resolved_ids,
1272                        ctx.session_mut(),
1273                        true,
1274                    )
1275                    .await
1276                {
1277                    Ok(mz_now) => mz_now,
1278                    Err(e) => return ctx.retire(Err(e)),
1279                };
1280
1281                let catalog = self.catalog().for_session(ctx.session());
1282
1283                purify_create_materialized_view_options(
1284                    catalog,
1285                    mz_now,
1286                    &mut cmvs,
1287                    &mut resolved_ids,
1288                );
1289
1290                let purified_stmt =
1291                    Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1292                        if_exists: cmvs.if_exists,
1293                        name: cmvs.name,
1294                        columns: cmvs.columns,
1295                        replacing: cmvs.replacing,
1296                        in_cluster: cmvs.in_cluster,
1297                        query: cmvs.query,
1298                        with_options: cmvs.with_options,
1299                        as_of: None,
1300                    });
1301
1302                // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1303                // `Message::PurifiedStatementReady` here.)
1304                (purified_stmt, resolved_ids)
1305            }
1306
1307            Statement::ExplainPlan(ExplainPlanStatement {
1308                stage,
1309                with_options,
1310                format,
1311                explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1312            }) => {
1313                let mut cmvs = *box_cmvs;
1314                let mz_now = match self
1315                    .resolve_mz_now_for_create_materialized_view(
1316                        &cmvs,
1317                        &resolved_ids,
1318                        ctx.session_mut(),
1319                        false,
1320                    )
1321                    .await
1322                {
1323                    Ok(mz_now) => mz_now,
1324                    Err(e) => return ctx.retire(Err(e)),
1325                };
1326
1327                let catalog = self.catalog().for_session(ctx.session());
1328
1329                purify_create_materialized_view_options(
1330                    catalog,
1331                    mz_now,
1332                    &mut cmvs,
1333                    &mut resolved_ids,
1334                );
1335
1336                let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1337                    stage,
1338                    with_options,
1339                    format,
1340                    explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1341                });
1342
1343                (purified_stmt, resolved_ids)
1344            }
1345
1346            // All other statements are handled immediately.
1347            _ => (stmt, resolved_ids),
1348        };
1349
1350        match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
1351            Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1352            Err(e) => ctx.retire(Err(e)),
1353        }
1354    }
1355
1356    /// Whether the statement must be serialized and is DDL.
1357    fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1358        // Non-DDL is not serialized here.
1359        if !StatementClassification::from(&*stmt).is_ddl() {
1360            return false;
1361        }
1362        // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1363        // not be serialized. These all use PlanValidity for their checking, and we must ensure
1364        // those checks are sufficient.
1365        if Self::must_spawn_purification(stmt) {
1366            return false;
1367        }
1368
1369        // Statements that support multiple DDLs in a single transaction aren't serialized here.
1370        // Their operations are serialized when applied to the catalog, guaranteeing that any
1371        // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1372        if ctx.session.transaction().is_ddl() {
1373            return false;
1374        }
1375
1376        // Some DDL is exempt. It is not great that we are matching on Statements here because
1377        // different plans can be produced from the same top-level statement type (i.e., `ALTER
1378        // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1379        // planned in the first place, so we accept the abstraction leak.
1380        match stmt {
1381            // Secrets have a small and understood set of dependencies, and their off-thread work
1382            // interacts with k8s.
1383            Statement::AlterSecret(_) => false,
1384            Statement::CreateSecret(_) => false,
1385            Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1386                if actions
1387                    .iter()
1388                    .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1389            {
1390                false
1391            }
1392
1393            // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1394            // does not affect its catalog names or ids and so is safe to not serialize. This could
1395            // change the set of replicas that exist. For queries that name replicas or use the
1396            // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1397            // ensure that those replicas exist during the query finish stage. Additionally, that
1398            // work can take hours (configured by the user), so would also be a bad experience for
1399            // users.
1400            Statement::AlterCluster(_) => false,
1401
1402            // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1403            // cutover. If the old collection is stalled, it may block forever. Checks in
1404            // sequencing ensure that the operation fails if any one of these happens concurrently:
1405            //   * the sink is dropped
1406            //   * the new source relation is dropped
1407            //   * another `ALTER SINK` for the same sink is applied first
1408            Statement::AlterSink(stmt)
1409                if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1410            {
1411                false
1412            }
1413
1414            // Everything else must be serialized.
1415            _ => true,
1416        }
1417    }
1418
1419    /// Whether the statement must be purified off of the Coordinator thread.
1420    fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1421        // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1422        // coordinator thread.
1423        if !matches!(
1424            stmt,
1425            Statement::CreateSource(_)
1426                | Statement::AlterSource(_)
1427                | Statement::CreateSink(_)
1428                | Statement::CreateTableFromSource(_)
1429        ) {
1430            return false;
1431        }
1432
1433        // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1434        if let Statement::AlterSource(stmt) = stmt {
1435            let names: Vec<CreateSourceOptionName> = match &stmt.action {
1436                AlterSourceAction::SetOptions(options) => {
1437                    options.iter().map(|o| o.name.clone()).collect()
1438                }
1439                AlterSourceAction::ResetOptions(names) => names.clone(),
1440                _ => vec![],
1441            };
1442            if !names.is_empty()
1443                && names
1444                    .iter()
1445                    .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1446            {
1447                return false;
1448            }
1449        }
1450
1451        true
1452    }
1453
1454    /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1455    /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1456    /// option, this function grabs read holds at the earliest possible time on input collections
1457    /// that might be involved in the MV.
1458    ///
1459    /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1460    /// in `with_options`).
1461    ///
1462    /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1463    /// unfortunately.)
1464    async fn resolve_mz_now_for_create_materialized_view(
1465        &mut self,
1466        cmvs: &CreateMaterializedViewStatement<Aug>,
1467        resolved_ids: &ResolvedIds,
1468        session: &Session,
1469        acquire_read_holds: bool,
1470    ) -> Result<Option<Timestamp>, AdapterError> {
1471        if cmvs
1472            .with_options
1473            .iter()
1474            .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1475        {
1476            let catalog = self.catalog().for_session(session);
1477            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1478            let ids = self
1479                .index_oracle(cluster)
1480                .sufficient_collections(resolved_ids.collections().copied());
1481
1482            // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1483            // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1484            // we want to check the AT times against the read holds that we acquire here. But
1485            // we do it for any REFRESH option, to avoid having so many code paths doing different
1486            // things.)
1487            //
1488            // It's important that we acquire read holds _before_ we determine the least valid read.
1489            // Otherwise, we're not guaranteed that the since frontier doesn't
1490            // advance forward from underneath us.
1491            let read_holds = self.acquire_read_holds(&ids);
1492
1493            // Does `mz_now()` occur?
1494            let mz_now_ts = if cmvs
1495                .with_options
1496                .iter()
1497                .any(materialized_view_option_contains_temporal)
1498            {
1499                let timeline_context = self
1500                    .catalog()
1501                    .validate_timeline_context(resolved_ids.collections().copied())?;
1502
1503                // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1504                // but even in the TimestampIndependent case.
1505                // Note that we didn't accurately decide whether we are TimestampDependent
1506                // or TimestampIndependent, because for this we'd need to also check whether
1507                // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1508                // However, this doesn't matter here, as we are just going to default to
1509                // EpochMilliseconds in both cases.
1510                let timeline = timeline_context
1511                    .timeline()
1512                    .unwrap_or(&Timeline::EpochMilliseconds);
1513
1514                // Let's start with the timestamp oracle read timestamp.
1515                let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1516
1517                // If `least_valid_read` is later than the oracle, then advance to that time.
1518                // If we didn't do this, then there would be a danger of missing the first refresh,
1519                // which might cause the materialized view to be unreadable for hours. This might
1520                // be what was happening here:
1521                // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1522                //
1523                // In the long term, it would be good to actually block the MV creation statement
1524                // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1525                // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1526                // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1527                // after its creation might see input changes that happened after the CRATE MATERIALIZED
1528                // VIEW statement returned.
1529                let oracle_timestamp = timestamp;
1530                let least_valid_read = read_holds.least_valid_read();
1531                timestamp.advance_by(least_valid_read.borrow());
1532
1533                if oracle_timestamp != timestamp {
1534                    warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1535                }
1536
1537                info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1538                Ok(Some(timestamp))
1539            } else {
1540                Ok(None)
1541            };
1542
1543            // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1544            // released when we don't use it.
1545            if acquire_read_holds {
1546                self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1547            }
1548
1549            mz_now_ts
1550        } else {
1551            Ok(None)
1552        }
1553    }
1554
1555    /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1556    /// the named `conn_id` if the correct secret key is specified.
1557    ///
1558    /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1559    /// `ConnectionId` because this method gets called by external clients when
1560    /// they request to cancel a request.
1561    #[mz_ore::instrument(level = "debug")]
1562    async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1563        if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1564            // If the secret key specified by the client doesn't match the
1565            // actual secret key for the target connection, we treat this as a
1566            // rogue cancellation request and ignore it.
1567            if conn_meta.secret_key != secret_key {
1568                return;
1569            }
1570
1571            // Now that we've verified the secret key, this is a privileged
1572            // cancellation request. We can upgrade the raw connection ID to a
1573            // proper `IdHandle`.
1574            self.handle_privileged_cancel(id_handle.clone()).await;
1575        }
1576    }
1577
1578    /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1579    /// interactive work for the named `conn_id`.
1580    #[mz_ore::instrument(level = "debug")]
1581    pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1582        let mut maybe_ctx = None;
1583
1584        // Cancel pending writes. There is at most one pending write per session.
1585        if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1586            matches!(pending_write_txn, PendingWriteTxn::User {
1587                pending_txn: PendingTxn { ctx, .. },
1588                ..
1589            } if *ctx.session().conn_id() == conn_id)
1590        }) {
1591            if let PendingWriteTxn::User {
1592                pending_txn: PendingTxn { ctx, .. },
1593                ..
1594            } = self.pending_writes.remove(idx)
1595            {
1596                maybe_ctx = Some(ctx);
1597            }
1598        }
1599
1600        // Cancel deferred writes.
1601        if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1602            maybe_ctx = Some(write_op.into_ctx());
1603        }
1604
1605        // Cancel deferred statements.
1606        if let Some(idx) = self
1607            .serialized_ddl
1608            .iter()
1609            .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1610        {
1611            let deferred = self
1612                .serialized_ddl
1613                .remove(idx)
1614                .expect("known to exist from call to `position` above");
1615            maybe_ctx = Some(deferred.ctx);
1616        }
1617
1618        // Cancel reads waiting on being linearized. There is at most one linearized read per
1619        // session.
1620        if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1621            let ctx = pending_read_txn.take_context();
1622            maybe_ctx = Some(ctx);
1623        }
1624
1625        if let Some(ctx) = maybe_ctx {
1626            ctx.retire(Err(AdapterError::Canceled));
1627        }
1628
1629        self.cancel_pending_peeks(&conn_id);
1630        self.cancel_pending_watchsets(&conn_id);
1631        self.cancel_compute_sinks_for_conn(&conn_id).await;
1632        self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1633            .await;
1634        self.cancel_pending_copy(&conn_id);
1635        if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1636            let _ = tx.send(true);
1637        }
1638    }
1639
1640    /// Handle termination of a client session.
1641    ///
1642    /// This cleans up any state in the coordinator associated with the session.
1643    #[mz_ore::instrument(level = "debug")]
1644    async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1645        if !self.active_conns.contains_key(&conn_id) {
1646            // If the session doesn't exist in `active_conns`, then this method will panic later on.
1647            // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1648            // debug. This panic is very infrequent so we want as much information as possible.
1649            // See https://github.com/MaterializeInc/database-issues/issues/5627.
1650            panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1651        }
1652
1653        // We do not need to call clear_transaction here because there are no side effects to run
1654        // based on any session transaction state.
1655        self.clear_connection(&conn_id).await;
1656
1657        self.drop_temp_items(&conn_id).await;
1658        self.catalog_mut()
1659            .drop_temporary_schema(&conn_id)
1660            .unwrap_or_terminate("unable to drop temporary schema");
1661        let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1662        let session_type = metrics::session_type_label_value(conn.user());
1663        self.metrics
1664            .active_sessions
1665            .with_label_values(&[session_type])
1666            .dec();
1667        self.cancel_pending_peeks(conn.conn_id());
1668        self.cancel_pending_watchsets(&conn_id);
1669        self.cancel_pending_copy(&conn_id);
1670        self.end_session_for_statement_logging(conn.uuid());
1671
1672        // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1673        // this to prevent blocking the Coordinator in the case that a lot of connections are
1674        // closed at once, which occurs regularly in some workflows.
1675        let update = self
1676            .catalog()
1677            .state()
1678            .pack_session_update(&conn, Diff::MINUS_ONE);
1679        let update = self.catalog().state().resolve_builtin_table_update(update);
1680
1681        let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1682    }
1683
1684    /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1685    /// rows.
1686    #[mz_ore::instrument(level = "debug")]
1687    fn handle_get_webhook(
1688        &mut self,
1689        database: String,
1690        schema: String,
1691        name: String,
1692        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1693    ) {
1694        /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1695        ///
1696        /// Returns a struct that can be used to append data to the underlying storate collection, and the
1697        /// types we should cast the request to.
1698        fn resolve(
1699            coord: &mut Coordinator,
1700            database: String,
1701            schema: String,
1702            name: String,
1703        ) -> Result<AppendWebhookResponse, PartialItemName> {
1704            // Resolve our collection.
1705            let name = PartialItemName {
1706                database: Some(database),
1707                schema: Some(schema),
1708                item: name,
1709            };
1710            let Ok(entry) = coord
1711                .catalog()
1712                .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1713            else {
1714                return Err(name);
1715            };
1716
1717            // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1718            let (data_source, desc, global_id) = match entry.item() {
1719                CatalogItem::Source(Source {
1720                    data_source: data_source @ DataSourceDesc::Webhook { .. },
1721                    desc,
1722                    global_id,
1723                    ..
1724                }) => (data_source, desc.clone(), *global_id),
1725                CatalogItem::Table(
1726                    table @ Table {
1727                        desc,
1728                        data_source:
1729                            TableDataSource::DataSource {
1730                                desc: data_source @ DataSourceDesc::Webhook { .. },
1731                                ..
1732                            },
1733                        ..
1734                    },
1735                ) => (data_source, desc.latest(), table.global_id_writes()),
1736                _ => return Err(name),
1737            };
1738
1739            let DataSourceDesc::Webhook {
1740                validate_using,
1741                body_format,
1742                headers,
1743                ..
1744            } = data_source
1745            else {
1746                mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1747                return Err(name);
1748            };
1749            let body_format = body_format.clone();
1750            let header_tys = headers.clone();
1751
1752            // Assert we have one column for the body, and how ever many are required for
1753            // the headers.
1754            let num_columns = headers.num_columns() + 1;
1755            mz_ore::soft_assert_or_log!(
1756                desc.arity() <= num_columns,
1757                "expected at most {} columns, but got {}",
1758                num_columns,
1759                desc.arity()
1760            );
1761
1762            // Double check that the body column of the webhook source matches the type
1763            // we're about to deserialize as.
1764            let body_column = desc
1765                .get_by_name(&"body".into())
1766                .map(|(_idx, ty)| ty.clone())
1767                .ok_or_else(|| name.clone())?;
1768            assert!(!body_column.nullable, "webhook body column is nullable!?");
1769            assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1770
1771            // Create a validator that can be called to validate a webhook request.
1772            let validator = validate_using.as_ref().map(|v| {
1773                let validation = v.clone();
1774                AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1775            });
1776
1777            // Get a channel so we can queue updates to be written.
1778            let row_tx = coord
1779                .controller
1780                .storage
1781                .monotonic_appender(global_id)
1782                .map_err(|_| name.clone())?;
1783            let stats = coord
1784                .controller
1785                .storage
1786                .webhook_statistics(global_id)
1787                .map_err(|_| name)?;
1788            let invalidator = coord
1789                .active_webhooks
1790                .entry(entry.id())
1791                .or_insert_with(WebhookAppenderInvalidator::new);
1792            let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1793
1794            Ok(AppendWebhookResponse {
1795                tx,
1796                body_format,
1797                header_tys,
1798                validator,
1799            })
1800        }
1801
1802        let response = resolve(self, database, schema, name).map_err(|name| {
1803            AppendWebhookError::UnknownWebhook {
1804                database: name.database.expect("provided"),
1805                schema: name.schema.expect("provided"),
1806                name: name.item,
1807            }
1808        });
1809        let _ = tx.send(response);
1810    }
1811}