mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::AuthenticatorKind;
17use mz_auth::password::Password;
18use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
19use mz_sql::catalog::AutoProvisionSource;
20use mz_sql::session::metadata::SessionMetadata;
21use std::collections::{BTreeMap, BTreeSet};
22use std::net::IpAddr;
23use std::sync::Arc;
24
25use futures::FutureExt;
26use futures::future::LocalBoxFuture;
27use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::memory::objects::{
30 CatalogItem, DataSourceDesc, Role, Source, Table, TableDataSource,
31};
32use mz_ore::task;
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{instrument, soft_panic_or_log};
35use mz_repr::role_id::RoleId;
36use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
37use mz_sql::ast::{
38 AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
39 ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
40 SubscribeStatement,
41};
42use mz_sql::catalog::RoleAttributesRaw;
43use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
44use mz_sql::plan::{
45 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
46 StatementClassification, TransactionType,
47};
48use mz_sql::pure::{
49 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
50};
51use mz_sql::rbac;
52use mz_sql::rbac::CREATE_ITEM_USAGE;
53use mz_sql::session::user::User;
54use mz_sql::session::vars::{
55 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
56};
57use mz_sql_parser::ast::display::AstDisplay;
58use mz_sql_parser::ast::{
59 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
60 WithOptionValue,
61};
62use mz_storage_types::sources::Timeline;
63use opentelemetry::trace::TraceContextExt;
64use tokio::sync::{mpsc, oneshot};
65use tracing::{Instrument, debug_span, info, warn};
66use tracing_opentelemetry::OpenTelemetrySpanExt;
67use uuid::Uuid;
68
69use crate::command::{
70 CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
71 SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
72};
73use crate::coord::appends::PendingWriteTxn;
74use crate::coord::peek::PendingPeek;
75use crate::coord::{
76 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
77 PurifiedStatementReady, validate_ip_with_policy_rules,
78};
79use crate::error::{AdapterError, AuthenticationError};
80use crate::notice::AdapterNotice;
81use crate::session::{Session, TransactionOps, TransactionStatus};
82use crate::statement_logging::WatchSetCreation;
83use crate::util::{ClientTransmitter, ResultExt};
84use crate::webhook::{
85 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
86};
87use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
88
89use super::ExecuteContextGuard;
90
91/// The login status of a role, used by authentication handlers to check role
92/// existence and login permission before proceeding to credential verification.
93enum RoleLoginStatus {
94 /// The role does not exist in the catalog.
95 NotFound,
96 /// The role exists and has the LOGIN attribute.
97 CanLogin,
98 /// The role exists but does not have the LOGIN attribute.
99 NonLogin,
100}
101
102fn role_login_status(role: Option<&Role>) -> RoleLoginStatus {
103 match role {
104 None => RoleLoginStatus::NotFound,
105 Some(role) => match role.attributes.login {
106 Some(login) if login => RoleLoginStatus::CanLogin,
107 _ => RoleLoginStatus::NonLogin,
108 },
109 }
110}
111
112impl Coordinator {
113 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
114 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
115 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
116 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
117 async move {
118 if let Some(session) = cmd.session_mut() {
119 session.apply_external_metadata_updates();
120 }
121 match cmd {
122 Command::Startup {
123 tx,
124 user,
125 conn_id,
126 secret_key,
127 uuid,
128 client_ip,
129 application_name,
130 notice_tx,
131 } => {
132 // Note: We purposefully do not use a ClientTransmitter here because startup
133 // handles errors and cleanup of sessions itself.
134 self.handle_startup(
135 tx,
136 user,
137 conn_id,
138 secret_key,
139 uuid,
140 client_ip,
141 application_name,
142 notice_tx,
143 )
144 .await;
145 }
146
147 Command::AuthenticatePassword {
148 tx,
149 role_name,
150 password,
151 } => {
152 self.handle_authenticate_password(tx, role_name, password)
153 .await;
154 }
155
156 Command::AuthenticateGetSASLChallenge {
157 tx,
158 role_name,
159 nonce,
160 } => {
161 self.handle_generate_sasl_challenge(tx, role_name, nonce)
162 .await;
163 }
164
165 Command::AuthenticateVerifySASLProof {
166 tx,
167 role_name,
168 proof,
169 mock_hash,
170 auth_message,
171 } => {
172 self.handle_authenticate_verify_sasl_proof(
173 tx,
174 role_name,
175 proof,
176 auth_message,
177 mock_hash,
178 );
179 }
180
181 Command::CheckRoleCanLogin { tx, role_name } => {
182 self.handle_role_can_login(tx, role_name);
183 }
184
185 Command::Execute {
186 portal_name,
187 session,
188 tx,
189 outer_ctx_extra,
190 } => {
191 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
192
193 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
194 .await;
195 }
196
197 Command::StartCopyFromStdin {
198 target_id,
199 target_name,
200 columns,
201 row_desc,
202 params,
203 session,
204 tx,
205 } => {
206 let otel_ctx = OpenTelemetryContext::obtain();
207 let result = self.setup_copy_from_stdin(
208 &session,
209 target_id,
210 target_name,
211 columns,
212 row_desc,
213 params,
214 );
215 let _ = tx.send(Response {
216 result,
217 session,
218 otel_ctx,
219 });
220 }
221
222 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
223
224 Command::CancelRequest {
225 conn_id,
226 secret_key,
227 } => {
228 self.handle_cancel(conn_id, secret_key).await;
229 }
230
231 Command::PrivilegedCancelRequest { conn_id } => {
232 self.handle_privileged_cancel(conn_id).await;
233 }
234
235 Command::GetWebhook {
236 database,
237 schema,
238 name,
239 tx,
240 } => {
241 self.handle_get_webhook(database, schema, name, tx);
242 }
243
244 Command::GetSystemVars { tx } => {
245 let _ = tx.send(self.catalog.system_config().clone());
246 }
247
248 Command::SetSystemVars { vars, conn_id, tx } => {
249 let mut ops = Vec::with_capacity(vars.len());
250 let conn = &self.active_conns[&conn_id];
251
252 for (name, value) in vars {
253 if let Err(e) =
254 self.catalog().system_config().get(&name).and_then(|var| {
255 var.visible(conn.user(), self.catalog.system_config())
256 })
257 {
258 let _ = tx.send(Err(e.into()));
259 return;
260 }
261
262 ops.push(catalog::Op::UpdateSystemConfiguration {
263 name,
264 value: OwnedVarInput::Flat(value),
265 });
266 }
267
268 let result = self
269 .catalog_transact_with_context(Some(&conn_id), None, ops)
270 .await;
271 let _ = tx.send(result);
272 }
273
274 Command::InjectAuditEvents {
275 events,
276 conn_id,
277 tx,
278 } => {
279 let ops = vec![catalog::Op::InjectAuditEvents { events }];
280 let result = self
281 .catalog_transact_with_context(Some(&conn_id), None, ops)
282 .await;
283 let _ = tx.send(result);
284 }
285
286 Command::Terminate { conn_id, tx } => {
287 self.handle_terminate(conn_id).await;
288 // Note: We purposefully do not use a ClientTransmitter here because we're already
289 // terminating the provided session.
290 if let Some(tx) = tx {
291 let _ = tx.send(Ok(()));
292 }
293 }
294
295 Command::Commit {
296 action,
297 session,
298 tx,
299 } => {
300 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
301 // We reach here not through a statement execution, but from the
302 // "commit" pgwire command. Thus, we just generate a default statement
303 // execution context (once statement logging is implemented, this will cause nothing to be logged
304 // when the execution finishes.)
305 let ctx = ExecuteContext::from_parts(
306 tx,
307 self.internal_cmd_tx.clone(),
308 session,
309 Default::default(),
310 );
311 let plan = match action {
312 EndTransactionAction::Commit => {
313 Plan::CommitTransaction(CommitTransactionPlan {
314 transaction_type: TransactionType::Implicit,
315 })
316 }
317 EndTransactionAction::Rollback => {
318 Plan::AbortTransaction(AbortTransactionPlan {
319 transaction_type: TransactionType::Implicit,
320 })
321 }
322 };
323
324 let conn_id = ctx.session().conn_id().clone();
325 self.sequence_plan(ctx, plan, ResolvedIds::empty(), ResolvedIds::empty())
326 .await;
327 // Part of the Command::Commit contract is that the Coordinator guarantees that
328 // it has cleared its transaction state for the connection.
329 self.clear_connection(&conn_id).await;
330 }
331
332 Command::CatalogSnapshot { tx } => {
333 let _ = tx.send(CatalogSnapshot {
334 catalog: self.owned_catalog(),
335 });
336 }
337
338 Command::CheckConsistency { tx } => {
339 let _ = tx.send(self.check_consistency());
340 }
341
342 Command::Dump { tx } => {
343 let _ = tx.send(self.dump().await);
344 }
345
346 Command::GetComputeInstanceClient { instance_id, tx } => {
347 let _ = tx.send(self.controller.compute.instance_client(instance_id));
348 }
349
350 Command::GetOracle { timeline, tx } => {
351 let oracle = self
352 .global_timelines
353 .get(&timeline)
354 .map(|timeline_state| Arc::clone(&timeline_state.oracle))
355 .ok_or(AdapterError::ChangedPlan(
356 "timeline has disappeared during planning".to_string(),
357 ));
358 let _ = tx.send(oracle);
359 }
360
361 Command::DetermineRealTimeRecentTimestamp {
362 source_ids,
363 real_time_recency_timeout,
364 tx,
365 } => {
366 let result = self
367 .determine_real_time_recent_timestamp(
368 source_ids.iter().copied(),
369 real_time_recency_timeout,
370 )
371 .await;
372
373 match result {
374 Ok(Some(fut)) => {
375 let catalog = Arc::clone(&self.catalog);
376 task::spawn(|| "determine real time recent timestamp", async move {
377 let result =
378 Coordinator::await_real_time_recent_timestamp(catalog, fut)
379 .await
380 .map(Some);
381 let _ = tx.send(result);
382 });
383 }
384 Ok(None) => {
385 let _ = tx.send(Ok(None));
386 }
387 Err(e) => {
388 let _ = tx.send(Err(e));
389 }
390 }
391 }
392
393 Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
394 let read_holds = self.txn_read_holds.get(&conn_id).cloned();
395 let _ = tx.send(read_holds);
396 }
397
398 Command::StoreTransactionReadHolds {
399 conn_id,
400 read_holds,
401 tx,
402 } => {
403 self.store_transaction_read_holds(conn_id, read_holds);
404 let _ = tx.send(());
405 }
406
407 Command::ExecuteSlowPathPeek {
408 dataflow_plan,
409 determination,
410 finishing,
411 compute_instance,
412 target_replica,
413 intermediate_result_type,
414 source_ids,
415 conn_id,
416 max_result_size,
417 max_query_result_size,
418 watch_set,
419 tx,
420 } => {
421 let result = self
422 .implement_slow_path_peek(
423 *dataflow_plan,
424 determination,
425 finishing,
426 compute_instance,
427 target_replica,
428 intermediate_result_type,
429 source_ids,
430 conn_id,
431 max_result_size,
432 max_query_result_size,
433 watch_set,
434 )
435 .await;
436 let _ = tx.send(result);
437 }
438
439 Command::ExecuteSubscribe {
440 df_desc,
441 dependency_ids,
442 cluster_id,
443 replica_id,
444 conn_id,
445 session_uuid,
446 read_holds,
447 plan,
448 statement_logging_id,
449 tx,
450 } => {
451 let mut ctx_extra = ExecuteContextGuard::new(
452 statement_logging_id,
453 self.internal_cmd_tx.clone(),
454 );
455 let result = self
456 .implement_subscribe(
457 &mut ctx_extra,
458 df_desc,
459 dependency_ids,
460 cluster_id,
461 replica_id,
462 conn_id,
463 session_uuid,
464 read_holds,
465 plan,
466 )
467 .await;
468 let _ = tx.send(result);
469 }
470
471 Command::CopyToPreflight {
472 s3_sink_connection,
473 sink_id,
474 tx,
475 } => {
476 // Spawn a background task to perform the slow S3 preflight operations.
477 // This avoids blocking the coordinator's main task.
478 let connection_context = self.connection_context().clone();
479 let enforce_external_addresses =
480 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
481 .get(self.controller.storage.config().config_set());
482 task::spawn(|| "copy_to_preflight", async move {
483 let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
484 connection_context,
485 &s3_sink_connection.aws_connection,
486 &s3_sink_connection.upload_info,
487 s3_sink_connection.connection_id,
488 sink_id,
489 enforce_external_addresses,
490 )
491 .await
492 .map_err(AdapterError::from);
493 let _ = tx.send(result);
494 });
495 }
496
497 Command::ExecuteCopyTo {
498 df_desc,
499 compute_instance,
500 target_replica,
501 source_ids,
502 conn_id,
503 watch_set,
504 tx,
505 } => {
506 // implement_copy_to spawns a background task that sends the response
507 // through tx when the COPY TO completes (or immediately if setup fails).
508 // We just call it and let it handle all response sending.
509 self.implement_copy_to(
510 *df_desc,
511 compute_instance,
512 target_replica,
513 source_ids,
514 conn_id,
515 watch_set,
516 tx,
517 )
518 .await;
519 }
520
521 Command::ExecuteSideEffectingFunc {
522 plan,
523 conn_id,
524 current_role,
525 tx,
526 } => {
527 let result = self
528 .execute_side_effecting_func(plan, conn_id, current_role)
529 .await;
530 let _ = tx.send(result);
531 }
532 Command::RegisterFrontendPeek {
533 uuid,
534 conn_id,
535 cluster_id,
536 depends_on,
537 is_fast_path,
538 watch_set,
539 tx,
540 } => {
541 self.handle_register_frontend_peek(
542 uuid,
543 conn_id,
544 cluster_id,
545 depends_on,
546 is_fast_path,
547 watch_set,
548 tx,
549 );
550 }
551 Command::UnregisterFrontendPeek { uuid, tx } => {
552 self.handle_unregister_frontend_peek(uuid, tx);
553 }
554 Command::ExplainTimestamp {
555 conn_id,
556 session_wall_time,
557 cluster_id,
558 id_bundle,
559 determination,
560 tx,
561 } => {
562 let explanation = self.explain_timestamp(
563 &conn_id,
564 session_wall_time,
565 cluster_id,
566 &id_bundle,
567 determination,
568 );
569 let _ = tx.send(explanation);
570 }
571 Command::FrontendStatementLogging(event) => {
572 self.handle_frontend_statement_logging_event(event);
573 }
574 }
575 }
576 .instrument(debug_span!("handle_command"))
577 .boxed_local()
578 }
579
580 fn handle_role_can_login(
581 &self,
582 tx: oneshot::Sender<Result<(), AdapterError>>,
583 role_name: String,
584 ) {
585 let result =
586 match role_login_status(self.catalog().try_get_role_by_name(role_name.as_str())) {
587 RoleLoginStatus::NotFound => Err(AdapterError::AuthenticationError(
588 AuthenticationError::RoleNotFound,
589 )),
590 RoleLoginStatus::NonLogin => Err(AdapterError::AuthenticationError(
591 AuthenticationError::NonLogin,
592 )),
593 RoleLoginStatus::CanLogin => Ok(()),
594 };
595 let _ = tx.send(result);
596 }
597
598 fn handle_authenticate_verify_sasl_proof(
599 &self,
600 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
601 role_name: String,
602 proof: String,
603 auth_message: String,
604 mock_hash: String,
605 ) {
606 let role = self.catalog().try_get_role_by_name(role_name.as_str());
607 let login_status = role_login_status(role);
608 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
609 let real_hash = role_auth
610 .as_ref()
611 .and_then(|auth| auth.password_hash.as_ref());
612 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
613
614 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
615 Ok(verifier) => {
616 // Success only if role exists, allows login, and a real password hash was used.
617 if matches!(login_status, RoleLoginStatus::CanLogin) && real_hash.is_some() {
618 let _ = tx.send(Ok(SASLVerifyProofResponse { verifier }));
619 } else {
620 let _ = tx.send(Err(AdapterError::AuthenticationError(match login_status {
621 RoleLoginStatus::NonLogin => AuthenticationError::NonLogin,
622 RoleLoginStatus::NotFound => AuthenticationError::RoleNotFound,
623 RoleLoginStatus::CanLogin => AuthenticationError::InvalidCredentials,
624 })));
625 }
626 }
627 Err(_) => {
628 let _ = tx.send(Err(AdapterError::AuthenticationError(
629 AuthenticationError::InvalidCredentials,
630 )));
631 }
632 }
633 }
634
635 #[mz_ore::instrument(level = "debug")]
636 async fn handle_generate_sasl_challenge(
637 &self,
638 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
639 role_name: String,
640 client_nonce: String,
641 ) {
642 let role_auth = self
643 .catalog()
644 .try_get_role_by_name(&role_name)
645 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
646
647 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
648 Ok(n) => n,
649 Err(e) => {
650 let msg = format!(
651 "failed to generate nonce for client nonce {}: {}",
652 client_nonce, e
653 );
654 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
655 soft_panic_or_log!("{msg}");
656 return;
657 }
658 };
659
660 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
661 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
662 // with the role name to get a per-role mock nonce.
663 let send_mock_challenge =
664 |role_name: String,
665 mock_nonce: String,
666 nonce: String,
667 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
668 let opts = mz_auth::hash::mock_sasl_challenge(
669 &role_name,
670 &mock_nonce,
671 &self.catalog().system_config().scram_iterations(),
672 );
673 let _ = tx.send(Ok(SASLChallengeResponse {
674 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
675 salt: BASE64_STANDARD.encode(opts.salt),
676 nonce,
677 }));
678 };
679
680 match role_auth {
681 Some(auth) if auth.password_hash.is_some() => {
682 let hash = auth.password_hash.as_ref().expect("checked above");
683 match mz_auth::hash::scram256_parse_opts(hash) {
684 Ok(opts) => {
685 let _ = tx.send(Ok(SASLChallengeResponse {
686 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
687 salt: BASE64_STANDARD.encode(opts.salt),
688 nonce,
689 }));
690 }
691 Err(_) => {
692 send_mock_challenge(
693 role_name,
694 self.catalog().state().mock_authentication_nonce(),
695 nonce,
696 tx,
697 );
698 }
699 }
700 }
701 _ => {
702 send_mock_challenge(
703 role_name,
704 self.catalog().state().mock_authentication_nonce(),
705 nonce,
706 tx,
707 );
708 }
709 }
710 }
711
712 #[mz_ore::instrument(level = "debug")]
713 async fn handle_authenticate_password(
714 &self,
715 tx: oneshot::Sender<Result<(), AdapterError>>,
716 role_name: String,
717 password: Option<Password>,
718 ) {
719 let Some(password) = password else {
720 // The user did not provide a password.
721 let _ = tx.send(Err(AdapterError::AuthenticationError(
722 AuthenticationError::PasswordRequired,
723 )));
724 return;
725 };
726 let role = self.catalog().try_get_role_by_name(role_name.as_str());
727
728 match role_login_status(role) {
729 RoleLoginStatus::NotFound => {
730 let _ = tx.send(Err(AdapterError::AuthenticationError(
731 AuthenticationError::RoleNotFound,
732 )));
733 return;
734 }
735 RoleLoginStatus::NonLogin => {
736 let _ = tx.send(Err(AdapterError::AuthenticationError(
737 AuthenticationError::NonLogin,
738 )));
739 return;
740 }
741 RoleLoginStatus::CanLogin => {}
742 }
743
744 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
745
746 if let Some(auth) = role_auth {
747 if let Some(hash) = &auth.password_hash {
748 let hash = hash.clone();
749 task::spawn_blocking(
750 || "auth-check-hash",
751 move || {
752 let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
753 Ok(_) => tx.send(Ok(())),
754 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
755 AuthenticationError::InvalidCredentials,
756 ))),
757 };
758 },
759 );
760 return;
761 }
762 }
763 // Authentication failed due to missing password hash.
764 let _ = tx.send(Err(AdapterError::AuthenticationError(
765 AuthenticationError::InvalidCredentials,
766 )));
767 }
768
769 #[mz_ore::instrument(level = "debug")]
770 async fn handle_startup(
771 &mut self,
772 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
773 user: User,
774 conn_id: ConnectionId,
775 secret_key: u32,
776 uuid: uuid::Uuid,
777 client_ip: Option<IpAddr>,
778 application_name: String,
779 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
780 ) {
781 // Early return if successful, otherwise cleanup any possible state.
782 match self
783 .handle_startup_inner(&user, &conn_id, &client_ip, ¬ice_tx)
784 .await
785 {
786 Ok((role_id, superuser_attribute, session_defaults)) => {
787 let session_type = metrics::session_type_label_value(&user);
788 self.metrics
789 .active_sessions
790 .with_label_values(&[session_type])
791 .inc();
792 let conn = ConnMeta {
793 secret_key,
794 notice_tx,
795 drop_sinks: BTreeSet::new(),
796 pending_cluster_alters: BTreeSet::new(),
797 connected_at: self.now(),
798 user,
799 application_name,
800 uuid,
801 client_ip,
802 conn_id: conn_id.clone(),
803 authenticated_role: role_id,
804 deferred_lock: None,
805 };
806 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
807 let update = self.catalog().state().resolve_builtin_table_update(update);
808 self.begin_session_for_statement_logging(&conn);
809 self.active_conns.insert(conn_id.clone(), conn);
810
811 // Note: Do NOT await the notify here, we pass this back to
812 // whatever requested the startup to prevent blocking startup
813 // and the Coordinator on a builtin table update.
814 let updates = vec![update];
815 // It's not a hard error if our list is missing a builtin table, but we want to
816 // make sure these two things stay in-sync.
817 if mz_ore::assert::soft_assertions_enabled() {
818 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
819 .iter()
820 .map(|table| self.catalog().resolve_builtin_table(*table))
821 .collect();
822 let updates_tracked = updates
823 .iter()
824 .all(|update| required_tables.contains(&update.id));
825 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
826 .iter()
827 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
828 mz_ore::soft_assert_or_log!(
829 updates_tracked,
830 "not tracking all required builtin table updates!"
831 );
832 // TODO(parkmycar): When checking if a query depends on these builtin table
833 // writes we do not check the transitive dependencies of the query, because
834 // we don't support creating views on mz_internal objects. If one of these
835 // tables is promoted out of mz_internal then we'll need to add this check.
836 mz_ore::soft_assert_or_log!(
837 all_mz_internal,
838 "not all builtin tables are in mz_internal! need to check transitive depends",
839 )
840 }
841 let notify = self.builtin_table_update().background(updates);
842
843 let catalog = self.owned_catalog();
844 let build_info_human_version =
845 catalog.state().config().build_info.human_version(None);
846
847 let statement_logging_frontend = self
848 .statement_logging
849 .create_frontend(build_info_human_version);
850
851 let resp = Ok(StartupResponse {
852 role_id,
853 write_notify: notify,
854 session_defaults,
855 catalog,
856 storage_collections: Arc::clone(&self.controller.storage_collections),
857 transient_id_gen: Arc::clone(&self.transient_id_gen),
858 optimizer_metrics: self.optimizer_metrics.clone(),
859 persist_client: self.persist_client.clone(),
860 statement_logging_frontend,
861 superuser_attribute,
862 });
863 if tx.send(resp).is_err() {
864 // Failed to send to adapter, but everything is setup so we can terminate
865 // normally.
866 self.handle_terminate(conn_id).await;
867 }
868 }
869 Err(e) => {
870 // Error during startup or sending to adapter. A user may have been created and
871 // it can stay; no need to delete it.
872 // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
873
874 // Communicate the error back to the client. No need to
875 // handle failures to send the error back; we've already
876 // cleaned up all necessary state.
877 let _ = tx.send(Err(e));
878 }
879 }
880 }
881
882 // Failible startup work that needs to be cleaned up on error.
883 async fn handle_startup_inner(
884 &mut self,
885 user: &User,
886 _conn_id: &ConnectionId,
887 client_ip: &Option<IpAddr>,
888 notice_tx: &mpsc::UnboundedSender<AdapterNotice>,
889 ) -> Result<(RoleId, SuperuserAttribute, BTreeMap<String, OwnedVarInput>), AdapterError> {
890 if self.catalog().try_get_role_by_name(&user.name).is_none() {
891 // If the user has made it to this point, that means they have been fully authenticated.
892 // This includes preventing any user, except a pre-defined set of system users, from
893 // connecting to an internal port. Therefore it's ok to always create a new role for the
894 // user.
895 let mut attributes = RoleAttributesRaw::new();
896 // When auto-provisioning, we store the authenticator that was used to provision the role.
897 attributes.auto_provision_source = match user.authenticator_kind {
898 Some(AuthenticatorKind::Oidc) => Some(AutoProvisionSource::Oidc),
899 Some(AuthenticatorKind::Frontegg) => Some(AutoProvisionSource::Frontegg),
900 Some(AuthenticatorKind::None) => Some(AutoProvisionSource::None),
901 _ => {
902 warn!(
903 "auto-provisioning role with unexpected authenticator kind: {:?}",
904 user.authenticator_kind
905 );
906 None
907 }
908 };
909
910 // Auto-provision roles with the LOGIN attribute to distinguish
911 // them as users.
912 attributes.login = Some(true);
913
914 let plan = CreateRolePlan {
915 name: user.name.to_string(),
916 attributes,
917 };
918 self.sequence_create_role_for_startup(plan).await?;
919 }
920 let role = self
921 .catalog()
922 .try_get_role_by_name(&user.name)
923 .expect("created above");
924 let role_id = role.id;
925 let superuser_attribute = role.attributes.superuser;
926
927 // JWT group-to-role sync: reconcile role memberships with JWT group claims.
928 // Missing groups claim (None) → skip sync; empty (Some([])) → revoke all.
929 self.maybe_sync_jwt_groups(role_id, user.groups.as_deref(), notice_tx)
930 .await?;
931
932 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
933 return Err(AdapterError::UserSessionsDisallowed);
934 }
935
936 // Initialize the default session variables for this role.
937 let mut session_defaults = BTreeMap::new();
938 let system_config = self.catalog().state().system_config();
939
940 // Override the session with any system defaults.
941 session_defaults.extend(
942 system_config
943 .iter_session()
944 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
945 );
946 // Special case.
947 let statement_logging_default = system_config
948 .statement_logging_default_sample_rate()
949 .format();
950 session_defaults.insert(
951 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
952 OwnedVarInput::Flat(statement_logging_default),
953 );
954 // Override system defaults with role defaults.
955 session_defaults.extend(
956 self.catalog()
957 .get_role(&role_id)
958 .vars()
959 .map(|(name, val)| (name.to_string(), val.clone())),
960 );
961
962 // Validate network policies for external users. Internal users can only connect on the
963 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
964 // to ensure these internal interfaces are well secured.
965 //
966 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
967 // the default network policy for this role, so we read directly out of what the session
968 // will get initialized with.
969 if !user.is_internal() {
970 let network_policy_name = session_defaults
971 .get(NETWORK_POLICY.name())
972 .and_then(|value| match value {
973 OwnedVarInput::Flat(name) => Some(name.clone()),
974 OwnedVarInput::SqlSet(names) => {
975 tracing::error!(?names, "found multiple network policies");
976 None
977 }
978 })
979 .unwrap_or_else(|| system_config.default_network_policy_name());
980 let maybe_network_policy = self
981 .catalog()
982 .get_network_policy_by_name(&network_policy_name);
983
984 let Some(network_policy) = maybe_network_policy else {
985 // We should prevent dropping the default network policy, or setting the policy
986 // to something that doesn't exist, so complain loudly if this occurs.
987 tracing::error!(
988 network_policy_name,
989 "default network policy does not exist. All user traffic will be blocked"
990 );
991 let reason = match client_ip {
992 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
993 None => super::NetworkPolicyError::MissingIp,
994 };
995 return Err(AdapterError::NetworkPolicyDenied(reason));
996 };
997
998 if let Some(ip) = client_ip {
999 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
1000 Ok(_) => {}
1001 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
1002 }
1003 } else {
1004 // Only temporary and internal representation of a session
1005 // should be missing a client_ip. These sessions should not be
1006 // making requests or going through handle_startup.
1007 return Err(AdapterError::NetworkPolicyDenied(
1008 super::NetworkPolicyError::MissingIp,
1009 ));
1010 }
1011 }
1012
1013 // Temporary schemas are now created lazily when the first temporary object is created,
1014 // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
1015 // for the common case where connections never create temporary objects.
1016
1017 Ok((
1018 role_id,
1019 SuperuserAttribute(superuser_attribute),
1020 session_defaults,
1021 ))
1022 }
1023
1024 /// Handles an execute command.
1025 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
1026 pub(crate) async fn handle_execute(
1027 &mut self,
1028 portal_name: String,
1029 mut session: Session,
1030 tx: ClientTransmitter<ExecuteResponse>,
1031 // If this command was part of another execute command
1032 // (for example, executing a `FETCH` statement causes an execute to be
1033 // issued for the cursor it references),
1034 // then `outer_context` should be `Some`.
1035 // This instructs the coordinator that the
1036 // outer execute should be considered finished once the inner one is.
1037 outer_context: Option<ExecuteContextGuard>,
1038 ) {
1039 if session.vars().emit_trace_id_notice() {
1040 let span_context = tracing::Span::current()
1041 .context()
1042 .span()
1043 .span_context()
1044 .clone();
1045 if span_context.is_valid() {
1046 session.add_notice(AdapterNotice::QueryTrace {
1047 trace_id: span_context.trace_id(),
1048 });
1049 }
1050 }
1051
1052 if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
1053 // If statement logging hasn't started yet, we don't need
1054 // to add any "end" event, so just make up a no-op
1055 // `ExecuteContextExtra` here, via `Default::default`.
1056 //
1057 // It's a bit unfortunate because the edge case of failed
1058 // portal verifications won't show up in statement
1059 // logging, but there seems to be nothing else we can do,
1060 // because we need access to the portal to begin logging.
1061 //
1062 // Another option would be to log a begin and end event, but just fill in NULLs
1063 // for everything we get from the portal (prepared statement id, params).
1064 let extra = outer_context.unwrap_or_else(Default::default);
1065 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1066 return ctx.retire(Err(err));
1067 }
1068
1069 // The reference to `portal` can't outlive `session`, which we
1070 // use to construct the context, so scope the reference to this block where we
1071 // get everything we need from the portal for later.
1072 let (stmt, ctx, params) = {
1073 let portal = session
1074 .get_portal_unverified(&portal_name)
1075 .expect("known to exist");
1076 let params = portal.parameters.clone();
1077 let stmt = portal.stmt.clone();
1078 let logging = Arc::clone(&portal.logging);
1079 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
1080
1081 let extra = if let Some(extra) = outer_context {
1082 // We are executing in the context of another SQL statement, so we don't
1083 // want to begin statement logging anew. The context of the actual statement
1084 // being executed is the one that should be retired once this finishes.
1085 extra
1086 } else {
1087 // This is a new statement, log it and return the context
1088 let maybe_uuid = self.begin_statement_execution(
1089 &mut session,
1090 ¶ms,
1091 &logging,
1092 lifecycle_timestamps,
1093 );
1094
1095 ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
1096 };
1097 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1098 (stmt, ctx, params)
1099 };
1100
1101 let stmt = match stmt {
1102 Some(stmt) => stmt,
1103 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
1104 };
1105
1106 let session_type = metrics::session_type_label_value(ctx.session().user());
1107 let stmt_type = metrics::statement_type_label_value(&stmt);
1108 self.metrics
1109 .query_total
1110 .with_label_values(&[session_type, stmt_type])
1111 .inc();
1112 match &*stmt {
1113 Statement::Subscribe(SubscribeStatement { output, .. })
1114 | Statement::Copy(CopyStatement {
1115 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
1116 ..
1117 }) => {
1118 self.metrics
1119 .subscribe_outputs
1120 .with_label_values(&[
1121 session_type,
1122 metrics::subscribe_output_label_value(output),
1123 ])
1124 .inc();
1125 }
1126 _ => {}
1127 }
1128
1129 self.handle_execute_inner(stmt, params, ctx).await
1130 }
1131
1132 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
1133 pub(crate) async fn handle_execute_inner(
1134 &mut self,
1135 stmt: Arc<Statement<Raw>>,
1136 params: Params,
1137 mut ctx: ExecuteContext,
1138 ) {
1139 // This comment describes the various ways DDL can execute (the ordered operations: name
1140 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
1141 // three notable properties that all partially interact.
1142 //
1143 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1144 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1145 // We announce success of the single DDL when it is executed, but do not attempt to plan
1146 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1147 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
1148 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1149 // work. When the single DDL is announced as successful we also put the session's
1150 // transaction ops into `SingleStatement` which will produce an error if any other
1151 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
1152 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
1153 // statement.
1154 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1155 // any number of only these DDL statements to be executed in a transaction. During
1156 // sequencing we run an incremental catalog dry run against in-memory transaction state
1157 // and store the resulting `CatalogState` in `TransactionOps::DDL`, but nothing is yet
1158 // committed to the durable catalog. At `COMMIT`, all accumulated ops are applied in one
1159 // catalog transaction. The purpose of this is to allow multiple, atomic renames in the
1160 // same transaction.
1161 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1162 // makes network calls (interfacing with secrets, optimization of views/indexes, source
1163 // purification). These must guarantee correctness when they return to the main
1164 // coordinator thread because the catalog state could have changed while they were doing
1165 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1166 // of IDs that we needed to exist. We discovered the way we were doing that was not
1167 // always correct. Instead of attempting to get that completely right, we have opted to
1168 // serialize DDL. Getting this right is difficult because catalog changes can affect name
1169 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
1170 // aware of all possible catalog changes that would affect any of those parts is not
1171 // something our current code has been designed to be helpful at. Even if a DDL statement
1172 // is doing off-thread work, another DDL must not yet execute at all. Executing these
1173 // serially will guarantee that no off-thread work has affected the state of the catalog.
1174 // This is done by adding a VecDeque of deferred statements and a lock to the
1175 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1176 // transaction ops are needed to the session as described above), it attempts to own the
1177 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1178 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1179 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
1180 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1181 // awaits dequeuing caused by the lock being released.
1182
1183 // Verify that this statement type can be executed in the current
1184 // transaction state.
1185 match ctx.session().transaction() {
1186 // By this point we should be in a running transaction.
1187 TransactionStatus::Default => unreachable!(),
1188
1189 // Failed transactions have already been checked in pgwire for a safe statement
1190 // (COMMIT, ROLLBACK, etc.) and can proceed.
1191 TransactionStatus::Failed(_) => {}
1192
1193 // Started is a deceptive name, and means different things depending on which
1194 // protocol was used. It's either exactly one statement (known because this
1195 // is the simple protocol and the parser parsed the entire string, and it had
1196 // one statement). Or from the extended protocol, it means *some* query is
1197 // being executed, but there might be others after it before the Sync (commit)
1198 // message. Postgres handles this by teaching Started to eagerly commit certain
1199 // statements that can't be run in a transaction block.
1200 TransactionStatus::Started(_) => {
1201 if let Statement::Declare(_) = &*stmt {
1202 // Declare is an exception. Although it's not against any spec to execute
1203 // it, it will always result in nothing happening, since all portals will be
1204 // immediately closed. Users don't know this detail, so this error helps them
1205 // understand what's going wrong. Postgres does this too.
1206 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1207 "DECLARE CURSOR".into(),
1208 )));
1209 }
1210 }
1211
1212 // Implicit or explicit transactions.
1213 //
1214 // Implicit transactions happen when a multi-statement query is executed
1215 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1216 // then the existing implicit transaction will be upgraded to an explicit
1217 // transaction. Thus, we should not separate what implicit and explicit
1218 // transactions can do unless there's some additional checking to make sure
1219 // something disallowed in explicit transactions did not previously take place
1220 // in the implicit portion.
1221 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1222 match &*stmt {
1223 // Statements that are safe in a transaction. We still need to verify that we
1224 // don't interleave reads and writes since we can't perform those serializably.
1225 Statement::Close(_)
1226 | Statement::Commit(_)
1227 | Statement::Copy(_)
1228 | Statement::Deallocate(_)
1229 | Statement::Declare(_)
1230 | Statement::Discard(_)
1231 | Statement::Execute(_)
1232 | Statement::ExplainPlan(_)
1233 | Statement::ExplainPushdown(_)
1234 | Statement::ExplainAnalyzeObject(_)
1235 | Statement::ExplainAnalyzeCluster(_)
1236 | Statement::ExplainTimestamp(_)
1237 | Statement::ExplainSinkSchema(_)
1238 | Statement::Fetch(_)
1239 | Statement::Prepare(_)
1240 | Statement::Rollback(_)
1241 | Statement::Select(_)
1242 | Statement::SetTransaction(_)
1243 | Statement::Show(_)
1244 | Statement::SetVariable(_)
1245 | Statement::ResetVariable(_)
1246 | Statement::StartTransaction(_)
1247 | Statement::Subscribe(_)
1248 | Statement::Raise(_) => {
1249 // Always safe.
1250 }
1251
1252 Statement::Insert(InsertStatement {
1253 source, returning, ..
1254 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1255 // Inserting from constant values statements that do not need to execute on
1256 // any cluster (no RETURNING) is always safe.
1257 }
1258
1259 // These statements must be kept in-sync with `must_serialize_ddl()`.
1260 Statement::AlterObjectRename(_)
1261 | Statement::AlterObjectSwap(_)
1262 | Statement::CreateTableFromSource(_)
1263 | Statement::CreateSource(_) => {
1264 let state = self.catalog().for_session(ctx.session()).state().clone();
1265 let revision = self.catalog().transient_revision();
1266
1267 // Initialize our transaction with a set of empty ops, or return an error
1268 // if we can't run a DDL transaction
1269 let txn_status = ctx.session_mut().transaction_mut();
1270 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1271 ops: vec![],
1272 state,
1273 revision,
1274 side_effects: vec![],
1275 snapshot: None,
1276 }) {
1277 return ctx.retire(Err(err));
1278 }
1279 }
1280
1281 // Statements below must by run singly (in Started).
1282 Statement::AlterCluster(_)
1283 | Statement::AlterConnection(_)
1284 | Statement::AlterDefaultPrivileges(_)
1285 | Statement::AlterIndex(_)
1286 | Statement::AlterMaterializedViewApplyReplacement(_)
1287 | Statement::AlterSetCluster(_)
1288 | Statement::AlterOwner(_)
1289 | Statement::AlterRetainHistory(_)
1290 | Statement::AlterRole(_)
1291 | Statement::AlterSecret(_)
1292 | Statement::AlterSink(_)
1293 | Statement::AlterSource(_)
1294 | Statement::AlterSystemReset(_)
1295 | Statement::AlterSystemResetAll(_)
1296 | Statement::AlterSystemSet(_)
1297 | Statement::AlterTableAddColumn(_)
1298 | Statement::AlterNetworkPolicy(_)
1299 | Statement::CreateCluster(_)
1300 | Statement::CreateClusterReplica(_)
1301 | Statement::CreateConnection(_)
1302 | Statement::CreateDatabase(_)
1303 | Statement::CreateIndex(_)
1304 | Statement::CreateMaterializedView(_)
1305 | Statement::CreateRole(_)
1306 | Statement::CreateSchema(_)
1307 | Statement::CreateSecret(_)
1308 | Statement::CreateSink(_)
1309 | Statement::CreateSubsource(_)
1310 | Statement::CreateTable(_)
1311 | Statement::CreateType(_)
1312 | Statement::CreateView(_)
1313 | Statement::CreateWebhookSource(_)
1314 | Statement::CreateNetworkPolicy(_)
1315 | Statement::Delete(_)
1316 | Statement::DropObjects(_)
1317 | Statement::DropOwned(_)
1318 | Statement::GrantPrivileges(_)
1319 | Statement::GrantRole(_)
1320 | Statement::Insert(_)
1321 | Statement::ReassignOwned(_)
1322 | Statement::RevokePrivileges(_)
1323 | Statement::RevokeRole(_)
1324 | Statement::Update(_)
1325 | Statement::ValidateConnection(_)
1326 | Statement::Comment(_)
1327 | Statement::ExecuteUnitTest(_) => {
1328 let txn_status = ctx.session_mut().transaction_mut();
1329
1330 // If we're not in an implicit transaction and we could generate exactly one
1331 // valid ExecuteResponse, we can delay execution until commit.
1332 if !txn_status.is_implicit() {
1333 // Statements whose tag is trivial (known only from an unexecuted statement) can
1334 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1335 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1336 // delay execution until `COMMIT`.
1337 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1338 if let Err(err) = txn_status
1339 .add_ops(TransactionOps::SingleStatement { stmt, params })
1340 {
1341 ctx.retire(Err(err));
1342 return;
1343 }
1344 ctx.retire(Ok(resp));
1345 return;
1346 }
1347 }
1348
1349 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1350 stmt.to_string(),
1351 )));
1352 }
1353 }
1354 }
1355 }
1356
1357 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1358 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1359 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1360 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1361 // Coordinator::clear_transaction is correctly called when session transactions are ended
1362 // because that function will release the held lock from active_conns.
1363 if Self::must_serialize_ddl(&stmt, &ctx) {
1364 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1365 let prev = self
1366 .active_conns
1367 .get_mut(ctx.session().conn_id())
1368 .expect("connection must exist")
1369 .deferred_lock
1370 .replace(guard);
1371 assert!(
1372 prev.is_none(),
1373 "connections should have at most one lock guard"
1374 );
1375 } else {
1376 if self
1377 .active_conns
1378 .get(ctx.session().conn_id())
1379 .expect("connection must exist")
1380 .deferred_lock
1381 .is_some()
1382 {
1383 // This session *already* has the lock, and incorrectly tried to execute another
1384 // DDL while still holding the lock, violating the assumption documented above.
1385 // This is an internal error, probably in some AdapterClient user (pgwire or
1386 // http). Because the session is now in some unexpected state, return an error
1387 // which should cause the AdapterClient user to fail the transaction.
1388 // (Terminating the connection is maybe what we would prefer to do, but is not
1389 // currently a thing we can do from the coordinator: calling handle_terminate
1390 // cleans up Coordinator state for the session but doesn't inform the
1391 // AdapterClient that the session should terminate.)
1392 soft_panic_or_log!(
1393 "session {} attempted to get ddl lock while already owning it",
1394 ctx.session().conn_id()
1395 );
1396 ctx.retire(Err(AdapterError::Internal(
1397 "session attempted to get ddl lock while already owning it".to_string(),
1398 )));
1399 return;
1400 }
1401 self.serialized_ddl.push_back(DeferredPlanStatement {
1402 ctx,
1403 ps: PlanStatement::Statement { stmt, params },
1404 });
1405 return;
1406 }
1407 }
1408
1409 let catalog = self.catalog();
1410 let catalog = catalog.for_session(ctx.session());
1411 let original_stmt = Arc::clone(&stmt);
1412 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1413 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1414 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1415 Ok(resolved) => resolved,
1416 Err(e) => return ctx.retire(Err(e.into())),
1417 };
1418 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1419 // purification. This should be done back on the main thread.
1420 // We do the validation:
1421 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1422 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1423 // occurs.
1424 let (stmt, resolved_ids) = match stmt {
1425 // Various statements must be purified off the main coordinator thread of control.
1426 stmt if Self::must_spawn_purification(&stmt) => {
1427 let internal_cmd_tx = self.internal_cmd_tx.clone();
1428 let conn_id = ctx.session().conn_id().clone();
1429 let catalog = self.owned_catalog();
1430 let now = self.now();
1431 let otel_ctx = OpenTelemetryContext::obtain();
1432 let current_storage_configuration = self.controller.storage.config().clone();
1433 task::spawn(|| format!("purify:{conn_id}"), async move {
1434 let transient_revision = catalog.transient_revision();
1435 let catalog = catalog.for_session(ctx.session());
1436
1437 // Checks if the session is authorized to purify a statement. Usually
1438 // authorization is checked after planning, however purification happens before
1439 // planning, which may require the use of some connections and secrets.
1440 if let Err(e) = rbac::check_usage(
1441 &catalog,
1442 ctx.session(),
1443 &resolved_ids,
1444 &CREATE_ITEM_USAGE,
1445 ) {
1446 return ctx.retire(Err(e.into()));
1447 }
1448
1449 let (result, cluster_id) = mz_sql::pure::purify_statement(
1450 catalog,
1451 now,
1452 stmt,
1453 ¤t_storage_configuration,
1454 )
1455 .await;
1456 let result = result.map_err(|e| e.into());
1457 let dependency_ids = resolved_ids.items().copied().collect();
1458 let plan_validity = PlanValidity::new(
1459 transient_revision,
1460 dependency_ids,
1461 cluster_id,
1462 None,
1463 ctx.session().role_metadata().clone(),
1464 );
1465 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1466 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1467 PurifiedStatementReady {
1468 ctx,
1469 result,
1470 params,
1471 plan_validity,
1472 original_stmt,
1473 otel_ctx,
1474 },
1475 ));
1476 if let Err(e) = result {
1477 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1478 }
1479 });
1480 return;
1481 }
1482
1483 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1484 // automatically as part of purification
1485 Statement::CreateSubsource(_) => {
1486 ctx.retire(Err(AdapterError::Unsupported(
1487 "CREATE SUBSOURCE statements",
1488 )));
1489 return;
1490 }
1491
1492 Statement::CreateMaterializedView(mut cmvs) => {
1493 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1494 // only used for storing initial frontiers in the catalog.
1495 if cmvs.as_of.is_some() {
1496 return ctx.retire(Err(AdapterError::Unsupported(
1497 "CREATE MATERIALIZED VIEW ... AS OF statements",
1498 )));
1499 }
1500
1501 let mz_now = match self
1502 .resolve_mz_now_for_create_materialized_view(
1503 &cmvs,
1504 &resolved_ids,
1505 ctx.session_mut(),
1506 true,
1507 )
1508 .await
1509 {
1510 Ok(mz_now) => mz_now,
1511 Err(e) => return ctx.retire(Err(e)),
1512 };
1513
1514 let catalog = self.catalog().for_session(ctx.session());
1515
1516 purify_create_materialized_view_options(
1517 catalog,
1518 mz_now,
1519 &mut cmvs,
1520 &mut resolved_ids,
1521 );
1522
1523 let purified_stmt =
1524 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1525 if_exists: cmvs.if_exists,
1526 name: cmvs.name,
1527 columns: cmvs.columns,
1528 replacement_for: cmvs.replacement_for,
1529 in_cluster: cmvs.in_cluster,
1530 in_cluster_replica: cmvs.in_cluster_replica,
1531 query: cmvs.query,
1532 with_options: cmvs.with_options,
1533 as_of: None,
1534 });
1535
1536 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1537 // `Message::PurifiedStatementReady` here.)
1538 (purified_stmt, resolved_ids)
1539 }
1540
1541 Statement::ExplainPlan(ExplainPlanStatement {
1542 stage,
1543 with_options,
1544 format,
1545 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1546 }) => {
1547 let mut cmvs = *box_cmvs;
1548 let mz_now = match self
1549 .resolve_mz_now_for_create_materialized_view(
1550 &cmvs,
1551 &resolved_ids,
1552 ctx.session_mut(),
1553 false,
1554 )
1555 .await
1556 {
1557 Ok(mz_now) => mz_now,
1558 Err(e) => return ctx.retire(Err(e)),
1559 };
1560
1561 let catalog = self.catalog().for_session(ctx.session());
1562
1563 purify_create_materialized_view_options(
1564 catalog,
1565 mz_now,
1566 &mut cmvs,
1567 &mut resolved_ids,
1568 );
1569
1570 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1571 stage,
1572 with_options,
1573 format,
1574 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1575 });
1576
1577 (purified_stmt, resolved_ids)
1578 }
1579
1580 // All other statements are handled immediately.
1581 _ => (stmt, resolved_ids),
1582 };
1583
1584 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1585 Ok((plan, sql_impl_ids)) => {
1586 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
1587 .await
1588 }
1589 Err(e) => ctx.retire(Err(e)),
1590 }
1591 }
1592
1593 /// Whether the statement must be serialized and is DDL.
1594 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1595 // Non-DDL is not serialized here.
1596 if !StatementClassification::from(&*stmt).is_ddl() {
1597 return false;
1598 }
1599 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1600 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1601 // those checks are sufficient.
1602 if Self::must_spawn_purification(stmt) {
1603 return false;
1604 }
1605
1606 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1607 // Their operations are serialized when applied to the catalog, guaranteeing that any
1608 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1609 if ctx.session.transaction().is_ddl() {
1610 return false;
1611 }
1612
1613 // Some DDL is exempt. It is not great that we are matching on Statements here because
1614 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1615 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1616 // planned in the first place, so we accept the abstraction leak.
1617 match stmt {
1618 // Secrets have a small and understood set of dependencies, and their off-thread work
1619 // interacts with k8s.
1620 Statement::AlterSecret(_) => false,
1621 Statement::CreateSecret(_) => false,
1622 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1623 if actions
1624 .iter()
1625 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1626 {
1627 false
1628 }
1629
1630 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1631 // does not affect its catalog names or ids and so is safe to not serialize. This could
1632 // change the set of replicas that exist. For queries that name replicas or use the
1633 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1634 // ensure that those replicas exist during the query finish stage. Additionally, that
1635 // work can take hours (configured by the user), so would also be a bad experience for
1636 // users.
1637 Statement::AlterCluster(_) => false,
1638
1639 // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1640 // cutover. If the old collection is stalled, it may block forever. Checks in
1641 // sequencing ensure that the operation fails if any one of these happens concurrently:
1642 // * the sink is dropped
1643 // * the new source relation is dropped
1644 // * another `ALTER SINK` for the same sink is applied first
1645 Statement::AlterSink(stmt)
1646 if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1647 {
1648 false
1649 }
1650
1651 // `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT` waits for the target MV to make
1652 // enough progress for a clean cutover. If the target MV is stalled, it may block
1653 // forever. Checks in sequencing ensure the operation fails if any of these happens
1654 // concurrently:
1655 // * the target MV is dropped
1656 // * the replacement MV is dropped
1657 Statement::AlterMaterializedViewApplyReplacement(_) => false,
1658
1659 // Everything else must be serialized.
1660 _ => true,
1661 }
1662 }
1663
1664 /// Whether the statement must be purified off of the Coordinator thread.
1665 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1666 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1667 // coordinator thread.
1668 if !matches!(
1669 stmt,
1670 Statement::CreateSource(_)
1671 | Statement::AlterSource(_)
1672 | Statement::CreateSink(_)
1673 | Statement::CreateTableFromSource(_)
1674 ) {
1675 return false;
1676 }
1677
1678 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1679 if let Statement::AlterSource(stmt) = stmt {
1680 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1681 AlterSourceAction::SetOptions(options) => {
1682 options.iter().map(|o| o.name.clone()).collect()
1683 }
1684 AlterSourceAction::ResetOptions(names) => names.clone(),
1685 _ => vec![],
1686 };
1687 if !names.is_empty()
1688 && names
1689 .iter()
1690 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1691 {
1692 return false;
1693 }
1694 }
1695
1696 true
1697 }
1698
1699 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1700 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1701 /// option, this function grabs read holds at the earliest possible time on input collections
1702 /// that might be involved in the MV.
1703 ///
1704 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1705 /// in `with_options`).
1706 ///
1707 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1708 /// unfortunately.)
1709 async fn resolve_mz_now_for_create_materialized_view(
1710 &mut self,
1711 cmvs: &CreateMaterializedViewStatement<Aug>,
1712 resolved_ids: &ResolvedIds,
1713 session: &Session,
1714 acquire_read_holds: bool,
1715 ) -> Result<Option<Timestamp>, AdapterError> {
1716 if cmvs
1717 .with_options
1718 .iter()
1719 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1720 {
1721 let catalog = self.catalog().for_session(session);
1722 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1723 let ids = self
1724 .index_oracle(cluster)
1725 .sufficient_collections(resolved_ids.collections().copied());
1726
1727 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1728 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1729 // we want to check the AT times against the read holds that we acquire here. But
1730 // we do it for any REFRESH option, to avoid having so many code paths doing different
1731 // things.)
1732 //
1733 // It's important that we acquire read holds _before_ we determine the least valid read.
1734 // Otherwise, we're not guaranteed that the since frontier doesn't
1735 // advance forward from underneath us.
1736 let read_holds = self.acquire_read_holds(&ids);
1737
1738 // Does `mz_now()` occur?
1739 let mz_now_ts = if cmvs
1740 .with_options
1741 .iter()
1742 .any(materialized_view_option_contains_temporal)
1743 {
1744 let timeline_context = self
1745 .catalog()
1746 .validate_timeline_context(resolved_ids.collections().copied())?;
1747
1748 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1749 // but even in the TimestampIndependent case.
1750 // Note that we didn't accurately decide whether we are TimestampDependent
1751 // or TimestampIndependent, because for this we'd need to also check whether
1752 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1753 // However, this doesn't matter here, as we are just going to default to
1754 // EpochMilliseconds in both cases.
1755 let timeline = timeline_context
1756 .timeline()
1757 .unwrap_or(&Timeline::EpochMilliseconds);
1758
1759 // Let's start with the timestamp oracle read timestamp.
1760 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1761
1762 // If `least_valid_read` is later than the oracle, then advance to that time.
1763 // If we didn't do this, then there would be a danger of missing the first refresh,
1764 // which might cause the materialized view to be unreadable for hours. This might
1765 // be what was happening here:
1766 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1767 //
1768 // In the long term, it would be good to actually block the MV creation statement
1769 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1770 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1771 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1772 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1773 // VIEW statement returned.
1774 let oracle_timestamp = timestamp;
1775 let least_valid_read = read_holds.least_valid_read();
1776 timestamp.advance_by(least_valid_read.borrow());
1777
1778 if oracle_timestamp != timestamp {
1779 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1780 }
1781
1782 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1783 Ok(Some(timestamp))
1784 } else {
1785 Ok(None)
1786 };
1787
1788 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1789 // released when we don't use it.
1790 if acquire_read_holds {
1791 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1792 }
1793
1794 mz_now_ts
1795 } else {
1796 Ok(None)
1797 }
1798 }
1799
1800 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1801 /// the named `conn_id` if the correct secret key is specified.
1802 ///
1803 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1804 /// `ConnectionId` because this method gets called by external clients when
1805 /// they request to cancel a request.
1806 #[mz_ore::instrument(level = "debug")]
1807 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1808 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1809 // If the secret key specified by the client doesn't match the
1810 // actual secret key for the target connection, we treat this as a
1811 // rogue cancellation request and ignore it.
1812 if conn_meta.secret_key != secret_key {
1813 return;
1814 }
1815
1816 // Now that we've verified the secret key, this is a privileged
1817 // cancellation request. We can upgrade the raw connection ID to a
1818 // proper `IdHandle`.
1819 self.handle_privileged_cancel(id_handle.clone()).await;
1820 }
1821 }
1822
1823 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1824 /// interactive work for the named `conn_id`.
1825 #[mz_ore::instrument(level = "debug")]
1826 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1827 let mut maybe_ctx = None;
1828
1829 // Cancel pending writes. There is at most one pending write per session.
1830 let pending_write_idx = self.pending_writes.iter().position(|pending_write_txn| {
1831 matches!(pending_write_txn, PendingWriteTxn::User {
1832 pending_txn: PendingTxn { ctx, .. },
1833 ..
1834 } if *ctx.session().conn_id() == conn_id)
1835 });
1836 if let Some(idx) = pending_write_idx {
1837 if let PendingWriteTxn::User {
1838 pending_txn: PendingTxn { ctx, .. },
1839 ..
1840 } = self.pending_writes.remove(idx)
1841 {
1842 maybe_ctx = Some(ctx);
1843 }
1844 }
1845
1846 // Cancel deferred writes.
1847 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1848 maybe_ctx = Some(write_op.into_ctx());
1849 }
1850
1851 // Cancel deferred statements.
1852 let deferred_ddl_idx = self
1853 .serialized_ddl
1854 .iter()
1855 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id);
1856 if let Some(idx) = deferred_ddl_idx {
1857 let deferred = self
1858 .serialized_ddl
1859 .remove(idx)
1860 .expect("known to exist from call to `position` above");
1861 maybe_ctx = Some(deferred.ctx);
1862 }
1863
1864 // Cancel reads waiting on being linearized. There is at most one linearized read per
1865 // session.
1866 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1867 let ctx = pending_read_txn.take_context();
1868 maybe_ctx = Some(ctx);
1869 }
1870
1871 if let Some(ctx) = maybe_ctx {
1872 ctx.retire(Err(AdapterError::Canceled));
1873 }
1874
1875 self.cancel_pending_peeks(&conn_id);
1876 self.cancel_pending_watchsets(&conn_id);
1877 self.cancel_compute_sinks_for_conn(&conn_id).await;
1878 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1879 .await;
1880 self.cancel_pending_copy(&conn_id);
1881 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1882 let _ = tx.send(true);
1883 }
1884 }
1885
1886 /// Handle termination of a client session.
1887 ///
1888 /// This cleans up any state in the coordinator associated with the session.
1889 #[mz_ore::instrument(level = "debug")]
1890 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1891 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1892 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1893 // debug. This panic is very infrequent so we want as much information as possible.
1894 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1895 assert!(
1896 self.active_conns.contains_key(&conn_id),
1897 "unknown connection: {conn_id:?}\n\n{self:?}"
1898 );
1899
1900 // We do not need to call clear_transaction here because there are no side effects to run
1901 // based on any session transaction state.
1902 self.clear_connection(&conn_id).await;
1903
1904 self.drop_temp_items(&conn_id).await;
1905 // Only call catalog_mut() if a temporary schema actually exists for this connection.
1906 // This avoids an expensive Arc::make_mut clone for the common case where the connection
1907 // never created any temporary objects.
1908 if self.catalog().state().has_temporary_schema(&conn_id) {
1909 self.catalog_mut()
1910 .drop_temporary_schema(&conn_id)
1911 .unwrap_or_terminate("unable to drop temporary schema");
1912 }
1913 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1914 let session_type = metrics::session_type_label_value(conn.user());
1915 self.metrics
1916 .active_sessions
1917 .with_label_values(&[session_type])
1918 .dec();
1919 self.cancel_pending_peeks(conn.conn_id());
1920 self.cancel_pending_watchsets(&conn_id);
1921 self.cancel_pending_copy(&conn_id);
1922 self.end_session_for_statement_logging(conn.uuid());
1923
1924 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1925 // this to prevent blocking the Coordinator in the case that a lot of connections are
1926 // closed at once, which occurs regularly in some workflows.
1927 let update = self
1928 .catalog()
1929 .state()
1930 .pack_session_update(&conn, Diff::MINUS_ONE);
1931 let update = self.catalog().state().resolve_builtin_table_update(update);
1932
1933 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1934 }
1935
1936 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1937 /// rows.
1938 #[mz_ore::instrument(level = "debug")]
1939 fn handle_get_webhook(
1940 &mut self,
1941 database: String,
1942 schema: String,
1943 name: String,
1944 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1945 ) {
1946 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1947 ///
1948 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1949 /// types we should cast the request to.
1950 fn resolve(
1951 coord: &mut Coordinator,
1952 database: String,
1953 schema: String,
1954 name: String,
1955 ) -> Result<AppendWebhookResponse, PartialItemName> {
1956 // Resolve our collection.
1957 let name = PartialItemName {
1958 database: Some(database),
1959 schema: Some(schema),
1960 item: name,
1961 };
1962 let Ok(entry) = coord
1963 .catalog()
1964 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1965 else {
1966 return Err(name);
1967 };
1968
1969 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1970 let (data_source, desc, global_id) = match entry.item() {
1971 CatalogItem::Source(Source {
1972 data_source: data_source @ DataSourceDesc::Webhook { .. },
1973 desc,
1974 global_id,
1975 ..
1976 }) => (data_source, desc.clone(), *global_id),
1977 CatalogItem::Table(
1978 table @ Table {
1979 desc,
1980 data_source:
1981 TableDataSource::DataSource {
1982 desc: data_source @ DataSourceDesc::Webhook { .. },
1983 ..
1984 },
1985 ..
1986 },
1987 ) => (data_source, desc.latest(), table.global_id_writes()),
1988 _ => return Err(name),
1989 };
1990
1991 let DataSourceDesc::Webhook {
1992 validate_using,
1993 body_format,
1994 headers,
1995 ..
1996 } = data_source
1997 else {
1998 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1999 return Err(name);
2000 };
2001 let body_format = body_format.clone();
2002 let header_tys = headers.clone();
2003
2004 // Assert we have one column for the body, and how ever many are required for
2005 // the headers.
2006 let num_columns = headers.num_columns() + 1;
2007 mz_ore::soft_assert_or_log!(
2008 desc.arity() <= num_columns,
2009 "expected at most {} columns, but got {}",
2010 num_columns,
2011 desc.arity()
2012 );
2013
2014 // Double check that the body column of the webhook source matches the type
2015 // we're about to deserialize as.
2016 let body_column = desc
2017 .get_by_name(&"body".into())
2018 .map(|(_idx, ty)| ty.clone())
2019 .ok_or_else(|| name.clone())?;
2020 assert!(!body_column.nullable, "webhook body column is nullable!?");
2021 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
2022
2023 // Create a validator that can be called to validate a webhook request.
2024 let validator = validate_using.as_ref().map(|v| {
2025 let validation = v.clone();
2026 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
2027 });
2028
2029 // Get a channel so we can queue updates to be written.
2030 let row_tx = coord
2031 .controller
2032 .storage
2033 .monotonic_appender(global_id)
2034 .map_err(|_| name.clone())?;
2035 let stats = coord
2036 .controller
2037 .storage
2038 .webhook_statistics(global_id)
2039 .map_err(|_| name)?;
2040 let invalidator = coord
2041 .active_webhooks
2042 .entry(entry.id())
2043 .or_insert_with(WebhookAppenderInvalidator::new);
2044 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
2045
2046 Ok(AppendWebhookResponse {
2047 tx,
2048 body_format,
2049 header_tys,
2050 validator,
2051 })
2052 }
2053
2054 let response = resolve(self, database, schema, name).map_err(|name| {
2055 AppendWebhookError::UnknownWebhook {
2056 database: name.database.expect("provided"),
2057 schema: name.schema.expect("provided"),
2058 name: name.item,
2059 }
2060 });
2061 let _ = tx.send(response);
2062 }
2063
2064 /// Handle registration of a frontend peek, for statement logging and query cancellation
2065 /// handling.
2066 fn handle_register_frontend_peek(
2067 &mut self,
2068 uuid: Uuid,
2069 conn_id: ConnectionId,
2070 cluster_id: mz_controller_types::ClusterId,
2071 depends_on: BTreeSet<GlobalId>,
2072 is_fast_path: bool,
2073 watch_set: Option<WatchSetCreation>,
2074 tx: oneshot::Sender<Result<(), AdapterError>>,
2075 ) {
2076 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
2077 if let Some(ws) = watch_set {
2078 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
2079 let _ = tx.send(Err(
2080 AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
2081 ));
2082 return;
2083 }
2084 }
2085
2086 // Store the peek in pending_peeks for later retrieval when results arrive
2087 self.pending_peeks.insert(
2088 uuid,
2089 PendingPeek {
2090 conn_id: conn_id.clone(),
2091 cluster_id,
2092 depends_on,
2093 ctx_extra: ExecuteContextGuard::new(
2094 statement_logging_id,
2095 self.internal_cmd_tx.clone(),
2096 ),
2097 is_fast_path,
2098 },
2099 );
2100
2101 // Also track it by connection ID for cancellation support
2102 self.client_pending_peeks
2103 .entry(conn_id)
2104 .or_default()
2105 .insert(uuid, cluster_id);
2106
2107 let _ = tx.send(Ok(()));
2108 }
2109
2110 /// Handle unregistration of a frontend peek that was registered but failed to issue.
2111 /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
2112 fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
2113 // Remove from pending_peeks (this also removes from client_pending_peeks)
2114 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
2115 // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
2116 let _ = pending_peek.ctx_extra.defuse();
2117 }
2118 let _ = tx.send(());
2119 }
2120}