Skip to main content

mz_adapter/coord/
command_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for  processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::AuthenticatorKind;
17use mz_auth::password::Password;
18use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
19use mz_sql::catalog::AutoProvisionSource;
20use mz_sql::session::metadata::SessionMetadata;
21use std::collections::{BTreeMap, BTreeSet};
22use std::net::IpAddr;
23use std::sync::Arc;
24
25use futures::FutureExt;
26use futures::future::LocalBoxFuture;
27use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::memory::objects::{
30    CatalogItem, DataSourceDesc, Role, Source, Table, TableDataSource,
31};
32use mz_ore::task;
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{instrument, soft_panic_or_log};
35use mz_repr::role_id::RoleId;
36use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
37use mz_sql::ast::{
38    AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
39    ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
40    SubscribeStatement,
41};
42use mz_sql::catalog::RoleAttributesRaw;
43use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
44use mz_sql::plan::{
45    AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
46    StatementClassification, TransactionType,
47};
48use mz_sql::pure::{
49    materialized_view_option_contains_temporal, purify_create_materialized_view_options,
50};
51use mz_sql::rbac;
52use mz_sql::rbac::CREATE_ITEM_USAGE;
53use mz_sql::session::user::User;
54use mz_sql::session::vars::{
55    EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
56};
57use mz_sql_parser::ast::display::AstDisplay;
58use mz_sql_parser::ast::{
59    CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
60    WithOptionValue,
61};
62use mz_storage_types::sources::Timeline;
63use opentelemetry::trace::TraceContextExt;
64use tokio::sync::{mpsc, oneshot};
65use tracing::{Instrument, debug_span, info, warn};
66use tracing_opentelemetry::OpenTelemetrySpanExt;
67use uuid::Uuid;
68
69use crate::command::{
70    CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
71    SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
72};
73use crate::coord::appends::PendingWriteTxn;
74use crate::coord::peek::PendingPeek;
75use crate::coord::{
76    ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
77    PurifiedStatementReady, validate_ip_with_policy_rules,
78};
79use crate::error::{AdapterError, AuthenticationError};
80use crate::notice::AdapterNotice;
81use crate::session::{Session, TransactionOps, TransactionStatus};
82use crate::statement_logging::WatchSetCreation;
83use crate::util::{ClientTransmitter, ResultExt};
84use crate::webhook::{
85    AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
86};
87use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
88
89use super::ExecuteContextGuard;
90
91/// The login status of a role, used by authentication handlers to check role
92/// existence and login permission before proceeding to credential verification.
93enum RoleLoginStatus {
94    /// The role does not exist in the catalog.
95    NotFound,
96    /// The role exists and has the LOGIN attribute.
97    CanLogin,
98    /// The role exists but does not have the LOGIN attribute.
99    NonLogin,
100}
101
102fn role_login_status(role: Option<&Role>) -> RoleLoginStatus {
103    match role {
104        None => RoleLoginStatus::NotFound,
105        Some(role) => match role.attributes.login {
106            Some(login) if login => RoleLoginStatus::CanLogin,
107            _ => RoleLoginStatus::NonLogin,
108        },
109    }
110}
111
112impl Coordinator {
113    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
114    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
115    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
116    pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
117        async move {
118            if let Some(session) = cmd.session_mut() {
119                session.apply_external_metadata_updates();
120            }
121            match cmd {
122                Command::Startup {
123                    tx,
124                    user,
125                    conn_id,
126                    secret_key,
127                    uuid,
128                    client_ip,
129                    application_name,
130                    notice_tx,
131                } => {
132                    // Note: We purposefully do not use a ClientTransmitter here because startup
133                    // handles errors and cleanup of sessions itself.
134                    self.handle_startup(
135                        tx,
136                        user,
137                        conn_id,
138                        secret_key,
139                        uuid,
140                        client_ip,
141                        application_name,
142                        notice_tx,
143                    )
144                    .await;
145                }
146
147                Command::AuthenticatePassword {
148                    tx,
149                    role_name,
150                    password,
151                } => {
152                    self.handle_authenticate_password(tx, role_name, password)
153                        .await;
154                }
155
156                Command::AuthenticateGetSASLChallenge {
157                    tx,
158                    role_name,
159                    nonce,
160                } => {
161                    self.handle_generate_sasl_challenge(tx, role_name, nonce)
162                        .await;
163                }
164
165                Command::AuthenticateVerifySASLProof {
166                    tx,
167                    role_name,
168                    proof,
169                    mock_hash,
170                    auth_message,
171                } => {
172                    self.handle_authenticate_verify_sasl_proof(
173                        tx,
174                        role_name,
175                        proof,
176                        auth_message,
177                        mock_hash,
178                    );
179                }
180
181                Command::CheckRoleCanLogin { tx, role_name } => {
182                    self.handle_role_can_login(tx, role_name);
183                }
184
185                Command::Execute {
186                    portal_name,
187                    session,
188                    tx,
189                    outer_ctx_extra,
190                } => {
191                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
192
193                    self.handle_execute(portal_name, session, tx, outer_ctx_extra)
194                        .await;
195                }
196
197                Command::StartCopyFromStdin {
198                    target_id,
199                    target_name,
200                    columns,
201                    row_desc,
202                    params,
203                    session,
204                    tx,
205                } => {
206                    let otel_ctx = OpenTelemetryContext::obtain();
207                    let result = self.setup_copy_from_stdin(
208                        &session,
209                        target_id,
210                        target_name,
211                        columns,
212                        row_desc,
213                        params,
214                    );
215                    let _ = tx.send(Response {
216                        result,
217                        session,
218                        otel_ctx,
219                    });
220                }
221
222                Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
223
224                Command::CancelRequest {
225                    conn_id,
226                    secret_key,
227                } => {
228                    self.handle_cancel(conn_id, secret_key).await;
229                }
230
231                Command::PrivilegedCancelRequest { conn_id } => {
232                    self.handle_privileged_cancel(conn_id).await;
233                }
234
235                Command::GetWebhook {
236                    database,
237                    schema,
238                    name,
239                    tx,
240                } => {
241                    self.handle_get_webhook(database, schema, name, tx);
242                }
243
244                Command::GetSystemVars { tx } => {
245                    let _ = tx.send(self.catalog.system_config().clone());
246                }
247
248                Command::SetSystemVars { vars, conn_id, tx } => {
249                    let mut ops = Vec::with_capacity(vars.len());
250                    let conn = &self.active_conns[&conn_id];
251
252                    for (name, value) in vars {
253                        if let Err(e) =
254                            self.catalog().system_config().get(&name).and_then(|var| {
255                                var.visible(conn.user(), self.catalog.system_config())
256                            })
257                        {
258                            let _ = tx.send(Err(e.into()));
259                            return;
260                        }
261
262                        ops.push(catalog::Op::UpdateSystemConfiguration {
263                            name,
264                            value: OwnedVarInput::Flat(value),
265                        });
266                    }
267
268                    let result = self
269                        .catalog_transact_with_context(Some(&conn_id), None, ops)
270                        .await;
271                    let _ = tx.send(result);
272                }
273
274                Command::InjectAuditEvents {
275                    events,
276                    conn_id,
277                    tx,
278                } => {
279                    let ops = vec![catalog::Op::InjectAuditEvents { events }];
280                    let result = self
281                        .catalog_transact_with_context(Some(&conn_id), None, ops)
282                        .await;
283                    let _ = tx.send(result);
284                }
285
286                Command::Terminate { conn_id, tx } => {
287                    self.handle_terminate(conn_id).await;
288                    // Note: We purposefully do not use a ClientTransmitter here because we're already
289                    // terminating the provided session.
290                    if let Some(tx) = tx {
291                        let _ = tx.send(Ok(()));
292                    }
293                }
294
295                Command::Commit {
296                    action,
297                    session,
298                    tx,
299                } => {
300                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
301                    // We reach here not through a statement execution, but from the
302                    // "commit" pgwire command. Thus, we just generate a default statement
303                    // execution context (once statement logging is implemented, this will cause nothing to be logged
304                    // when the execution finishes.)
305                    let ctx = ExecuteContext::from_parts(
306                        tx,
307                        self.internal_cmd_tx.clone(),
308                        session,
309                        Default::default(),
310                    );
311                    let plan = match action {
312                        EndTransactionAction::Commit => {
313                            Plan::CommitTransaction(CommitTransactionPlan {
314                                transaction_type: TransactionType::Implicit,
315                            })
316                        }
317                        EndTransactionAction::Rollback => {
318                            Plan::AbortTransaction(AbortTransactionPlan {
319                                transaction_type: TransactionType::Implicit,
320                            })
321                        }
322                    };
323
324                    let conn_id = ctx.session().conn_id().clone();
325                    self.sequence_plan(ctx, plan, ResolvedIds::empty(), ResolvedIds::empty())
326                        .await;
327                    // Part of the Command::Commit contract is that the Coordinator guarantees that
328                    // it has cleared its transaction state for the connection.
329                    self.clear_connection(&conn_id).await;
330                }
331
332                Command::CatalogSnapshot { tx } => {
333                    let _ = tx.send(CatalogSnapshot {
334                        catalog: self.owned_catalog(),
335                    });
336                }
337
338                Command::CheckConsistency { tx } => {
339                    let _ = tx.send(self.check_consistency());
340                }
341
342                Command::Dump { tx } => {
343                    let _ = tx.send(self.dump().await);
344                }
345
346                Command::GetComputeInstanceClient { instance_id, tx } => {
347                    let _ = tx.send(self.controller.compute.instance_client(instance_id));
348                }
349
350                Command::GetOracle { timeline, tx } => {
351                    let oracle = self
352                        .global_timelines
353                        .get(&timeline)
354                        .map(|timeline_state| Arc::clone(&timeline_state.oracle))
355                        .ok_or(AdapterError::ChangedPlan(
356                            "timeline has disappeared during planning".to_string(),
357                        ));
358                    let _ = tx.send(oracle);
359                }
360
361                Command::DetermineRealTimeRecentTimestamp {
362                    source_ids,
363                    real_time_recency_timeout,
364                    tx,
365                } => {
366                    let result = self
367                        .determine_real_time_recent_timestamp(
368                            source_ids.iter().copied(),
369                            real_time_recency_timeout,
370                        )
371                        .await;
372
373                    match result {
374                        Ok(Some(fut)) => {
375                            let catalog = Arc::clone(&self.catalog);
376                            task::spawn(|| "determine real time recent timestamp", async move {
377                                let result =
378                                    Coordinator::await_real_time_recent_timestamp(catalog, fut)
379                                        .await
380                                        .map(Some);
381                                let _ = tx.send(result);
382                            });
383                        }
384                        Ok(None) => {
385                            let _ = tx.send(Ok(None));
386                        }
387                        Err(e) => {
388                            let _ = tx.send(Err(e));
389                        }
390                    }
391                }
392
393                Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
394                    let read_holds = self.txn_read_holds.get(&conn_id).cloned();
395                    let _ = tx.send(read_holds);
396                }
397
398                Command::StoreTransactionReadHolds {
399                    conn_id,
400                    read_holds,
401                    tx,
402                } => {
403                    self.store_transaction_read_holds(conn_id, read_holds);
404                    let _ = tx.send(());
405                }
406
407                Command::ExecuteSlowPathPeek {
408                    dataflow_plan,
409                    determination,
410                    finishing,
411                    compute_instance,
412                    target_replica,
413                    intermediate_result_type,
414                    source_ids,
415                    conn_id,
416                    max_result_size,
417                    max_query_result_size,
418                    watch_set,
419                    tx,
420                } => {
421                    let result = self
422                        .implement_slow_path_peek(
423                            *dataflow_plan,
424                            determination,
425                            finishing,
426                            compute_instance,
427                            target_replica,
428                            intermediate_result_type,
429                            source_ids,
430                            conn_id,
431                            max_result_size,
432                            max_query_result_size,
433                            watch_set,
434                        )
435                        .await;
436                    let _ = tx.send(result);
437                }
438
439                Command::ExecuteSubscribe {
440                    df_desc,
441                    dependency_ids,
442                    cluster_id,
443                    replica_id,
444                    conn_id,
445                    session_uuid,
446                    read_holds,
447                    plan,
448                    statement_logging_id,
449                    tx,
450                } => {
451                    let mut ctx_extra = ExecuteContextGuard::new(
452                        statement_logging_id,
453                        self.internal_cmd_tx.clone(),
454                    );
455                    let result = self
456                        .implement_subscribe(
457                            &mut ctx_extra,
458                            df_desc,
459                            dependency_ids,
460                            cluster_id,
461                            replica_id,
462                            conn_id,
463                            session_uuid,
464                            read_holds,
465                            plan,
466                        )
467                        .await;
468                    let _ = tx.send(result);
469                }
470
471                Command::CopyToPreflight {
472                    s3_sink_connection,
473                    sink_id,
474                    tx,
475                } => {
476                    // Spawn a background task to perform the slow S3 preflight operations.
477                    // This avoids blocking the coordinator's main task.
478                    let connection_context = self.connection_context().clone();
479                    let enforce_external_addresses =
480                        mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
481                            .get(self.controller.storage.config().config_set());
482                    task::spawn(|| "copy_to_preflight", async move {
483                        let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
484                            connection_context,
485                            &s3_sink_connection.aws_connection,
486                            &s3_sink_connection.upload_info,
487                            s3_sink_connection.connection_id,
488                            sink_id,
489                            enforce_external_addresses,
490                        )
491                        .await
492                        .map_err(AdapterError::from);
493                        let _ = tx.send(result);
494                    });
495                }
496
497                Command::ExecuteCopyTo {
498                    df_desc,
499                    compute_instance,
500                    target_replica,
501                    source_ids,
502                    conn_id,
503                    watch_set,
504                    tx,
505                } => {
506                    // implement_copy_to spawns a background task that sends the response
507                    // through tx when the COPY TO completes (or immediately if setup fails).
508                    // We just call it and let it handle all response sending.
509                    self.implement_copy_to(
510                        *df_desc,
511                        compute_instance,
512                        target_replica,
513                        source_ids,
514                        conn_id,
515                        watch_set,
516                        tx,
517                    )
518                    .await;
519                }
520
521                Command::ExecuteSideEffectingFunc {
522                    plan,
523                    conn_id,
524                    current_role,
525                    tx,
526                } => {
527                    let result = self
528                        .execute_side_effecting_func(plan, conn_id, current_role)
529                        .await;
530                    let _ = tx.send(result);
531                }
532                Command::RegisterFrontendPeek {
533                    uuid,
534                    conn_id,
535                    cluster_id,
536                    depends_on,
537                    is_fast_path,
538                    watch_set,
539                    tx,
540                } => {
541                    self.handle_register_frontend_peek(
542                        uuid,
543                        conn_id,
544                        cluster_id,
545                        depends_on,
546                        is_fast_path,
547                        watch_set,
548                        tx,
549                    );
550                }
551                Command::UnregisterFrontendPeek { uuid, tx } => {
552                    self.handle_unregister_frontend_peek(uuid, tx);
553                }
554                Command::ExplainTimestamp {
555                    conn_id,
556                    session_wall_time,
557                    cluster_id,
558                    id_bundle,
559                    determination,
560                    tx,
561                } => {
562                    let explanation = self.explain_timestamp(
563                        &conn_id,
564                        session_wall_time,
565                        cluster_id,
566                        &id_bundle,
567                        determination,
568                    );
569                    let _ = tx.send(explanation);
570                }
571                Command::FrontendStatementLogging(event) => {
572                    self.handle_frontend_statement_logging_event(event);
573                }
574            }
575        }
576        .instrument(debug_span!("handle_command"))
577        .boxed_local()
578    }
579
580    fn handle_role_can_login(
581        &self,
582        tx: oneshot::Sender<Result<(), AdapterError>>,
583        role_name: String,
584    ) {
585        let result =
586            match role_login_status(self.catalog().try_get_role_by_name(role_name.as_str())) {
587                RoleLoginStatus::NotFound => Err(AdapterError::AuthenticationError(
588                    AuthenticationError::RoleNotFound,
589                )),
590                RoleLoginStatus::NonLogin => Err(AdapterError::AuthenticationError(
591                    AuthenticationError::NonLogin,
592                )),
593                RoleLoginStatus::CanLogin => Ok(()),
594            };
595        let _ = tx.send(result);
596    }
597
598    fn handle_authenticate_verify_sasl_proof(
599        &self,
600        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
601        role_name: String,
602        proof: String,
603        auth_message: String,
604        mock_hash: String,
605    ) {
606        let role = self.catalog().try_get_role_by_name(role_name.as_str());
607        let login_status = role_login_status(role);
608        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
609        let real_hash = role_auth
610            .as_ref()
611            .and_then(|auth| auth.password_hash.as_ref());
612        let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
613
614        match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
615            Ok(verifier) => {
616                // Success only if role exists, allows login, and a real password hash was used.
617                if matches!(login_status, RoleLoginStatus::CanLogin) && real_hash.is_some() {
618                    let _ = tx.send(Ok(SASLVerifyProofResponse { verifier }));
619                } else {
620                    let _ = tx.send(Err(AdapterError::AuthenticationError(match login_status {
621                        RoleLoginStatus::NonLogin => AuthenticationError::NonLogin,
622                        RoleLoginStatus::NotFound => AuthenticationError::RoleNotFound,
623                        RoleLoginStatus::CanLogin => AuthenticationError::InvalidCredentials,
624                    })));
625                }
626            }
627            Err(_) => {
628                let _ = tx.send(Err(AdapterError::AuthenticationError(
629                    AuthenticationError::InvalidCredentials,
630                )));
631            }
632        }
633    }
634
635    #[mz_ore::instrument(level = "debug")]
636    async fn handle_generate_sasl_challenge(
637        &self,
638        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
639        role_name: String,
640        client_nonce: String,
641    ) {
642        let role_auth = self
643            .catalog()
644            .try_get_role_by_name(&role_name)
645            .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
646
647        let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
648            Ok(n) => n,
649            Err(e) => {
650                let msg = format!(
651                    "failed to generate nonce for client nonce {}: {}",
652                    client_nonce, e
653                );
654                let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
655                soft_panic_or_log!("{msg}");
656                return;
657            }
658        };
659
660        // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
661        // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
662        // with the role name to get a per-role mock nonce.
663        let send_mock_challenge =
664            |role_name: String,
665             mock_nonce: String,
666             nonce: String,
667             tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
668                let opts = mz_auth::hash::mock_sasl_challenge(
669                    &role_name,
670                    &mock_nonce,
671                    &self.catalog().system_config().scram_iterations(),
672                );
673                let _ = tx.send(Ok(SASLChallengeResponse {
674                    iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
675                    salt: BASE64_STANDARD.encode(opts.salt),
676                    nonce,
677                }));
678            };
679
680        match role_auth {
681            Some(auth) if auth.password_hash.is_some() => {
682                let hash = auth.password_hash.as_ref().expect("checked above");
683                match mz_auth::hash::scram256_parse_opts(hash) {
684                    Ok(opts) => {
685                        let _ = tx.send(Ok(SASLChallengeResponse {
686                            iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
687                            salt: BASE64_STANDARD.encode(opts.salt),
688                            nonce,
689                        }));
690                    }
691                    Err(_) => {
692                        send_mock_challenge(
693                            role_name,
694                            self.catalog().state().mock_authentication_nonce(),
695                            nonce,
696                            tx,
697                        );
698                    }
699                }
700            }
701            _ => {
702                send_mock_challenge(
703                    role_name,
704                    self.catalog().state().mock_authentication_nonce(),
705                    nonce,
706                    tx,
707                );
708            }
709        }
710    }
711
712    #[mz_ore::instrument(level = "debug")]
713    async fn handle_authenticate_password(
714        &self,
715        tx: oneshot::Sender<Result<(), AdapterError>>,
716        role_name: String,
717        password: Option<Password>,
718    ) {
719        let Some(password) = password else {
720            // The user did not provide a password.
721            let _ = tx.send(Err(AdapterError::AuthenticationError(
722                AuthenticationError::PasswordRequired,
723            )));
724            return;
725        };
726        let role = self.catalog().try_get_role_by_name(role_name.as_str());
727
728        match role_login_status(role) {
729            RoleLoginStatus::NotFound => {
730                let _ = tx.send(Err(AdapterError::AuthenticationError(
731                    AuthenticationError::RoleNotFound,
732                )));
733                return;
734            }
735            RoleLoginStatus::NonLogin => {
736                let _ = tx.send(Err(AdapterError::AuthenticationError(
737                    AuthenticationError::NonLogin,
738                )));
739                return;
740            }
741            RoleLoginStatus::CanLogin => {}
742        }
743
744        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
745
746        if let Some(auth) = role_auth {
747            if let Some(hash) = &auth.password_hash {
748                let hash = hash.clone();
749                task::spawn_blocking(
750                    || "auth-check-hash",
751                    move || {
752                        let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
753                            Ok(_) => tx.send(Ok(())),
754                            Err(_) => tx.send(Err(AdapterError::AuthenticationError(
755                                AuthenticationError::InvalidCredentials,
756                            ))),
757                        };
758                    },
759                );
760                return;
761            }
762        }
763        // Authentication failed due to missing password hash.
764        let _ = tx.send(Err(AdapterError::AuthenticationError(
765            AuthenticationError::InvalidCredentials,
766        )));
767    }
768
769    #[mz_ore::instrument(level = "debug")]
770    async fn handle_startup(
771        &mut self,
772        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
773        user: User,
774        conn_id: ConnectionId,
775        secret_key: u32,
776        uuid: uuid::Uuid,
777        client_ip: Option<IpAddr>,
778        application_name: String,
779        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
780    ) {
781        // Early return if successful, otherwise cleanup any possible state.
782        match self
783            .handle_startup_inner(&user, &conn_id, &client_ip, &notice_tx)
784            .await
785        {
786            Ok((role_id, superuser_attribute, session_defaults)) => {
787                let session_type = metrics::session_type_label_value(&user);
788                self.metrics
789                    .active_sessions
790                    .with_label_values(&[session_type])
791                    .inc();
792                let conn = ConnMeta {
793                    secret_key,
794                    notice_tx,
795                    drop_sinks: BTreeSet::new(),
796                    pending_cluster_alters: BTreeSet::new(),
797                    connected_at: self.now(),
798                    user,
799                    application_name,
800                    uuid,
801                    client_ip,
802                    conn_id: conn_id.clone(),
803                    authenticated_role: role_id,
804                    deferred_lock: None,
805                };
806                let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
807                let update = self.catalog().state().resolve_builtin_table_update(update);
808                self.begin_session_for_statement_logging(&conn);
809                self.active_conns.insert(conn_id.clone(), conn);
810
811                // Note: Do NOT await the notify here, we pass this back to
812                // whatever requested the startup to prevent blocking startup
813                // and the Coordinator on a builtin table update.
814                let updates = vec![update];
815                // It's not a hard error if our list is missing a builtin table, but we want to
816                // make sure these two things stay in-sync.
817                if mz_ore::assert::soft_assertions_enabled() {
818                    let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
819                        .iter()
820                        .map(|table| self.catalog().resolve_builtin_table(*table))
821                        .collect();
822                    let updates_tracked = updates
823                        .iter()
824                        .all(|update| required_tables.contains(&update.id));
825                    let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
826                        .iter()
827                        .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
828                    mz_ore::soft_assert_or_log!(
829                        updates_tracked,
830                        "not tracking all required builtin table updates!"
831                    );
832                    // TODO(parkmycar): When checking if a query depends on these builtin table
833                    // writes we do not check the transitive dependencies of the query, because
834                    // we don't support creating views on mz_internal objects. If one of these
835                    // tables is promoted out of mz_internal then we'll need to add this check.
836                    mz_ore::soft_assert_or_log!(
837                        all_mz_internal,
838                        "not all builtin tables are in mz_internal! need to check transitive depends",
839                    )
840                }
841                let notify = self.builtin_table_update().background(updates);
842
843                let catalog = self.owned_catalog();
844                let build_info_human_version =
845                    catalog.state().config().build_info.human_version(None);
846
847                let statement_logging_frontend = self
848                    .statement_logging
849                    .create_frontend(build_info_human_version);
850
851                let resp = Ok(StartupResponse {
852                    role_id,
853                    write_notify: notify,
854                    session_defaults,
855                    catalog,
856                    storage_collections: Arc::clone(&self.controller.storage_collections),
857                    transient_id_gen: Arc::clone(&self.transient_id_gen),
858                    optimizer_metrics: self.optimizer_metrics.clone(),
859                    persist_client: self.persist_client.clone(),
860                    statement_logging_frontend,
861                    superuser_attribute,
862                });
863                if tx.send(resp).is_err() {
864                    // Failed to send to adapter, but everything is setup so we can terminate
865                    // normally.
866                    self.handle_terminate(conn_id).await;
867                }
868            }
869            Err(e) => {
870                // Error during startup or sending to adapter. A user may have been created and
871                // it can stay; no need to delete it.
872                // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
873
874                // Communicate the error back to the client. No need to
875                // handle failures to send the error back; we've already
876                // cleaned up all necessary state.
877                let _ = tx.send(Err(e));
878            }
879        }
880    }
881
882    // Failible startup work that needs to be cleaned up on error.
883    async fn handle_startup_inner(
884        &mut self,
885        user: &User,
886        _conn_id: &ConnectionId,
887        client_ip: &Option<IpAddr>,
888        notice_tx: &mpsc::UnboundedSender<AdapterNotice>,
889    ) -> Result<(RoleId, SuperuserAttribute, BTreeMap<String, OwnedVarInput>), AdapterError> {
890        if self.catalog().try_get_role_by_name(&user.name).is_none() {
891            // If the user has made it to this point, that means they have been fully authenticated.
892            // This includes preventing any user, except a pre-defined set of system users, from
893            // connecting to an internal port. Therefore it's ok to always create a new role for the
894            // user.
895            let mut attributes = RoleAttributesRaw::new();
896            // When auto-provisioning, we store the authenticator that was used to provision the role.
897            attributes.auto_provision_source = match user.authenticator_kind {
898                Some(AuthenticatorKind::Oidc) => Some(AutoProvisionSource::Oidc),
899                Some(AuthenticatorKind::Frontegg) => Some(AutoProvisionSource::Frontegg),
900                Some(AuthenticatorKind::None) => Some(AutoProvisionSource::None),
901                _ => {
902                    warn!(
903                        "auto-provisioning role with unexpected authenticator kind: {:?}",
904                        user.authenticator_kind
905                    );
906                    None
907                }
908            };
909
910            // Auto-provision roles with the LOGIN attribute to distinguish
911            // them as users.
912            attributes.login = Some(true);
913
914            let plan = CreateRolePlan {
915                name: user.name.to_string(),
916                attributes,
917            };
918            self.sequence_create_role_for_startup(plan).await?;
919        }
920        let role = self
921            .catalog()
922            .try_get_role_by_name(&user.name)
923            .expect("created above");
924        let role_id = role.id;
925        let superuser_attribute = role.attributes.superuser;
926
927        // JWT group-to-role sync: reconcile role memberships with JWT group claims.
928        // Missing groups claim (None) → skip sync; empty (Some([])) → revoke all.
929        self.maybe_sync_jwt_groups(role_id, user.groups.as_deref(), notice_tx)
930            .await?;
931
932        if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
933            return Err(AdapterError::UserSessionsDisallowed);
934        }
935
936        // Initialize the default session variables for this role.
937        let mut session_defaults = BTreeMap::new();
938        let system_config = self.catalog().state().system_config();
939
940        // Override the session with any system defaults.
941        session_defaults.extend(
942            system_config
943                .iter_session()
944                .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
945        );
946        // Special case.
947        let statement_logging_default = system_config
948            .statement_logging_default_sample_rate()
949            .format();
950        session_defaults.insert(
951            STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
952            OwnedVarInput::Flat(statement_logging_default),
953        );
954        // Override system defaults with role defaults.
955        session_defaults.extend(
956            self.catalog()
957                .get_role(&role_id)
958                .vars()
959                .map(|(name, val)| (name.to_string(), val.clone())),
960        );
961
962        // Validate network policies for external users. Internal users can only connect on the
963        // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
964        // to ensure these internal interfaces are well secured.
965        //
966        // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
967        // the default network policy for this role, so we read directly out of what the session
968        // will get initialized with.
969        if !user.is_internal() {
970            let network_policy_name = session_defaults
971                .get(NETWORK_POLICY.name())
972                .and_then(|value| match value {
973                    OwnedVarInput::Flat(name) => Some(name.clone()),
974                    OwnedVarInput::SqlSet(names) => {
975                        tracing::error!(?names, "found multiple network policies");
976                        None
977                    }
978                })
979                .unwrap_or_else(|| system_config.default_network_policy_name());
980            let maybe_network_policy = self
981                .catalog()
982                .get_network_policy_by_name(&network_policy_name);
983
984            let Some(network_policy) = maybe_network_policy else {
985                // We should prevent dropping the default network policy, or setting the policy
986                // to something that doesn't exist, so complain loudly if this occurs.
987                tracing::error!(
988                    network_policy_name,
989                    "default network policy does not exist. All user traffic will be blocked"
990                );
991                let reason = match client_ip {
992                    Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
993                    None => super::NetworkPolicyError::MissingIp,
994                };
995                return Err(AdapterError::NetworkPolicyDenied(reason));
996            };
997
998            if let Some(ip) = client_ip {
999                match validate_ip_with_policy_rules(ip, &network_policy.rules) {
1000                    Ok(_) => {}
1001                    Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
1002                }
1003            } else {
1004                // Only temporary and internal representation of a session
1005                // should be missing a client_ip. These sessions should not be
1006                // making requests or going through handle_startup.
1007                return Err(AdapterError::NetworkPolicyDenied(
1008                    super::NetworkPolicyError::MissingIp,
1009                ));
1010            }
1011        }
1012
1013        // Temporary schemas are now created lazily when the first temporary object is created,
1014        // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
1015        // for the common case where connections never create temporary objects.
1016
1017        Ok((
1018            role_id,
1019            SuperuserAttribute(superuser_attribute),
1020            session_defaults,
1021        ))
1022    }
1023
1024    /// Handles an execute command.
1025    #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
1026    pub(crate) async fn handle_execute(
1027        &mut self,
1028        portal_name: String,
1029        mut session: Session,
1030        tx: ClientTransmitter<ExecuteResponse>,
1031        // If this command was part of another execute command
1032        // (for example, executing a `FETCH` statement causes an execute to be
1033        //  issued for the cursor it references),
1034        // then `outer_context` should be `Some`.
1035        // This instructs the coordinator that the
1036        // outer execute should be considered finished once the inner one is.
1037        outer_context: Option<ExecuteContextGuard>,
1038    ) {
1039        if session.vars().emit_trace_id_notice() {
1040            let span_context = tracing::Span::current()
1041                .context()
1042                .span()
1043                .span_context()
1044                .clone();
1045            if span_context.is_valid() {
1046                session.add_notice(AdapterNotice::QueryTrace {
1047                    trace_id: span_context.trace_id(),
1048                });
1049            }
1050        }
1051
1052        if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
1053            // If statement logging hasn't started yet, we don't need
1054            // to add any "end" event, so just make up a no-op
1055            // `ExecuteContextExtra` here, via `Default::default`.
1056            //
1057            // It's a bit unfortunate because the edge case of failed
1058            // portal verifications won't show up in statement
1059            // logging, but there seems to be nothing else we can do,
1060            // because we need access to the portal to begin logging.
1061            //
1062            // Another option would be to log a begin and end event, but just fill in NULLs
1063            // for everything we get from the portal (prepared statement id, params).
1064            let extra = outer_context.unwrap_or_else(Default::default);
1065            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1066            return ctx.retire(Err(err));
1067        }
1068
1069        // The reference to `portal` can't outlive `session`, which we
1070        // use to construct the context, so scope the reference to this block where we
1071        // get everything we need from the portal for later.
1072        let (stmt, ctx, params) = {
1073            let portal = session
1074                .get_portal_unverified(&portal_name)
1075                .expect("known to exist");
1076            let params = portal.parameters.clone();
1077            let stmt = portal.stmt.clone();
1078            let logging = Arc::clone(&portal.logging);
1079            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
1080
1081            let extra = if let Some(extra) = outer_context {
1082                // We are executing in the context of another SQL statement, so we don't
1083                // want to begin statement logging anew. The context of the actual statement
1084                // being executed is the one that should be retired once this finishes.
1085                extra
1086            } else {
1087                // This is a new statement, log it and return the context
1088                let maybe_uuid = self.begin_statement_execution(
1089                    &mut session,
1090                    &params,
1091                    &logging,
1092                    lifecycle_timestamps,
1093                );
1094
1095                ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
1096            };
1097            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1098            (stmt, ctx, params)
1099        };
1100
1101        let stmt = match stmt {
1102            Some(stmt) => stmt,
1103            None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
1104        };
1105
1106        let session_type = metrics::session_type_label_value(ctx.session().user());
1107        let stmt_type = metrics::statement_type_label_value(&stmt);
1108        self.metrics
1109            .query_total
1110            .with_label_values(&[session_type, stmt_type])
1111            .inc();
1112        match &*stmt {
1113            Statement::Subscribe(SubscribeStatement { output, .. })
1114            | Statement::Copy(CopyStatement {
1115                relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
1116                ..
1117            }) => {
1118                self.metrics
1119                    .subscribe_outputs
1120                    .with_label_values(&[
1121                        session_type,
1122                        metrics::subscribe_output_label_value(output),
1123                    ])
1124                    .inc();
1125            }
1126            _ => {}
1127        }
1128
1129        self.handle_execute_inner(stmt, params, ctx).await
1130    }
1131
1132    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
1133    pub(crate) async fn handle_execute_inner(
1134        &mut self,
1135        stmt: Arc<Statement<Raw>>,
1136        params: Params,
1137        mut ctx: ExecuteContext,
1138    ) {
1139        // This comment describes the various ways DDL can execute (the ordered operations: name
1140        // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
1141        // three notable properties that all partially interact.
1142        //
1143        // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1144        //    execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1145        //    We announce success of the single DDL when it is executed, but do not attempt to plan
1146        //    or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1147        //    DDL (this behavior is Postgres-compatible). The purpose of this is because some
1148        //    drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1149        //    work. When the single DDL is announced as successful we also put the session's
1150        //    transaction ops into `SingleStatement` which will produce an error if any other
1151        //    statement is run in the transaction except `COMMIT`. Additionally, this will cause
1152        //    `handle_execute_inner` to stop further processing (no planning, etc.) of the
1153        //    statement.
1154        // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1155        //    any number of only these DDL statements to be executed in a transaction. During
1156        //    sequencing we run an incremental catalog dry run against in-memory transaction state
1157        //    and store the resulting `CatalogState` in `TransactionOps::DDL`, but nothing is yet
1158        //    committed to the durable catalog. At `COMMIT`, all accumulated ops are applied in one
1159        //    catalog transaction. The purpose of this is to allow multiple, atomic renames in the
1160        //    same transaction.
1161        // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1162        //    makes network calls (interfacing with secrets, optimization of views/indexes, source
1163        //    purification). These must guarantee correctness when they return to the main
1164        //    coordinator thread because the catalog state could have changed while they were doing
1165        //    the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1166        //    of IDs that we needed to exist. We discovered the way we were doing that was not
1167        //    always correct. Instead of attempting to get that completely right, we have opted to
1168        //    serialize DDL. Getting this right is difficult because catalog changes can affect name
1169        //    resolution, planning, sequencing, and optimization. Correctly writing logic that is
1170        //    aware of all possible catalog changes that would affect any of those parts is not
1171        //    something our current code has been designed to be helpful at. Even if a DDL statement
1172        //    is doing off-thread work, another DDL must not yet execute at all. Executing these
1173        //    serially will guarantee that no off-thread work has affected the state of the catalog.
1174        //    This is done by adding a VecDeque of deferred statements and a lock to the
1175        //    Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1176        //    transaction ops are needed to the session as described above), it attempts to own the
1177        //    lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1178        //    struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1179        //    `clear_transaction` and a message sent to the Coordinator to execute the next queued
1180        //    DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1181        //    awaits dequeuing caused by the lock being released.
1182
1183        // Verify that this statement type can be executed in the current
1184        // transaction state.
1185        match ctx.session().transaction() {
1186            // By this point we should be in a running transaction.
1187            TransactionStatus::Default => unreachable!(),
1188
1189            // Failed transactions have already been checked in pgwire for a safe statement
1190            // (COMMIT, ROLLBACK, etc.) and can proceed.
1191            TransactionStatus::Failed(_) => {}
1192
1193            // Started is a deceptive name, and means different things depending on which
1194            // protocol was used. It's either exactly one statement (known because this
1195            // is the simple protocol and the parser parsed the entire string, and it had
1196            // one statement). Or from the extended protocol, it means *some* query is
1197            // being executed, but there might be others after it before the Sync (commit)
1198            // message. Postgres handles this by teaching Started to eagerly commit certain
1199            // statements that can't be run in a transaction block.
1200            TransactionStatus::Started(_) => {
1201                if let Statement::Declare(_) = &*stmt {
1202                    // Declare is an exception. Although it's not against any spec to execute
1203                    // it, it will always result in nothing happening, since all portals will be
1204                    // immediately closed. Users don't know this detail, so this error helps them
1205                    // understand what's going wrong. Postgres does this too.
1206                    return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1207                        "DECLARE CURSOR".into(),
1208                    )));
1209                }
1210            }
1211
1212            // Implicit or explicit transactions.
1213            //
1214            // Implicit transactions happen when a multi-statement query is executed
1215            // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1216            // then the existing implicit transaction will be upgraded to an explicit
1217            // transaction. Thus, we should not separate what implicit and explicit
1218            // transactions can do unless there's some additional checking to make sure
1219            // something disallowed in explicit transactions did not previously take place
1220            // in the implicit portion.
1221            TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1222                match &*stmt {
1223                    // Statements that are safe in a transaction. We still need to verify that we
1224                    // don't interleave reads and writes since we can't perform those serializably.
1225                    Statement::Close(_)
1226                    | Statement::Commit(_)
1227                    | Statement::Copy(_)
1228                    | Statement::Deallocate(_)
1229                    | Statement::Declare(_)
1230                    | Statement::Discard(_)
1231                    | Statement::Execute(_)
1232                    | Statement::ExplainPlan(_)
1233                    | Statement::ExplainPushdown(_)
1234                    | Statement::ExplainAnalyzeObject(_)
1235                    | Statement::ExplainAnalyzeCluster(_)
1236                    | Statement::ExplainTimestamp(_)
1237                    | Statement::ExplainSinkSchema(_)
1238                    | Statement::Fetch(_)
1239                    | Statement::Prepare(_)
1240                    | Statement::Rollback(_)
1241                    | Statement::Select(_)
1242                    | Statement::SetTransaction(_)
1243                    | Statement::Show(_)
1244                    | Statement::SetVariable(_)
1245                    | Statement::ResetVariable(_)
1246                    | Statement::StartTransaction(_)
1247                    | Statement::Subscribe(_)
1248                    | Statement::Raise(_) => {
1249                        // Always safe.
1250                    }
1251
1252                    Statement::Insert(InsertStatement {
1253                        source, returning, ..
1254                    }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1255                        // Inserting from constant values statements that do not need to execute on
1256                        // any cluster (no RETURNING) is always safe.
1257                    }
1258
1259                    // These statements must be kept in-sync with `must_serialize_ddl()`.
1260                    Statement::AlterObjectRename(_)
1261                    | Statement::AlterObjectSwap(_)
1262                    | Statement::CreateTableFromSource(_)
1263                    | Statement::CreateSource(_) => {
1264                        let state = self.catalog().for_session(ctx.session()).state().clone();
1265                        let revision = self.catalog().transient_revision();
1266
1267                        // Initialize our transaction with a set of empty ops, or return an error
1268                        // if we can't run a DDL transaction
1269                        let txn_status = ctx.session_mut().transaction_mut();
1270                        if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1271                            ops: vec![],
1272                            state,
1273                            revision,
1274                            side_effects: vec![],
1275                            snapshot: None,
1276                        }) {
1277                            return ctx.retire(Err(err));
1278                        }
1279                    }
1280
1281                    // Statements below must by run singly (in Started).
1282                    Statement::AlterCluster(_)
1283                    | Statement::AlterConnection(_)
1284                    | Statement::AlterDefaultPrivileges(_)
1285                    | Statement::AlterIndex(_)
1286                    | Statement::AlterMaterializedViewApplyReplacement(_)
1287                    | Statement::AlterSetCluster(_)
1288                    | Statement::AlterOwner(_)
1289                    | Statement::AlterRetainHistory(_)
1290                    | Statement::AlterRole(_)
1291                    | Statement::AlterSecret(_)
1292                    | Statement::AlterSink(_)
1293                    | Statement::AlterSource(_)
1294                    | Statement::AlterSystemReset(_)
1295                    | Statement::AlterSystemResetAll(_)
1296                    | Statement::AlterSystemSet(_)
1297                    | Statement::AlterTableAddColumn(_)
1298                    | Statement::AlterNetworkPolicy(_)
1299                    | Statement::CreateCluster(_)
1300                    | Statement::CreateClusterReplica(_)
1301                    | Statement::CreateConnection(_)
1302                    | Statement::CreateDatabase(_)
1303                    | Statement::CreateIndex(_)
1304                    | Statement::CreateMaterializedView(_)
1305                    | Statement::CreateRole(_)
1306                    | Statement::CreateSchema(_)
1307                    | Statement::CreateSecret(_)
1308                    | Statement::CreateSink(_)
1309                    | Statement::CreateSubsource(_)
1310                    | Statement::CreateTable(_)
1311                    | Statement::CreateType(_)
1312                    | Statement::CreateView(_)
1313                    | Statement::CreateWebhookSource(_)
1314                    | Statement::CreateNetworkPolicy(_)
1315                    | Statement::Delete(_)
1316                    | Statement::DropObjects(_)
1317                    | Statement::DropOwned(_)
1318                    | Statement::GrantPrivileges(_)
1319                    | Statement::GrantRole(_)
1320                    | Statement::Insert(_)
1321                    | Statement::ReassignOwned(_)
1322                    | Statement::RevokePrivileges(_)
1323                    | Statement::RevokeRole(_)
1324                    | Statement::Update(_)
1325                    | Statement::ValidateConnection(_)
1326                    | Statement::Comment(_)
1327                    | Statement::ExecuteUnitTest(_) => {
1328                        let txn_status = ctx.session_mut().transaction_mut();
1329
1330                        // If we're not in an implicit transaction and we could generate exactly one
1331                        // valid ExecuteResponse, we can delay execution until commit.
1332                        if !txn_status.is_implicit() {
1333                            // Statements whose tag is trivial (known only from an unexecuted statement) can
1334                            // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1335                            // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1336                            // delay execution until `COMMIT`.
1337                            if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1338                                if let Err(err) = txn_status
1339                                    .add_ops(TransactionOps::SingleStatement { stmt, params })
1340                                {
1341                                    ctx.retire(Err(err));
1342                                    return;
1343                                }
1344                                ctx.retire(Ok(resp));
1345                                return;
1346                            }
1347                        }
1348
1349                        return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1350                            stmt.to_string(),
1351                        )));
1352                    }
1353                }
1354            }
1355        }
1356
1357        // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1358        // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1359        // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1360        // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1361        // Coordinator::clear_transaction is correctly called when session transactions are ended
1362        // because that function will release the held lock from active_conns.
1363        if Self::must_serialize_ddl(&stmt, &ctx) {
1364            if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1365                let prev = self
1366                    .active_conns
1367                    .get_mut(ctx.session().conn_id())
1368                    .expect("connection must exist")
1369                    .deferred_lock
1370                    .replace(guard);
1371                assert!(
1372                    prev.is_none(),
1373                    "connections should have at most one lock guard"
1374                );
1375            } else {
1376                if self
1377                    .active_conns
1378                    .get(ctx.session().conn_id())
1379                    .expect("connection must exist")
1380                    .deferred_lock
1381                    .is_some()
1382                {
1383                    // This session *already* has the lock, and incorrectly tried to execute another
1384                    // DDL while still holding the lock, violating the assumption documented above.
1385                    // This is an internal error, probably in some AdapterClient user (pgwire or
1386                    // http). Because the session is now in some unexpected state, return an error
1387                    // which should cause the AdapterClient user to fail the transaction.
1388                    // (Terminating the connection is maybe what we would prefer to do, but is not
1389                    // currently a thing we can do from the coordinator: calling handle_terminate
1390                    // cleans up Coordinator state for the session but doesn't inform the
1391                    // AdapterClient that the session should terminate.)
1392                    soft_panic_or_log!(
1393                        "session {} attempted to get ddl lock while already owning it",
1394                        ctx.session().conn_id()
1395                    );
1396                    ctx.retire(Err(AdapterError::Internal(
1397                        "session attempted to get ddl lock while already owning it".to_string(),
1398                    )));
1399                    return;
1400                }
1401                self.serialized_ddl.push_back(DeferredPlanStatement {
1402                    ctx,
1403                    ps: PlanStatement::Statement { stmt, params },
1404                });
1405                return;
1406            }
1407        }
1408
1409        let catalog = self.catalog();
1410        let catalog = catalog.for_session(ctx.session());
1411        let original_stmt = Arc::clone(&stmt);
1412        // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1413        // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1414        let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1415            Ok(resolved) => resolved,
1416            Err(e) => return ctx.retire(Err(e.into())),
1417        };
1418        // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1419        // purification.  This should be done back on the main thread.
1420        // We do the validation:
1421        //   - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1422        // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1423        // occurs.
1424        let (stmt, resolved_ids) = match stmt {
1425            // Various statements must be purified off the main coordinator thread of control.
1426            stmt if Self::must_spawn_purification(&stmt) => {
1427                let internal_cmd_tx = self.internal_cmd_tx.clone();
1428                let conn_id = ctx.session().conn_id().clone();
1429                let catalog = self.owned_catalog();
1430                let now = self.now();
1431                let otel_ctx = OpenTelemetryContext::obtain();
1432                let current_storage_configuration = self.controller.storage.config().clone();
1433                task::spawn(|| format!("purify:{conn_id}"), async move {
1434                    let transient_revision = catalog.transient_revision();
1435                    let catalog = catalog.for_session(ctx.session());
1436
1437                    // Checks if the session is authorized to purify a statement. Usually
1438                    // authorization is checked after planning, however purification happens before
1439                    // planning, which may require the use of some connections and secrets.
1440                    if let Err(e) = rbac::check_usage(
1441                        &catalog,
1442                        ctx.session(),
1443                        &resolved_ids,
1444                        &CREATE_ITEM_USAGE,
1445                    ) {
1446                        return ctx.retire(Err(e.into()));
1447                    }
1448
1449                    let (result, cluster_id) = mz_sql::pure::purify_statement(
1450                        catalog,
1451                        now,
1452                        stmt,
1453                        &current_storage_configuration,
1454                    )
1455                    .await;
1456                    let result = result.map_err(|e| e.into());
1457                    let dependency_ids = resolved_ids.items().copied().collect();
1458                    let plan_validity = PlanValidity::new(
1459                        transient_revision,
1460                        dependency_ids,
1461                        cluster_id,
1462                        None,
1463                        ctx.session().role_metadata().clone(),
1464                    );
1465                    // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1466                    let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1467                        PurifiedStatementReady {
1468                            ctx,
1469                            result,
1470                            params,
1471                            plan_validity,
1472                            original_stmt,
1473                            otel_ctx,
1474                        },
1475                    ));
1476                    if let Err(e) = result {
1477                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1478                    }
1479                });
1480                return;
1481            }
1482
1483            // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1484            // automatically as part of purification
1485            Statement::CreateSubsource(_) => {
1486                ctx.retire(Err(AdapterError::Unsupported(
1487                    "CREATE SUBSOURCE statements",
1488                )));
1489                return;
1490            }
1491
1492            Statement::CreateMaterializedView(mut cmvs) => {
1493                // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1494                // only used for storing initial frontiers in the catalog.
1495                if cmvs.as_of.is_some() {
1496                    return ctx.retire(Err(AdapterError::Unsupported(
1497                        "CREATE MATERIALIZED VIEW ... AS OF statements",
1498                    )));
1499                }
1500
1501                let mz_now = match self
1502                    .resolve_mz_now_for_create_materialized_view(
1503                        &cmvs,
1504                        &resolved_ids,
1505                        ctx.session_mut(),
1506                        true,
1507                    )
1508                    .await
1509                {
1510                    Ok(mz_now) => mz_now,
1511                    Err(e) => return ctx.retire(Err(e)),
1512                };
1513
1514                let catalog = self.catalog().for_session(ctx.session());
1515
1516                purify_create_materialized_view_options(
1517                    catalog,
1518                    mz_now,
1519                    &mut cmvs,
1520                    &mut resolved_ids,
1521                );
1522
1523                let purified_stmt =
1524                    Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1525                        if_exists: cmvs.if_exists,
1526                        name: cmvs.name,
1527                        columns: cmvs.columns,
1528                        replacement_for: cmvs.replacement_for,
1529                        in_cluster: cmvs.in_cluster,
1530                        in_cluster_replica: cmvs.in_cluster_replica,
1531                        query: cmvs.query,
1532                        with_options: cmvs.with_options,
1533                        as_of: None,
1534                    });
1535
1536                // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1537                // `Message::PurifiedStatementReady` here.)
1538                (purified_stmt, resolved_ids)
1539            }
1540
1541            Statement::ExplainPlan(ExplainPlanStatement {
1542                stage,
1543                with_options,
1544                format,
1545                explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1546            }) => {
1547                let mut cmvs = *box_cmvs;
1548                let mz_now = match self
1549                    .resolve_mz_now_for_create_materialized_view(
1550                        &cmvs,
1551                        &resolved_ids,
1552                        ctx.session_mut(),
1553                        false,
1554                    )
1555                    .await
1556                {
1557                    Ok(mz_now) => mz_now,
1558                    Err(e) => return ctx.retire(Err(e)),
1559                };
1560
1561                let catalog = self.catalog().for_session(ctx.session());
1562
1563                purify_create_materialized_view_options(
1564                    catalog,
1565                    mz_now,
1566                    &mut cmvs,
1567                    &mut resolved_ids,
1568                );
1569
1570                let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1571                    stage,
1572                    with_options,
1573                    format,
1574                    explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1575                });
1576
1577                (purified_stmt, resolved_ids)
1578            }
1579
1580            // All other statements are handled immediately.
1581            _ => (stmt, resolved_ids),
1582        };
1583
1584        match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
1585            Ok((plan, sql_impl_ids)) => {
1586                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
1587                    .await
1588            }
1589            Err(e) => ctx.retire(Err(e)),
1590        }
1591    }
1592
1593    /// Whether the statement must be serialized and is DDL.
1594    fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1595        // Non-DDL is not serialized here.
1596        if !StatementClassification::from(&*stmt).is_ddl() {
1597            return false;
1598        }
1599        // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1600        // not be serialized. These all use PlanValidity for their checking, and we must ensure
1601        // those checks are sufficient.
1602        if Self::must_spawn_purification(stmt) {
1603            return false;
1604        }
1605
1606        // Statements that support multiple DDLs in a single transaction aren't serialized here.
1607        // Their operations are serialized when applied to the catalog, guaranteeing that any
1608        // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1609        if ctx.session.transaction().is_ddl() {
1610            return false;
1611        }
1612
1613        // Some DDL is exempt. It is not great that we are matching on Statements here because
1614        // different plans can be produced from the same top-level statement type (i.e., `ALTER
1615        // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1616        // planned in the first place, so we accept the abstraction leak.
1617        match stmt {
1618            // Secrets have a small and understood set of dependencies, and their off-thread work
1619            // interacts with k8s.
1620            Statement::AlterSecret(_) => false,
1621            Statement::CreateSecret(_) => false,
1622            Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1623                if actions
1624                    .iter()
1625                    .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1626            {
1627                false
1628            }
1629
1630            // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1631            // does not affect its catalog names or ids and so is safe to not serialize. This could
1632            // change the set of replicas that exist. For queries that name replicas or use the
1633            // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1634            // ensure that those replicas exist during the query finish stage. Additionally, that
1635            // work can take hours (configured by the user), so would also be a bad experience for
1636            // users.
1637            Statement::AlterCluster(_) => false,
1638
1639            // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1640            // cutover. If the old collection is stalled, it may block forever. Checks in
1641            // sequencing ensure that the operation fails if any one of these happens concurrently:
1642            //   * the sink is dropped
1643            //   * the new source relation is dropped
1644            //   * another `ALTER SINK` for the same sink is applied first
1645            Statement::AlterSink(stmt)
1646                if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1647            {
1648                false
1649            }
1650
1651            // `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT` waits for the target MV to make
1652            // enough progress for a clean cutover. If the target MV is stalled, it may block
1653            // forever. Checks in sequencing ensure the operation fails if any of these happens
1654            // concurrently:
1655            //   * the target MV is dropped
1656            //   * the replacement MV is dropped
1657            Statement::AlterMaterializedViewApplyReplacement(_) => false,
1658
1659            // Everything else must be serialized.
1660            _ => true,
1661        }
1662    }
1663
1664    /// Whether the statement must be purified off of the Coordinator thread.
1665    fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1666        // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1667        // coordinator thread.
1668        if !matches!(
1669            stmt,
1670            Statement::CreateSource(_)
1671                | Statement::AlterSource(_)
1672                | Statement::CreateSink(_)
1673                | Statement::CreateTableFromSource(_)
1674        ) {
1675            return false;
1676        }
1677
1678        // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1679        if let Statement::AlterSource(stmt) = stmt {
1680            let names: Vec<CreateSourceOptionName> = match &stmt.action {
1681                AlterSourceAction::SetOptions(options) => {
1682                    options.iter().map(|o| o.name.clone()).collect()
1683                }
1684                AlterSourceAction::ResetOptions(names) => names.clone(),
1685                _ => vec![],
1686            };
1687            if !names.is_empty()
1688                && names
1689                    .iter()
1690                    .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1691            {
1692                return false;
1693            }
1694        }
1695
1696        true
1697    }
1698
1699    /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1700    /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1701    /// option, this function grabs read holds at the earliest possible time on input collections
1702    /// that might be involved in the MV.
1703    ///
1704    /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1705    /// in `with_options`).
1706    ///
1707    /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1708    /// unfortunately.)
1709    async fn resolve_mz_now_for_create_materialized_view(
1710        &mut self,
1711        cmvs: &CreateMaterializedViewStatement<Aug>,
1712        resolved_ids: &ResolvedIds,
1713        session: &Session,
1714        acquire_read_holds: bool,
1715    ) -> Result<Option<Timestamp>, AdapterError> {
1716        if cmvs
1717            .with_options
1718            .iter()
1719            .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1720        {
1721            let catalog = self.catalog().for_session(session);
1722            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1723            let ids = self
1724                .index_oracle(cluster)
1725                .sufficient_collections(resolved_ids.collections().copied());
1726
1727            // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1728            // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1729            // we want to check the AT times against the read holds that we acquire here. But
1730            // we do it for any REFRESH option, to avoid having so many code paths doing different
1731            // things.)
1732            //
1733            // It's important that we acquire read holds _before_ we determine the least valid read.
1734            // Otherwise, we're not guaranteed that the since frontier doesn't
1735            // advance forward from underneath us.
1736            let read_holds = self.acquire_read_holds(&ids);
1737
1738            // Does `mz_now()` occur?
1739            let mz_now_ts = if cmvs
1740                .with_options
1741                .iter()
1742                .any(materialized_view_option_contains_temporal)
1743            {
1744                let timeline_context = self
1745                    .catalog()
1746                    .validate_timeline_context(resolved_ids.collections().copied())?;
1747
1748                // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1749                // but even in the TimestampIndependent case.
1750                // Note that we didn't accurately decide whether we are TimestampDependent
1751                // or TimestampIndependent, because for this we'd need to also check whether
1752                // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1753                // However, this doesn't matter here, as we are just going to default to
1754                // EpochMilliseconds in both cases.
1755                let timeline = timeline_context
1756                    .timeline()
1757                    .unwrap_or(&Timeline::EpochMilliseconds);
1758
1759                // Let's start with the timestamp oracle read timestamp.
1760                let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1761
1762                // If `least_valid_read` is later than the oracle, then advance to that time.
1763                // If we didn't do this, then there would be a danger of missing the first refresh,
1764                // which might cause the materialized view to be unreadable for hours. This might
1765                // be what was happening here:
1766                // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1767                //
1768                // In the long term, it would be good to actually block the MV creation statement
1769                // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1770                // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1771                // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1772                // after its creation might see input changes that happened after the CRATE MATERIALIZED
1773                // VIEW statement returned.
1774                let oracle_timestamp = timestamp;
1775                let least_valid_read = read_holds.least_valid_read();
1776                timestamp.advance_by(least_valid_read.borrow());
1777
1778                if oracle_timestamp != timestamp {
1779                    warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1780                }
1781
1782                info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1783                Ok(Some(timestamp))
1784            } else {
1785                Ok(None)
1786            };
1787
1788            // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1789            // released when we don't use it.
1790            if acquire_read_holds {
1791                self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1792            }
1793
1794            mz_now_ts
1795        } else {
1796            Ok(None)
1797        }
1798    }
1799
1800    /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1801    /// the named `conn_id` if the correct secret key is specified.
1802    ///
1803    /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1804    /// `ConnectionId` because this method gets called by external clients when
1805    /// they request to cancel a request.
1806    #[mz_ore::instrument(level = "debug")]
1807    async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1808        if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1809            // If the secret key specified by the client doesn't match the
1810            // actual secret key for the target connection, we treat this as a
1811            // rogue cancellation request and ignore it.
1812            if conn_meta.secret_key != secret_key {
1813                return;
1814            }
1815
1816            // Now that we've verified the secret key, this is a privileged
1817            // cancellation request. We can upgrade the raw connection ID to a
1818            // proper `IdHandle`.
1819            self.handle_privileged_cancel(id_handle.clone()).await;
1820        }
1821    }
1822
1823    /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1824    /// interactive work for the named `conn_id`.
1825    #[mz_ore::instrument(level = "debug")]
1826    pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1827        let mut maybe_ctx = None;
1828
1829        // Cancel pending writes. There is at most one pending write per session.
1830        let pending_write_idx = self.pending_writes.iter().position(|pending_write_txn| {
1831            matches!(pending_write_txn, PendingWriteTxn::User {
1832                pending_txn: PendingTxn { ctx, .. },
1833                ..
1834            } if *ctx.session().conn_id() == conn_id)
1835        });
1836        if let Some(idx) = pending_write_idx {
1837            if let PendingWriteTxn::User {
1838                pending_txn: PendingTxn { ctx, .. },
1839                ..
1840            } = self.pending_writes.remove(idx)
1841            {
1842                maybe_ctx = Some(ctx);
1843            }
1844        }
1845
1846        // Cancel deferred writes.
1847        if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1848            maybe_ctx = Some(write_op.into_ctx());
1849        }
1850
1851        // Cancel deferred statements.
1852        let deferred_ddl_idx = self
1853            .serialized_ddl
1854            .iter()
1855            .position(|deferred| *deferred.ctx.session().conn_id() == conn_id);
1856        if let Some(idx) = deferred_ddl_idx {
1857            let deferred = self
1858                .serialized_ddl
1859                .remove(idx)
1860                .expect("known to exist from call to `position` above");
1861            maybe_ctx = Some(deferred.ctx);
1862        }
1863
1864        // Cancel reads waiting on being linearized. There is at most one linearized read per
1865        // session.
1866        if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1867            let ctx = pending_read_txn.take_context();
1868            maybe_ctx = Some(ctx);
1869        }
1870
1871        if let Some(ctx) = maybe_ctx {
1872            ctx.retire(Err(AdapterError::Canceled));
1873        }
1874
1875        self.cancel_pending_peeks(&conn_id);
1876        self.cancel_pending_watchsets(&conn_id);
1877        self.cancel_compute_sinks_for_conn(&conn_id).await;
1878        self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1879            .await;
1880        self.cancel_pending_copy(&conn_id);
1881        if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1882            let _ = tx.send(true);
1883        }
1884    }
1885
1886    /// Handle termination of a client session.
1887    ///
1888    /// This cleans up any state in the coordinator associated with the session.
1889    #[mz_ore::instrument(level = "debug")]
1890    async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1891        // If the session doesn't exist in `active_conns`, then this method will panic later on.
1892        // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1893        // debug. This panic is very infrequent so we want as much information as possible.
1894        // See https://github.com/MaterializeInc/database-issues/issues/5627.
1895        assert!(
1896            self.active_conns.contains_key(&conn_id),
1897            "unknown connection: {conn_id:?}\n\n{self:?}"
1898        );
1899
1900        // We do not need to call clear_transaction here because there are no side effects to run
1901        // based on any session transaction state.
1902        self.clear_connection(&conn_id).await;
1903
1904        self.drop_temp_items(&conn_id).await;
1905        // Only call catalog_mut() if a temporary schema actually exists for this connection.
1906        // This avoids an expensive Arc::make_mut clone for the common case where the connection
1907        // never created any temporary objects.
1908        if self.catalog().state().has_temporary_schema(&conn_id) {
1909            self.catalog_mut()
1910                .drop_temporary_schema(&conn_id)
1911                .unwrap_or_terminate("unable to drop temporary schema");
1912        }
1913        let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1914        let session_type = metrics::session_type_label_value(conn.user());
1915        self.metrics
1916            .active_sessions
1917            .with_label_values(&[session_type])
1918            .dec();
1919        self.cancel_pending_peeks(conn.conn_id());
1920        self.cancel_pending_watchsets(&conn_id);
1921        self.cancel_pending_copy(&conn_id);
1922        self.end_session_for_statement_logging(conn.uuid());
1923
1924        // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1925        // this to prevent blocking the Coordinator in the case that a lot of connections are
1926        // closed at once, which occurs regularly in some workflows.
1927        let update = self
1928            .catalog()
1929            .state()
1930            .pack_session_update(&conn, Diff::MINUS_ONE);
1931        let update = self.catalog().state().resolve_builtin_table_update(update);
1932
1933        let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1934    }
1935
1936    /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1937    /// rows.
1938    #[mz_ore::instrument(level = "debug")]
1939    fn handle_get_webhook(
1940        &mut self,
1941        database: String,
1942        schema: String,
1943        name: String,
1944        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1945    ) {
1946        /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1947        ///
1948        /// Returns a struct that can be used to append data to the underlying storate collection, and the
1949        /// types we should cast the request to.
1950        fn resolve(
1951            coord: &mut Coordinator,
1952            database: String,
1953            schema: String,
1954            name: String,
1955        ) -> Result<AppendWebhookResponse, PartialItemName> {
1956            // Resolve our collection.
1957            let name = PartialItemName {
1958                database: Some(database),
1959                schema: Some(schema),
1960                item: name,
1961            };
1962            let Ok(entry) = coord
1963                .catalog()
1964                .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1965            else {
1966                return Err(name);
1967            };
1968
1969            // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1970            let (data_source, desc, global_id) = match entry.item() {
1971                CatalogItem::Source(Source {
1972                    data_source: data_source @ DataSourceDesc::Webhook { .. },
1973                    desc,
1974                    global_id,
1975                    ..
1976                }) => (data_source, desc.clone(), *global_id),
1977                CatalogItem::Table(
1978                    table @ Table {
1979                        desc,
1980                        data_source:
1981                            TableDataSource::DataSource {
1982                                desc: data_source @ DataSourceDesc::Webhook { .. },
1983                                ..
1984                            },
1985                        ..
1986                    },
1987                ) => (data_source, desc.latest(), table.global_id_writes()),
1988                _ => return Err(name),
1989            };
1990
1991            let DataSourceDesc::Webhook {
1992                validate_using,
1993                body_format,
1994                headers,
1995                ..
1996            } = data_source
1997            else {
1998                mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1999                return Err(name);
2000            };
2001            let body_format = body_format.clone();
2002            let header_tys = headers.clone();
2003
2004            // Assert we have one column for the body, and how ever many are required for
2005            // the headers.
2006            let num_columns = headers.num_columns() + 1;
2007            mz_ore::soft_assert_or_log!(
2008                desc.arity() <= num_columns,
2009                "expected at most {} columns, but got {}",
2010                num_columns,
2011                desc.arity()
2012            );
2013
2014            // Double check that the body column of the webhook source matches the type
2015            // we're about to deserialize as.
2016            let body_column = desc
2017                .get_by_name(&"body".into())
2018                .map(|(_idx, ty)| ty.clone())
2019                .ok_or_else(|| name.clone())?;
2020            assert!(!body_column.nullable, "webhook body column is nullable!?");
2021            assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
2022
2023            // Create a validator that can be called to validate a webhook request.
2024            let validator = validate_using.as_ref().map(|v| {
2025                let validation = v.clone();
2026                AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
2027            });
2028
2029            // Get a channel so we can queue updates to be written.
2030            let row_tx = coord
2031                .controller
2032                .storage
2033                .monotonic_appender(global_id)
2034                .map_err(|_| name.clone())?;
2035            let stats = coord
2036                .controller
2037                .storage
2038                .webhook_statistics(global_id)
2039                .map_err(|_| name)?;
2040            let invalidator = coord
2041                .active_webhooks
2042                .entry(entry.id())
2043                .or_insert_with(WebhookAppenderInvalidator::new);
2044            let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
2045
2046            Ok(AppendWebhookResponse {
2047                tx,
2048                body_format,
2049                header_tys,
2050                validator,
2051            })
2052        }
2053
2054        let response = resolve(self, database, schema, name).map_err(|name| {
2055            AppendWebhookError::UnknownWebhook {
2056                database: name.database.expect("provided"),
2057                schema: name.schema.expect("provided"),
2058                name: name.item,
2059            }
2060        });
2061        let _ = tx.send(response);
2062    }
2063
2064    /// Handle registration of a frontend peek, for statement logging and query cancellation
2065    /// handling.
2066    fn handle_register_frontend_peek(
2067        &mut self,
2068        uuid: Uuid,
2069        conn_id: ConnectionId,
2070        cluster_id: mz_controller_types::ClusterId,
2071        depends_on: BTreeSet<GlobalId>,
2072        is_fast_path: bool,
2073        watch_set: Option<WatchSetCreation>,
2074        tx: oneshot::Sender<Result<(), AdapterError>>,
2075    ) {
2076        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
2077        if let Some(ws) = watch_set {
2078            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
2079                let _ = tx.send(Err(
2080                    AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
2081                ));
2082                return;
2083            }
2084        }
2085
2086        // Store the peek in pending_peeks for later retrieval when results arrive
2087        self.pending_peeks.insert(
2088            uuid,
2089            PendingPeek {
2090                conn_id: conn_id.clone(),
2091                cluster_id,
2092                depends_on,
2093                ctx_extra: ExecuteContextGuard::new(
2094                    statement_logging_id,
2095                    self.internal_cmd_tx.clone(),
2096                ),
2097                is_fast_path,
2098            },
2099        );
2100
2101        // Also track it by connection ID for cancellation support
2102        self.client_pending_peeks
2103            .entry(conn_id)
2104            .or_default()
2105            .insert(uuid, cluster_id);
2106
2107        let _ = tx.send(Ok(()));
2108    }
2109
2110    /// Handle unregistration of a frontend peek that was registered but failed to issue.
2111    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
2112    fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
2113        // Remove from pending_peeks (this also removes from client_pending_peeks)
2114        if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
2115            // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
2116            let _ = pending_peek.ctx_extra.defuse();
2117        }
2118        let _ = tx.send(());
2119    }
2120}