mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::AuthenticatorKind;
17use mz_auth::password::Password;
18use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
19use mz_sql::catalog::AutoProvisionSource;
20use mz_sql::session::metadata::SessionMetadata;
21use std::collections::{BTreeMap, BTreeSet};
22use std::net::IpAddr;
23use std::sync::Arc;
24
25use futures::FutureExt;
26use futures::future::LocalBoxFuture;
27use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::memory::objects::{
30 CatalogItem, DataSourceDesc, Role, Source, Table, TableDataSource,
31};
32use mz_ore::task;
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{instrument, soft_panic_or_log};
35use mz_repr::role_id::RoleId;
36use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
37use mz_sql::ast::{
38 AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
39 ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
40 SubscribeStatement,
41};
42use mz_sql::catalog::RoleAttributesRaw;
43use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
44use mz_sql::plan::{
45 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
46 StatementClassification, TransactionType,
47};
48use mz_sql::pure::{
49 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
50};
51use mz_sql::rbac;
52use mz_sql::rbac::CREATE_ITEM_USAGE;
53use mz_sql::session::user::User;
54use mz_sql::session::vars::{
55 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE,
56 TRANSACTION_ISOLATION_VAR_NAME, Value, Var, check_transaction_isolation_feature_flag,
57};
58use mz_sql_parser::ast::display::AstDisplay;
59use mz_sql_parser::ast::{
60 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
61 WithOptionValue,
62};
63use mz_storage_types::sources::Timeline;
64use opentelemetry::trace::TraceContextExt;
65use tokio::sync::{mpsc, oneshot};
66use tracing::{Instrument, debug_span, info, warn};
67use tracing_opentelemetry::OpenTelemetrySpanExt;
68use uuid::Uuid;
69
70use crate::command::{
71 CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
72 SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
73};
74use crate::coord::appends::{PendingWriteTxn, UserWriteResponder};
75use crate::coord::peek::PendingPeek;
76use crate::coord::{
77 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
78 PurifiedStatementReady, validate_ip_with_policy_rules,
79};
80use crate::error::{AdapterError, AuthenticationError};
81use crate::notice::AdapterNotice;
82use crate::session::{Session, TransactionOps, TransactionStatus};
83use crate::statement_logging::WatchSetCreation;
84use crate::util::{ClientTransmitter, ResultExt};
85use crate::webhook::{
86 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
87};
88use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
89
90use super::ExecuteContextGuard;
91
92/// The login status of a role, used by authentication handlers to check role
93/// existence and login permission before proceeding to credential verification.
94enum RoleLoginStatus {
95 /// The role does not exist in the catalog.
96 NotFound,
97 /// The role exists and has the LOGIN attribute.
98 CanLogin,
99 /// The role exists but does not have the LOGIN attribute.
100 NonLogin,
101}
102
103fn role_login_status(role: Option<&Role>) -> RoleLoginStatus {
104 match role {
105 None => RoleLoginStatus::NotFound,
106 Some(role) => match role.attributes.login {
107 Some(login) if login => RoleLoginStatus::CanLogin,
108 _ => RoleLoginStatus::NonLogin,
109 },
110 }
111}
112
113impl Coordinator {
114 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
115 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
116 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
117 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
118 async move {
119 if let Some(session) = cmd.session_mut() {
120 session.apply_external_metadata_updates();
121 }
122 match cmd {
123 Command::Startup {
124 tx,
125 user,
126 conn_id,
127 secret_key,
128 uuid,
129 client_ip,
130 application_name,
131 notice_tx,
132 } => {
133 // Note: We purposefully do not use a ClientTransmitter here because startup
134 // handles errors and cleanup of sessions itself.
135 self.handle_startup(
136 tx,
137 user,
138 conn_id,
139 secret_key,
140 uuid,
141 client_ip,
142 application_name,
143 notice_tx,
144 )
145 .await;
146 }
147
148 Command::AuthenticatePassword {
149 tx,
150 role_name,
151 password,
152 } => {
153 self.handle_authenticate_password(tx, role_name, password)
154 .await;
155 }
156
157 Command::AuthenticateGetSASLChallenge {
158 tx,
159 role_name,
160 nonce,
161 } => {
162 self.handle_generate_sasl_challenge(tx, role_name, nonce)
163 .await;
164 }
165
166 Command::AuthenticateVerifySASLProof {
167 tx,
168 role_name,
169 proof,
170 mock_hash,
171 auth_message,
172 } => {
173 self.handle_authenticate_verify_sasl_proof(
174 tx,
175 role_name,
176 proof,
177 auth_message,
178 mock_hash,
179 );
180 }
181
182 Command::CheckRoleCanLogin { tx, role_name } => {
183 self.handle_role_can_login(tx, role_name);
184 }
185
186 Command::Execute {
187 portal_name,
188 session,
189 tx,
190 outer_ctx_extra,
191 } => {
192 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
193
194 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
195 .await;
196 }
197
198 Command::StartCopyFromStdin {
199 target_id,
200 target_name,
201 columns,
202 row_desc,
203 params,
204 session,
205 tx,
206 } => {
207 let otel_ctx = OpenTelemetryContext::obtain();
208 let result = self.setup_copy_from_stdin(
209 &session,
210 target_id,
211 target_name,
212 columns,
213 row_desc,
214 params,
215 );
216 let _ = tx.send(Response {
217 result,
218 session,
219 otel_ctx,
220 });
221 }
222
223 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
224
225 Command::CancelRequest {
226 conn_id,
227 secret_key,
228 } => {
229 self.handle_cancel(conn_id, secret_key).await;
230 }
231
232 Command::PrivilegedCancelRequest { conn_id } => {
233 self.handle_privileged_cancel(conn_id).await;
234 }
235
236 Command::GetWebhook {
237 database,
238 schema,
239 name,
240 tx,
241 } => {
242 self.handle_get_webhook(database, schema, name, tx);
243 }
244
245 Command::GetSystemVars { tx } => {
246 let _ = tx.send(self.catalog.system_config().clone());
247 }
248
249 Command::SetSystemVars { vars, conn_id, tx } => {
250 let mut ops = Vec::with_capacity(vars.len());
251 let conn = &self.active_conns[&conn_id];
252
253 for (name, value) in vars {
254 if let Err(e) =
255 self.catalog().system_config().get(&name).and_then(|var| {
256 var.visible(conn.user(), self.catalog.system_config())
257 })
258 {
259 let _ = tx.send(Err(e.into()));
260 return;
261 }
262
263 ops.push(catalog::Op::UpdateSystemConfiguration {
264 name,
265 value: OwnedVarInput::Flat(value),
266 });
267 }
268
269 let result = self
270 .catalog_transact_with_context(Some(&conn_id), None, ops)
271 .await;
272 let _ = tx.send(result);
273 }
274
275 Command::UpdateScopedSystemParameters {
276 overrides,
277 prune_scope,
278 tx,
279 } => {
280 // Store the new working copy, persist it durably, and
281 // reconcile it into the per-scope resolution boundaries.
282 self.reconcile_scoped_system_parameters(overrides, prune_scope)
283 .await;
284 let _ = tx.send(());
285 }
286
287 Command::InstallScopedSystemParameterFrontend { frontend } => {
288 // Keep the shared frontend so create-cluster / create-replica
289 // can resolve scoped overrides synchronously at create time.
290 self.scoped_frontend = Some(frontend);
291 }
292
293 Command::InjectAuditEvents {
294 events,
295 conn_id,
296 tx,
297 } => {
298 let ops = vec![catalog::Op::InjectAuditEvents { events }];
299 let result = self
300 .catalog_transact_with_context(Some(&conn_id), None, ops)
301 .await;
302 let _ = tx.send(result);
303 }
304
305 Command::Terminate { conn_id, tx } => {
306 self.handle_terminate(conn_id).await;
307 // Note: We purposefully do not use a ClientTransmitter here because we're already
308 // terminating the provided session.
309 if let Some(tx) = tx {
310 let _ = tx.send(Ok(()));
311 }
312 }
313
314 Command::Commit {
315 action,
316 session,
317 tx,
318 } => {
319 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
320 // We reach here not through a statement execution, but from the
321 // "commit" pgwire command. Thus, we just generate a default statement
322 // execution context (once statement logging is implemented, this will cause nothing to be logged
323 // when the execution finishes.)
324 let ctx = ExecuteContext::from_parts(
325 tx,
326 self.internal_cmd_tx.clone(),
327 session,
328 Default::default(),
329 );
330 let plan = match action {
331 EndTransactionAction::Commit => {
332 Plan::CommitTransaction(CommitTransactionPlan {
333 transaction_type: TransactionType::Implicit,
334 })
335 }
336 EndTransactionAction::Rollback => {
337 Plan::AbortTransaction(AbortTransactionPlan {
338 transaction_type: TransactionType::Implicit,
339 })
340 }
341 };
342
343 let conn_id = ctx.session().conn_id().clone();
344 self.sequence_plan(ctx, plan, ResolvedIds::empty(), ResolvedIds::empty())
345 .await;
346 // Part of the Command::Commit contract is that the Coordinator guarantees that
347 // it has cleared its transaction state for the connection.
348 self.clear_connection(&conn_id).await;
349 }
350
351 Command::CatalogSnapshot { tx } => {
352 let _ = tx.send(CatalogSnapshot {
353 catalog: self.owned_catalog(),
354 });
355 }
356
357 Command::CheckConsistency { tx } => {
358 let _ = tx.send(self.check_consistency());
359 }
360
361 Command::Dump { tx } => {
362 let _ = tx.send(self.dump().await);
363 }
364
365 Command::GetComputeInstanceClient { instance_id, tx } => {
366 let _ = tx.send(self.controller.compute.instance_client(instance_id));
367 }
368
369 Command::GetOracle { timeline, tx } => {
370 let oracle = self
371 .global_timelines
372 .get(&timeline)
373 .map(|timeline_state| Arc::clone(&timeline_state.oracle))
374 .ok_or(AdapterError::ChangedPlan(
375 "timeline has disappeared during planning".to_string(),
376 ));
377 let _ = tx.send(oracle);
378 }
379
380 Command::DetermineRealTimeRecentTimestamp {
381 source_ids,
382 real_time_recency_timeout,
383 tx,
384 } => {
385 let result = self
386 .determine_real_time_recent_timestamp(
387 source_ids.iter().copied(),
388 real_time_recency_timeout,
389 )
390 .await;
391
392 match result {
393 Ok(Some(fut)) => {
394 let catalog = Arc::clone(&self.catalog);
395 task::spawn(|| "determine real time recent timestamp", async move {
396 let result =
397 Coordinator::await_real_time_recent_timestamp(catalog, fut)
398 .await
399 .map(Some);
400 let _ = tx.send(result);
401 });
402 }
403 Ok(None) => {
404 let _ = tx.send(Ok(None));
405 }
406 Err(e) => {
407 let _ = tx.send(Err(e));
408 }
409 }
410 }
411
412 Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
413 let read_holds = self.txn_read_holds.get(&conn_id).cloned();
414 let _ = tx.send(read_holds);
415 }
416
417 Command::StoreTransactionReadHolds {
418 conn_id,
419 read_holds,
420 tx,
421 } => {
422 self.store_transaction_read_holds(conn_id, read_holds);
423 let _ = tx.send(());
424 }
425
426 Command::ExecuteSlowPathPeek {
427 dataflow_plan,
428 determination,
429 finishing,
430 compute_instance,
431 target_replica,
432 intermediate_result_type,
433 source_ids,
434 conn_id,
435 max_result_size,
436 max_query_result_size,
437 watch_set,
438 tx,
439 } => {
440 let result = self
441 .implement_slow_path_peek(
442 *dataflow_plan,
443 determination,
444 finishing,
445 compute_instance,
446 target_replica,
447 intermediate_result_type,
448 source_ids,
449 conn_id,
450 max_result_size,
451 max_query_result_size,
452 watch_set,
453 )
454 .await;
455 let _ = tx.send(result);
456 }
457
458 Command::ExecuteSubscribe {
459 df_desc,
460 dependency_ids,
461 cluster_id,
462 replica_id,
463 conn_id,
464 session_uuid,
465 read_holds,
466 plan,
467 statement_logging_id,
468 tx,
469 } => {
470 let mut ctx_extra = ExecuteContextGuard::new(
471 statement_logging_id,
472 self.internal_cmd_tx.clone(),
473 );
474 let result = self
475 .implement_subscribe(
476 &mut ctx_extra,
477 df_desc,
478 dependency_ids,
479 cluster_id,
480 replica_id,
481 conn_id,
482 session_uuid,
483 read_holds,
484 plan,
485 )
486 .await;
487 let _ = tx.send(result);
488 }
489
490 Command::CopyToPreflight {
491 s3_sink_connection,
492 sink_id,
493 tx,
494 } => {
495 // Spawn a background task to perform the slow S3 preflight operations.
496 // This avoids blocking the coordinator's main task.
497 let connection_context = self.connection_context().clone();
498 let enforce_external_addresses =
499 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
500 .get(self.controller.storage.config().config_set());
501 task::spawn(|| "copy_to_preflight", async move {
502 let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
503 connection_context,
504 &s3_sink_connection.aws_connection,
505 &s3_sink_connection.upload_info,
506 s3_sink_connection.connection_id,
507 sink_id,
508 enforce_external_addresses,
509 )
510 .await
511 .map_err(AdapterError::from);
512 let _ = tx.send(result);
513 });
514 }
515
516 Command::ExecuteCopyTo {
517 df_desc,
518 compute_instance,
519 target_replica,
520 source_ids,
521 conn_id,
522 watch_set,
523 tx,
524 } => {
525 // implement_copy_to spawns a background task that sends the response
526 // through tx when the COPY TO completes (or immediately if setup fails).
527 // We just call it and let it handle all response sending.
528 self.implement_copy_to(
529 *df_desc,
530 compute_instance,
531 target_replica,
532 source_ids,
533 conn_id,
534 watch_set,
535 tx,
536 )
537 .await;
538 }
539
540 Command::ExecuteSideEffectingFunc {
541 plan,
542 conn_id,
543 current_role,
544 tx,
545 } => {
546 let result = self
547 .execute_side_effecting_func(plan, conn_id, current_role)
548 .await;
549 let _ = tx.send(result);
550 }
551 Command::RegisterFrontendPeek {
552 uuid,
553 conn_id,
554 cluster_id,
555 depends_on,
556 is_fast_path,
557 watch_set,
558 tx,
559 } => {
560 self.handle_register_frontend_peek(
561 uuid,
562 conn_id,
563 cluster_id,
564 depends_on,
565 is_fast_path,
566 watch_set,
567 tx,
568 );
569 }
570 Command::UnregisterFrontendPeek { uuid, tx } => {
571 self.handle_unregister_frontend_peek(uuid, tx);
572 }
573 Command::ExplainTimestamp {
574 conn_id,
575 session_wall_time,
576 cluster_id,
577 id_bundle,
578 determination,
579 tx,
580 } => {
581 let explanation = self.explain_timestamp(
582 &conn_id,
583 session_wall_time,
584 cluster_id,
585 &id_bundle,
586 determination,
587 );
588 let _ = tx.send(explanation);
589 }
590 Command::FrontendStatementLogging(event) => {
591 self.handle_frontend_statement_logging_event(event);
592 }
593 }
594 }
595 .instrument(debug_span!("handle_command"))
596 .boxed_local()
597 }
598
599 fn handle_role_can_login(
600 &self,
601 tx: oneshot::Sender<Result<(), AdapterError>>,
602 role_name: String,
603 ) {
604 let result =
605 match role_login_status(self.catalog().try_get_role_by_name(role_name.as_str())) {
606 RoleLoginStatus::NotFound => Err(AdapterError::AuthenticationError(
607 AuthenticationError::RoleNotFound,
608 )),
609 RoleLoginStatus::NonLogin => Err(AdapterError::AuthenticationError(
610 AuthenticationError::NonLogin,
611 )),
612 RoleLoginStatus::CanLogin => Ok(()),
613 };
614 let _ = tx.send(result);
615 }
616
617 fn handle_authenticate_verify_sasl_proof(
618 &self,
619 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
620 role_name: String,
621 proof: String,
622 auth_message: String,
623 mock_hash: String,
624 ) {
625 let role = self.catalog().try_get_role_by_name(role_name.as_str());
626 let login_status = role_login_status(role);
627 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
628 let real_hash = role_auth
629 .as_ref()
630 .and_then(|auth| auth.password_hash.as_ref());
631 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
632
633 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
634 Ok(verifier) => {
635 // Success only if role exists, allows login, and a real password hash was used.
636 if matches!(login_status, RoleLoginStatus::CanLogin) && real_hash.is_some() {
637 let _ = tx.send(Ok(SASLVerifyProofResponse { verifier }));
638 } else {
639 let _ = tx.send(Err(AdapterError::AuthenticationError(match login_status {
640 RoleLoginStatus::NonLogin => AuthenticationError::NonLogin,
641 RoleLoginStatus::NotFound => AuthenticationError::RoleNotFound,
642 RoleLoginStatus::CanLogin => AuthenticationError::InvalidCredentials,
643 })));
644 }
645 }
646 Err(_) => {
647 let _ = tx.send(Err(AdapterError::AuthenticationError(
648 AuthenticationError::InvalidCredentials,
649 )));
650 }
651 }
652 }
653
654 #[mz_ore::instrument(level = "debug")]
655 async fn handle_generate_sasl_challenge(
656 &self,
657 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
658 role_name: String,
659 client_nonce: String,
660 ) {
661 let role_auth = self
662 .catalog()
663 .try_get_role_by_name(&role_name)
664 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
665
666 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
667 Ok(n) => n,
668 Err(e) => {
669 let msg = format!(
670 "failed to generate nonce for client nonce {}: {}",
671 client_nonce, e
672 );
673 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
674 soft_panic_or_log!("{msg}");
675 return;
676 }
677 };
678
679 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
680 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
681 // with the role name to get a per-role mock nonce.
682 let send_mock_challenge =
683 |role_name: String,
684 mock_nonce: String,
685 nonce: String,
686 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
687 let opts = mz_auth::hash::mock_sasl_challenge(
688 &role_name,
689 &mock_nonce,
690 &self.catalog().system_config().scram_iterations(),
691 );
692 let _ = tx.send(Ok(SASLChallengeResponse {
693 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
694 salt: BASE64_STANDARD.encode(opts.salt),
695 nonce,
696 }));
697 };
698
699 match role_auth {
700 Some(auth) if auth.password_hash.is_some() => {
701 let hash = auth.password_hash.as_ref().expect("checked above");
702 match mz_auth::hash::scram256_parse_opts(hash) {
703 Ok(opts) => {
704 let _ = tx.send(Ok(SASLChallengeResponse {
705 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
706 salt: BASE64_STANDARD.encode(opts.salt),
707 nonce,
708 }));
709 }
710 Err(_) => {
711 send_mock_challenge(
712 role_name,
713 self.catalog().state().mock_authentication_nonce(),
714 nonce,
715 tx,
716 );
717 }
718 }
719 }
720 _ => {
721 send_mock_challenge(
722 role_name,
723 self.catalog().state().mock_authentication_nonce(),
724 nonce,
725 tx,
726 );
727 }
728 }
729 }
730
731 #[mz_ore::instrument(level = "debug")]
732 async fn handle_authenticate_password(
733 &self,
734 tx: oneshot::Sender<Result<(), AdapterError>>,
735 role_name: String,
736 password: Option<Password>,
737 ) {
738 let Some(password) = password else {
739 // The user did not provide a password.
740 let _ = tx.send(Err(AdapterError::AuthenticationError(
741 AuthenticationError::PasswordRequired,
742 )));
743 return;
744 };
745 let role = self.catalog().try_get_role_by_name(role_name.as_str());
746
747 match role_login_status(role) {
748 RoleLoginStatus::NotFound => {
749 let _ = tx.send(Err(AdapterError::AuthenticationError(
750 AuthenticationError::RoleNotFound,
751 )));
752 return;
753 }
754 RoleLoginStatus::NonLogin => {
755 let _ = tx.send(Err(AdapterError::AuthenticationError(
756 AuthenticationError::NonLogin,
757 )));
758 return;
759 }
760 RoleLoginStatus::CanLogin => {}
761 }
762
763 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
764
765 if let Some(auth) = role_auth {
766 if let Some(hash) = &auth.password_hash {
767 let hash = hash.clone();
768 task::spawn_blocking(
769 || "auth-check-hash",
770 move || {
771 let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
772 Ok(_) => tx.send(Ok(())),
773 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
774 AuthenticationError::InvalidCredentials,
775 ))),
776 };
777 },
778 );
779 return;
780 }
781 }
782 // Authentication failed due to missing password hash.
783 let _ = tx.send(Err(AdapterError::AuthenticationError(
784 AuthenticationError::InvalidCredentials,
785 )));
786 }
787
788 #[mz_ore::instrument(level = "debug")]
789 async fn handle_startup(
790 &mut self,
791 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
792 user: User,
793 conn_id: ConnectionId,
794 secret_key: u32,
795 uuid: uuid::Uuid,
796 client_ip: Option<IpAddr>,
797 application_name: String,
798 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
799 ) {
800 // Early return if successful, otherwise cleanup any possible state.
801 match self
802 .handle_startup_inner(&user, &conn_id, &client_ip, ¬ice_tx)
803 .await
804 {
805 Ok((role_id, superuser_attribute, session_defaults)) => {
806 let session_type = metrics::session_type_label_value(&user);
807 self.metrics
808 .active_sessions
809 .with_label_values(&[session_type])
810 .inc();
811 let conn = ConnMeta {
812 secret_key,
813 notice_tx,
814 drop_sinks: BTreeSet::new(),
815 pending_cluster_alters: BTreeSet::new(),
816 connected_at: self.now(),
817 user,
818 application_name,
819 uuid,
820 client_ip,
821 conn_id: conn_id.clone(),
822 authenticated_role: role_id,
823 deferred_lock: None,
824 };
825 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
826 let update = self.catalog().state().resolve_builtin_table_update(update);
827 self.begin_session_for_statement_logging(&conn);
828 self.active_conns.insert(conn_id.clone(), conn);
829
830 // Note: Do NOT await the notify here, we pass this back to
831 // whatever requested the startup to prevent blocking startup
832 // and the Coordinator on a builtin table update.
833 let updates = vec![update];
834 // It's not a hard error if our list is missing a builtin table, but we want to
835 // make sure these two things stay in-sync.
836 if mz_ore::assert::soft_assertions_enabled() {
837 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
838 .iter()
839 .map(|table| self.catalog().resolve_builtin_table(*table))
840 .collect();
841 let updates_tracked = updates
842 .iter()
843 .all(|update| required_tables.contains(&update.id));
844 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
845 .iter()
846 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
847 mz_ore::soft_assert_or_log!(
848 updates_tracked,
849 "not tracking all required builtin table updates!"
850 );
851 // TODO(parkmycar): When checking if a query depends on these builtin table
852 // writes we do not check the transitive dependencies of the query, because
853 // we don't support creating views on mz_internal objects. If one of these
854 // tables is promoted out of mz_internal then we'll need to add this check.
855 mz_ore::soft_assert_or_log!(
856 all_mz_internal,
857 "not all builtin tables are in mz_internal! need to check transitive depends",
858 )
859 }
860 let notify = self.builtin_table_update().background(updates);
861
862 let catalog = self.owned_catalog();
863 let build_info_human_version =
864 catalog.state().config().build_info.human_version(None);
865
866 let statement_logging_frontend = self
867 .statement_logging
868 .create_frontend(build_info_human_version);
869
870 let resp = Ok(StartupResponse {
871 role_id,
872 write_notify: notify,
873 session_defaults,
874 catalog,
875 storage_collections: Arc::clone(&self.controller.storage_collections),
876 transient_id_gen: Arc::clone(&self.transient_id_gen),
877 optimizer_metrics: self.optimizer_metrics.clone(),
878 persist_client: self.persist_client.clone(),
879 statement_logging_frontend,
880 superuser_attribute,
881 });
882 if tx.send(resp).is_err() {
883 // Failed to send to adapter, but everything is setup so we can terminate
884 // normally.
885 self.handle_terminate(conn_id).await;
886 }
887 }
888 Err(e) => {
889 // Error during startup or sending to adapter. A user may have been created and
890 // it can stay; no need to delete it.
891 // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
892
893 // Communicate the error back to the client. No need to
894 // handle failures to send the error back; we've already
895 // cleaned up all necessary state.
896 let _ = tx.send(Err(e));
897 }
898 }
899 }
900
901 // Failible startup work that needs to be cleaned up on error.
902 async fn handle_startup_inner(
903 &mut self,
904 user: &User,
905 _conn_id: &ConnectionId,
906 client_ip: &Option<IpAddr>,
907 notice_tx: &mpsc::UnboundedSender<AdapterNotice>,
908 ) -> Result<(RoleId, SuperuserAttribute, BTreeMap<String, OwnedVarInput>), AdapterError> {
909 if self.catalog().try_get_role_by_name(&user.name).is_none() {
910 // If the user has made it to this point, that means they have been fully authenticated.
911 // This includes preventing any user, except a pre-defined set of system users, from
912 // connecting to an internal port. Therefore it's ok to always create a new role for the
913 // user.
914 let mut attributes = RoleAttributesRaw::new();
915 // When auto-provisioning, we store the authenticator that was used to provision the role.
916 attributes.auto_provision_source = match user.authenticator_kind {
917 Some(AuthenticatorKind::Oidc) => Some(AutoProvisionSource::Oidc),
918 Some(AuthenticatorKind::Frontegg) => Some(AutoProvisionSource::Frontegg),
919 Some(AuthenticatorKind::None) => Some(AutoProvisionSource::None),
920 _ => {
921 warn!(
922 "auto-provisioning role with unexpected authenticator kind: {:?}",
923 user.authenticator_kind
924 );
925 None
926 }
927 };
928
929 // Auto-provision roles with the LOGIN attribute to distinguish
930 // them as users.
931 attributes.login = Some(true);
932
933 let plan = CreateRolePlan {
934 name: user.name.to_string(),
935 attributes,
936 };
937 self.sequence_create_role_for_startup(plan).await?;
938 }
939 let role = self
940 .catalog()
941 .try_get_role_by_name(&user.name)
942 .expect("created above");
943 let role_id = role.id;
944 let superuser_attribute = role.attributes.superuser;
945
946 // JWT group-to-role sync: reconcile role memberships with JWT group claims.
947 // Missing groups claim (None) → skip sync; empty (Some([])) → revoke all.
948 self.maybe_sync_jwt_groups(role_id, user.groups.as_deref(), notice_tx)
949 .await?;
950
951 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
952 return Err(AdapterError::UserSessionsDisallowed);
953 }
954
955 // Initialize the default session variables for this role.
956 let mut session_defaults = BTreeMap::new();
957 let system_config = self.catalog().state().system_config();
958
959 // Override the session with any system defaults.
960 session_defaults.extend(
961 system_config
962 .iter_session()
963 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
964 );
965 // Special case.
966 let statement_logging_default = system_config
967 .statement_logging_default_sample_rate()
968 .format();
969 session_defaults.insert(
970 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
971 OwnedVarInput::Flat(statement_logging_default),
972 );
973 // Override system defaults with role defaults.
974 session_defaults.extend(
975 self.catalog()
976 .get_role(&role_id)
977 .vars()
978 .map(|(name, val)| (name.to_string(), val.clone())),
979 );
980
981 // If the resolved `transaction_isolation` default names a feature-flagged
982 // isolation level whose flag is now disabled (e.g. a role default set
983 // while `bounded staleness` was enabled, then the flag turned off), drop
984 // it so the session falls back to the built-in default rather than
985 // silently using a gated level.
986 if let Some(value) = session_defaults.get(TRANSACTION_ISOLATION_VAR_NAME) {
987 if check_transaction_isolation_feature_flag(
988 TRANSACTION_ISOLATION_VAR_NAME,
989 value.borrow(),
990 system_config,
991 )
992 .is_err()
993 {
994 session_defaults.remove(TRANSACTION_ISOLATION_VAR_NAME);
995 }
996 }
997
998 // Validate network policies for external users. Internal users can only connect on the
999 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
1000 // to ensure these internal interfaces are well secured.
1001 //
1002 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
1003 // the default network policy for this role, so we read directly out of what the session
1004 // will get initialized with.
1005 if !user.is_internal() {
1006 let network_policy_name = session_defaults
1007 .get(NETWORK_POLICY.name())
1008 .and_then(|value| match value {
1009 OwnedVarInput::Flat(name) => Some(name.clone()),
1010 OwnedVarInput::SqlSet(names) => {
1011 tracing::error!(?names, "found multiple network policies");
1012 None
1013 }
1014 })
1015 .unwrap_or_else(|| system_config.default_network_policy_name());
1016 let maybe_network_policy = self
1017 .catalog()
1018 .get_network_policy_by_name(&network_policy_name);
1019
1020 let Some(network_policy) = maybe_network_policy else {
1021 // We should prevent dropping the default network policy, or setting the policy
1022 // to something that doesn't exist, so complain loudly if this occurs.
1023 tracing::error!(
1024 network_policy_name,
1025 "default network policy does not exist. All user traffic will be blocked"
1026 );
1027 let reason = match client_ip {
1028 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
1029 None => super::NetworkPolicyError::MissingIp,
1030 };
1031 return Err(AdapterError::NetworkPolicyDenied(reason));
1032 };
1033
1034 if let Some(ip) = client_ip {
1035 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
1036 Ok(_) => {}
1037 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
1038 }
1039 } else {
1040 // Only temporary and internal representation of a session
1041 // should be missing a client_ip. These sessions should not be
1042 // making requests or going through handle_startup.
1043 return Err(AdapterError::NetworkPolicyDenied(
1044 super::NetworkPolicyError::MissingIp,
1045 ));
1046 }
1047 }
1048
1049 // Temporary schemas are now created lazily when the first temporary object is created,
1050 // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
1051 // for the common case where connections never create temporary objects.
1052
1053 Ok((
1054 role_id,
1055 SuperuserAttribute(superuser_attribute),
1056 session_defaults,
1057 ))
1058 }
1059
1060 /// Handles an execute command.
1061 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
1062 pub(crate) async fn handle_execute(
1063 &mut self,
1064 portal_name: String,
1065 mut session: Session,
1066 tx: ClientTransmitter<ExecuteResponse>,
1067 // If this command was part of another execute command
1068 // (for example, executing a `FETCH` statement causes an execute to be
1069 // issued for the cursor it references),
1070 // then `outer_context` should be `Some`.
1071 // This instructs the coordinator that the
1072 // outer execute should be considered finished once the inner one is.
1073 outer_context: Option<ExecuteContextGuard>,
1074 ) {
1075 if session.vars().emit_trace_id_notice() {
1076 let span_context = tracing::Span::current()
1077 .context()
1078 .span()
1079 .span_context()
1080 .clone();
1081 if span_context.is_valid() {
1082 session.add_notice(AdapterNotice::QueryTrace {
1083 trace_id: span_context.trace_id(),
1084 });
1085 }
1086 }
1087
1088 if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
1089 // If statement logging hasn't started yet, we don't need
1090 // to add any "end" event, so just make up a no-op
1091 // `ExecuteContextExtra` here, via `Default::default`.
1092 //
1093 // It's a bit unfortunate because the edge case of failed
1094 // portal verifications won't show up in statement
1095 // logging, but there seems to be nothing else we can do,
1096 // because we need access to the portal to begin logging.
1097 //
1098 // Another option would be to log a begin and end event, but just fill in NULLs
1099 // for everything we get from the portal (prepared statement id, params).
1100 let extra = outer_context.unwrap_or_else(Default::default);
1101 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1102 return ctx.retire(Err(err));
1103 }
1104
1105 // The reference to `portal` can't outlive `session`, which we
1106 // use to construct the context, so scope the reference to this block where we
1107 // get everything we need from the portal for later.
1108 let (stmt, ctx, params) = {
1109 let portal = session
1110 .get_portal_unverified(&portal_name)
1111 .expect("known to exist");
1112 let params = portal.parameters.clone();
1113 let stmt = portal.stmt.clone();
1114 let logging = Arc::clone(&portal.logging);
1115 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
1116
1117 let extra = if let Some(extra) = outer_context {
1118 // We are executing in the context of another SQL statement, so we don't
1119 // want to begin statement logging anew. The context of the actual statement
1120 // being executed is the one that should be retired once this finishes.
1121 extra
1122 } else {
1123 // This is a new statement, log it and return the context
1124 let maybe_uuid = self.begin_statement_execution(
1125 &mut session,
1126 ¶ms,
1127 &logging,
1128 lifecycle_timestamps,
1129 );
1130
1131 ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
1132 };
1133 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
1134 (stmt, ctx, params)
1135 };
1136
1137 let stmt = match stmt {
1138 Some(stmt) => stmt,
1139 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
1140 };
1141
1142 let session_type = metrics::session_type_label_value(ctx.session().user());
1143 let stmt_type = metrics::statement_type_label_value(&stmt);
1144 self.metrics
1145 .query_total
1146 .with_label_values(&[session_type, stmt_type])
1147 .inc();
1148 match &*stmt {
1149 Statement::Subscribe(SubscribeStatement { output, .. })
1150 | Statement::Copy(CopyStatement {
1151 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
1152 ..
1153 }) => {
1154 self.metrics
1155 .subscribe_outputs
1156 .with_label_values(&[
1157 session_type,
1158 metrics::subscribe_output_label_value(output),
1159 ])
1160 .inc();
1161 }
1162 _ => {}
1163 }
1164
1165 self.handle_execute_inner(stmt, params, ctx).await
1166 }
1167
1168 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
1169 pub(crate) async fn handle_execute_inner(
1170 &mut self,
1171 stmt: Arc<Statement<Raw>>,
1172 params: Params,
1173 mut ctx: ExecuteContext,
1174 ) {
1175 // This comment describes the various ways DDL can execute (the ordered operations: name
1176 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
1177 // three notable properties that all partially interact.
1178 //
1179 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1180 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1181 // We announce success of the single DDL when it is executed, but do not attempt to plan
1182 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1183 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
1184 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1185 // work. When the single DDL is announced as successful we also put the session's
1186 // transaction ops into `SingleStatement` which will produce an error if any other
1187 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
1188 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
1189 // statement.
1190 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1191 // any number of only these DDL statements to be executed in a transaction. During
1192 // sequencing we run an incremental catalog dry run against in-memory transaction state
1193 // and store the resulting `CatalogState` in `TransactionOps::DDL`, but nothing is yet
1194 // committed to the durable catalog. At `COMMIT`, all accumulated ops are applied in one
1195 // catalog transaction. The purpose of this is to allow multiple, atomic renames in the
1196 // same transaction.
1197 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1198 // makes network calls (interfacing with secrets, optimization of views/indexes, source
1199 // purification). These must guarantee correctness when they return to the main
1200 // coordinator thread because the catalog state could have changed while they were doing
1201 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1202 // of IDs that we needed to exist. We discovered the way we were doing that was not
1203 // always correct. Instead of attempting to get that completely right, we have opted to
1204 // serialize DDL. Getting this right is difficult because catalog changes can affect name
1205 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
1206 // aware of all possible catalog changes that would affect any of those parts is not
1207 // something our current code has been designed to be helpful at. Even if a DDL statement
1208 // is doing off-thread work, another DDL must not yet execute at all. Executing these
1209 // serially will guarantee that no off-thread work has affected the state of the catalog.
1210 // This is done by adding a VecDeque of deferred statements and a lock to the
1211 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1212 // transaction ops are needed to the session as described above), it attempts to own the
1213 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1214 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1215 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
1216 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1217 // awaits dequeuing caused by the lock being released.
1218
1219 // Verify that this statement type can be executed in the current
1220 // transaction state.
1221 match ctx.session().transaction() {
1222 // By this point we should be in a running transaction.
1223 TransactionStatus::Default => unreachable!(),
1224
1225 // Failed transactions have already been checked in pgwire for a safe statement
1226 // (COMMIT, ROLLBACK, etc.) and can proceed.
1227 TransactionStatus::Failed(_) => {}
1228
1229 // Started is a deceptive name, and means different things depending on which
1230 // protocol was used. It's either exactly one statement (known because this
1231 // is the simple protocol and the parser parsed the entire string, and it had
1232 // one statement). Or from the extended protocol, it means *some* query is
1233 // being executed, but there might be others after it before the Sync (commit)
1234 // message. Postgres handles this by teaching Started to eagerly commit certain
1235 // statements that can't be run in a transaction block.
1236 TransactionStatus::Started(_) => {
1237 if let Statement::Declare(_) = &*stmt {
1238 // Declare is an exception. Although it's not against any spec to execute
1239 // it, it will always result in nothing happening, since all portals will be
1240 // immediately closed. Users don't know this detail, so this error helps them
1241 // understand what's going wrong. Postgres does this too.
1242 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1243 "DECLARE CURSOR".into(),
1244 )));
1245 }
1246 }
1247
1248 // Implicit or explicit transactions.
1249 //
1250 // Implicit transactions happen when a multi-statement query is executed
1251 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1252 // then the existing implicit transaction will be upgraded to an explicit
1253 // transaction. Thus, we should not separate what implicit and explicit
1254 // transactions can do unless there's some additional checking to make sure
1255 // something disallowed in explicit transactions did not previously take place
1256 // in the implicit portion.
1257 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1258 match &*stmt {
1259 // Statements that are safe in a transaction. We still need to verify that we
1260 // don't interleave reads and writes since we can't perform those serializably.
1261 Statement::Close(_)
1262 | Statement::Commit(_)
1263 | Statement::Copy(_)
1264 | Statement::Deallocate(_)
1265 | Statement::Declare(_)
1266 | Statement::Discard(_)
1267 | Statement::Execute(_)
1268 | Statement::ExplainPlan(_)
1269 | Statement::ExplainPushdown(_)
1270 | Statement::ExplainAnalyzeObject(_)
1271 | Statement::ExplainAnalyzeCluster(_)
1272 | Statement::ExplainTimestamp(_)
1273 | Statement::ExplainSinkSchema(_)
1274 | Statement::Fetch(_)
1275 | Statement::Prepare(_)
1276 | Statement::Rollback(_)
1277 | Statement::Select(_)
1278 | Statement::SetTransaction(_)
1279 | Statement::Show(_)
1280 | Statement::SetVariable(_)
1281 | Statement::ResetVariable(_)
1282 | Statement::StartTransaction(_)
1283 | Statement::Subscribe(_)
1284 | Statement::Raise(_) => {
1285 // Always safe.
1286 }
1287
1288 Statement::Insert(InsertStatement {
1289 source, returning, ..
1290 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1291 // Inserting from constant values statements that do not need to execute on
1292 // any cluster (no RETURNING) is always safe.
1293 }
1294
1295 // These statements must be kept in-sync with `must_serialize_ddl()`.
1296 Statement::AlterObjectRename(_)
1297 | Statement::AlterObjectSwap(_)
1298 | Statement::CreateTableFromSource(_)
1299 | Statement::CreateSource(_) => {
1300 let state = self.catalog().for_session(ctx.session()).state().clone();
1301 let revision = self.catalog().transient_revision();
1302
1303 // Initialize our transaction with a set of empty ops, or return an error
1304 // if we can't run a DDL transaction
1305 let txn_status = ctx.session_mut().transaction_mut();
1306 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1307 ops: vec![],
1308 state,
1309 revision,
1310 side_effects: vec![],
1311 snapshot: None,
1312 }) {
1313 return ctx.retire(Err(err));
1314 }
1315 }
1316
1317 // Statements below must by run singly (in Started).
1318 Statement::AlterCluster(_)
1319 | Statement::AlterConnection(_)
1320 | Statement::AlterDefaultPrivileges(_)
1321 | Statement::AlterIndex(_)
1322 | Statement::AlterMaterializedViewApplyReplacement(_)
1323 | Statement::AlterSetCluster(_)
1324 | Statement::AlterOwner(_)
1325 | Statement::AlterRetainHistory(_)
1326 | Statement::AlterRole(_)
1327 | Statement::AlterSecret(_)
1328 | Statement::AlterSink(_)
1329 | Statement::AlterSource(_)
1330 | Statement::AlterSystemReset(_)
1331 | Statement::AlterSystemResetAll(_)
1332 | Statement::AlterSystemSet(_)
1333 | Statement::AlterTableAddColumn(_)
1334 | Statement::AlterNetworkPolicy(_)
1335 | Statement::CreateCluster(_)
1336 | Statement::CreateClusterReplica(_)
1337 | Statement::CreateConnection(_)
1338 | Statement::CreateDatabase(_)
1339 | Statement::CreateIndex(_)
1340 | Statement::CreateMaterializedView(_)
1341 | Statement::CreateRole(_)
1342 | Statement::CreateSchema(_)
1343 | Statement::CreateSecret(_)
1344 | Statement::CreateSink(_)
1345 | Statement::CreateSubsource(_)
1346 | Statement::CreateTable(_)
1347 | Statement::CreateType(_)
1348 | Statement::CreateView(_)
1349 | Statement::CreateWebhookSource(_)
1350 | Statement::CreateNetworkPolicy(_)
1351 | Statement::Delete(_)
1352 | Statement::DropObjects(_)
1353 | Statement::DropOwned(_)
1354 | Statement::GrantPrivileges(_)
1355 | Statement::GrantRole(_)
1356 | Statement::Insert(_)
1357 | Statement::ReassignOwned(_)
1358 | Statement::RevokePrivileges(_)
1359 | Statement::RevokeRole(_)
1360 | Statement::Update(_)
1361 | Statement::ValidateConnection(_)
1362 | Statement::Comment(_)
1363 | Statement::ExecuteUnitTest(_) => {
1364 let txn_status = ctx.session_mut().transaction_mut();
1365
1366 // If we're not in an implicit transaction and we could generate exactly one
1367 // valid ExecuteResponse, we can delay execution until commit.
1368 if !txn_status.is_implicit() {
1369 // Statements whose tag is trivial (known only from an unexecuted statement) can
1370 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1371 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1372 // delay execution until `COMMIT`.
1373 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1374 if let Err(err) = txn_status
1375 .add_ops(TransactionOps::SingleStatement { stmt, params })
1376 {
1377 ctx.retire(Err(err));
1378 return;
1379 }
1380 ctx.retire(Ok(resp));
1381 return;
1382 }
1383 }
1384
1385 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1386 stmt.to_string(),
1387 )));
1388 }
1389 }
1390 }
1391 }
1392
1393 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1394 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1395 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1396 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1397 // Coordinator::clear_transaction is correctly called when session transactions are ended
1398 // because that function will release the held lock from active_conns.
1399 if Self::must_serialize_ddl(&stmt, &ctx) {
1400 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1401 let prev = self
1402 .active_conns
1403 .get_mut(ctx.session().conn_id())
1404 .expect("connection must exist")
1405 .deferred_lock
1406 .replace(guard);
1407 assert!(
1408 prev.is_none(),
1409 "connections should have at most one lock guard"
1410 );
1411 } else {
1412 if self
1413 .active_conns
1414 .get(ctx.session().conn_id())
1415 .expect("connection must exist")
1416 .deferred_lock
1417 .is_some()
1418 {
1419 // This session *already* has the lock, and incorrectly tried to execute another
1420 // DDL while still holding the lock, violating the assumption documented above.
1421 // This is an internal error, probably in some AdapterClient user (pgwire or
1422 // http). Because the session is now in some unexpected state, return an error
1423 // which should cause the AdapterClient user to fail the transaction.
1424 // (Terminating the connection is maybe what we would prefer to do, but is not
1425 // currently a thing we can do from the coordinator: calling handle_terminate
1426 // cleans up Coordinator state for the session but doesn't inform the
1427 // AdapterClient that the session should terminate.)
1428 soft_panic_or_log!(
1429 "session {} attempted to get ddl lock while already owning it",
1430 ctx.session().conn_id()
1431 );
1432 ctx.retire(Err(AdapterError::Internal(
1433 "session attempted to get ddl lock while already owning it".to_string(),
1434 )));
1435 return;
1436 }
1437 self.serialized_ddl.push_back(DeferredPlanStatement {
1438 ctx,
1439 ps: PlanStatement::Statement { stmt, params },
1440 });
1441 return;
1442 }
1443 }
1444
1445 let catalog = self.catalog();
1446 let catalog = catalog.for_session(ctx.session());
1447 let original_stmt = Arc::clone(&stmt);
1448 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1449 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1450 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1451 Ok(resolved) => resolved,
1452 Err(e) => return ctx.retire(Err(e.into())),
1453 };
1454 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1455 // purification. This should be done back on the main thread.
1456 // We do the validation:
1457 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1458 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1459 // occurs.
1460 let (stmt, resolved_ids) = match stmt {
1461 // Various statements must be purified off the main coordinator thread of control.
1462 stmt if Self::must_spawn_purification(&stmt) => {
1463 let internal_cmd_tx = self.internal_cmd_tx.clone();
1464 let conn_id = ctx.session().conn_id().clone();
1465 let catalog = self.owned_catalog();
1466 let now = self.now();
1467 let otel_ctx = OpenTelemetryContext::obtain();
1468 let current_storage_configuration = self.controller.storage.config().clone();
1469 task::spawn(|| format!("purify:{conn_id}"), async move {
1470 let transient_revision = catalog.transient_revision();
1471 let catalog = catalog.for_session(ctx.session());
1472
1473 // Checks if the session is authorized to purify a statement. Usually
1474 // authorization is checked after planning, however purification happens before
1475 // planning, which may require the use of some connections and secrets.
1476 if let Err(e) = rbac::check_usage(
1477 &catalog,
1478 ctx.session(),
1479 &resolved_ids,
1480 &CREATE_ITEM_USAGE,
1481 ) {
1482 return ctx.retire(Err(e.into()));
1483 }
1484
1485 let (result, cluster_id) = mz_sql::pure::purify_statement(
1486 catalog,
1487 now,
1488 stmt,
1489 ¤t_storage_configuration,
1490 )
1491 .await;
1492 let result = result.map_err(|e| e.into());
1493 let dependency_ids = resolved_ids.items().copied().collect();
1494 let plan_validity = PlanValidity::new(
1495 transient_revision,
1496 dependency_ids,
1497 cluster_id,
1498 None,
1499 ctx.session().role_metadata().clone(),
1500 );
1501 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1502 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1503 PurifiedStatementReady {
1504 ctx,
1505 result,
1506 params,
1507 plan_validity,
1508 original_stmt,
1509 otel_ctx,
1510 },
1511 ));
1512 if let Err(e) = result {
1513 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1514 }
1515 });
1516 return;
1517 }
1518
1519 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1520 // automatically as part of purification
1521 Statement::CreateSubsource(_) => {
1522 ctx.retire(Err(AdapterError::Unsupported(
1523 "CREATE SUBSOURCE statements",
1524 )));
1525 return;
1526 }
1527
1528 Statement::CreateMaterializedView(mut cmvs) => {
1529 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1530 // only used for storing initial frontiers in the catalog.
1531 if cmvs.as_of.is_some() {
1532 return ctx.retire(Err(AdapterError::Unsupported(
1533 "CREATE MATERIALIZED VIEW ... AS OF statements",
1534 )));
1535 }
1536
1537 let mz_now = match self
1538 .resolve_mz_now_for_create_materialized_view(
1539 &cmvs,
1540 &resolved_ids,
1541 ctx.session_mut(),
1542 true,
1543 )
1544 .await
1545 {
1546 Ok(mz_now) => mz_now,
1547 Err(e) => return ctx.retire(Err(e)),
1548 };
1549
1550 let catalog = self.catalog().for_session(ctx.session());
1551
1552 purify_create_materialized_view_options(
1553 catalog,
1554 mz_now,
1555 &mut cmvs,
1556 &mut resolved_ids,
1557 );
1558
1559 let purified_stmt =
1560 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1561 if_exists: cmvs.if_exists,
1562 name: cmvs.name,
1563 columns: cmvs.columns,
1564 replacement_for: cmvs.replacement_for,
1565 in_cluster: cmvs.in_cluster,
1566 in_cluster_replica: cmvs.in_cluster_replica,
1567 query: cmvs.query,
1568 with_options: cmvs.with_options,
1569 as_of: None,
1570 });
1571
1572 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1573 // `Message::PurifiedStatementReady` here.)
1574 (purified_stmt, resolved_ids)
1575 }
1576
1577 Statement::ExplainPlan(ExplainPlanStatement {
1578 stage,
1579 with_options,
1580 format,
1581 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1582 }) => {
1583 let mut cmvs = *box_cmvs;
1584 let mz_now = match self
1585 .resolve_mz_now_for_create_materialized_view(
1586 &cmvs,
1587 &resolved_ids,
1588 ctx.session_mut(),
1589 false,
1590 )
1591 .await
1592 {
1593 Ok(mz_now) => mz_now,
1594 Err(e) => return ctx.retire(Err(e)),
1595 };
1596
1597 let catalog = self.catalog().for_session(ctx.session());
1598
1599 purify_create_materialized_view_options(
1600 catalog,
1601 mz_now,
1602 &mut cmvs,
1603 &mut resolved_ids,
1604 );
1605
1606 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1607 stage,
1608 with_options,
1609 format,
1610 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1611 });
1612
1613 (purified_stmt, resolved_ids)
1614 }
1615
1616 // All other statements are handled immediately.
1617 _ => (stmt, resolved_ids),
1618 };
1619
1620 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1621 Ok((plan, sql_impl_ids)) => {
1622 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
1623 .await
1624 }
1625 Err(e) => ctx.retire(Err(e)),
1626 }
1627 }
1628
1629 /// Whether the statement must be serialized and is DDL.
1630 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1631 // Non-DDL is not serialized here.
1632 if !StatementClassification::from(&*stmt).is_ddl() {
1633 return false;
1634 }
1635 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1636 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1637 // those checks are sufficient.
1638 if Self::must_spawn_purification(stmt) {
1639 return false;
1640 }
1641
1642 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1643 // Their operations are serialized when applied to the catalog, guaranteeing that any
1644 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1645 if ctx.session.transaction().is_ddl() {
1646 return false;
1647 }
1648
1649 // Some DDL is exempt. It is not great that we are matching on Statements here because
1650 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1651 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1652 // planned in the first place, so we accept the abstraction leak.
1653 match stmt {
1654 // Secrets have a small and understood set of dependencies, and their off-thread work
1655 // interacts with k8s.
1656 Statement::AlterSecret(_) => false,
1657 Statement::CreateSecret(_) => false,
1658 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1659 if actions
1660 .iter()
1661 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1662 {
1663 false
1664 }
1665
1666 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1667 // does not affect its catalog names or ids and so is safe to not serialize. This could
1668 // change the set of replicas that exist. For queries that name replicas or use the
1669 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1670 // ensure that those replicas exist during the query finish stage. Additionally, that
1671 // work can take hours (configured by the user), so would also be a bad experience for
1672 // users.
1673 Statement::AlterCluster(_) => false,
1674
1675 // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1676 // cutover. If the old collection is stalled, it may block forever. Checks in
1677 // sequencing ensure that the operation fails if any one of these happens concurrently:
1678 // * the sink is dropped
1679 // * the new source relation is dropped
1680 // * another `ALTER SINK` for the same sink is applied first
1681 Statement::AlterSink(stmt)
1682 if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1683 {
1684 false
1685 }
1686
1687 // `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT` waits for the target MV to make
1688 // enough progress for a clean cutover. If the target MV is stalled, it may block
1689 // forever. Checks in sequencing ensure the operation fails if any of these happens
1690 // concurrently:
1691 // * the target MV is dropped
1692 // * the replacement MV is dropped
1693 Statement::AlterMaterializedViewApplyReplacement(_) => false,
1694
1695 // Everything else must be serialized.
1696 _ => true,
1697 }
1698 }
1699
1700 /// Whether the statement must be purified off of the Coordinator thread.
1701 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1702 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1703 // coordinator thread.
1704 if !matches!(
1705 stmt,
1706 Statement::CreateSource(_)
1707 | Statement::AlterSource(_)
1708 | Statement::CreateSink(_)
1709 | Statement::CreateTableFromSource(_)
1710 ) {
1711 return false;
1712 }
1713
1714 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1715 if let Statement::AlterSource(stmt) = stmt {
1716 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1717 AlterSourceAction::SetOptions(options) => {
1718 options.iter().map(|o| o.name.clone()).collect()
1719 }
1720 AlterSourceAction::ResetOptions(names) => names.clone(),
1721 _ => vec![],
1722 };
1723 if !names.is_empty()
1724 && names
1725 .iter()
1726 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1727 {
1728 return false;
1729 }
1730 }
1731
1732 true
1733 }
1734
1735 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1736 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1737 /// option, this function grabs read holds at the earliest possible time on input collections
1738 /// that might be involved in the MV.
1739 ///
1740 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1741 /// in `with_options`).
1742 ///
1743 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1744 /// unfortunately.)
1745 async fn resolve_mz_now_for_create_materialized_view(
1746 &mut self,
1747 cmvs: &CreateMaterializedViewStatement<Aug>,
1748 resolved_ids: &ResolvedIds,
1749 session: &Session,
1750 acquire_read_holds: bool,
1751 ) -> Result<Option<Timestamp>, AdapterError> {
1752 if cmvs
1753 .with_options
1754 .iter()
1755 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1756 {
1757 let catalog = self.catalog().for_session(session);
1758 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1759 let ids = self
1760 .index_oracle(cluster)
1761 .sufficient_collections(resolved_ids.collections().copied());
1762
1763 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1764 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1765 // we want to check the AT times against the read holds that we acquire here. But
1766 // we do it for any REFRESH option, to avoid having so many code paths doing different
1767 // things.)
1768 //
1769 // It's important that we acquire read holds _before_ we determine the least valid read.
1770 // Otherwise, we're not guaranteed that the since frontier doesn't
1771 // advance forward from underneath us.
1772 let read_holds = self.acquire_read_holds(&ids);
1773
1774 // Does `mz_now()` occur?
1775 let mz_now_ts = if cmvs
1776 .with_options
1777 .iter()
1778 .any(materialized_view_option_contains_temporal)
1779 {
1780 let timeline_context = self
1781 .catalog()
1782 .validate_timeline_context(resolved_ids.collections().copied())?;
1783
1784 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1785 // but even in the TimestampIndependent case.
1786 // Note that we didn't accurately decide whether we are TimestampDependent
1787 // or TimestampIndependent, because for this we'd need to also check whether
1788 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1789 // However, this doesn't matter here, as we are just going to default to
1790 // EpochMilliseconds in both cases.
1791 let timeline = timeline_context
1792 .timeline()
1793 .unwrap_or(&Timeline::EpochMilliseconds);
1794
1795 // Let's start with the timestamp oracle read timestamp.
1796 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1797
1798 // If `least_valid_read` is later than the oracle, then advance to that time.
1799 // If we didn't do this, then there would be a danger of missing the first refresh,
1800 // which might cause the materialized view to be unreadable for hours. This might
1801 // be what was happening here:
1802 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1803 //
1804 // In the long term, it would be good to actually block the MV creation statement
1805 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1806 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1807 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1808 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1809 // VIEW statement returned.
1810 let oracle_timestamp = timestamp;
1811 let least_valid_read = read_holds.least_valid_read();
1812 timestamp.advance_by(least_valid_read.borrow());
1813
1814 if oracle_timestamp != timestamp {
1815 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1816 }
1817
1818 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1819 Ok(Some(timestamp))
1820 } else {
1821 Ok(None)
1822 };
1823
1824 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1825 // released when we don't use it.
1826 if acquire_read_holds {
1827 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1828 }
1829
1830 mz_now_ts
1831 } else {
1832 Ok(None)
1833 }
1834 }
1835
1836 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1837 /// the named `conn_id` if the correct secret key is specified.
1838 ///
1839 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1840 /// `ConnectionId` because this method gets called by external clients when
1841 /// they request to cancel a request.
1842 #[mz_ore::instrument(level = "debug")]
1843 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1844 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1845 // If the secret key specified by the client doesn't match the
1846 // actual secret key for the target connection, we treat this as a
1847 // rogue cancellation request and ignore it.
1848 if conn_meta.secret_key != secret_key {
1849 return;
1850 }
1851
1852 // Now that we've verified the secret key, this is a privileged
1853 // cancellation request. We can upgrade the raw connection ID to a
1854 // proper `IdHandle`.
1855 self.handle_privileged_cancel(id_handle.clone()).await;
1856 }
1857 }
1858
1859 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1860 /// interactive work for the named `conn_id`.
1861 #[mz_ore::instrument(level = "debug")]
1862 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1863 let mut maybe_ctx = None;
1864
1865 // Cancel pending writes. There is at most one pending write per session.
1866 let pending_write_idx = self.pending_writes.iter().position(|pending_write_txn| {
1867 matches!(pending_write_txn, PendingWriteTxn::User {
1868 responder: UserWriteResponder::Session(PendingTxn { ctx, .. }),
1869 ..
1870 } if *ctx.session().conn_id() == conn_id)
1871 });
1872 if let Some(idx) = pending_write_idx {
1873 if let PendingWriteTxn::User {
1874 responder: UserWriteResponder::Session(PendingTxn { ctx, .. }),
1875 ..
1876 } = self.pending_writes.remove(idx)
1877 {
1878 maybe_ctx = Some(ctx);
1879 }
1880 }
1881
1882 // Cancel deferred writes.
1883 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1884 maybe_ctx = Some(write_op.into_ctx());
1885 }
1886
1887 // Cancel deferred statements.
1888 let deferred_ddl_idx = self
1889 .serialized_ddl
1890 .iter()
1891 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id);
1892 if let Some(idx) = deferred_ddl_idx {
1893 let deferred = self
1894 .serialized_ddl
1895 .remove(idx)
1896 .expect("known to exist from call to `position` above");
1897 maybe_ctx = Some(deferred.ctx);
1898 }
1899
1900 // Cancel reads waiting on being linearized. There is at most one linearized read per
1901 // session.
1902 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1903 let ctx = pending_read_txn.take_context();
1904 maybe_ctx = Some(ctx);
1905 }
1906
1907 if let Some(ctx) = maybe_ctx {
1908 ctx.retire(Err(AdapterError::Canceled));
1909 }
1910
1911 self.cancel_pending_peeks(&conn_id);
1912 self.cancel_pending_watchsets(&conn_id);
1913 self.cancel_compute_sinks_for_conn(&conn_id).await;
1914 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1915 .await;
1916 self.cancel_pending_copy(&conn_id);
1917 if let Some((tx, _rx)) = self.connection_cancel_watches.get_mut(&conn_id) {
1918 let _ = tx.send(true);
1919 }
1920 }
1921
1922 /// Handle termination of a client session.
1923 ///
1924 /// This cleans up any state in the coordinator associated with the session.
1925 #[mz_ore::instrument(level = "debug")]
1926 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1927 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1928 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1929 // debug. This panic is very infrequent so we want as much information as possible.
1930 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1931 assert!(
1932 self.active_conns.contains_key(&conn_id),
1933 "unknown connection: {conn_id:?}\n\n{self:?}"
1934 );
1935
1936 // We do not need to call clear_transaction here because there are no side effects to run
1937 // based on any session transaction state.
1938 self.clear_connection(&conn_id).await;
1939
1940 self.drop_temp_items(&conn_id).await;
1941 // Only call catalog_mut() if a temporary schema actually exists for this connection.
1942 // This avoids an expensive Arc::make_mut clone for the common case where the connection
1943 // never created any temporary objects.
1944 if self.catalog().state().has_temporary_schema(&conn_id) {
1945 self.catalog_mut()
1946 .drop_temporary_schema(&conn_id)
1947 .unwrap_or_terminate("unable to drop temporary schema");
1948 }
1949 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1950 let session_type = metrics::session_type_label_value(conn.user());
1951 self.metrics
1952 .active_sessions
1953 .with_label_values(&[session_type])
1954 .dec();
1955 self.cancel_pending_peeks(conn.conn_id());
1956 self.cancel_pending_watchsets(&conn_id);
1957 self.cancel_pending_copy(&conn_id);
1958 self.end_session_for_statement_logging(conn.uuid());
1959
1960 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1961 // this to prevent blocking the Coordinator in the case that a lot of connections are
1962 // closed at once, which occurs regularly in some workflows.
1963 let update = self
1964 .catalog()
1965 .state()
1966 .pack_session_update(&conn, Diff::MINUS_ONE);
1967 let update = self.catalog().state().resolve_builtin_table_update(update);
1968
1969 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1970 }
1971
1972 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1973 /// rows.
1974 #[mz_ore::instrument(level = "debug")]
1975 fn handle_get_webhook(
1976 &mut self,
1977 database: String,
1978 schema: String,
1979 name: String,
1980 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1981 ) {
1982 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1983 ///
1984 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1985 /// types we should cast the request to.
1986 fn resolve(
1987 coord: &mut Coordinator,
1988 database: String,
1989 schema: String,
1990 name: String,
1991 ) -> Result<AppendWebhookResponse, PartialItemName> {
1992 // Resolve our collection.
1993 let name = PartialItemName {
1994 database: Some(database),
1995 schema: Some(schema),
1996 item: name,
1997 };
1998 let Ok(entry) = coord
1999 .catalog()
2000 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
2001 else {
2002 return Err(name);
2003 };
2004
2005 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
2006 let (data_source, desc, global_id) = match entry.item() {
2007 CatalogItem::Source(Source {
2008 data_source: data_source @ DataSourceDesc::Webhook { .. },
2009 desc,
2010 global_id,
2011 ..
2012 }) => (data_source, desc.clone(), *global_id),
2013 CatalogItem::Table(
2014 table @ Table {
2015 desc,
2016 data_source:
2017 TableDataSource::DataSource {
2018 desc: data_source @ DataSourceDesc::Webhook { .. },
2019 ..
2020 },
2021 ..
2022 },
2023 ) => (data_source, desc.latest(), table.global_id_writes()),
2024 _ => return Err(name),
2025 };
2026
2027 let DataSourceDesc::Webhook {
2028 validate_using,
2029 body_format,
2030 headers,
2031 ..
2032 } = data_source
2033 else {
2034 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
2035 return Err(name);
2036 };
2037 let body_format = body_format.clone();
2038 let header_tys = headers.clone();
2039
2040 // Assert we have one column for the body, and how ever many are required for
2041 // the headers.
2042 let num_columns = headers.num_columns() + 1;
2043 mz_ore::soft_assert_or_log!(
2044 desc.arity() <= num_columns,
2045 "expected at most {} columns, but got {}",
2046 num_columns,
2047 desc.arity()
2048 );
2049
2050 // Double check that the body column of the webhook source matches the type
2051 // we're about to deserialize as.
2052 let body_column = desc
2053 .get_by_name(&"body".into())
2054 .map(|(_idx, ty)| ty.clone())
2055 .ok_or_else(|| name.clone())?;
2056 assert!(!body_column.nullable, "webhook body column is nullable!?");
2057 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
2058
2059 // Create a validator that can be called to validate a webhook request.
2060 let validator = validate_using.as_ref().map(|v| {
2061 let validation = v.clone();
2062 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
2063 });
2064
2065 // Get a channel so we can queue updates to be written.
2066 let row_tx = coord
2067 .controller
2068 .storage
2069 .monotonic_appender(global_id)
2070 .map_err(|_| name.clone())?;
2071 let stats = coord
2072 .controller
2073 .storage
2074 .webhook_statistics(global_id)
2075 .map_err(|_| name)?;
2076 let invalidator = coord
2077 .active_webhooks
2078 .entry(entry.id())
2079 .or_insert_with(WebhookAppenderInvalidator::new);
2080 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
2081
2082 Ok(AppendWebhookResponse {
2083 tx,
2084 body_format,
2085 header_tys,
2086 validator,
2087 })
2088 }
2089
2090 let response = resolve(self, database, schema, name).map_err(|name| {
2091 AppendWebhookError::UnknownWebhook {
2092 database: name.database.expect("provided"),
2093 schema: name.schema.expect("provided"),
2094 name: name.item,
2095 }
2096 });
2097 let _ = tx.send(response);
2098 }
2099
2100 /// Handle registration of a frontend peek, for statement logging and query cancellation
2101 /// handling.
2102 fn handle_register_frontend_peek(
2103 &mut self,
2104 uuid: Uuid,
2105 conn_id: ConnectionId,
2106 cluster_id: mz_controller_types::ClusterId,
2107 depends_on: BTreeSet<GlobalId>,
2108 is_fast_path: bool,
2109 watch_set: Option<WatchSetCreation>,
2110 tx: oneshot::Sender<Result<(), AdapterError>>,
2111 ) {
2112 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
2113 if let Some(ws) = watch_set {
2114 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
2115 let _ = tx.send(Err(
2116 AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
2117 ));
2118 return;
2119 }
2120 }
2121
2122 // Store the peek in pending_peeks for later retrieval when results arrive
2123 self.pending_peeks.insert(
2124 uuid,
2125 PendingPeek {
2126 conn_id: conn_id.clone(),
2127 cluster_id,
2128 depends_on,
2129 ctx_extra: ExecuteContextGuard::new(
2130 statement_logging_id,
2131 self.internal_cmd_tx.clone(),
2132 ),
2133 is_fast_path,
2134 },
2135 );
2136
2137 // Also track it by connection ID for cancellation support
2138 self.client_pending_peeks
2139 .entry(conn_id)
2140 .or_default()
2141 .insert(uuid, cluster_id);
2142
2143 let _ = tx.send(Ok(()));
2144 }
2145
2146 /// Handle unregistration of a frontend peek that was registered but failed to issue.
2147 /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
2148 fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
2149 // Remove from pending_peeks (this also removes from client_pending_peeks)
2150 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
2151 // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
2152 let _ = pending_peek.ctx_extra.defuse();
2153 }
2154 let _ = tx.send(());
2155 }
2156}