Skip to main content

mz_adapter/coord/
command_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for  processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::AuthenticatorKind;
17use mz_auth::password::Password;
18use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
19use mz_sql::catalog::AutoProvisionSource;
20use mz_sql::session::metadata::SessionMetadata;
21use std::collections::{BTreeMap, BTreeSet};
22use std::net::IpAddr;
23use std::sync::Arc;
24
25use futures::FutureExt;
26use futures::future::LocalBoxFuture;
27use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::memory::objects::{
30    CatalogItem, DataSourceDesc, Role, Source, Table, TableDataSource,
31};
32use mz_ore::task;
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{instrument, soft_panic_or_log};
35use mz_repr::role_id::RoleId;
36use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
37use mz_sql::ast::{
38    AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
39    ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
40    SubscribeStatement,
41};
42use mz_sql::catalog::RoleAttributesRaw;
43use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
44use mz_sql::plan::{
45    AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
46    StatementClassification, TransactionType,
47};
48use mz_sql::pure::{
49    materialized_view_option_contains_temporal, purify_create_materialized_view_options,
50};
51use mz_sql::rbac;
52use mz_sql::rbac::CREATE_ITEM_USAGE;
53use mz_sql::session::user::User;
54use mz_sql::session::vars::{
55    EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE,
56    TRANSACTION_ISOLATION_VAR_NAME, Value, Var, check_transaction_isolation_feature_flag,
57};
58use mz_sql_parser::ast::display::AstDisplay;
59use mz_sql_parser::ast::{
60    CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
61    WithOptionValue,
62};
63use mz_storage_types::sources::Timeline;
64use opentelemetry::trace::TraceContextExt;
65use tokio::sync::{mpsc, oneshot};
66use tracing::{Instrument, debug_span, info, warn};
67use tracing_opentelemetry::OpenTelemetrySpanExt;
68use uuid::Uuid;
69
70use crate::command::{
71    CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
72    SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
73};
74use crate::coord::appends::{PendingWriteTxn, UserWriteResponder};
75use crate::coord::peek::PendingPeek;
76use crate::coord::{
77    ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
78    PurifiedStatementReady, validate_ip_with_policy_rules,
79};
80use crate::error::{AdapterError, AuthenticationError};
81use crate::notice::AdapterNotice;
82use crate::session::{Session, TransactionOps, TransactionStatus};
83use crate::statement_logging::WatchSetCreation;
84use crate::util::{ClientTransmitter, ResultExt};
85use crate::webhook::{
86    AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
87};
88use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
89
90use super::ExecuteContextGuard;
91
92/// The login status of a role, used by authentication handlers to check role
93/// existence and login permission before proceeding to credential verification.
94enum RoleLoginStatus {
95    /// The role does not exist in the catalog.
96    NotFound,
97    /// The role exists and has the LOGIN attribute.
98    CanLogin,
99    /// The role exists but does not have the LOGIN attribute.
100    NonLogin,
101}
102
103fn role_login_status(role: Option<&Role>) -> RoleLoginStatus {
104    match role {
105        None => RoleLoginStatus::NotFound,
106        Some(role) => match role.attributes.login {
107            Some(login) if login => RoleLoginStatus::CanLogin,
108            _ => RoleLoginStatus::NonLogin,
109        },
110    }
111}
112
113impl Coordinator {
114    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
115    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
116    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
117    pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
118        async move {
119            if let Some(session) = cmd.session_mut() {
120                session.apply_external_metadata_updates();
121            }
122            match cmd {
123                Command::Startup {
124                    tx,
125                    user,
126                    conn_id,
127                    secret_key,
128                    uuid,
129                    client_ip,
130                    application_name,
131                    notice_tx,
132                } => {
133                    // Note: We purposefully do not use a ClientTransmitter here because startup
134                    // handles errors and cleanup of sessions itself.
135                    self.handle_startup(
136                        tx,
137                        user,
138                        conn_id,
139                        secret_key,
140                        uuid,
141                        client_ip,
142                        application_name,
143                        notice_tx,
144                    )
145                    .await;
146                }
147
148                Command::AuthenticatePassword {
149                    tx,
150                    role_name,
151                    password,
152                } => {
153                    self.handle_authenticate_password(tx, role_name, password)
154                        .await;
155                }
156
157                Command::AuthenticateGetSASLChallenge {
158                    tx,
159                    role_name,
160                    nonce,
161                } => {
162                    self.handle_generate_sasl_challenge(tx, role_name, nonce)
163                        .await;
164                }
165
166                Command::AuthenticateVerifySASLProof {
167                    tx,
168                    role_name,
169                    proof,
170                    mock_hash,
171                    auth_message,
172                } => {
173                    self.handle_authenticate_verify_sasl_proof(
174                        tx,
175                        role_name,
176                        proof,
177                        auth_message,
178                        mock_hash,
179                    );
180                }
181
182                Command::CheckRoleCanLogin { tx, role_name } => {
183                    self.handle_role_can_login(tx, role_name);
184                }
185
186                Command::Execute {
187                    portal_name,
188                    session,
189                    tx,
190                    outer_ctx_extra,
191                } => {
192                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
193
194                    self.handle_execute(portal_name, session, tx, outer_ctx_extra)
195                        .await;
196                }
197
198                Command::StartCopyFromStdin {
199                    target_id,
200                    target_name,
201                    columns,
202                    row_desc,
203                    params,
204                    session,
205                    tx,
206                } => {
207                    let otel_ctx = OpenTelemetryContext::obtain();
208                    let result = self.setup_copy_from_stdin(
209                        &session,
210                        target_id,
211                        target_name,
212                        columns,
213                        row_desc,
214                        params,
215                    );
216                    let _ = tx.send(Response {
217                        result,
218                        session,
219                        otel_ctx,
220                    });
221                }
222
223                Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
224
225                Command::CancelRequest {
226                    conn_id,
227                    secret_key,
228                } => {
229                    self.handle_cancel(conn_id, secret_key).await;
230                }
231
232                Command::PrivilegedCancelRequest { conn_id } => {
233                    self.handle_privileged_cancel(conn_id).await;
234                }
235
236                Command::GetWebhook {
237                    database,
238                    schema,
239                    name,
240                    tx,
241                } => {
242                    self.handle_get_webhook(database, schema, name, tx);
243                }
244
245                Command::GetSystemVars { tx } => {
246                    let _ = tx.send(self.catalog.system_config().clone());
247                }
248
249                Command::SetSystemVars { vars, conn_id, tx } => {
250                    let mut ops = Vec::with_capacity(vars.len());
251                    let conn = &self.active_conns[&conn_id];
252
253                    for (name, value) in vars {
254                        if let Err(e) =
255                            self.catalog().system_config().get(&name).and_then(|var| {
256                                var.visible(conn.user(), self.catalog.system_config())
257                            })
258                        {
259                            let _ = tx.send(Err(e.into()));
260                            return;
261                        }
262
263                        ops.push(catalog::Op::UpdateSystemConfiguration {
264                            name,
265                            value: OwnedVarInput::Flat(value),
266                        });
267                    }
268
269                    let result = self
270                        .catalog_transact_with_context(Some(&conn_id), None, ops)
271                        .await;
272                    let _ = tx.send(result);
273                }
274
275                Command::UpdateScopedSystemParameters {
276                    overrides,
277                    prune_scope,
278                    tx,
279                } => {
280                    // Store the new working copy, persist it durably, and
281                    // reconcile it into the per-scope resolution boundaries.
282                    self.reconcile_scoped_system_parameters(overrides, prune_scope)
283                        .await;
284                    let _ = tx.send(());
285                }
286
287                Command::InstallScopedSystemParameterFrontend { frontend } => {
288                    // Keep the shared frontend so create-cluster / create-replica
289                    // can resolve scoped overrides synchronously at create time.
290                    self.scoped_frontend = Some(frontend);
291                }
292
293                Command::InjectAuditEvents {
294                    events,
295                    conn_id,
296                    tx,
297                } => {
298                    let ops = vec![catalog::Op::InjectAuditEvents { events }];
299                    let result = self
300                        .catalog_transact_with_context(Some(&conn_id), None, ops)
301                        .await;
302                    let _ = tx.send(result);
303                }
304
305                Command::Terminate { conn_id, tx } => {
306                    self.handle_terminate(conn_id).await;
307                    // Note: We purposefully do not use a ClientTransmitter here because we're already
308                    // terminating the provided session.
309                    if let Some(tx) = tx {
310                        let _ = tx.send(Ok(()));
311                    }
312                }
313
314                Command::Commit {
315                    action,
316                    session,
317                    tx,
318                } => {
319                    let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
320                    // We reach here not through a statement execution, but from the
321                    // "commit" pgwire command. Thus, we just generate a default statement
322                    // execution context (once statement logging is implemented, this will cause nothing to be logged
323                    // when the execution finishes.)
324                    let ctx = ExecuteContext::from_parts(
325                        tx,
326                        self.internal_cmd_tx.clone(),
327                        session,
328                        Default::default(),
329                    );
330                    let plan = match action {
331                        EndTransactionAction::Commit => {
332                            Plan::CommitTransaction(CommitTransactionPlan {
333                                transaction_type: TransactionType::Implicit,
334                            })
335                        }
336                        EndTransactionAction::Rollback => {
337                            Plan::AbortTransaction(AbortTransactionPlan {
338                                transaction_type: TransactionType::Implicit,
339                            })
340                        }
341                    };
342
343                    let conn_id = ctx.session().conn_id().clone();
344                    self.sequence_plan(ctx, plan, ResolvedIds::empty(), ResolvedIds::empty())
345                        .await;
346                    // Part of the Command::Commit contract is that the Coordinator guarantees that
347                    // it has cleared its transaction state for the connection.
348                    self.clear_connection(&conn_id).await;
349                }
350
351                Command::CatalogSnapshot { tx } => {
352                    let _ = tx.send(CatalogSnapshot {
353                        catalog: self.owned_catalog(),
354                    });
355                }
356
357                Command::CheckConsistency { tx } => {
358                    let _ = tx.send(self.check_consistency());
359                }
360
361                Command::Dump { tx } => {
362                    let _ = tx.send(self.dump().await);
363                }
364
365                Command::GetComputeInstanceClient { instance_id, tx } => {
366                    let _ = tx.send(self.controller.compute.instance_client(instance_id));
367                }
368
369                Command::GetOracle { timeline, tx } => {
370                    let oracle = self
371                        .global_timelines
372                        .get(&timeline)
373                        .map(|timeline_state| Arc::clone(&timeline_state.oracle))
374                        .ok_or(AdapterError::ChangedPlan(
375                            "timeline has disappeared during planning".to_string(),
376                        ));
377                    let _ = tx.send(oracle);
378                }
379
380                Command::DetermineRealTimeRecentTimestamp {
381                    source_ids,
382                    real_time_recency_timeout,
383                    tx,
384                } => {
385                    let result = self
386                        .determine_real_time_recent_timestamp(
387                            source_ids.iter().copied(),
388                            real_time_recency_timeout,
389                        )
390                        .await;
391
392                    match result {
393                        Ok(Some(fut)) => {
394                            let catalog = Arc::clone(&self.catalog);
395                            task::spawn(|| "determine real time recent timestamp", async move {
396                                let result =
397                                    Coordinator::await_real_time_recent_timestamp(catalog, fut)
398                                        .await
399                                        .map(Some);
400                                let _ = tx.send(result);
401                            });
402                        }
403                        Ok(None) => {
404                            let _ = tx.send(Ok(None));
405                        }
406                        Err(e) => {
407                            let _ = tx.send(Err(e));
408                        }
409                    }
410                }
411
412                Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
413                    let read_holds = self.txn_read_holds.get(&conn_id).cloned();
414                    let _ = tx.send(read_holds);
415                }
416
417                Command::StoreTransactionReadHolds {
418                    conn_id,
419                    read_holds,
420                    tx,
421                } => {
422                    self.store_transaction_read_holds(conn_id, read_holds);
423                    let _ = tx.send(());
424                }
425
426                Command::ExecuteSlowPathPeek {
427                    dataflow_plan,
428                    determination,
429                    finishing,
430                    compute_instance,
431                    target_replica,
432                    intermediate_result_type,
433                    source_ids,
434                    conn_id,
435                    max_result_size,
436                    max_query_result_size,
437                    watch_set,
438                    tx,
439                } => {
440                    let result = self
441                        .implement_slow_path_peek(
442                            *dataflow_plan,
443                            determination,
444                            finishing,
445                            compute_instance,
446                            target_replica,
447                            intermediate_result_type,
448                            source_ids,
449                            conn_id,
450                            max_result_size,
451                            max_query_result_size,
452                            watch_set,
453                        )
454                        .await;
455                    let _ = tx.send(result);
456                }
457
458                Command::ExecuteSubscribe {
459                    df_desc,
460                    dependency_ids,
461                    cluster_id,
462                    replica_id,
463                    conn_id,
464                    session_uuid,
465                    read_holds,
466                    plan,
467                    statement_logging_id,
468                    tx,
469                } => {
470                    let mut ctx_extra = ExecuteContextGuard::new(
471                        statement_logging_id,
472                        self.internal_cmd_tx.clone(),
473                    );
474                    let result = self
475                        .implement_subscribe(
476                            &mut ctx_extra,
477                            df_desc,
478                            dependency_ids,
479                            cluster_id,
480                            replica_id,
481                            conn_id,
482                            session_uuid,
483                            read_holds,
484                            plan,
485                        )
486                        .await;
487                    let _ = tx.send(result);
488                }
489
490                Command::CopyToPreflight {
491                    s3_sink_connection,
492                    sink_id,
493                    tx,
494                } => {
495                    // Spawn a background task to perform the slow S3 preflight operations.
496                    // This avoids blocking the coordinator's main task.
497                    let connection_context = self.connection_context().clone();
498                    let enforce_external_addresses =
499                        mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
500                            .get(self.controller.storage.config().config_set());
501                    task::spawn(|| "copy_to_preflight", async move {
502                        let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
503                            connection_context,
504                            &s3_sink_connection.aws_connection,
505                            &s3_sink_connection.upload_info,
506                            s3_sink_connection.connection_id,
507                            sink_id,
508                            enforce_external_addresses,
509                        )
510                        .await
511                        .map_err(AdapterError::from);
512                        let _ = tx.send(result);
513                    });
514                }
515
516                Command::ExecuteCopyTo {
517                    df_desc,
518                    compute_instance,
519                    target_replica,
520                    source_ids,
521                    conn_id,
522                    watch_set,
523                    tx,
524                } => {
525                    // implement_copy_to spawns a background task that sends the response
526                    // through tx when the COPY TO completes (or immediately if setup fails).
527                    // We just call it and let it handle all response sending.
528                    self.implement_copy_to(
529                        *df_desc,
530                        compute_instance,
531                        target_replica,
532                        source_ids,
533                        conn_id,
534                        watch_set,
535                        tx,
536                    )
537                    .await;
538                }
539
540                Command::ExecuteSideEffectingFunc {
541                    plan,
542                    conn_id,
543                    current_role,
544                    tx,
545                } => {
546                    let result = self
547                        .execute_side_effecting_func(plan, conn_id, current_role)
548                        .await;
549                    let _ = tx.send(result);
550                }
551                Command::RegisterFrontendPeek {
552                    uuid,
553                    conn_id,
554                    cluster_id,
555                    depends_on,
556                    is_fast_path,
557                    watch_set,
558                    tx,
559                } => {
560                    self.handle_register_frontend_peek(
561                        uuid,
562                        conn_id,
563                        cluster_id,
564                        depends_on,
565                        is_fast_path,
566                        watch_set,
567                        tx,
568                    );
569                }
570                Command::UnregisterFrontendPeek { uuid, tx } => {
571                    self.handle_unregister_frontend_peek(uuid, tx);
572                }
573                Command::ExplainTimestamp {
574                    conn_id,
575                    session_wall_time,
576                    cluster_id,
577                    id_bundle,
578                    determination,
579                    tx,
580                } => {
581                    let explanation = self.explain_timestamp(
582                        &conn_id,
583                        session_wall_time,
584                        cluster_id,
585                        &id_bundle,
586                        determination,
587                    );
588                    let _ = tx.send(explanation);
589                }
590                Command::FrontendStatementLogging(event) => {
591                    self.handle_frontend_statement_logging_event(event);
592                }
593            }
594        }
595        .instrument(debug_span!("handle_command"))
596        .boxed_local()
597    }
598
599    fn handle_role_can_login(
600        &self,
601        tx: oneshot::Sender<Result<(), AdapterError>>,
602        role_name: String,
603    ) {
604        let result =
605            match role_login_status(self.catalog().try_get_role_by_name(role_name.as_str())) {
606                RoleLoginStatus::NotFound => Err(AdapterError::AuthenticationError(
607                    AuthenticationError::RoleNotFound,
608                )),
609                RoleLoginStatus::NonLogin => Err(AdapterError::AuthenticationError(
610                    AuthenticationError::NonLogin,
611                )),
612                RoleLoginStatus::CanLogin => Ok(()),
613            };
614        let _ = tx.send(result);
615    }
616
617    fn handle_authenticate_verify_sasl_proof(
618        &self,
619        tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
620        role_name: String,
621        proof: String,
622        auth_message: String,
623        mock_hash: String,
624    ) {
625        let role = self.catalog().try_get_role_by_name(role_name.as_str());
626        let login_status = role_login_status(role);
627        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
628        let real_hash = role_auth
629            .as_ref()
630            .and_then(|auth| auth.password_hash.as_ref());
631        let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
632
633        match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
634            Ok(verifier) => {
635                // Success only if role exists, allows login, and a real password hash was used.
636                if matches!(login_status, RoleLoginStatus::CanLogin) && real_hash.is_some() {
637                    let _ = tx.send(Ok(SASLVerifyProofResponse { verifier }));
638                } else {
639                    let _ = tx.send(Err(AdapterError::AuthenticationError(match login_status {
640                        RoleLoginStatus::NonLogin => AuthenticationError::NonLogin,
641                        RoleLoginStatus::NotFound => AuthenticationError::RoleNotFound,
642                        RoleLoginStatus::CanLogin => AuthenticationError::InvalidCredentials,
643                    })));
644                }
645            }
646            Err(_) => {
647                let _ = tx.send(Err(AdapterError::AuthenticationError(
648                    AuthenticationError::InvalidCredentials,
649                )));
650            }
651        }
652    }
653
654    #[mz_ore::instrument(level = "debug")]
655    async fn handle_generate_sasl_challenge(
656        &self,
657        tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
658        role_name: String,
659        client_nonce: String,
660    ) {
661        let role_auth = self
662            .catalog()
663            .try_get_role_by_name(&role_name)
664            .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
665
666        let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
667            Ok(n) => n,
668            Err(e) => {
669                let msg = format!(
670                    "failed to generate nonce for client nonce {}: {}",
671                    client_nonce, e
672                );
673                let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
674                soft_panic_or_log!("{msg}");
675                return;
676            }
677        };
678
679        // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
680        // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
681        // with the role name to get a per-role mock nonce.
682        let send_mock_challenge =
683            |role_name: String,
684             mock_nonce: String,
685             nonce: String,
686             tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
687                let opts = mz_auth::hash::mock_sasl_challenge(
688                    &role_name,
689                    &mock_nonce,
690                    &self.catalog().system_config().scram_iterations(),
691                );
692                let _ = tx.send(Ok(SASLChallengeResponse {
693                    iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
694                    salt: BASE64_STANDARD.encode(opts.salt),
695                    nonce,
696                }));
697            };
698
699        match role_auth {
700            Some(auth) if auth.password_hash.is_some() => {
701                let hash = auth.password_hash.as_ref().expect("checked above");
702                match mz_auth::hash::scram256_parse_opts(hash) {
703                    Ok(opts) => {
704                        let _ = tx.send(Ok(SASLChallengeResponse {
705                            iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
706                            salt: BASE64_STANDARD.encode(opts.salt),
707                            nonce,
708                        }));
709                    }
710                    Err(_) => {
711                        send_mock_challenge(
712                            role_name,
713                            self.catalog().state().mock_authentication_nonce(),
714                            nonce,
715                            tx,
716                        );
717                    }
718                }
719            }
720            _ => {
721                send_mock_challenge(
722                    role_name,
723                    self.catalog().state().mock_authentication_nonce(),
724                    nonce,
725                    tx,
726                );
727            }
728        }
729    }
730
731    #[mz_ore::instrument(level = "debug")]
732    async fn handle_authenticate_password(
733        &self,
734        tx: oneshot::Sender<Result<(), AdapterError>>,
735        role_name: String,
736        password: Option<Password>,
737    ) {
738        let Some(password) = password else {
739            // The user did not provide a password.
740            let _ = tx.send(Err(AdapterError::AuthenticationError(
741                AuthenticationError::PasswordRequired,
742            )));
743            return;
744        };
745        let role = self.catalog().try_get_role_by_name(role_name.as_str());
746
747        match role_login_status(role) {
748            RoleLoginStatus::NotFound => {
749                let _ = tx.send(Err(AdapterError::AuthenticationError(
750                    AuthenticationError::RoleNotFound,
751                )));
752                return;
753            }
754            RoleLoginStatus::NonLogin => {
755                let _ = tx.send(Err(AdapterError::AuthenticationError(
756                    AuthenticationError::NonLogin,
757                )));
758                return;
759            }
760            RoleLoginStatus::CanLogin => {}
761        }
762
763        let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
764
765        if let Some(auth) = role_auth {
766            if let Some(hash) = &auth.password_hash {
767                let hash = hash.clone();
768                task::spawn_blocking(
769                    || "auth-check-hash",
770                    move || {
771                        let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
772                            Ok(_) => tx.send(Ok(())),
773                            Err(_) => tx.send(Err(AdapterError::AuthenticationError(
774                                AuthenticationError::InvalidCredentials,
775                            ))),
776                        };
777                    },
778                );
779                return;
780            }
781        }
782        // Authentication failed due to missing password hash.
783        let _ = tx.send(Err(AdapterError::AuthenticationError(
784            AuthenticationError::InvalidCredentials,
785        )));
786    }
787
788    #[mz_ore::instrument(level = "debug")]
789    async fn handle_startup(
790        &mut self,
791        tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
792        user: User,
793        conn_id: ConnectionId,
794        secret_key: u32,
795        uuid: uuid::Uuid,
796        client_ip: Option<IpAddr>,
797        application_name: String,
798        notice_tx: mpsc::UnboundedSender<AdapterNotice>,
799    ) {
800        // Early return if successful, otherwise cleanup any possible state.
801        match self
802            .handle_startup_inner(&user, &conn_id, &client_ip, &notice_tx)
803            .await
804        {
805            Ok((role_id, superuser_attribute, session_defaults)) => {
806                let session_type = metrics::session_type_label_value(&user);
807                self.metrics
808                    .active_sessions
809                    .with_label_values(&[session_type])
810                    .inc();
811                let conn = ConnMeta {
812                    secret_key,
813                    notice_tx,
814                    drop_sinks: BTreeSet::new(),
815                    pending_cluster_alters: BTreeSet::new(),
816                    connected_at: self.now(),
817                    user,
818                    application_name,
819                    uuid,
820                    client_ip,
821                    conn_id: conn_id.clone(),
822                    authenticated_role: role_id,
823                    deferred_lock: None,
824                };
825                let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
826                let update = self.catalog().state().resolve_builtin_table_update(update);
827                self.begin_session_for_statement_logging(&conn);
828                self.active_conns.insert(conn_id.clone(), conn);
829
830                // Note: Do NOT await the notify here, we pass this back to
831                // whatever requested the startup to prevent blocking startup
832                // and the Coordinator on a builtin table update.
833                let updates = vec![update];
834                // It's not a hard error if our list is missing a builtin table, but we want to
835                // make sure these two things stay in-sync.
836                if mz_ore::assert::soft_assertions_enabled() {
837                    let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
838                        .iter()
839                        .map(|table| self.catalog().resolve_builtin_table(*table))
840                        .collect();
841                    let updates_tracked = updates
842                        .iter()
843                        .all(|update| required_tables.contains(&update.id));
844                    let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
845                        .iter()
846                        .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
847                    mz_ore::soft_assert_or_log!(
848                        updates_tracked,
849                        "not tracking all required builtin table updates!"
850                    );
851                    // TODO(parkmycar): When checking if a query depends on these builtin table
852                    // writes we do not check the transitive dependencies of the query, because
853                    // we don't support creating views on mz_internal objects. If one of these
854                    // tables is promoted out of mz_internal then we'll need to add this check.
855                    mz_ore::soft_assert_or_log!(
856                        all_mz_internal,
857                        "not all builtin tables are in mz_internal! need to check transitive depends",
858                    )
859                }
860                let notify = self.builtin_table_update().background(updates);
861
862                let catalog = self.owned_catalog();
863                let build_info_human_version =
864                    catalog.state().config().build_info.human_version(None);
865
866                let statement_logging_frontend = self
867                    .statement_logging
868                    .create_frontend(build_info_human_version);
869
870                let resp = Ok(StartupResponse {
871                    role_id,
872                    write_notify: notify,
873                    session_defaults,
874                    catalog,
875                    storage_collections: Arc::clone(&self.controller.storage_collections),
876                    transient_id_gen: Arc::clone(&self.transient_id_gen),
877                    optimizer_metrics: self.optimizer_metrics.clone(),
878                    persist_client: self.persist_client.clone(),
879                    statement_logging_frontend,
880                    superuser_attribute,
881                });
882                if tx.send(resp).is_err() {
883                    // Failed to send to adapter, but everything is setup so we can terminate
884                    // normally.
885                    self.handle_terminate(conn_id).await;
886                }
887            }
888            Err(e) => {
889                // Error during startup or sending to adapter. A user may have been created and
890                // it can stay; no need to delete it.
891                // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
892
893                // Communicate the error back to the client. No need to
894                // handle failures to send the error back; we've already
895                // cleaned up all necessary state.
896                let _ = tx.send(Err(e));
897            }
898        }
899    }
900
901    // Failible startup work that needs to be cleaned up on error.
902    async fn handle_startup_inner(
903        &mut self,
904        user: &User,
905        _conn_id: &ConnectionId,
906        client_ip: &Option<IpAddr>,
907        notice_tx: &mpsc::UnboundedSender<AdapterNotice>,
908    ) -> Result<(RoleId, SuperuserAttribute, BTreeMap<String, OwnedVarInput>), AdapterError> {
909        if self.catalog().try_get_role_by_name(&user.name).is_none() {
910            // If the user has made it to this point, that means they have been fully authenticated.
911            // This includes preventing any user, except a pre-defined set of system users, from
912            // connecting to an internal port. Therefore it's ok to always create a new role for the
913            // user.
914            let mut attributes = RoleAttributesRaw::new();
915            // When auto-provisioning, we store the authenticator that was used to provision the role.
916            attributes.auto_provision_source = match user.authenticator_kind {
917                Some(AuthenticatorKind::Oidc) => Some(AutoProvisionSource::Oidc),
918                Some(AuthenticatorKind::Frontegg) => Some(AutoProvisionSource::Frontegg),
919                Some(AuthenticatorKind::None) => Some(AutoProvisionSource::None),
920                _ => {
921                    warn!(
922                        "auto-provisioning role with unexpected authenticator kind: {:?}",
923                        user.authenticator_kind
924                    );
925                    None
926                }
927            };
928
929            // Auto-provision roles with the LOGIN attribute to distinguish
930            // them as users.
931            attributes.login = Some(true);
932
933            let plan = CreateRolePlan {
934                name: user.name.to_string(),
935                attributes,
936            };
937            self.sequence_create_role_for_startup(plan).await?;
938        }
939        let role = self
940            .catalog()
941            .try_get_role_by_name(&user.name)
942            .expect("created above");
943        let role_id = role.id;
944        let superuser_attribute = role.attributes.superuser;
945
946        // JWT group-to-role sync: reconcile role memberships with JWT group claims.
947        // Missing groups claim (None) → skip sync; empty (Some([])) → revoke all.
948        self.maybe_sync_jwt_groups(role_id, user.groups.as_deref(), notice_tx)
949            .await?;
950
951        if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
952            return Err(AdapterError::UserSessionsDisallowed);
953        }
954
955        // Initialize the default session variables for this role.
956        let mut session_defaults = BTreeMap::new();
957        let system_config = self.catalog().state().system_config();
958
959        // Override the session with any system defaults.
960        session_defaults.extend(
961            system_config
962                .iter_session()
963                .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
964        );
965        // Special case.
966        let statement_logging_default = system_config
967            .statement_logging_default_sample_rate()
968            .format();
969        session_defaults.insert(
970            STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
971            OwnedVarInput::Flat(statement_logging_default),
972        );
973        // Override system defaults with role defaults.
974        session_defaults.extend(
975            self.catalog()
976                .get_role(&role_id)
977                .vars()
978                .map(|(name, val)| (name.to_string(), val.clone())),
979        );
980
981        // If the resolved `transaction_isolation` default names a feature-flagged
982        // isolation level whose flag is now disabled (e.g. a role default set
983        // while `bounded staleness` was enabled, then the flag turned off), drop
984        // it so the session falls back to the built-in default rather than
985        // silently using a gated level.
986        if let Some(value) = session_defaults.get(TRANSACTION_ISOLATION_VAR_NAME) {
987            if check_transaction_isolation_feature_flag(
988                TRANSACTION_ISOLATION_VAR_NAME,
989                value.borrow(),
990                system_config,
991            )
992            .is_err()
993            {
994                session_defaults.remove(TRANSACTION_ISOLATION_VAR_NAME);
995            }
996        }
997
998        // Validate network policies for external users. Internal users can only connect on the
999        // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
1000        // to ensure these internal interfaces are well secured.
1001        //
1002        // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
1003        // the default network policy for this role, so we read directly out of what the session
1004        // will get initialized with.
1005        if !user.is_internal() {
1006            let network_policy_name = session_defaults
1007                .get(NETWORK_POLICY.name())
1008                .and_then(|value| match value {
1009                    OwnedVarInput::Flat(name) => Some(name.clone()),
1010                    OwnedVarInput::SqlSet(names) => {
1011                        tracing::error!(?names, "found multiple network policies");
1012                        None
1013                    }
1014                })
1015                .unwrap_or_else(|| system_config.default_network_policy_name());
1016            let maybe_network_policy = self
1017                .catalog()
1018                .get_network_policy_by_name(&network_policy_name);
1019
1020            let Some(network_policy) = maybe_network_policy else {
1021                // We should prevent dropping the default network policy, or setting the policy
1022                // to something that doesn't exist, so complain loudly if this occurs.
1023                tracing::error!(
1024                    network_policy_name,
1025                    "default network policy does not exist. All user traffic will be blocked"
1026                );
1027                let reason = match client_ip {
1028                    Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
1029                    None => super::NetworkPolicyError::MissingIp,
1030                };
1031                return Err(AdapterError::NetworkPolicyDenied(reason));
1032            };
1033
1034            if let Some(ip) = client_ip {
1035                match validate_ip_with_policy_rules(ip, &network_policy.rules) {
1036                    Ok(_) => {}
1037                    Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
1038                }
1039            } else {
1040                // Only temporary and internal representation of a session
1041                // should be missing a client_ip. These sessions should not be
1042                // making requests or going through handle_startup.
1043                return Err(AdapterError::NetworkPolicyDenied(
1044                    super::NetworkPolicyError::MissingIp,
1045                ));
1046            }
1047        }
1048
1049        // Temporary schemas are now created lazily when the first temporary object is created,
1050        // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
1051        // for the common case where connections never create temporary objects.
1052
1053        Ok((
1054            role_id,
1055            SuperuserAttribute(superuser_attribute),
1056            session_defaults,
1057        ))
1058    }
1059
1060    /// Handles an execute command.
1061    #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
1062    pub(crate) async fn handle_execute(
1063        &mut self,
1064        portal_name: String,
1065        mut session: Session,
1066        tx: ClientTransmitter<ExecuteResponse>,
1067        // If this command was part of another execute command
1068        // (for example, executing a `FETCH` statement causes an execute to be
1069        //  issued for the cursor it references),
1070        // then `outer_context` should be `Some`.
1071        // This instructs the coordinator that the
1072        // outer execute should be considered finished once the inner one is.
1073        outer_context: Option<ExecuteContextGuard>,
1074    ) {
1075        if session.vars().emit_trace_id_notice() {
1076            let span_context = tracing::Span::current()
1077                .context()
1078                .span()
1079                .span_context()
1080                .clone();
1081            if span_context.is_valid() {
1082                session.add_notice(AdapterNotice::QueryTrace {
1083                    trace_id: span_context.trace_id(),
1084                });
1085            }
1086        }
1087
1088        if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
1089            // If statement logging hasn't started yet, we don't need
1090            // to add any "end" event, so just make up a no-op
1091            // `ExecuteContextExtra` here, via `Default::default`.
1092            //
1093            // It's a bit unfortunate because the edge case of failed
1094            // portal verifications won't show up in statement
1095            // logging, but there seems to be nothing else we can do,
1096            // because we need access to the portal to begin logging.
1097            //
1098            // Another option would be to log a begin and end event, but just fill in NULLs
1099            // for everything we get from the portal (prepared statement id, params).
1100            let extra = outer_context.unwrap_or_else(Default::default);
1101            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1102            return ctx.retire(Err(err));
1103        }
1104
1105        // The reference to `portal` can't outlive `session`, which we
1106        // use to construct the context, so scope the reference to this block where we
1107        // get everything we need from the portal for later.
1108        let (stmt, ctx, params) = {
1109            let portal = session
1110                .get_portal_unverified(&portal_name)
1111                .expect("known to exist");
1112            let params = portal.parameters.clone();
1113            let stmt = portal.stmt.clone();
1114            let logging = Arc::clone(&portal.logging);
1115            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
1116
1117            let extra = if let Some(extra) = outer_context {
1118                // We are executing in the context of another SQL statement, so we don't
1119                // want to begin statement logging anew. The context of the actual statement
1120                // being executed is the one that should be retired once this finishes.
1121                extra
1122            } else {
1123                // This is a new statement, log it and return the context
1124                let maybe_uuid = self.begin_statement_execution(
1125                    &mut session,
1126                    &params,
1127                    &logging,
1128                    lifecycle_timestamps,
1129                );
1130
1131                ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
1132            };
1133            let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1134            (stmt, ctx, params)
1135        };
1136
1137        let stmt = match stmt {
1138            Some(stmt) => stmt,
1139            None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
1140        };
1141
1142        let session_type = metrics::session_type_label_value(ctx.session().user());
1143        let stmt_type = metrics::statement_type_label_value(&stmt);
1144        self.metrics
1145            .query_total
1146            .with_label_values(&[session_type, stmt_type])
1147            .inc();
1148        match &*stmt {
1149            Statement::Subscribe(SubscribeStatement { output, .. })
1150            | Statement::Copy(CopyStatement {
1151                relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
1152                ..
1153            }) => {
1154                self.metrics
1155                    .subscribe_outputs
1156                    .with_label_values(&[
1157                        session_type,
1158                        metrics::subscribe_output_label_value(output),
1159                    ])
1160                    .inc();
1161            }
1162            _ => {}
1163        }
1164
1165        self.handle_execute_inner(stmt, params, ctx).await
1166    }
1167
1168    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
1169    pub(crate) async fn handle_execute_inner(
1170        &mut self,
1171        stmt: Arc<Statement<Raw>>,
1172        params: Params,
1173        mut ctx: ExecuteContext,
1174    ) {
1175        // This comment describes the various ways DDL can execute (the ordered operations: name
1176        // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
1177        // three notable properties that all partially interact.
1178        //
1179        // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1180        //    execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1181        //    We announce success of the single DDL when it is executed, but do not attempt to plan
1182        //    or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1183        //    DDL (this behavior is Postgres-compatible). The purpose of this is because some
1184        //    drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1185        //    work. When the single DDL is announced as successful we also put the session's
1186        //    transaction ops into `SingleStatement` which will produce an error if any other
1187        //    statement is run in the transaction except `COMMIT`. Additionally, this will cause
1188        //    `handle_execute_inner` to stop further processing (no planning, etc.) of the
1189        //    statement.
1190        // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1191        //    any number of only these DDL statements to be executed in a transaction. During
1192        //    sequencing we run an incremental catalog dry run against in-memory transaction state
1193        //    and store the resulting `CatalogState` in `TransactionOps::DDL`, but nothing is yet
1194        //    committed to the durable catalog. At `COMMIT`, all accumulated ops are applied in one
1195        //    catalog transaction. The purpose of this is to allow multiple, atomic renames in the
1196        //    same transaction.
1197        // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1198        //    makes network calls (interfacing with secrets, optimization of views/indexes, source
1199        //    purification). These must guarantee correctness when they return to the main
1200        //    coordinator thread because the catalog state could have changed while they were doing
1201        //    the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1202        //    of IDs that we needed to exist. We discovered the way we were doing that was not
1203        //    always correct. Instead of attempting to get that completely right, we have opted to
1204        //    serialize DDL. Getting this right is difficult because catalog changes can affect name
1205        //    resolution, planning, sequencing, and optimization. Correctly writing logic that is
1206        //    aware of all possible catalog changes that would affect any of those parts is not
1207        //    something our current code has been designed to be helpful at. Even if a DDL statement
1208        //    is doing off-thread work, another DDL must not yet execute at all. Executing these
1209        //    serially will guarantee that no off-thread work has affected the state of the catalog.
1210        //    This is done by adding a VecDeque of deferred statements and a lock to the
1211        //    Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1212        //    transaction ops are needed to the session as described above), it attempts to own the
1213        //    lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1214        //    struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1215        //    `clear_transaction` and a message sent to the Coordinator to execute the next queued
1216        //    DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1217        //    awaits dequeuing caused by the lock being released.
1218
1219        // Verify that this statement type can be executed in the current
1220        // transaction state.
1221        match ctx.session().transaction() {
1222            // By this point we should be in a running transaction.
1223            TransactionStatus::Default => unreachable!(),
1224
1225            // Failed transactions have already been checked in pgwire for a safe statement
1226            // (COMMIT, ROLLBACK, etc.) and can proceed.
1227            TransactionStatus::Failed(_) => {}
1228
1229            // Started is a deceptive name, and means different things depending on which
1230            // protocol was used. It's either exactly one statement (known because this
1231            // is the simple protocol and the parser parsed the entire string, and it had
1232            // one statement). Or from the extended protocol, it means *some* query is
1233            // being executed, but there might be others after it before the Sync (commit)
1234            // message. Postgres handles this by teaching Started to eagerly commit certain
1235            // statements that can't be run in a transaction block.
1236            TransactionStatus::Started(_) => {
1237                if let Statement::Declare(_) = &*stmt {
1238                    // Declare is an exception. Although it's not against any spec to execute
1239                    // it, it will always result in nothing happening, since all portals will be
1240                    // immediately closed. Users don't know this detail, so this error helps them
1241                    // understand what's going wrong. Postgres does this too.
1242                    return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1243                        "DECLARE CURSOR".into(),
1244                    )));
1245                }
1246            }
1247
1248            // Implicit or explicit transactions.
1249            //
1250            // Implicit transactions happen when a multi-statement query is executed
1251            // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1252            // then the existing implicit transaction will be upgraded to an explicit
1253            // transaction. Thus, we should not separate what implicit and explicit
1254            // transactions can do unless there's some additional checking to make sure
1255            // something disallowed in explicit transactions did not previously take place
1256            // in the implicit portion.
1257            TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1258                match &*stmt {
1259                    // Statements that are safe in a transaction. We still need to verify that we
1260                    // don't interleave reads and writes since we can't perform those serializably.
1261                    Statement::Close(_)
1262                    | Statement::Commit(_)
1263                    | Statement::Copy(_)
1264                    | Statement::Deallocate(_)
1265                    | Statement::Declare(_)
1266                    | Statement::Discard(_)
1267                    | Statement::Execute(_)
1268                    | Statement::ExplainPlan(_)
1269                    | Statement::ExplainPushdown(_)
1270                    | Statement::ExplainAnalyzeObject(_)
1271                    | Statement::ExplainAnalyzeCluster(_)
1272                    | Statement::ExplainTimestamp(_)
1273                    | Statement::ExplainSinkSchema(_)
1274                    | Statement::Fetch(_)
1275                    | Statement::Prepare(_)
1276                    | Statement::Rollback(_)
1277                    | Statement::Select(_)
1278                    | Statement::SetTransaction(_)
1279                    | Statement::Show(_)
1280                    | Statement::SetVariable(_)
1281                    | Statement::ResetVariable(_)
1282                    | Statement::StartTransaction(_)
1283                    | Statement::Subscribe(_)
1284                    | Statement::Raise(_) => {
1285                        // Always safe.
1286                    }
1287
1288                    Statement::Insert(InsertStatement {
1289                        source, returning, ..
1290                    }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1291                        // Inserting from constant values statements that do not need to execute on
1292                        // any cluster (no RETURNING) is always safe.
1293                    }
1294
1295                    // These statements must be kept in-sync with `must_serialize_ddl()`.
1296                    Statement::AlterObjectRename(_)
1297                    | Statement::AlterObjectSwap(_)
1298                    | Statement::CreateTableFromSource(_)
1299                    | Statement::CreateSource(_) => {
1300                        let state = self.catalog().for_session(ctx.session()).state().clone();
1301                        let revision = self.catalog().transient_revision();
1302
1303                        // Initialize our transaction with a set of empty ops, or return an error
1304                        // if we can't run a DDL transaction
1305                        let txn_status = ctx.session_mut().transaction_mut();
1306                        if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1307                            ops: vec![],
1308                            state,
1309                            revision,
1310                            side_effects: vec![],
1311                            snapshot: None,
1312                        }) {
1313                            return ctx.retire(Err(err));
1314                        }
1315                    }
1316
1317                    // Statements below must by run singly (in Started).
1318                    Statement::AlterCluster(_)
1319                    | Statement::AlterConnection(_)
1320                    | Statement::AlterDefaultPrivileges(_)
1321                    | Statement::AlterIndex(_)
1322                    | Statement::AlterMaterializedViewApplyReplacement(_)
1323                    | Statement::AlterSetCluster(_)
1324                    | Statement::AlterOwner(_)
1325                    | Statement::AlterRetainHistory(_)
1326                    | Statement::AlterRole(_)
1327                    | Statement::AlterSecret(_)
1328                    | Statement::AlterSink(_)
1329                    | Statement::AlterSource(_)
1330                    | Statement::AlterSystemReset(_)
1331                    | Statement::AlterSystemResetAll(_)
1332                    | Statement::AlterSystemSet(_)
1333                    | Statement::AlterTableAddColumn(_)
1334                    | Statement::AlterNetworkPolicy(_)
1335                    | Statement::CreateCluster(_)
1336                    | Statement::CreateClusterReplica(_)
1337                    | Statement::CreateConnection(_)
1338                    | Statement::CreateDatabase(_)
1339                    | Statement::CreateIndex(_)
1340                    | Statement::CreateMaterializedView(_)
1341                    | Statement::CreateRole(_)
1342                    | Statement::CreateSchema(_)
1343                    | Statement::CreateSecret(_)
1344                    | Statement::CreateSink(_)
1345                    | Statement::CreateSubsource(_)
1346                    | Statement::CreateTable(_)
1347                    | Statement::CreateType(_)
1348                    | Statement::CreateView(_)
1349                    | Statement::CreateWebhookSource(_)
1350                    | Statement::CreateNetworkPolicy(_)
1351                    | Statement::Delete(_)
1352                    | Statement::DropObjects(_)
1353                    | Statement::DropOwned(_)
1354                    | Statement::GrantPrivileges(_)
1355                    | Statement::GrantRole(_)
1356                    | Statement::Insert(_)
1357                    | Statement::ReassignOwned(_)
1358                    | Statement::RevokePrivileges(_)
1359                    | Statement::RevokeRole(_)
1360                    | Statement::Update(_)
1361                    | Statement::ValidateConnection(_)
1362                    | Statement::Comment(_)
1363                    | Statement::ExecuteUnitTest(_) => {
1364                        let txn_status = ctx.session_mut().transaction_mut();
1365
1366                        // If we're not in an implicit transaction and we could generate exactly one
1367                        // valid ExecuteResponse, we can delay execution until commit.
1368                        if !txn_status.is_implicit() {
1369                            // Statements whose tag is trivial (known only from an unexecuted statement) can
1370                            // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1371                            // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1372                            // delay execution until `COMMIT`.
1373                            if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1374                                if let Err(err) = txn_status
1375                                    .add_ops(TransactionOps::SingleStatement { stmt, params })
1376                                {
1377                                    ctx.retire(Err(err));
1378                                    return;
1379                                }
1380                                ctx.retire(Ok(resp));
1381                                return;
1382                            }
1383                        }
1384
1385                        return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1386                            stmt.to_string(),
1387                        )));
1388                    }
1389                }
1390            }
1391        }
1392
1393        // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1394        // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1395        // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1396        // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1397        // Coordinator::clear_transaction is correctly called when session transactions are ended
1398        // because that function will release the held lock from active_conns.
1399        if Self::must_serialize_ddl(&stmt, &ctx) {
1400            if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1401                let prev = self
1402                    .active_conns
1403                    .get_mut(ctx.session().conn_id())
1404                    .expect("connection must exist")
1405                    .deferred_lock
1406                    .replace(guard);
1407                assert!(
1408                    prev.is_none(),
1409                    "connections should have at most one lock guard"
1410                );
1411            } else {
1412                if self
1413                    .active_conns
1414                    .get(ctx.session().conn_id())
1415                    .expect("connection must exist")
1416                    .deferred_lock
1417                    .is_some()
1418                {
1419                    // This session *already* has the lock, and incorrectly tried to execute another
1420                    // DDL while still holding the lock, violating the assumption documented above.
1421                    // This is an internal error, probably in some AdapterClient user (pgwire or
1422                    // http). Because the session is now in some unexpected state, return an error
1423                    // which should cause the AdapterClient user to fail the transaction.
1424                    // (Terminating the connection is maybe what we would prefer to do, but is not
1425                    // currently a thing we can do from the coordinator: calling handle_terminate
1426                    // cleans up Coordinator state for the session but doesn't inform the
1427                    // AdapterClient that the session should terminate.)
1428                    soft_panic_or_log!(
1429                        "session {} attempted to get ddl lock while already owning it",
1430                        ctx.session().conn_id()
1431                    );
1432                    ctx.retire(Err(AdapterError::Internal(
1433                        "session attempted to get ddl lock while already owning it".to_string(),
1434                    )));
1435                    return;
1436                }
1437                self.serialized_ddl.push_back(DeferredPlanStatement {
1438                    ctx,
1439                    ps: PlanStatement::Statement { stmt, params },
1440                });
1441                return;
1442            }
1443        }
1444
1445        let catalog = self.catalog();
1446        let catalog = catalog.for_session(ctx.session());
1447        let original_stmt = Arc::clone(&stmt);
1448        // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1449        // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1450        let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1451            Ok(resolved) => resolved,
1452            Err(e) => return ctx.retire(Err(e.into())),
1453        };
1454        // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1455        // purification.  This should be done back on the main thread.
1456        // We do the validation:
1457        //   - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1458        // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1459        // occurs.
1460        let (stmt, resolved_ids) = match stmt {
1461            // Various statements must be purified off the main coordinator thread of control.
1462            stmt if Self::must_spawn_purification(&stmt) => {
1463                let internal_cmd_tx = self.internal_cmd_tx.clone();
1464                let conn_id = ctx.session().conn_id().clone();
1465                let catalog = self.owned_catalog();
1466                let now = self.now();
1467                let otel_ctx = OpenTelemetryContext::obtain();
1468                let current_storage_configuration = self.controller.storage.config().clone();
1469                task::spawn(|| format!("purify:{conn_id}"), async move {
1470                    let transient_revision = catalog.transient_revision();
1471                    let catalog = catalog.for_session(ctx.session());
1472
1473                    // Checks if the session is authorized to purify a statement. Usually
1474                    // authorization is checked after planning, however purification happens before
1475                    // planning, which may require the use of some connections and secrets.
1476                    if let Err(e) = rbac::check_usage(
1477                        &catalog,
1478                        ctx.session(),
1479                        &resolved_ids,
1480                        &CREATE_ITEM_USAGE,
1481                    ) {
1482                        return ctx.retire(Err(e.into()));
1483                    }
1484
1485                    let (result, cluster_id) = mz_sql::pure::purify_statement(
1486                        catalog,
1487                        now,
1488                        stmt,
1489                        &current_storage_configuration,
1490                    )
1491                    .await;
1492                    let result = result.map_err(|e| e.into());
1493                    let dependency_ids = resolved_ids.items().copied().collect();
1494                    let plan_validity = PlanValidity::new(
1495                        transient_revision,
1496                        dependency_ids,
1497                        cluster_id,
1498                        None,
1499                        ctx.session().role_metadata().clone(),
1500                    );
1501                    // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1502                    let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1503                        PurifiedStatementReady {
1504                            ctx,
1505                            result,
1506                            params,
1507                            plan_validity,
1508                            original_stmt,
1509                            otel_ctx,
1510                        },
1511                    ));
1512                    if let Err(e) = result {
1513                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1514                    }
1515                });
1516                return;
1517            }
1518
1519            // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1520            // automatically as part of purification
1521            Statement::CreateSubsource(_) => {
1522                ctx.retire(Err(AdapterError::Unsupported(
1523                    "CREATE SUBSOURCE statements",
1524                )));
1525                return;
1526            }
1527
1528            Statement::CreateMaterializedView(mut cmvs) => {
1529                // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1530                // only used for storing initial frontiers in the catalog.
1531                if cmvs.as_of.is_some() {
1532                    return ctx.retire(Err(AdapterError::Unsupported(
1533                        "CREATE MATERIALIZED VIEW ... AS OF statements",
1534                    )));
1535                }
1536
1537                let mz_now = match self
1538                    .resolve_mz_now_for_create_materialized_view(
1539                        &cmvs,
1540                        &resolved_ids,
1541                        ctx.session_mut(),
1542                        true,
1543                    )
1544                    .await
1545                {
1546                    Ok(mz_now) => mz_now,
1547                    Err(e) => return ctx.retire(Err(e)),
1548                };
1549
1550                let catalog = self.catalog().for_session(ctx.session());
1551
1552                purify_create_materialized_view_options(
1553                    catalog,
1554                    mz_now,
1555                    &mut cmvs,
1556                    &mut resolved_ids,
1557                );
1558
1559                let purified_stmt =
1560                    Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1561                        if_exists: cmvs.if_exists,
1562                        name: cmvs.name,
1563                        columns: cmvs.columns,
1564                        replacement_for: cmvs.replacement_for,
1565                        in_cluster: cmvs.in_cluster,
1566                        in_cluster_replica: cmvs.in_cluster_replica,
1567                        query: cmvs.query,
1568                        with_options: cmvs.with_options,
1569                        as_of: None,
1570                    });
1571
1572                // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1573                // `Message::PurifiedStatementReady` here.)
1574                (purified_stmt, resolved_ids)
1575            }
1576
1577            Statement::ExplainPlan(ExplainPlanStatement {
1578                stage,
1579                with_options,
1580                format,
1581                explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1582            }) => {
1583                let mut cmvs = *box_cmvs;
1584                let mz_now = match self
1585                    .resolve_mz_now_for_create_materialized_view(
1586                        &cmvs,
1587                        &resolved_ids,
1588                        ctx.session_mut(),
1589                        false,
1590                    )
1591                    .await
1592                {
1593                    Ok(mz_now) => mz_now,
1594                    Err(e) => return ctx.retire(Err(e)),
1595                };
1596
1597                let catalog = self.catalog().for_session(ctx.session());
1598
1599                purify_create_materialized_view_options(
1600                    catalog,
1601                    mz_now,
1602                    &mut cmvs,
1603                    &mut resolved_ids,
1604                );
1605
1606                let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1607                    stage,
1608                    with_options,
1609                    format,
1610                    explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1611                });
1612
1613                (purified_stmt, resolved_ids)
1614            }
1615
1616            // All other statements are handled immediately.
1617            _ => (stmt, resolved_ids),
1618        };
1619
1620        match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
1621            Ok((plan, sql_impl_ids)) => {
1622                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
1623                    .await
1624            }
1625            Err(e) => ctx.retire(Err(e)),
1626        }
1627    }
1628
1629    /// Whether the statement must be serialized and is DDL.
1630    fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1631        // Non-DDL is not serialized here.
1632        if !StatementClassification::from(&*stmt).is_ddl() {
1633            return false;
1634        }
1635        // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1636        // not be serialized. These all use PlanValidity for their checking, and we must ensure
1637        // those checks are sufficient.
1638        if Self::must_spawn_purification(stmt) {
1639            return false;
1640        }
1641
1642        // Statements that support multiple DDLs in a single transaction aren't serialized here.
1643        // Their operations are serialized when applied to the catalog, guaranteeing that any
1644        // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1645        if ctx.session.transaction().is_ddl() {
1646            return false;
1647        }
1648
1649        // Some DDL is exempt. It is not great that we are matching on Statements here because
1650        // different plans can be produced from the same top-level statement type (i.e., `ALTER
1651        // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1652        // planned in the first place, so we accept the abstraction leak.
1653        match stmt {
1654            // Secrets have a small and understood set of dependencies, and their off-thread work
1655            // interacts with k8s.
1656            Statement::AlterSecret(_) => false,
1657            Statement::CreateSecret(_) => false,
1658            Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1659                if actions
1660                    .iter()
1661                    .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1662            {
1663                false
1664            }
1665
1666            // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1667            // does not affect its catalog names or ids and so is safe to not serialize. This could
1668            // change the set of replicas that exist. For queries that name replicas or use the
1669            // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1670            // ensure that those replicas exist during the query finish stage. Additionally, that
1671            // work can take hours (configured by the user), so would also be a bad experience for
1672            // users.
1673            Statement::AlterCluster(_) => false,
1674
1675            // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1676            // cutover. If the old collection is stalled, it may block forever. Checks in
1677            // sequencing ensure that the operation fails if any one of these happens concurrently:
1678            //   * the sink is dropped
1679            //   * the new source relation is dropped
1680            //   * another `ALTER SINK` for the same sink is applied first
1681            Statement::AlterSink(stmt)
1682                if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1683            {
1684                false
1685            }
1686
1687            // `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT` waits for the target MV to make
1688            // enough progress for a clean cutover. If the target MV is stalled, it may block
1689            // forever. Checks in sequencing ensure the operation fails if any of these happens
1690            // concurrently:
1691            //   * the target MV is dropped
1692            //   * the replacement MV is dropped
1693            Statement::AlterMaterializedViewApplyReplacement(_) => false,
1694
1695            // Everything else must be serialized.
1696            _ => true,
1697        }
1698    }
1699
1700    /// Whether the statement must be purified off of the Coordinator thread.
1701    fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1702        // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1703        // coordinator thread.
1704        if !matches!(
1705            stmt,
1706            Statement::CreateSource(_)
1707                | Statement::AlterSource(_)
1708                | Statement::CreateSink(_)
1709                | Statement::CreateTableFromSource(_)
1710        ) {
1711            return false;
1712        }
1713
1714        // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1715        if let Statement::AlterSource(stmt) = stmt {
1716            let names: Vec<CreateSourceOptionName> = match &stmt.action {
1717                AlterSourceAction::SetOptions(options) => {
1718                    options.iter().map(|o| o.name.clone()).collect()
1719                }
1720                AlterSourceAction::ResetOptions(names) => names.clone(),
1721                _ => vec![],
1722            };
1723            if !names.is_empty()
1724                && names
1725                    .iter()
1726                    .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1727            {
1728                return false;
1729            }
1730        }
1731
1732        true
1733    }
1734
1735    /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1736    /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1737    /// option, this function grabs read holds at the earliest possible time on input collections
1738    /// that might be involved in the MV.
1739    ///
1740    /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1741    /// in `with_options`).
1742    ///
1743    /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1744    /// unfortunately.)
1745    async fn resolve_mz_now_for_create_materialized_view(
1746        &mut self,
1747        cmvs: &CreateMaterializedViewStatement<Aug>,
1748        resolved_ids: &ResolvedIds,
1749        session: &Session,
1750        acquire_read_holds: bool,
1751    ) -> Result<Option<Timestamp>, AdapterError> {
1752        if cmvs
1753            .with_options
1754            .iter()
1755            .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1756        {
1757            let catalog = self.catalog().for_session(session);
1758            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1759            let ids = self
1760                .index_oracle(cluster)
1761                .sufficient_collections(resolved_ids.collections().copied());
1762
1763            // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1764            // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1765            // we want to check the AT times against the read holds that we acquire here. But
1766            // we do it for any REFRESH option, to avoid having so many code paths doing different
1767            // things.)
1768            //
1769            // It's important that we acquire read holds _before_ we determine the least valid read.
1770            // Otherwise, we're not guaranteed that the since frontier doesn't
1771            // advance forward from underneath us.
1772            let read_holds = self.acquire_read_holds(&ids);
1773
1774            // Does `mz_now()` occur?
1775            let mz_now_ts = if cmvs
1776                .with_options
1777                .iter()
1778                .any(materialized_view_option_contains_temporal)
1779            {
1780                let timeline_context = self
1781                    .catalog()
1782                    .validate_timeline_context(resolved_ids.collections().copied())?;
1783
1784                // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1785                // but even in the TimestampIndependent case.
1786                // Note that we didn't accurately decide whether we are TimestampDependent
1787                // or TimestampIndependent, because for this we'd need to also check whether
1788                // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1789                // However, this doesn't matter here, as we are just going to default to
1790                // EpochMilliseconds in both cases.
1791                let timeline = timeline_context
1792                    .timeline()
1793                    .unwrap_or(&Timeline::EpochMilliseconds);
1794
1795                // Let's start with the timestamp oracle read timestamp.
1796                let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1797
1798                // If `least_valid_read` is later than the oracle, then advance to that time.
1799                // If we didn't do this, then there would be a danger of missing the first refresh,
1800                // which might cause the materialized view to be unreadable for hours. This might
1801                // be what was happening here:
1802                // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1803                //
1804                // In the long term, it would be good to actually block the MV creation statement
1805                // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1806                // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1807                // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1808                // after its creation might see input changes that happened after the CRATE MATERIALIZED
1809                // VIEW statement returned.
1810                let oracle_timestamp = timestamp;
1811                let least_valid_read = read_holds.least_valid_read();
1812                timestamp.advance_by(least_valid_read.borrow());
1813
1814                if oracle_timestamp != timestamp {
1815                    warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1816                }
1817
1818                info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1819                Ok(Some(timestamp))
1820            } else {
1821                Ok(None)
1822            };
1823
1824            // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1825            // released when we don't use it.
1826            if acquire_read_holds {
1827                self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1828            }
1829
1830            mz_now_ts
1831        } else {
1832            Ok(None)
1833        }
1834    }
1835
1836    /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1837    /// the named `conn_id` if the correct secret key is specified.
1838    ///
1839    /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1840    /// `ConnectionId` because this method gets called by external clients when
1841    /// they request to cancel a request.
1842    #[mz_ore::instrument(level = "debug")]
1843    async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1844        if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1845            // If the secret key specified by the client doesn't match the
1846            // actual secret key for the target connection, we treat this as a
1847            // rogue cancellation request and ignore it.
1848            if conn_meta.secret_key != secret_key {
1849                return;
1850            }
1851
1852            // Now that we've verified the secret key, this is a privileged
1853            // cancellation request. We can upgrade the raw connection ID to a
1854            // proper `IdHandle`.
1855            self.handle_privileged_cancel(id_handle.clone()).await;
1856        }
1857    }
1858
1859    /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1860    /// interactive work for the named `conn_id`.
1861    #[mz_ore::instrument(level = "debug")]
1862    pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1863        let mut maybe_ctx = None;
1864
1865        // Cancel pending writes. There is at most one pending write per session.
1866        let pending_write_idx = self.pending_writes.iter().position(|pending_write_txn| {
1867            matches!(pending_write_txn, PendingWriteTxn::User {
1868                responder: UserWriteResponder::Session(PendingTxn { ctx, .. }),
1869                ..
1870            } if *ctx.session().conn_id() == conn_id)
1871        });
1872        if let Some(idx) = pending_write_idx {
1873            if let PendingWriteTxn::User {
1874                responder: UserWriteResponder::Session(PendingTxn { ctx, .. }),
1875                ..
1876            } = self.pending_writes.remove(idx)
1877            {
1878                maybe_ctx = Some(ctx);
1879            }
1880        }
1881
1882        // Cancel deferred writes.
1883        if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1884            maybe_ctx = Some(write_op.into_ctx());
1885        }
1886
1887        // Cancel deferred statements.
1888        let deferred_ddl_idx = self
1889            .serialized_ddl
1890            .iter()
1891            .position(|deferred| *deferred.ctx.session().conn_id() == conn_id);
1892        if let Some(idx) = deferred_ddl_idx {
1893            let deferred = self
1894                .serialized_ddl
1895                .remove(idx)
1896                .expect("known to exist from call to `position` above");
1897            maybe_ctx = Some(deferred.ctx);
1898        }
1899
1900        // Cancel reads waiting on being linearized. There is at most one linearized read per
1901        // session.
1902        if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1903            let ctx = pending_read_txn.take_context();
1904            maybe_ctx = Some(ctx);
1905        }
1906
1907        if let Some(ctx) = maybe_ctx {
1908            ctx.retire(Err(AdapterError::Canceled));
1909        }
1910
1911        self.cancel_pending_peeks(&conn_id);
1912        self.cancel_pending_watchsets(&conn_id);
1913        self.cancel_compute_sinks_for_conn(&conn_id).await;
1914        self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1915            .await;
1916        self.cancel_pending_copy(&conn_id);
1917        if let Some((tx, _rx)) = self.connection_cancel_watches.get_mut(&conn_id) {
1918            let _ = tx.send(true);
1919        }
1920    }
1921
1922    /// Handle termination of a client session.
1923    ///
1924    /// This cleans up any state in the coordinator associated with the session.
1925    #[mz_ore::instrument(level = "debug")]
1926    async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1927        // If the session doesn't exist in `active_conns`, then this method will panic later on.
1928        // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1929        // debug. This panic is very infrequent so we want as much information as possible.
1930        // See https://github.com/MaterializeInc/database-issues/issues/5627.
1931        assert!(
1932            self.active_conns.contains_key(&conn_id),
1933            "unknown connection: {conn_id:?}\n\n{self:?}"
1934        );
1935
1936        // We do not need to call clear_transaction here because there are no side effects to run
1937        // based on any session transaction state.
1938        self.clear_connection(&conn_id).await;
1939
1940        self.drop_temp_items(&conn_id).await;
1941        // Only call catalog_mut() if a temporary schema actually exists for this connection.
1942        // This avoids an expensive Arc::make_mut clone for the common case where the connection
1943        // never created any temporary objects.
1944        if self.catalog().state().has_temporary_schema(&conn_id) {
1945            self.catalog_mut()
1946                .drop_temporary_schema(&conn_id)
1947                .unwrap_or_terminate("unable to drop temporary schema");
1948        }
1949        let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1950        let session_type = metrics::session_type_label_value(conn.user());
1951        self.metrics
1952            .active_sessions
1953            .with_label_values(&[session_type])
1954            .dec();
1955        self.cancel_pending_peeks(conn.conn_id());
1956        self.cancel_pending_watchsets(&conn_id);
1957        self.cancel_pending_copy(&conn_id);
1958        self.end_session_for_statement_logging(conn.uuid());
1959
1960        // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1961        // this to prevent blocking the Coordinator in the case that a lot of connections are
1962        // closed at once, which occurs regularly in some workflows.
1963        let update = self
1964            .catalog()
1965            .state()
1966            .pack_session_update(&conn, Diff::MINUS_ONE);
1967        let update = self.catalog().state().resolve_builtin_table_update(update);
1968
1969        let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1970    }
1971
1972    /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1973    /// rows.
1974    #[mz_ore::instrument(level = "debug")]
1975    fn handle_get_webhook(
1976        &mut self,
1977        database: String,
1978        schema: String,
1979        name: String,
1980        tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1981    ) {
1982        /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1983        ///
1984        /// Returns a struct that can be used to append data to the underlying storate collection, and the
1985        /// types we should cast the request to.
1986        fn resolve(
1987            coord: &mut Coordinator,
1988            database: String,
1989            schema: String,
1990            name: String,
1991        ) -> Result<AppendWebhookResponse, PartialItemName> {
1992            // Resolve our collection.
1993            let name = PartialItemName {
1994                database: Some(database),
1995                schema: Some(schema),
1996                item: name,
1997            };
1998            let Ok(entry) = coord
1999                .catalog()
2000                .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
2001            else {
2002                return Err(name);
2003            };
2004
2005            // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
2006            let (data_source, desc, global_id) = match entry.item() {
2007                CatalogItem::Source(Source {
2008                    data_source: data_source @ DataSourceDesc::Webhook { .. },
2009                    desc,
2010                    global_id,
2011                    ..
2012                }) => (data_source, desc.clone(), *global_id),
2013                CatalogItem::Table(
2014                    table @ Table {
2015                        desc,
2016                        data_source:
2017                            TableDataSource::DataSource {
2018                                desc: data_source @ DataSourceDesc::Webhook { .. },
2019                                ..
2020                            },
2021                        ..
2022                    },
2023                ) => (data_source, desc.latest(), table.global_id_writes()),
2024                _ => return Err(name),
2025            };
2026
2027            let DataSourceDesc::Webhook {
2028                validate_using,
2029                body_format,
2030                headers,
2031                ..
2032            } = data_source
2033            else {
2034                mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
2035                return Err(name);
2036            };
2037            let body_format = body_format.clone();
2038            let header_tys = headers.clone();
2039
2040            // Assert we have one column for the body, and how ever many are required for
2041            // the headers.
2042            let num_columns = headers.num_columns() + 1;
2043            mz_ore::soft_assert_or_log!(
2044                desc.arity() <= num_columns,
2045                "expected at most {} columns, but got {}",
2046                num_columns,
2047                desc.arity()
2048            );
2049
2050            // Double check that the body column of the webhook source matches the type
2051            // we're about to deserialize as.
2052            let body_column = desc
2053                .get_by_name(&"body".into())
2054                .map(|(_idx, ty)| ty.clone())
2055                .ok_or_else(|| name.clone())?;
2056            assert!(!body_column.nullable, "webhook body column is nullable!?");
2057            assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
2058
2059            // Create a validator that can be called to validate a webhook request.
2060            let validator = validate_using.as_ref().map(|v| {
2061                let validation = v.clone();
2062                AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
2063            });
2064
2065            // Get a channel so we can queue updates to be written.
2066            let row_tx = coord
2067                .controller
2068                .storage
2069                .monotonic_appender(global_id)
2070                .map_err(|_| name.clone())?;
2071            let stats = coord
2072                .controller
2073                .storage
2074                .webhook_statistics(global_id)
2075                .map_err(|_| name)?;
2076            let invalidator = coord
2077                .active_webhooks
2078                .entry(entry.id())
2079                .or_insert_with(WebhookAppenderInvalidator::new);
2080            let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
2081
2082            Ok(AppendWebhookResponse {
2083                tx,
2084                body_format,
2085                header_tys,
2086                validator,
2087            })
2088        }
2089
2090        let response = resolve(self, database, schema, name).map_err(|name| {
2091            AppendWebhookError::UnknownWebhook {
2092                database: name.database.expect("provided"),
2093                schema: name.schema.expect("provided"),
2094                name: name.item,
2095            }
2096        });
2097        let _ = tx.send(response);
2098    }
2099
2100    /// Handle registration of a frontend peek, for statement logging and query cancellation
2101    /// handling.
2102    fn handle_register_frontend_peek(
2103        &mut self,
2104        uuid: Uuid,
2105        conn_id: ConnectionId,
2106        cluster_id: mz_controller_types::ClusterId,
2107        depends_on: BTreeSet<GlobalId>,
2108        is_fast_path: bool,
2109        watch_set: Option<WatchSetCreation>,
2110        tx: oneshot::Sender<Result<(), AdapterError>>,
2111    ) {
2112        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
2113        if let Some(ws) = watch_set {
2114            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
2115                let _ = tx.send(Err(
2116                    AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
2117                ));
2118                return;
2119            }
2120        }
2121
2122        // Store the peek in pending_peeks for later retrieval when results arrive
2123        self.pending_peeks.insert(
2124            uuid,
2125            PendingPeek {
2126                conn_id: conn_id.clone(),
2127                cluster_id,
2128                depends_on,
2129                ctx_extra: ExecuteContextGuard::new(
2130                    statement_logging_id,
2131                    self.internal_cmd_tx.clone(),
2132                ),
2133                is_fast_path,
2134            },
2135        );
2136
2137        // Also track it by connection ID for cancellation support
2138        self.client_pending_peeks
2139            .entry(conn_id)
2140            .or_default()
2141            .insert(uuid, cluster_id);
2142
2143        let _ = tx.send(Ok(()));
2144    }
2145
2146    /// Handle unregistration of a frontend peek that was registered but failed to issue.
2147    /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
2148    fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
2149        // Remove from pending_peeks (this also removes from client_pending_peeks)
2150        if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
2151            // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
2152            let _ = pending_peek.ctx_extra.defuse();
2153        }
2154        let _ = tx.send(());
2155    }
2156}