Skip to main content

mz_adapter/coord/
command_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for  processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34    AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
35    ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
36    SubscribeStatement,
37};
38use mz_sql::catalog::RoleAttributesRaw;
39use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
40use mz_sql::plan::{
41    AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
42    StatementClassification, TransactionType,
43};
44use mz_sql::pure::{
45    materialized_view_option_contains_temporal, purify_create_materialized_view_options,
46};
47use mz_sql::rbac;
48use mz_sql::rbac::CREATE_ITEM_USAGE;
49use mz_sql::session::user::User;
50use mz_sql::session::vars::{
51    EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
52};
53use mz_sql_parser::ast::display::AstDisplay;
54use mz_sql_parser::ast::{
55    CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
56    WithOptionValue,
57};
58use mz_storage_types::sources::Timeline;
59use opentelemetry::trace::TraceContextExt;
60use tokio::sync::{mpsc, oneshot};
61use tracing::{Instrument, debug_span, info, warn};
62use tracing_opentelemetry::OpenTelemetrySpanExt;
63use uuid::Uuid;
64
65use crate::command::{
66    CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
67    SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
68};
69use crate::coord::appends::PendingWriteTxn;
70use crate::coord::peek::PendingPeek;
71use crate::coord::{
72    ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
73    PurifiedStatementReady, validate_ip_with_policy_rules,
74};
75use crate::error::{AdapterError, AuthenticationError};
76use crate::notice::AdapterNotice;
77use crate::session::{Session, TransactionOps, TransactionStatus};
78use crate::statement_logging::WatchSetCreation;
79use crate::util::{ClientTransmitter, ResultExt};
80use crate::webhook::{
81    AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
82};
83use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
84
85use super::ExecuteContextGuard;
86
87impl Coordinator {
88    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
89    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
90    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
91    pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
92        async move {
93            if let Some(session) = cmd.session_mut() {
94                session.apply_external_metadata_updates();
95            }
96            match cmd {
97                Command::Startup {
98                    tx,
99                    user,
100                    conn_id,
101                    secret_key,
102                    uuid,
103                    client_ip,
104                    application_name,
105                    notice_tx,
106                } => {
107                    // Note: We purposefully do not use a ClientTransmitter here because startup
108                    // handles errors and cleanup of sessions itself.
109                    self.handle_startup(
110                        tx,
111                        user,
112                        conn_id,
113                        secret_key,
114                        uuid,
115                        client_ip,
116                        application_name,
117                        notice_tx,
118                    )
119                    .await;
120                }
121
122                Command::AuthenticatePassword {
123                    tx,
124                    role_name,
125                    password,
126                } => {
127                    self.handle_authenticate_password(tx, role_name, password)
128                        .await;
129                }
130
131                Command::AuthenticateGetSASLChallenge {
132                    tx,
133                    role_name,
134                    nonce,
135                } => {
136                    self.handle_generate_sasl_challenge(tx, role_name, nonce)
137                        .await;
138                }
139
140                Command::AuthenticateVerifySASLProof {
141                    tx,
142                    role_name,
143                    proof,
144                    mock_hash,
145                    auth_message,
146                } => {
147                    self.handle_authenticate_verify_sasl_proof(
148                        tx,
149                        role_name,
150                        proof,
151                        auth_message,
152                        mock_hash,
153                    );
154                }
155
156                Command::Execute {
157                    portal_name,
158                    session,
159                    tx,
160                    outer_ctx_extra,
161                } => {
162                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
163
164                    self.handle_execute(portal_name, session, tx, outer_ctx_extra)
165                        .await;
166                }
167
168                Command::StartCopyFromStdin {
169                    target_id,
170                    target_name,
171                    columns,
172                    row_desc,
173                    params,
174                    session,
175                    tx,
176                } => {
177                    let otel_ctx = OpenTelemetryContext::obtain();
178                    let result = self.setup_copy_from_stdin(
179                        &session,
180                        target_id,
181                        target_name,
182                        columns,
183                        row_desc,
184                        params,
185                    );
186                    let _ = tx.send(Response {
187                        result,
188                        session,
189                        otel_ctx,
190                    });
191                }
192
193                Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
194
195                Command::CancelRequest {
196                    conn_id,
197                    secret_key,
198                } => {
199                    self.handle_cancel(conn_id, secret_key).await;
200                }
201
202                Command::PrivilegedCancelRequest { conn_id } => {
203                    self.handle_privileged_cancel(conn_id).await;
204                }
205
206                Command::GetWebhook {
207                    database,
208                    schema,
209                    name,
210                    tx,
211                } => {
212                    self.handle_get_webhook(database, schema, name, tx);
213                }
214
215                Command::GetSystemVars { tx } => {
216                    let _ = tx.send(self.catalog.system_config().clone());
217                }
218
219                Command::SetSystemVars { vars, conn_id, tx } => {
220                    let mut ops = Vec::with_capacity(vars.len());
221                    let conn = &self.active_conns[&conn_id];
222
223                    for (name, value) in vars {
224                        if let Err(e) =
225                            self.catalog().system_config().get(&name).and_then(|var| {
226                                var.visible(conn.user(), self.catalog.system_config())
227                            })
228                        {
229                            let _ = tx.send(Err(e.into()));
230                            return;
231                        }
232
233                        ops.push(catalog::Op::UpdateSystemConfiguration {
234                            name,
235                            value: OwnedVarInput::Flat(value),
236                        });
237                    }
238
239                    let result = self
240                        .catalog_transact_with_context(Some(&conn_id), None, ops)
241                        .await;
242                    let _ = tx.send(result);
243                }
244
245                Command::Terminate { conn_id, tx } => {
246                    self.handle_terminate(conn_id).await;
247                    // Note: We purposefully do not use a ClientTransmitter here because we're already
248                    // terminating the provided session.
249                    if let Some(tx) = tx {
250                        let _ = tx.send(Ok(()));
251                    }
252                }
253
254                Command::Commit {
255                    action,
256                    session,
257                    tx,
258                } => {
259                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
260                    // We reach here not through a statement execution, but from the
261                    // "commit" pgwire command. Thus, we just generate a default statement
262                    // execution context (once statement logging is implemented, this will cause nothing to be logged
263                    // when the execution finishes.)
264                    let ctx = ExecuteContext::from_parts(
265                        tx,
266                        self.internal_cmd_tx.clone(),
267                        session,
268                        Default::default(),
269                    );
270                    let plan = match action {
271                        EndTransactionAction::Commit => {
272                            Plan::CommitTransaction(CommitTransactionPlan {
273                                transaction_type: TransactionType::Implicit,
274                            })
275                        }
276                        EndTransactionAction::Rollback => {
277                            Plan::AbortTransaction(AbortTransactionPlan {
278                                transaction_type: TransactionType::Implicit,
279                            })
280                        }
281                    };
282
283                    let conn_id = ctx.session().conn_id().clone();
284                    self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
285                    // Part of the Command::Commit contract is that the Coordinator guarantees that
286                    // it has cleared its transaction state for the connection.
287                    self.clear_connection(&conn_id).await;
288                }
289
290                Command::CatalogSnapshot { tx } => {
291                    let _ = tx.send(CatalogSnapshot {
292                        catalog: self.owned_catalog(),
293                    });
294                }
295
296                Command::CheckConsistency { tx } => {
297                    let _ = tx.send(self.check_consistency());
298                }
299
300                Command::Dump { tx } => {
301                    let _ = tx.send(self.dump().await);
302                }
303
304                Command::GetComputeInstanceClient { instance_id, tx } => {
305                    let _ = tx.send(self.controller.compute.instance_client(instance_id));
306                }
307
308                Command::GetOracle { timeline, tx } => {
309                    let oracle = self
310                        .global_timelines
311                        .get(&timeline)
312                        .map(|timeline_state| Arc::clone(&timeline_state.oracle))
313                        .ok_or(AdapterError::ChangedPlan(
314                            "timeline has disappeared during planning".to_string(),
315                        ));
316                    let _ = tx.send(oracle);
317                }
318
319                Command::DetermineRealTimeRecentTimestamp {
320                    source_ids,
321                    real_time_recency_timeout,
322                    tx,
323                } => {
324                    let result = self
325                        .determine_real_time_recent_timestamp(
326                            source_ids.iter().copied(),
327                            real_time_recency_timeout,
328                        )
329                        .await;
330
331                    match result {
332                        Ok(Some(fut)) => {
333                            task::spawn(|| "determine real time recent timestamp", async move {
334                                let result = fut.await.map(Some).map_err(AdapterError::from);
335                                let _ = tx.send(result);
336                            });
337                        }
338                        Ok(None) => {
339                            let _ = tx.send(Ok(None));
340                        }
341                        Err(e) => {
342                            let _ = tx.send(Err(e));
343                        }
344                    }
345                }
346
347                Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
348                    let read_holds = self.txn_read_holds.get(&conn_id).cloned();
349                    let _ = tx.send(read_holds);
350                }
351
352                Command::StoreTransactionReadHolds {
353                    conn_id,
354                    read_holds,
355                    tx,
356                } => {
357                    self.store_transaction_read_holds(conn_id, read_holds);
358                    let _ = tx.send(());
359                }
360
361                Command::ExecuteSlowPathPeek {
362                    dataflow_plan,
363                    determination,
364                    finishing,
365                    compute_instance,
366                    target_replica,
367                    intermediate_result_type,
368                    source_ids,
369                    conn_id,
370                    max_result_size,
371                    max_query_result_size,
372                    watch_set,
373                    tx,
374                } => {
375                    let result = self
376                        .implement_slow_path_peek(
377                            *dataflow_plan,
378                            determination,
379                            finishing,
380                            compute_instance,
381                            target_replica,
382                            intermediate_result_type,
383                            source_ids,
384                            conn_id,
385                            max_result_size,
386                            max_query_result_size,
387                            watch_set,
388                        )
389                        .await;
390                    let _ = tx.send(result);
391                }
392
393                Command::CopyToPreflight {
394                    s3_sink_connection,
395                    sink_id,
396                    tx,
397                } => {
398                    // Spawn a background task to perform the slow S3 preflight operations.
399                    // This avoids blocking the coordinator's main task.
400                    let connection_context = self.connection_context().clone();
401                    task::spawn(|| "copy_to_preflight", async move {
402                        let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
403                            connection_context,
404                            &s3_sink_connection.aws_connection,
405                            &s3_sink_connection.upload_info,
406                            s3_sink_connection.connection_id,
407                            sink_id,
408                        )
409                        .await
410                        .map_err(AdapterError::from);
411                        let _ = tx.send(result);
412                    });
413                }
414
415                Command::ExecuteCopyTo {
416                    df_desc,
417                    compute_instance,
418                    target_replica,
419                    source_ids,
420                    conn_id,
421                    watch_set,
422                    tx,
423                } => {
424                    // implement_copy_to spawns a background task that sends the response
425                    // through tx when the COPY TO completes (or immediately if setup fails).
426                    // We just call it and let it handle all response sending.
427                    self.implement_copy_to(
428                        *df_desc,
429                        compute_instance,
430                        target_replica,
431                        source_ids,
432                        conn_id,
433                        watch_set,
434                        tx,
435                    )
436                    .await;
437                }
438
439                Command::ExecuteSideEffectingFunc {
440                    plan,
441                    conn_id,
442                    current_role,
443                    tx,
444                } => {
445                    let result = self
446                        .execute_side_effecting_func(plan, conn_id, current_role)
447                        .await;
448                    let _ = tx.send(result);
449                }
450                Command::RegisterFrontendPeek {
451                    uuid,
452                    conn_id,
453                    cluster_id,
454                    depends_on,
455                    is_fast_path,
456                    watch_set,
457                    tx,
458                } => {
459                    self.handle_register_frontend_peek(
460                        uuid,
461                        conn_id,
462                        cluster_id,
463                        depends_on,
464                        is_fast_path,
465                        watch_set,
466                        tx,
467                    );
468                }
469                Command::UnregisterFrontendPeek { uuid, tx } => {
470                    self.handle_unregister_frontend_peek(uuid, tx);
471                }
472                Command::ExplainTimestamp {
473                    conn_id,
474                    session_wall_time,
475                    cluster_id,
476                    id_bundle,
477                    determination,
478                    tx,
479                } => {
480                    let explanation = self.explain_timestamp(
481                        &conn_id,
482                        session_wall_time,
483                        cluster_id,
484                        &id_bundle,
485                        determination,
486                    );
487                    let _ = tx.send(explanation);
488                }
489                Command::FrontendStatementLogging(event) => {
490                    self.handle_frontend_statement_logging_event(event);
491                }
492            }
493        }
494        .instrument(debug_span!("handle_command"))
495        .boxed_local()
496    }
497
498    fn handle_authenticate_verify_sasl_proof(
499        &self,
500        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
501        role_name: String,
502        proof: String,
503        auth_message: String,
504        mock_hash: String,
505    ) {
506        let role = self.catalog().try_get_role_by_name(role_name.as_str());
507        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
508
509        let login = role
510            .as_ref()
511            .map(|r| r.attributes.login.unwrap_or(false))
512            .unwrap_or(false);
513
514        let real_hash = role_auth
515            .as_ref()
516            .and_then(|auth| auth.password_hash.as_ref());
517        let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
518
519        let role_present = role.is_some();
520        let make_auth_err = |role_present: bool, login: bool| {
521            AdapterError::AuthenticationError(if role_present && !login {
522                AuthenticationError::NonLogin
523            } else {
524                AuthenticationError::InvalidCredentials
525            })
526        };
527
528        match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
529            Ok(verifier) => {
530                // Success only if role exists, allows login, and a real password hash was used.
531                if login && real_hash.is_some() {
532                    let _ = tx.send(Ok(SASLVerifyProofResponse { verifier }));
533                } else {
534                    let _ = tx.send(Err(make_auth_err(role_present, login)));
535                }
536            }
537            Err(_) => {
538                let _ = tx.send(Err(AdapterError::AuthenticationError(
539                    AuthenticationError::InvalidCredentials,
540                )));
541            }
542        }
543    }
544
545    #[mz_ore::instrument(level = "debug")]
546    async fn handle_generate_sasl_challenge(
547        &self,
548        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
549        role_name: String,
550        client_nonce: String,
551    ) {
552        let role_auth = self
553            .catalog()
554            .try_get_role_by_name(&role_name)
555            .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
556
557        let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
558            Ok(n) => n,
559            Err(e) => {
560                let msg = format!(
561                    "failed to generate nonce for client nonce {}: {}",
562                    client_nonce, e
563                );
564                let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
565                soft_panic_or_log!("{msg}");
566                return;
567            }
568        };
569
570        // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
571        // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
572        // with the role name to get a per-role mock nonce.
573        let send_mock_challenge =
574            |role_name: String,
575             mock_nonce: String,
576             nonce: String,
577             tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
578                let opts = mz_auth::hash::mock_sasl_challenge(
579                    &role_name,
580                    &mock_nonce,
581                    &self.catalog().system_config().scram_iterations(),
582                );
583                let _ = tx.send(Ok(SASLChallengeResponse {
584                    iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
585                    salt: BASE64_STANDARD.encode(opts.salt),
586                    nonce,
587                }));
588            };
589
590        match role_auth {
591            Some(auth) if auth.password_hash.is_some() => {
592                let hash = auth.password_hash.as_ref().expect("checked above");
593                match mz_auth::hash::scram256_parse_opts(hash) {
594                    Ok(opts) => {
595                        let _ = tx.send(Ok(SASLChallengeResponse {
596                            iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
597                            salt: BASE64_STANDARD.encode(opts.salt),
598                            nonce,
599                        }));
600                    }
601                    Err(_) => {
602                        send_mock_challenge(
603                            role_name,
604                            self.catalog().state().mock_authentication_nonce(),
605                            nonce,
606                            tx,
607                        );
608                    }
609                }
610            }
611            _ => {
612                send_mock_challenge(
613                    role_name,
614                    self.catalog().state().mock_authentication_nonce(),
615                    nonce,
616                    tx,
617                );
618            }
619        }
620    }
621
622    #[mz_ore::instrument(level = "debug")]
623    async fn handle_authenticate_password(
624        &self,
625        tx: oneshot::Sender<Result<(), AdapterError>>,
626        role_name: String,
627        password: Option<Password>,
628    ) {
629        let Some(password) = password else {
630            // The user did not provide a password.
631            let _ = tx.send(Err(AdapterError::AuthenticationError(
632                AuthenticationError::PasswordRequired,
633            )));
634            return;
635        };
636
637        if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
638            if !role.attributes.login.unwrap_or(false) {
639                // The user is not allowed to login.
640                let _ = tx.send(Err(AdapterError::AuthenticationError(
641                    AuthenticationError::NonLogin,
642                )));
643                return;
644            }
645            if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
646                if let Some(hash) = &auth.password_hash {
647                    let hash = hash.clone();
648                    task::spawn_blocking(
649                        || "auth-check-hash",
650                        move || {
651                            let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
652                                Ok(_) => tx.send(Ok(())),
653                                Err(_) => tx.send(Err(AdapterError::AuthenticationError(
654                                    AuthenticationError::InvalidCredentials,
655                                ))),
656                            };
657                        },
658                    );
659                    return;
660                }
661            }
662            // Authentication failed due to incorrect password or missing password hash.
663            let _ = tx.send(Err(AdapterError::AuthenticationError(
664                AuthenticationError::InvalidCredentials,
665            )));
666        } else {
667            // The user does not exist.
668            let _ = tx.send(Err(AdapterError::AuthenticationError(
669                AuthenticationError::RoleNotFound,
670            )));
671        }
672    }
673
674    #[mz_ore::instrument(level = "debug")]
675    async fn handle_startup(
676        &mut self,
677        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
678        user: User,
679        conn_id: ConnectionId,
680        secret_key: u32,
681        uuid: uuid::Uuid,
682        client_ip: Option<IpAddr>,
683        application_name: String,
684        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
685    ) {
686        // Early return if successful, otherwise cleanup any possible state.
687        match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
688            Ok((role_id, superuser_attribute, session_defaults)) => {
689                let session_type = metrics::session_type_label_value(&user);
690                self.metrics
691                    .active_sessions
692                    .with_label_values(&[session_type])
693                    .inc();
694                let conn = ConnMeta {
695                    secret_key,
696                    notice_tx,
697                    drop_sinks: BTreeSet::new(),
698                    pending_cluster_alters: BTreeSet::new(),
699                    connected_at: self.now(),
700                    user,
701                    application_name,
702                    uuid,
703                    client_ip,
704                    conn_id: conn_id.clone(),
705                    authenticated_role: role_id,
706                    deferred_lock: None,
707                };
708                let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
709                let update = self.catalog().state().resolve_builtin_table_update(update);
710                self.begin_session_for_statement_logging(&conn);
711                self.active_conns.insert(conn_id.clone(), conn);
712
713                // Note: Do NOT await the notify here, we pass this back to
714                // whatever requested the startup to prevent blocking startup
715                // and the Coordinator on a builtin table update.
716                let updates = vec![update];
717                // It's not a hard error if our list is missing a builtin table, but we want to
718                // make sure these two things stay in-sync.
719                if mz_ore::assert::soft_assertions_enabled() {
720                    let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
721                        .iter()
722                        .map(|table| self.catalog().resolve_builtin_table(*table))
723                        .collect();
724                    let updates_tracked = updates
725                        .iter()
726                        .all(|update| required_tables.contains(&update.id));
727                    let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
728                        .iter()
729                        .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
730                    mz_ore::soft_assert_or_log!(
731                        updates_tracked,
732                        "not tracking all required builtin table updates!"
733                    );
734                    // TODO(parkmycar): When checking if a query depends on these builtin table
735                    // writes we do not check the transitive dependencies of the query, because
736                    // we don't support creating views on mz_internal objects. If one of these
737                    // tables is promoted out of mz_internal then we'll need to add this check.
738                    mz_ore::soft_assert_or_log!(
739                        all_mz_internal,
740                        "not all builtin tables are in mz_internal! need to check transitive depends",
741                    )
742                }
743                let notify = self.builtin_table_update().background(updates);
744
745                let catalog = self.owned_catalog();
746                let build_info_human_version =
747                    catalog.state().config().build_info.human_version(None);
748
749                let statement_logging_frontend = self
750                    .statement_logging
751                    .create_frontend(build_info_human_version);
752
753                let resp = Ok(StartupResponse {
754                    role_id,
755                    write_notify: notify,
756                    session_defaults,
757                    catalog,
758                    storage_collections: Arc::clone(&self.controller.storage_collections),
759                    transient_id_gen: Arc::clone(&self.transient_id_gen),
760                    optimizer_metrics: self.optimizer_metrics.clone(),
761                    persist_client: self.persist_client.clone(),
762                    statement_logging_frontend,
763                    superuser_attribute,
764                });
765                if tx.send(resp).is_err() {
766                    // Failed to send to adapter, but everything is setup so we can terminate
767                    // normally.
768                    self.handle_terminate(conn_id).await;
769                }
770            }
771            Err(e) => {
772                // Error during startup or sending to adapter. A user may have been created and
773                // it can stay; no need to delete it.
774                // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
775
776                // Communicate the error back to the client. No need to
777                // handle failures to send the error back; we've already
778                // cleaned up all necessary state.
779                let _ = tx.send(Err(e));
780            }
781        }
782    }
783
784    // Failible startup work that needs to be cleaned up on error.
785    async fn handle_startup_inner(
786        &mut self,
787        user: &User,
788        _conn_id: &ConnectionId,
789        client_ip: &Option<IpAddr>,
790    ) -> Result<(RoleId, SuperuserAttribute, BTreeMap<String, OwnedVarInput>), AdapterError> {
791        if self.catalog().try_get_role_by_name(&user.name).is_none() {
792            // If the user has made it to this point, that means they have been fully authenticated.
793            // This includes preventing any user, except a pre-defined set of system users, from
794            // connecting to an internal port. Therefore it's ok to always create a new role for the
795            // user.
796            let attributes = RoleAttributesRaw::new();
797            let plan = CreateRolePlan {
798                name: user.name.to_string(),
799                attributes,
800            };
801            self.sequence_create_role_for_startup(plan).await?;
802        }
803        let role = self
804            .catalog()
805            .try_get_role_by_name(&user.name)
806            .expect("created above");
807        let role_id = role.id;
808
809        let superuser_attribute = role.attributes.superuser;
810
811        if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
812            return Err(AdapterError::UserSessionsDisallowed);
813        }
814
815        // Initialize the default session variables for this role.
816        let mut session_defaults = BTreeMap::new();
817        let system_config = self.catalog().state().system_config();
818
819        // Override the session with any system defaults.
820        session_defaults.extend(
821            system_config
822                .iter_session()
823                .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
824        );
825        // Special case.
826        let statement_logging_default = system_config
827            .statement_logging_default_sample_rate()
828            .format();
829        session_defaults.insert(
830            STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
831            OwnedVarInput::Flat(statement_logging_default),
832        );
833        // Override system defaults with role defaults.
834        session_defaults.extend(
835            self.catalog()
836                .get_role(&role_id)
837                .vars()
838                .map(|(name, val)| (name.to_string(), val.clone())),
839        );
840
841        // Validate network policies for external users. Internal users can only connect on the
842        // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
843        // to ensure these internal interfaces are well secured.
844        //
845        // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
846        // the default network policy for this role, so we read directly out of what the session
847        // will get initialized with.
848        if !user.is_internal() {
849            let network_policy_name = session_defaults
850                .get(NETWORK_POLICY.name())
851                .and_then(|value| match value {
852                    OwnedVarInput::Flat(name) => Some(name.clone()),
853                    OwnedVarInput::SqlSet(names) => {
854                        tracing::error!(?names, "found multiple network policies");
855                        None
856                    }
857                })
858                .unwrap_or_else(|| system_config.default_network_policy_name());
859            let maybe_network_policy = self
860                .catalog()
861                .get_network_policy_by_name(&network_policy_name);
862
863            let Some(network_policy) = maybe_network_policy else {
864                // We should prevent dropping the default network policy, or setting the policy
865                // to something that doesn't exist, so complain loudly if this occurs.
866                tracing::error!(
867                    network_policy_name,
868                    "default network policy does not exist. All user traffic will be blocked"
869                );
870                let reason = match client_ip {
871                    Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
872                    None => super::NetworkPolicyError::MissingIp,
873                };
874                return Err(AdapterError::NetworkPolicyDenied(reason));
875            };
876
877            if let Some(ip) = client_ip {
878                match validate_ip_with_policy_rules(ip, &network_policy.rules) {
879                    Ok(_) => {}
880                    Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
881                }
882            } else {
883                // Only temporary and internal representation of a session
884                // should be missing a client_ip. These sessions should not be
885                // making requests or going through handle_startup.
886                return Err(AdapterError::NetworkPolicyDenied(
887                    super::NetworkPolicyError::MissingIp,
888                ));
889            }
890        }
891
892        // Temporary schemas are now created lazily when the first temporary object is created,
893        // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
894        // for the common case where connections never create temporary objects.
895
896        Ok((
897            role_id,
898            SuperuserAttribute(superuser_attribute),
899            session_defaults,
900        ))
901    }
902
903    /// Handles an execute command.
904    #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
905    pub(crate) async fn handle_execute(
906        &mut self,
907        portal_name: String,
908        mut session: Session,
909        tx: ClientTransmitter<ExecuteResponse>,
910        // If this command was part of another execute command
911        // (for example, executing a `FETCH` statement causes an execute to be
912        //  issued for the cursor it references),
913        // then `outer_context` should be `Some`.
914        // This instructs the coordinator that the
915        // outer execute should be considered finished once the inner one is.
916        outer_context: Option<ExecuteContextGuard>,
917    ) {
918        if session.vars().emit_trace_id_notice() {
919            let span_context = tracing::Span::current()
920                .context()
921                .span()
922                .span_context()
923                .clone();
924            if span_context.is_valid() {
925                session.add_notice(AdapterNotice::QueryTrace {
926                    trace_id: span_context.trace_id(),
927                });
928            }
929        }
930
931        if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
932            // If statement logging hasn't started yet, we don't need
933            // to add any "end" event, so just make up a no-op
934            // `ExecuteContextExtra` here, via `Default::default`.
935            //
936            // It's a bit unfortunate because the edge case of failed
937            // portal verifications won't show up in statement
938            // logging, but there seems to be nothing else we can do,
939            // because we need access to the portal to begin logging.
940            //
941            // Another option would be to log a begin and end event, but just fill in NULLs
942            // for everything we get from the portal (prepared statement id, params).
943            let extra = outer_context.unwrap_or_else(Default::default);
944            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
945            return ctx.retire(Err(err));
946        }
947
948        // The reference to `portal` can't outlive `session`, which we
949        // use to construct the context, so scope the reference to this block where we
950        // get everything we need from the portal for later.
951        let (stmt, ctx, params) = {
952            let portal = session
953                .get_portal_unverified(&portal_name)
954                .expect("known to exist");
955            let params = portal.parameters.clone();
956            let stmt = portal.stmt.clone();
957            let logging = Arc::clone(&portal.logging);
958            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
959
960            let extra = if let Some(extra) = outer_context {
961                // We are executing in the context of another SQL statement, so we don't
962                // want to begin statement logging anew. The context of the actual statement
963                // being executed is the one that should be retired once this finishes.
964                extra
965            } else {
966                // This is a new statement, log it and return the context
967                let maybe_uuid = self.begin_statement_execution(
968                    &mut session,
969                    &params,
970                    &logging,
971                    lifecycle_timestamps,
972                );
973
974                ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
975            };
976            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
977            (stmt, ctx, params)
978        };
979
980        let stmt = match stmt {
981            Some(stmt) => stmt,
982            None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
983        };
984
985        let session_type = metrics::session_type_label_value(ctx.session().user());
986        let stmt_type = metrics::statement_type_label_value(&stmt);
987        self.metrics
988            .query_total
989            .with_label_values(&[session_type, stmt_type])
990            .inc();
991        match &*stmt {
992            Statement::Subscribe(SubscribeStatement { output, .. })
993            | Statement::Copy(CopyStatement {
994                relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
995                ..
996            }) => {
997                self.metrics
998                    .subscribe_outputs
999                    .with_label_values(&[
1000                        session_type,
1001                        metrics::subscribe_output_label_value(output),
1002                    ])
1003                    .inc();
1004            }
1005            _ => {}
1006        }
1007
1008        self.handle_execute_inner(stmt, params, ctx).await
1009    }
1010
1011    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
1012    pub(crate) async fn handle_execute_inner(
1013        &mut self,
1014        stmt: Arc<Statement<Raw>>,
1015        params: Params,
1016        mut ctx: ExecuteContext,
1017    ) {
1018        // This comment describes the various ways DDL can execute (the ordered operations: name
1019        // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
1020        // three notable properties that all partially interact.
1021        //
1022        // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1023        //    execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1024        //    We announce success of the single DDL when it is executed, but do not attempt to plan
1025        //    or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1026        //    DDL (this behavior is Postgres-compatible). The purpose of this is because some
1027        //    drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1028        //    work. When the single DDL is announced as successful we also put the session's
1029        //    transaction ops into `SingleStatement` which will produce an error if any other
1030        //    statement is run in the transaction except `COMMIT`. Additionally, this will cause
1031        //    `handle_execute_inner` to stop further processing (no planning, etc.) of the
1032        //    statement.
1033        // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1034        //    any number of only these DDL statements to be executed in a transaction. At sequencing
1035        //    these generate the `Op::TransactionDryRun` catalog op. When applied with
1036        //    `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
1037        //    `catalog_transact_with_ddl_transaction` function intercepts that error and reports
1038        //    success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
1039        //    of the ops but without dry run are applied. The purpose of this is to allow multiple,
1040        //    atomic renames in the same transaction.
1041        // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1042        //    makes network calls (interfacing with secrets, optimization of views/indexes, source
1043        //    purification). These must guarantee correctness when they return to the main
1044        //    coordinator thread because the catalog state could have changed while they were doing
1045        //    the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1046        //    of IDs that we needed to exist. We discovered the way we were doing that was not
1047        //    always correct. Instead of attempting to get that completely right, we have opted to
1048        //    serialize DDL. Getting this right is difficult because catalog changes can affect name
1049        //    resolution, planning, sequencing, and optimization. Correctly writing logic that is
1050        //    aware of all possible catalog changes that would affect any of those parts is not
1051        //    something our current code has been designed to be helpful at. Even if a DDL statement
1052        //    is doing off-thread work, another DDL must not yet execute at all. Executing these
1053        //    serially will guarantee that no off-thread work has affected the state of the catalog.
1054        //    This is done by adding a VecDeque of deferred statements and a lock to the
1055        //    Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1056        //    transaction ops are needed to the session as described above), it attempts to own the
1057        //    lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1058        //    struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1059        //    `clear_transaction` and a message sent to the Coordinator to execute the next queued
1060        //    DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1061        //    awaits dequeuing caused by the lock being released.
1062
1063        // Verify that this statement type can be executed in the current
1064        // transaction state.
1065        match ctx.session().transaction() {
1066            // By this point we should be in a running transaction.
1067            TransactionStatus::Default => unreachable!(),
1068
1069            // Failed transactions have already been checked in pgwire for a safe statement
1070            // (COMMIT, ROLLBACK, etc.) and can proceed.
1071            TransactionStatus::Failed(_) => {}
1072
1073            // Started is a deceptive name, and means different things depending on which
1074            // protocol was used. It's either exactly one statement (known because this
1075            // is the simple protocol and the parser parsed the entire string, and it had
1076            // one statement). Or from the extended protocol, it means *some* query is
1077            // being executed, but there might be others after it before the Sync (commit)
1078            // message. Postgres handles this by teaching Started to eagerly commit certain
1079            // statements that can't be run in a transaction block.
1080            TransactionStatus::Started(_) => {
1081                if let Statement::Declare(_) = &*stmt {
1082                    // Declare is an exception. Although it's not against any spec to execute
1083                    // it, it will always result in nothing happening, since all portals will be
1084                    // immediately closed. Users don't know this detail, so this error helps them
1085                    // understand what's going wrong. Postgres does this too.
1086                    return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1087                        "DECLARE CURSOR".into(),
1088                    )));
1089                }
1090            }
1091
1092            // Implicit or explicit transactions.
1093            //
1094            // Implicit transactions happen when a multi-statement query is executed
1095            // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1096            // then the existing implicit transaction will be upgraded to an explicit
1097            // transaction. Thus, we should not separate what implicit and explicit
1098            // transactions can do unless there's some additional checking to make sure
1099            // something disallowed in explicit transactions did not previously take place
1100            // in the implicit portion.
1101            TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1102                match &*stmt {
1103                    // Statements that are safe in a transaction. We still need to verify that we
1104                    // don't interleave reads and writes since we can't perform those serializably.
1105                    Statement::Close(_)
1106                    | Statement::Commit(_)
1107                    | Statement::Copy(_)
1108                    | Statement::Deallocate(_)
1109                    | Statement::Declare(_)
1110                    | Statement::Discard(_)
1111                    | Statement::Execute(_)
1112                    | Statement::ExplainPlan(_)
1113                    | Statement::ExplainPushdown(_)
1114                    | Statement::ExplainAnalyzeObject(_)
1115                    | Statement::ExplainAnalyzeCluster(_)
1116                    | Statement::ExplainTimestamp(_)
1117                    | Statement::ExplainSinkSchema(_)
1118                    | Statement::Fetch(_)
1119                    | Statement::Prepare(_)
1120                    | Statement::Rollback(_)
1121                    | Statement::Select(_)
1122                    | Statement::SetTransaction(_)
1123                    | Statement::Show(_)
1124                    | Statement::SetVariable(_)
1125                    | Statement::ResetVariable(_)
1126                    | Statement::StartTransaction(_)
1127                    | Statement::Subscribe(_)
1128                    | Statement::Raise(_) => {
1129                        // Always safe.
1130                    }
1131
1132                    Statement::Insert(InsertStatement {
1133                        source, returning, ..
1134                    }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1135                        // Inserting from constant values statements that do not need to execute on
1136                        // any cluster (no RETURNING) is always safe.
1137                    }
1138
1139                    // These statements must be kept in-sync with `must_serialize_ddl()`.
1140                    Statement::AlterObjectRename(_)
1141                    | Statement::AlterObjectSwap(_)
1142                    | Statement::CreateTableFromSource(_)
1143                    | Statement::CreateSource(_) => {
1144                        let state = self.catalog().for_session(ctx.session()).state().clone();
1145                        let revision = self.catalog().transient_revision();
1146
1147                        // Initialize our transaction with a set of empty ops, or return an error
1148                        // if we can't run a DDL transaction
1149                        let txn_status = ctx.session_mut().transaction_mut();
1150                        if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1151                            ops: vec![],
1152                            state,
1153                            revision,
1154                            side_effects: vec![],
1155                        }) {
1156                            return ctx.retire(Err(err));
1157                        }
1158                    }
1159
1160                    // Statements below must by run singly (in Started).
1161                    Statement::AlterCluster(_)
1162                    | Statement::AlterConnection(_)
1163                    | Statement::AlterDefaultPrivileges(_)
1164                    | Statement::AlterIndex(_)
1165                    | Statement::AlterMaterializedViewApplyReplacement(_)
1166                    | Statement::AlterSetCluster(_)
1167                    | Statement::AlterOwner(_)
1168                    | Statement::AlterRetainHistory(_)
1169                    | Statement::AlterRole(_)
1170                    | Statement::AlterSecret(_)
1171                    | Statement::AlterSink(_)
1172                    | Statement::AlterSource(_)
1173                    | Statement::AlterSystemReset(_)
1174                    | Statement::AlterSystemResetAll(_)
1175                    | Statement::AlterSystemSet(_)
1176                    | Statement::AlterTableAddColumn(_)
1177                    | Statement::AlterNetworkPolicy(_)
1178                    | Statement::CreateCluster(_)
1179                    | Statement::CreateClusterReplica(_)
1180                    | Statement::CreateConnection(_)
1181                    | Statement::CreateDatabase(_)
1182                    | Statement::CreateIndex(_)
1183                    | Statement::CreateMaterializedView(_)
1184                    | Statement::CreateContinualTask(_)
1185                    | Statement::CreateRole(_)
1186                    | Statement::CreateSchema(_)
1187                    | Statement::CreateSecret(_)
1188                    | Statement::CreateSink(_)
1189                    | Statement::CreateSubsource(_)
1190                    | Statement::CreateTable(_)
1191                    | Statement::CreateType(_)
1192                    | Statement::CreateView(_)
1193                    | Statement::CreateWebhookSource(_)
1194                    | Statement::CreateNetworkPolicy(_)
1195                    | Statement::Delete(_)
1196                    | Statement::DropObjects(_)
1197                    | Statement::DropOwned(_)
1198                    | Statement::GrantPrivileges(_)
1199                    | Statement::GrantRole(_)
1200                    | Statement::Insert(_)
1201                    | Statement::ReassignOwned(_)
1202                    | Statement::RevokePrivileges(_)
1203                    | Statement::RevokeRole(_)
1204                    | Statement::Update(_)
1205                    | Statement::ValidateConnection(_)
1206                    | Statement::Comment(_) => {
1207                        let txn_status = ctx.session_mut().transaction_mut();
1208
1209                        // If we're not in an implicit transaction and we could generate exactly one
1210                        // valid ExecuteResponse, we can delay execution until commit.
1211                        if !txn_status.is_implicit() {
1212                            // Statements whose tag is trivial (known only from an unexecuted statement) can
1213                            // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1214                            // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1215                            // delay execution until `COMMIT`.
1216                            if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1217                                if let Err(err) = txn_status
1218                                    .add_ops(TransactionOps::SingleStatement { stmt, params })
1219                                {
1220                                    ctx.retire(Err(err));
1221                                    return;
1222                                }
1223                                ctx.retire(Ok(resp));
1224                                return;
1225                            }
1226                        }
1227
1228                        return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1229                            stmt.to_string(),
1230                        )));
1231                    }
1232                }
1233            }
1234        }
1235
1236        // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1237        // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1238        // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1239        // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1240        // Coordinator::clear_transaction is correctly called when session transactions are ended
1241        // because that function will release the held lock from active_conns.
1242        if Self::must_serialize_ddl(&stmt, &ctx) {
1243            if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1244                let prev = self
1245                    .active_conns
1246                    .get_mut(ctx.session().conn_id())
1247                    .expect("connection must exist")
1248                    .deferred_lock
1249                    .replace(guard);
1250                assert!(
1251                    prev.is_none(),
1252                    "connections should have at most one lock guard"
1253                );
1254            } else {
1255                if self
1256                    .active_conns
1257                    .get(ctx.session().conn_id())
1258                    .expect("connection must exist")
1259                    .deferred_lock
1260                    .is_some()
1261                {
1262                    // This session *already* has the lock, and incorrectly tried to execute another
1263                    // DDL while still holding the lock, violating the assumption documented above.
1264                    // This is an internal error, probably in some AdapterClient user (pgwire or
1265                    // http). Because the session is now in some unexpected state, return an error
1266                    // which should cause the AdapterClient user to fail the transaction.
1267                    // (Terminating the connection is maybe what we would prefer to do, but is not
1268                    // currently a thing we can do from the coordinator: calling handle_terminate
1269                    // cleans up Coordinator state for the session but doesn't inform the
1270                    // AdapterClient that the session should terminate.)
1271                    soft_panic_or_log!(
1272                        "session {} attempted to get ddl lock while already owning it",
1273                        ctx.session().conn_id()
1274                    );
1275                    ctx.retire(Err(AdapterError::Internal(
1276                        "session attempted to get ddl lock while already owning it".to_string(),
1277                    )));
1278                    return;
1279                }
1280                self.serialized_ddl.push_back(DeferredPlanStatement {
1281                    ctx,
1282                    ps: PlanStatement::Statement { stmt, params },
1283                });
1284                return;
1285            }
1286        }
1287
1288        let catalog = self.catalog();
1289        let catalog = catalog.for_session(ctx.session());
1290        let original_stmt = Arc::clone(&stmt);
1291        // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1292        // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1293        let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1294            Ok(resolved) => resolved,
1295            Err(e) => return ctx.retire(Err(e.into())),
1296        };
1297        // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1298        // purification.  This should be done back on the main thread.
1299        // We do the validation:
1300        //   - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1301        // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1302        // occurs.
1303        let (stmt, resolved_ids) = match stmt {
1304            // Various statements must be purified off the main coordinator thread of control.
1305            stmt if Self::must_spawn_purification(&stmt) => {
1306                let internal_cmd_tx = self.internal_cmd_tx.clone();
1307                let conn_id = ctx.session().conn_id().clone();
1308                let catalog = self.owned_catalog();
1309                let now = self.now();
1310                let otel_ctx = OpenTelemetryContext::obtain();
1311                let current_storage_configuration = self.controller.storage.config().clone();
1312                task::spawn(|| format!("purify:{conn_id}"), async move {
1313                    let transient_revision = catalog.transient_revision();
1314                    let catalog = catalog.for_session(ctx.session());
1315
1316                    // Checks if the session is authorized to purify a statement. Usually
1317                    // authorization is checked after planning, however purification happens before
1318                    // planning, which may require the use of some connections and secrets.
1319                    if let Err(e) = rbac::check_usage(
1320                        &catalog,
1321                        ctx.session(),
1322                        &resolved_ids,
1323                        &CREATE_ITEM_USAGE,
1324                    ) {
1325                        return ctx.retire(Err(e.into()));
1326                    }
1327
1328                    let (result, cluster_id) = mz_sql::pure::purify_statement(
1329                        catalog,
1330                        now,
1331                        stmt,
1332                        &current_storage_configuration,
1333                    )
1334                    .await;
1335                    let result = result.map_err(|e| e.into());
1336                    let dependency_ids = resolved_ids.items().copied().collect();
1337                    let plan_validity = PlanValidity::new(
1338                        transient_revision,
1339                        dependency_ids,
1340                        cluster_id,
1341                        None,
1342                        ctx.session().role_metadata().clone(),
1343                    );
1344                    // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1345                    let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1346                        PurifiedStatementReady {
1347                            ctx,
1348                            result,
1349                            params,
1350                            plan_validity,
1351                            original_stmt,
1352                            otel_ctx,
1353                        },
1354                    ));
1355                    if let Err(e) = result {
1356                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1357                    }
1358                });
1359                return;
1360            }
1361
1362            // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1363            // automatically as part of purification
1364            Statement::CreateSubsource(_) => {
1365                ctx.retire(Err(AdapterError::Unsupported(
1366                    "CREATE SUBSOURCE statements",
1367                )));
1368                return;
1369            }
1370
1371            Statement::CreateMaterializedView(mut cmvs) => {
1372                // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1373                // only used for storing initial frontiers in the catalog.
1374                if cmvs.as_of.is_some() {
1375                    return ctx.retire(Err(AdapterError::Unsupported(
1376                        "CREATE MATERIALIZED VIEW ... AS OF statements",
1377                    )));
1378                }
1379
1380                let mz_now = match self
1381                    .resolve_mz_now_for_create_materialized_view(
1382                        &cmvs,
1383                        &resolved_ids,
1384                        ctx.session_mut(),
1385                        true,
1386                    )
1387                    .await
1388                {
1389                    Ok(mz_now) => mz_now,
1390                    Err(e) => return ctx.retire(Err(e)),
1391                };
1392
1393                let catalog = self.catalog().for_session(ctx.session());
1394
1395                purify_create_materialized_view_options(
1396                    catalog,
1397                    mz_now,
1398                    &mut cmvs,
1399                    &mut resolved_ids,
1400                );
1401
1402                let purified_stmt =
1403                    Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1404                        if_exists: cmvs.if_exists,
1405                        name: cmvs.name,
1406                        columns: cmvs.columns,
1407                        replacement_for: cmvs.replacement_for,
1408                        in_cluster: cmvs.in_cluster,
1409                        in_cluster_replica: cmvs.in_cluster_replica,
1410                        query: cmvs.query,
1411                        with_options: cmvs.with_options,
1412                        as_of: None,
1413                    });
1414
1415                // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1416                // `Message::PurifiedStatementReady` here.)
1417                (purified_stmt, resolved_ids)
1418            }
1419
1420            Statement::ExplainPlan(ExplainPlanStatement {
1421                stage,
1422                with_options,
1423                format,
1424                explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1425            }) => {
1426                let mut cmvs = *box_cmvs;
1427                let mz_now = match self
1428                    .resolve_mz_now_for_create_materialized_view(
1429                        &cmvs,
1430                        &resolved_ids,
1431                        ctx.session_mut(),
1432                        false,
1433                    )
1434                    .await
1435                {
1436                    Ok(mz_now) => mz_now,
1437                    Err(e) => return ctx.retire(Err(e)),
1438                };
1439
1440                let catalog = self.catalog().for_session(ctx.session());
1441
1442                purify_create_materialized_view_options(
1443                    catalog,
1444                    mz_now,
1445                    &mut cmvs,
1446                    &mut resolved_ids,
1447                );
1448
1449                let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1450                    stage,
1451                    with_options,
1452                    format,
1453                    explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1454                });
1455
1456                (purified_stmt, resolved_ids)
1457            }
1458
1459            // All other statements are handled immediately.
1460            _ => (stmt, resolved_ids),
1461        };
1462
1463        match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
1464            Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1465            Err(e) => ctx.retire(Err(e)),
1466        }
1467    }
1468
1469    /// Whether the statement must be serialized and is DDL.
1470    fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1471        // Non-DDL is not serialized here.
1472        if !StatementClassification::from(&*stmt).is_ddl() {
1473            return false;
1474        }
1475        // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1476        // not be serialized. These all use PlanValidity for their checking, and we must ensure
1477        // those checks are sufficient.
1478        if Self::must_spawn_purification(stmt) {
1479            return false;
1480        }
1481
1482        // Statements that support multiple DDLs in a single transaction aren't serialized here.
1483        // Their operations are serialized when applied to the catalog, guaranteeing that any
1484        // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1485        if ctx.session.transaction().is_ddl() {
1486            return false;
1487        }
1488
1489        // Some DDL is exempt. It is not great that we are matching on Statements here because
1490        // different plans can be produced from the same top-level statement type (i.e., `ALTER
1491        // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1492        // planned in the first place, so we accept the abstraction leak.
1493        match stmt {
1494            // Secrets have a small and understood set of dependencies, and their off-thread work
1495            // interacts with k8s.
1496            Statement::AlterSecret(_) => false,
1497            Statement::CreateSecret(_) => false,
1498            Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1499                if actions
1500                    .iter()
1501                    .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1502            {
1503                false
1504            }
1505
1506            // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1507            // does not affect its catalog names or ids and so is safe to not serialize. This could
1508            // change the set of replicas that exist. For queries that name replicas or use the
1509            // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1510            // ensure that those replicas exist during the query finish stage. Additionally, that
1511            // work can take hours (configured by the user), so would also be a bad experience for
1512            // users.
1513            Statement::AlterCluster(_) => false,
1514
1515            // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1516            // cutover. If the old collection is stalled, it may block forever. Checks in
1517            // sequencing ensure that the operation fails if any one of these happens concurrently:
1518            //   * the sink is dropped
1519            //   * the new source relation is dropped
1520            //   * another `ALTER SINK` for the same sink is applied first
1521            Statement::AlterSink(stmt)
1522                if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1523            {
1524                false
1525            }
1526
1527            // `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT` waits for the target MV to make
1528            // enough progress for a clean cutover. If the target MV is stalled, it may block
1529            // forever. Checks in sequencing ensure the operation fails if any of these happens
1530            // concurrently:
1531            //   * the target MV is dropped
1532            //   * the replacement MV is dropped
1533            Statement::AlterMaterializedViewApplyReplacement(_) => false,
1534
1535            // Everything else must be serialized.
1536            _ => true,
1537        }
1538    }
1539
1540    /// Whether the statement must be purified off of the Coordinator thread.
1541    fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1542        // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1543        // coordinator thread.
1544        if !matches!(
1545            stmt,
1546            Statement::CreateSource(_)
1547                | Statement::AlterSource(_)
1548                | Statement::CreateSink(_)
1549                | Statement::CreateTableFromSource(_)
1550        ) {
1551            return false;
1552        }
1553
1554        // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1555        if let Statement::AlterSource(stmt) = stmt {
1556            let names: Vec<CreateSourceOptionName> = match &stmt.action {
1557                AlterSourceAction::SetOptions(options) => {
1558                    options.iter().map(|o| o.name.clone()).collect()
1559                }
1560                AlterSourceAction::ResetOptions(names) => names.clone(),
1561                _ => vec![],
1562            };
1563            if !names.is_empty()
1564                && names
1565                    .iter()
1566                    .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1567            {
1568                return false;
1569            }
1570        }
1571
1572        true
1573    }
1574
1575    /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1576    /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1577    /// option, this function grabs read holds at the earliest possible time on input collections
1578    /// that might be involved in the MV.
1579    ///
1580    /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1581    /// in `with_options`).
1582    ///
1583    /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1584    /// unfortunately.)
1585    async fn resolve_mz_now_for_create_materialized_view(
1586        &mut self,
1587        cmvs: &CreateMaterializedViewStatement<Aug>,
1588        resolved_ids: &ResolvedIds,
1589        session: &Session,
1590        acquire_read_holds: bool,
1591    ) -> Result<Option<Timestamp>, AdapterError> {
1592        if cmvs
1593            .with_options
1594            .iter()
1595            .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1596        {
1597            let catalog = self.catalog().for_session(session);
1598            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1599            let ids = self
1600                .index_oracle(cluster)
1601                .sufficient_collections(resolved_ids.collections().copied());
1602
1603            // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1604            // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1605            // we want to check the AT times against the read holds that we acquire here. But
1606            // we do it for any REFRESH option, to avoid having so many code paths doing different
1607            // things.)
1608            //
1609            // It's important that we acquire read holds _before_ we determine the least valid read.
1610            // Otherwise, we're not guaranteed that the since frontier doesn't
1611            // advance forward from underneath us.
1612            let read_holds = self.acquire_read_holds(&ids);
1613
1614            // Does `mz_now()` occur?
1615            let mz_now_ts = if cmvs
1616                .with_options
1617                .iter()
1618                .any(materialized_view_option_contains_temporal)
1619            {
1620                let timeline_context = self
1621                    .catalog()
1622                    .validate_timeline_context(resolved_ids.collections().copied())?;
1623
1624                // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1625                // but even in the TimestampIndependent case.
1626                // Note that we didn't accurately decide whether we are TimestampDependent
1627                // or TimestampIndependent, because for this we'd need to also check whether
1628                // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1629                // However, this doesn't matter here, as we are just going to default to
1630                // EpochMilliseconds in both cases.
1631                let timeline = timeline_context
1632                    .timeline()
1633                    .unwrap_or(&Timeline::EpochMilliseconds);
1634
1635                // Let's start with the timestamp oracle read timestamp.
1636                let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1637
1638                // If `least_valid_read` is later than the oracle, then advance to that time.
1639                // If we didn't do this, then there would be a danger of missing the first refresh,
1640                // which might cause the materialized view to be unreadable for hours. This might
1641                // be what was happening here:
1642                // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1643                //
1644                // In the long term, it would be good to actually block the MV creation statement
1645                // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1646                // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1647                // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1648                // after its creation might see input changes that happened after the CRATE MATERIALIZED
1649                // VIEW statement returned.
1650                let oracle_timestamp = timestamp;
1651                let least_valid_read = read_holds.least_valid_read();
1652                timestamp.advance_by(least_valid_read.borrow());
1653
1654                if oracle_timestamp != timestamp {
1655                    warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1656                }
1657
1658                info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1659                Ok(Some(timestamp))
1660            } else {
1661                Ok(None)
1662            };
1663
1664            // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1665            // released when we don't use it.
1666            if acquire_read_holds {
1667                self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1668            }
1669
1670            mz_now_ts
1671        } else {
1672            Ok(None)
1673        }
1674    }
1675
1676    /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1677    /// the named `conn_id` if the correct secret key is specified.
1678    ///
1679    /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1680    /// `ConnectionId` because this method gets called by external clients when
1681    /// they request to cancel a request.
1682    #[mz_ore::instrument(level = "debug")]
1683    async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1684        if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1685            // If the secret key specified by the client doesn't match the
1686            // actual secret key for the target connection, we treat this as a
1687            // rogue cancellation request and ignore it.
1688            if conn_meta.secret_key != secret_key {
1689                return;
1690            }
1691
1692            // Now that we've verified the secret key, this is a privileged
1693            // cancellation request. We can upgrade the raw connection ID to a
1694            // proper `IdHandle`.
1695            self.handle_privileged_cancel(id_handle.clone()).await;
1696        }
1697    }
1698
1699    /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1700    /// interactive work for the named `conn_id`.
1701    #[mz_ore::instrument(level = "debug")]
1702    pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1703        let mut maybe_ctx = None;
1704
1705        // Cancel pending writes. There is at most one pending write per session.
1706        if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1707            matches!(pending_write_txn, PendingWriteTxn::User {
1708                pending_txn: PendingTxn { ctx, .. },
1709                ..
1710            } if *ctx.session().conn_id() == conn_id)
1711        }) {
1712            if let PendingWriteTxn::User {
1713                pending_txn: PendingTxn { ctx, .. },
1714                ..
1715            } = self.pending_writes.remove(idx)
1716            {
1717                maybe_ctx = Some(ctx);
1718            }
1719        }
1720
1721        // Cancel deferred writes.
1722        if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1723            maybe_ctx = Some(write_op.into_ctx());
1724        }
1725
1726        // Cancel deferred statements.
1727        if let Some(idx) = self
1728            .serialized_ddl
1729            .iter()
1730            .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1731        {
1732            let deferred = self
1733                .serialized_ddl
1734                .remove(idx)
1735                .expect("known to exist from call to `position` above");
1736            maybe_ctx = Some(deferred.ctx);
1737        }
1738
1739        // Cancel reads waiting on being linearized. There is at most one linearized read per
1740        // session.
1741        if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1742            let ctx = pending_read_txn.take_context();
1743            maybe_ctx = Some(ctx);
1744        }
1745
1746        if let Some(ctx) = maybe_ctx {
1747            ctx.retire(Err(AdapterError::Canceled));
1748        }
1749
1750        self.cancel_pending_peeks(&conn_id);
1751        self.cancel_pending_watchsets(&conn_id);
1752        self.cancel_compute_sinks_for_conn(&conn_id).await;
1753        self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1754            .await;
1755        self.cancel_pending_copy(&conn_id);
1756        if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1757            let _ = tx.send(true);
1758        }
1759    }
1760
1761    /// Handle termination of a client session.
1762    ///
1763    /// This cleans up any state in the coordinator associated with the session.
1764    #[mz_ore::instrument(level = "debug")]
1765    async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1766        if !self.active_conns.contains_key(&conn_id) {
1767            // If the session doesn't exist in `active_conns`, then this method will panic later on.
1768            // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1769            // debug. This panic is very infrequent so we want as much information as possible.
1770            // See https://github.com/MaterializeInc/database-issues/issues/5627.
1771            panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1772        }
1773
1774        // We do not need to call clear_transaction here because there are no side effects to run
1775        // based on any session transaction state.
1776        self.clear_connection(&conn_id).await;
1777
1778        self.drop_temp_items(&conn_id).await;
1779        // Only call catalog_mut() if a temporary schema actually exists for this connection.
1780        // This avoids an expensive Arc::make_mut clone for the common case where the connection
1781        // never created any temporary objects.
1782        if self.catalog().state().has_temporary_schema(&conn_id) {
1783            self.catalog_mut()
1784                .drop_temporary_schema(&conn_id)
1785                .unwrap_or_terminate("unable to drop temporary schema");
1786        }
1787        let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1788        let session_type = metrics::session_type_label_value(conn.user());
1789        self.metrics
1790            .active_sessions
1791            .with_label_values(&[session_type])
1792            .dec();
1793        self.cancel_pending_peeks(conn.conn_id());
1794        self.cancel_pending_watchsets(&conn_id);
1795        self.cancel_pending_copy(&conn_id);
1796        self.end_session_for_statement_logging(conn.uuid());
1797
1798        // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1799        // this to prevent blocking the Coordinator in the case that a lot of connections are
1800        // closed at once, which occurs regularly in some workflows.
1801        let update = self
1802            .catalog()
1803            .state()
1804            .pack_session_update(&conn, Diff::MINUS_ONE);
1805        let update = self.catalog().state().resolve_builtin_table_update(update);
1806
1807        let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1808    }
1809
1810    /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1811    /// rows.
1812    #[mz_ore::instrument(level = "debug")]
1813    fn handle_get_webhook(
1814        &mut self,
1815        database: String,
1816        schema: String,
1817        name: String,
1818        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1819    ) {
1820        /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1821        ///
1822        /// Returns a struct that can be used to append data to the underlying storate collection, and the
1823        /// types we should cast the request to.
1824        fn resolve(
1825            coord: &mut Coordinator,
1826            database: String,
1827            schema: String,
1828            name: String,
1829        ) -> Result<AppendWebhookResponse, PartialItemName> {
1830            // Resolve our collection.
1831            let name = PartialItemName {
1832                database: Some(database),
1833                schema: Some(schema),
1834                item: name,
1835            };
1836            let Ok(entry) = coord
1837                .catalog()
1838                .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1839            else {
1840                return Err(name);
1841            };
1842
1843            // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1844            let (data_source, desc, global_id) = match entry.item() {
1845                CatalogItem::Source(Source {
1846                    data_source: data_source @ DataSourceDesc::Webhook { .. },
1847                    desc,
1848                    global_id,
1849                    ..
1850                }) => (data_source, desc.clone(), *global_id),
1851                CatalogItem::Table(
1852                    table @ Table {
1853                        desc,
1854                        data_source:
1855                            TableDataSource::DataSource {
1856                                desc: data_source @ DataSourceDesc::Webhook { .. },
1857                                ..
1858                            },
1859                        ..
1860                    },
1861                ) => (data_source, desc.latest(), table.global_id_writes()),
1862                _ => return Err(name),
1863            };
1864
1865            let DataSourceDesc::Webhook {
1866                validate_using,
1867                body_format,
1868                headers,
1869                ..
1870            } = data_source
1871            else {
1872                mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1873                return Err(name);
1874            };
1875            let body_format = body_format.clone();
1876            let header_tys = headers.clone();
1877
1878            // Assert we have one column for the body, and how ever many are required for
1879            // the headers.
1880            let num_columns = headers.num_columns() + 1;
1881            mz_ore::soft_assert_or_log!(
1882                desc.arity() <= num_columns,
1883                "expected at most {} columns, but got {}",
1884                num_columns,
1885                desc.arity()
1886            );
1887
1888            // Double check that the body column of the webhook source matches the type
1889            // we're about to deserialize as.
1890            let body_column = desc
1891                .get_by_name(&"body".into())
1892                .map(|(_idx, ty)| ty.clone())
1893                .ok_or_else(|| name.clone())?;
1894            assert!(!body_column.nullable, "webhook body column is nullable!?");
1895            assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1896
1897            // Create a validator that can be called to validate a webhook request.
1898            let validator = validate_using.as_ref().map(|v| {
1899                let validation = v.clone();
1900                AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1901            });
1902
1903            // Get a channel so we can queue updates to be written.
1904            let row_tx = coord
1905                .controller
1906                .storage
1907                .monotonic_appender(global_id)
1908                .map_err(|_| name.clone())?;
1909            let stats = coord
1910                .controller
1911                .storage
1912                .webhook_statistics(global_id)
1913                .map_err(|_| name)?;
1914            let invalidator = coord
1915                .active_webhooks
1916                .entry(entry.id())
1917                .or_insert_with(WebhookAppenderInvalidator::new);
1918            let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1919
1920            Ok(AppendWebhookResponse {
1921                tx,
1922                body_format,
1923                header_tys,
1924                validator,
1925            })
1926        }
1927
1928        let response = resolve(self, database, schema, name).map_err(|name| {
1929            AppendWebhookError::UnknownWebhook {
1930                database: name.database.expect("provided"),
1931                schema: name.schema.expect("provided"),
1932                name: name.item,
1933            }
1934        });
1935        let _ = tx.send(response);
1936    }
1937
1938    /// Handle registration of a frontend peek, for statement logging and query cancellation
1939    /// handling.
1940    fn handle_register_frontend_peek(
1941        &mut self,
1942        uuid: Uuid,
1943        conn_id: ConnectionId,
1944        cluster_id: mz_controller_types::ClusterId,
1945        depends_on: BTreeSet<GlobalId>,
1946        is_fast_path: bool,
1947        watch_set: Option<WatchSetCreation>,
1948        tx: oneshot::Sender<Result<(), AdapterError>>,
1949    ) {
1950        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1951        if let Some(ws) = watch_set {
1952            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1953                let _ = tx.send(Err(
1954                    AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
1955                ));
1956                return;
1957            }
1958        }
1959
1960        // Store the peek in pending_peeks for later retrieval when results arrive
1961        self.pending_peeks.insert(
1962            uuid,
1963            PendingPeek {
1964                conn_id: conn_id.clone(),
1965                cluster_id,
1966                depends_on,
1967                ctx_extra: ExecuteContextGuard::new(
1968                    statement_logging_id,
1969                    self.internal_cmd_tx.clone(),
1970                ),
1971                is_fast_path,
1972            },
1973        );
1974
1975        // Also track it by connection ID for cancellation support
1976        self.client_pending_peeks
1977            .entry(conn_id)
1978            .or_default()
1979            .insert(uuid, cluster_id);
1980
1981        let _ = tx.send(Ok(()));
1982    }
1983
1984    /// Handle unregistration of a frontend peek that was registered but failed to issue.
1985    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
1986    fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
1987        // Remove from pending_peeks (this also removes from client_pending_peeks)
1988        if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
1989            // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
1990            let _ = pending_peek.ctx_extra.defuse();
1991        }
1992        let _ = tx.send(());
1993    }
1994}