mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34 AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
35 ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
36 SubscribeStatement,
37};
38use mz_sql::catalog::RoleAttributesRaw;
39use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
40use mz_sql::plan::{
41 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
42 StatementClassification, TransactionType,
43};
44use mz_sql::pure::{
45 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
46};
47use mz_sql::rbac;
48use mz_sql::rbac::CREATE_ITEM_USAGE;
49use mz_sql::session::user::User;
50use mz_sql::session::vars::{
51 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
52};
53use mz_sql_parser::ast::display::AstDisplay;
54use mz_sql_parser::ast::{
55 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
56 WithOptionValue,
57};
58use mz_storage_types::sources::Timeline;
59use opentelemetry::trace::TraceContextExt;
60use tokio::sync::{mpsc, oneshot};
61use tracing::{Instrument, debug_span, info, warn};
62use tracing_opentelemetry::OpenTelemetrySpanExt;
63use uuid::Uuid;
64
65use crate::command::{
66 AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse,
67 SASLVerifyProofResponse, StartupResponse,
68};
69use crate::coord::appends::PendingWriteTxn;
70use crate::coord::peek::PendingPeek;
71use crate::coord::{
72 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
73 PurifiedStatementReady, validate_ip_with_policy_rules,
74};
75use crate::error::{AdapterError, AuthenticationError};
76use crate::notice::AdapterNotice;
77use crate::session::{Session, TransactionOps, TransactionStatus};
78use crate::statement_logging::WatchSetCreation;
79use crate::util::{ClientTransmitter, ResultExt};
80use crate::webhook::{
81 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
82};
83use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
84
85use super::ExecuteContextGuard;
86
87impl Coordinator {
88 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
89 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
90 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
91 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
92 async move {
93 if let Some(session) = cmd.session_mut() {
94 session.apply_external_metadata_updates();
95 }
96 match cmd {
97 Command::Startup {
98 tx,
99 user,
100 conn_id,
101 secret_key,
102 uuid,
103 client_ip,
104 application_name,
105 notice_tx,
106 } => {
107 // Note: We purposefully do not use a ClientTransmitter here because startup
108 // handles errors and cleanup of sessions itself.
109 self.handle_startup(
110 tx,
111 user,
112 conn_id,
113 secret_key,
114 uuid,
115 client_ip,
116 application_name,
117 notice_tx,
118 )
119 .await;
120 }
121
122 Command::AuthenticatePassword {
123 tx,
124 role_name,
125 password,
126 } => {
127 self.handle_authenticate_password(tx, role_name, password)
128 .await;
129 }
130
131 Command::AuthenticateGetSASLChallenge {
132 tx,
133 role_name,
134 nonce,
135 } => {
136 self.handle_generate_sasl_challenge(tx, role_name, nonce)
137 .await;
138 }
139
140 Command::AuthenticateVerifySASLProof {
141 tx,
142 role_name,
143 proof,
144 mock_hash,
145 auth_message,
146 } => {
147 self.handle_authenticate_verify_sasl_proof(
148 tx,
149 role_name,
150 proof,
151 auth_message,
152 mock_hash,
153 );
154 }
155
156 Command::Execute {
157 portal_name,
158 session,
159 tx,
160 outer_ctx_extra,
161 } => {
162 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
163
164 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
165 .await;
166 }
167
168 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
169
170 Command::CancelRequest {
171 conn_id,
172 secret_key,
173 } => {
174 self.handle_cancel(conn_id, secret_key).await;
175 }
176
177 Command::PrivilegedCancelRequest { conn_id } => {
178 self.handle_privileged_cancel(conn_id).await;
179 }
180
181 Command::GetWebhook {
182 database,
183 schema,
184 name,
185 tx,
186 } => {
187 self.handle_get_webhook(database, schema, name, tx);
188 }
189
190 Command::GetSystemVars { tx } => {
191 let _ = tx.send(self.catalog.system_config().clone());
192 }
193
194 Command::SetSystemVars { vars, conn_id, tx } => {
195 let mut ops = Vec::with_capacity(vars.len());
196 let conn = &self.active_conns[&conn_id];
197
198 for (name, value) in vars {
199 if let Err(e) =
200 self.catalog().system_config().get(&name).and_then(|var| {
201 var.visible(conn.user(), self.catalog.system_config())
202 })
203 {
204 let _ = tx.send(Err(e.into()));
205 return;
206 }
207
208 ops.push(catalog::Op::UpdateSystemConfiguration {
209 name,
210 value: OwnedVarInput::Flat(value),
211 });
212 }
213
214 let result = self
215 .catalog_transact_with_context(Some(&conn_id), None, ops)
216 .await;
217 let _ = tx.send(result);
218 }
219
220 Command::Terminate { conn_id, tx } => {
221 self.handle_terminate(conn_id).await;
222 // Note: We purposefully do not use a ClientTransmitter here because we're already
223 // terminating the provided session.
224 if let Some(tx) = tx {
225 let _ = tx.send(Ok(()));
226 }
227 }
228
229 Command::Commit {
230 action,
231 session,
232 tx,
233 } => {
234 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
235 // We reach here not through a statement execution, but from the
236 // "commit" pgwire command. Thus, we just generate a default statement
237 // execution context (once statement logging is implemented, this will cause nothing to be logged
238 // when the execution finishes.)
239 let ctx = ExecuteContext::from_parts(
240 tx,
241 self.internal_cmd_tx.clone(),
242 session,
243 Default::default(),
244 );
245 let plan = match action {
246 EndTransactionAction::Commit => {
247 Plan::CommitTransaction(CommitTransactionPlan {
248 transaction_type: TransactionType::Implicit,
249 })
250 }
251 EndTransactionAction::Rollback => {
252 Plan::AbortTransaction(AbortTransactionPlan {
253 transaction_type: TransactionType::Implicit,
254 })
255 }
256 };
257
258 let conn_id = ctx.session().conn_id().clone();
259 self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
260 // Part of the Command::Commit contract is that the Coordinator guarantees that
261 // it has cleared its transaction state for the connection.
262 self.clear_connection(&conn_id).await;
263 }
264
265 Command::CatalogSnapshot { tx } => {
266 let _ = tx.send(CatalogSnapshot {
267 catalog: self.owned_catalog(),
268 });
269 }
270
271 Command::CheckConsistency { tx } => {
272 let _ = tx.send(self.check_consistency());
273 }
274
275 Command::Dump { tx } => {
276 let _ = tx.send(self.dump().await);
277 }
278
279 Command::GetComputeInstanceClient { instance_id, tx } => {
280 let _ = tx.send(self.controller.compute.instance_client(instance_id));
281 }
282
283 Command::GetOracle { timeline, tx } => {
284 let oracle = self
285 .global_timelines
286 .get(&timeline)
287 .map(|timeline_state| Arc::clone(&timeline_state.oracle))
288 .ok_or(AdapterError::ChangedPlan(
289 "timeline has disappeared during planning".to_string(),
290 ));
291 let _ = tx.send(oracle);
292 }
293
294 Command::DetermineRealTimeRecentTimestamp {
295 source_ids,
296 real_time_recency_timeout,
297 tx,
298 } => {
299 let result = self
300 .determine_real_time_recent_timestamp(
301 source_ids.iter().copied(),
302 real_time_recency_timeout,
303 )
304 .await;
305
306 match result {
307 Ok(Some(fut)) => {
308 task::spawn(|| "determine real time recent timestamp", async move {
309 let result = fut.await.map(Some).map_err(AdapterError::from);
310 let _ = tx.send(result);
311 });
312 }
313 Ok(None) => {
314 let _ = tx.send(Ok(None));
315 }
316 Err(e) => {
317 let _ = tx.send(Err(e));
318 }
319 }
320 }
321
322 Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
323 let read_holds = self.txn_read_holds.get(&conn_id).cloned();
324 let _ = tx.send(read_holds);
325 }
326
327 Command::StoreTransactionReadHolds {
328 conn_id,
329 read_holds,
330 tx,
331 } => {
332 self.store_transaction_read_holds(conn_id, read_holds);
333 let _ = tx.send(());
334 }
335
336 Command::ExecuteSlowPathPeek {
337 dataflow_plan,
338 determination,
339 finishing,
340 compute_instance,
341 target_replica,
342 intermediate_result_type,
343 source_ids,
344 conn_id,
345 max_result_size,
346 max_query_result_size,
347 watch_set,
348 tx,
349 } => {
350 let result = self
351 .implement_slow_path_peek(
352 *dataflow_plan,
353 determination,
354 finishing,
355 compute_instance,
356 target_replica,
357 intermediate_result_type,
358 source_ids,
359 conn_id,
360 max_result_size,
361 max_query_result_size,
362 watch_set,
363 )
364 .await;
365 let _ = tx.send(result);
366 }
367
368 Command::CopyToPreflight {
369 s3_sink_connection,
370 sink_id,
371 tx,
372 } => {
373 // Spawn a background task to perform the slow S3 preflight operations.
374 // This avoids blocking the coordinator's main task.
375 let connection_context = self.connection_context().clone();
376 task::spawn(|| "copy_to_preflight", async move {
377 let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
378 connection_context,
379 &s3_sink_connection.aws_connection,
380 &s3_sink_connection.upload_info,
381 s3_sink_connection.connection_id,
382 sink_id,
383 )
384 .await
385 .map_err(AdapterError::from);
386 let _ = tx.send(result);
387 });
388 }
389
390 Command::ExecuteCopyTo {
391 df_desc,
392 compute_instance,
393 target_replica,
394 source_ids,
395 conn_id,
396 watch_set,
397 tx,
398 } => {
399 // implement_copy_to spawns a background task that sends the response
400 // through tx when the COPY TO completes (or immediately if setup fails).
401 // We just call it and let it handle all response sending.
402 self.implement_copy_to(
403 *df_desc,
404 compute_instance,
405 target_replica,
406 source_ids,
407 conn_id,
408 watch_set,
409 tx,
410 )
411 .await;
412 }
413
414 Command::ExecuteSideEffectingFunc {
415 plan,
416 conn_id,
417 current_role,
418 tx,
419 } => {
420 let result = self
421 .execute_side_effecting_func(plan, conn_id, current_role)
422 .await;
423 let _ = tx.send(result);
424 }
425 Command::RegisterFrontendPeek {
426 uuid,
427 conn_id,
428 cluster_id,
429 depends_on,
430 is_fast_path,
431 watch_set,
432 tx,
433 } => {
434 self.handle_register_frontend_peek(
435 uuid,
436 conn_id,
437 cluster_id,
438 depends_on,
439 is_fast_path,
440 watch_set,
441 tx,
442 );
443 }
444 Command::UnregisterFrontendPeek { uuid, tx } => {
445 self.handle_unregister_frontend_peek(uuid, tx);
446 }
447 Command::ExplainTimestamp {
448 conn_id,
449 session_wall_time,
450 cluster_id,
451 id_bundle,
452 determination,
453 tx,
454 } => {
455 let explanation = self.explain_timestamp(
456 &conn_id,
457 session_wall_time,
458 cluster_id,
459 &id_bundle,
460 determination,
461 );
462 let _ = tx.send(explanation);
463 }
464 Command::FrontendStatementLogging(event) => {
465 self.handle_frontend_statement_logging_event(event);
466 }
467 }
468 }
469 .instrument(debug_span!("handle_command"))
470 .boxed_local()
471 }
472
473 fn handle_authenticate_verify_sasl_proof(
474 &self,
475 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
476 role_name: String,
477 proof: String,
478 auth_message: String,
479 mock_hash: String,
480 ) {
481 let role = self.catalog().try_get_role_by_name(role_name.as_str());
482 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
483
484 let login = role
485 .as_ref()
486 .map(|r| r.attributes.login.unwrap_or(false))
487 .unwrap_or(false);
488
489 let real_hash = role_auth
490 .as_ref()
491 .and_then(|auth| auth.password_hash.as_ref());
492 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
493
494 let role_present = role.is_some();
495 let make_auth_err = |role_present: bool, login: bool| {
496 AdapterError::AuthenticationError(if role_present && !login {
497 AuthenticationError::NonLogin
498 } else {
499 AuthenticationError::InvalidCredentials
500 })
501 };
502
503 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
504 Ok(verifier) => {
505 // Success only if role exists, allows login, and a real password hash was used.
506 if login && real_hash.is_some() {
507 let role = role.expect("login implies role exists");
508 let _ = tx.send(Ok(SASLVerifyProofResponse {
509 verifier,
510 auth_resp: AuthResponse {
511 role_id: role.id,
512 superuser: role.attributes.superuser.unwrap_or(false),
513 },
514 }));
515 } else {
516 let _ = tx.send(Err(make_auth_err(role_present, login)));
517 }
518 }
519 Err(_) => {
520 let _ = tx.send(Err(AdapterError::AuthenticationError(
521 AuthenticationError::InvalidCredentials,
522 )));
523 }
524 }
525 }
526
527 #[mz_ore::instrument(level = "debug")]
528 async fn handle_generate_sasl_challenge(
529 &self,
530 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
531 role_name: String,
532 client_nonce: String,
533 ) {
534 let role_auth = self
535 .catalog()
536 .try_get_role_by_name(&role_name)
537 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
538
539 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
540 Ok(n) => n,
541 Err(e) => {
542 let msg = format!(
543 "failed to generate nonce for client nonce {}: {}",
544 client_nonce, e
545 );
546 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
547 soft_panic_or_log!("{msg}");
548 return;
549 }
550 };
551
552 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
553 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
554 // with the role name to get a per-role mock nonce.
555 let send_mock_challenge =
556 |role_name: String,
557 mock_nonce: String,
558 nonce: String,
559 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
560 let opts = mz_auth::hash::mock_sasl_challenge(
561 &role_name,
562 &mock_nonce,
563 &self.catalog().system_config().scram_iterations(),
564 );
565 let _ = tx.send(Ok(SASLChallengeResponse {
566 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
567 salt: BASE64_STANDARD.encode(opts.salt),
568 nonce,
569 }));
570 };
571
572 match role_auth {
573 Some(auth) if auth.password_hash.is_some() => {
574 let hash = auth.password_hash.as_ref().expect("checked above");
575 match mz_auth::hash::scram256_parse_opts(hash) {
576 Ok(opts) => {
577 let _ = tx.send(Ok(SASLChallengeResponse {
578 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
579 salt: BASE64_STANDARD.encode(opts.salt),
580 nonce,
581 }));
582 }
583 Err(_) => {
584 send_mock_challenge(
585 role_name,
586 self.catalog().state().mock_authentication_nonce(),
587 nonce,
588 tx,
589 );
590 }
591 }
592 }
593 _ => {
594 send_mock_challenge(
595 role_name,
596 self.catalog().state().mock_authentication_nonce(),
597 nonce,
598 tx,
599 );
600 }
601 }
602 }
603
604 #[mz_ore::instrument(level = "debug")]
605 async fn handle_authenticate_password(
606 &self,
607 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
608 role_name: String,
609 password: Option<Password>,
610 ) {
611 let Some(password) = password else {
612 // The user did not provide a password.
613 let _ = tx.send(Err(AdapterError::AuthenticationError(
614 AuthenticationError::PasswordRequired,
615 )));
616 return;
617 };
618
619 if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
620 if !role.attributes.login.unwrap_or(false) {
621 // The user is not allowed to login.
622 let _ = tx.send(Err(AdapterError::AuthenticationError(
623 AuthenticationError::NonLogin,
624 )));
625 return;
626 }
627 if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
628 if let Some(hash) = &auth.password_hash {
629 let hash = hash.clone();
630 let role_id = role.id;
631 let superuser = role.attributes.superuser.unwrap_or(false);
632 task::spawn_blocking(
633 || "auth-check-hash",
634 move || {
635 let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
636 Ok(_) => tx.send(Ok(AuthResponse { role_id, superuser })),
637 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
638 AuthenticationError::InvalidCredentials,
639 ))),
640 };
641 },
642 );
643 return;
644 }
645 }
646 // Authentication failed due to incorrect password or missing password hash.
647 let _ = tx.send(Err(AdapterError::AuthenticationError(
648 AuthenticationError::InvalidCredentials,
649 )));
650 } else {
651 // The user does not exist.
652 let _ = tx.send(Err(AdapterError::AuthenticationError(
653 AuthenticationError::RoleNotFound,
654 )));
655 }
656 }
657
658 #[mz_ore::instrument(level = "debug")]
659 async fn handle_startup(
660 &mut self,
661 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
662 user: User,
663 conn_id: ConnectionId,
664 secret_key: u32,
665 uuid: uuid::Uuid,
666 client_ip: Option<IpAddr>,
667 application_name: String,
668 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
669 ) {
670 // Early return if successful, otherwise cleanup any possible state.
671 match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
672 Ok((role_id, session_defaults)) => {
673 let session_type = metrics::session_type_label_value(&user);
674 self.metrics
675 .active_sessions
676 .with_label_values(&[session_type])
677 .inc();
678 let conn = ConnMeta {
679 secret_key,
680 notice_tx,
681 drop_sinks: BTreeSet::new(),
682 pending_cluster_alters: BTreeSet::new(),
683 connected_at: self.now(),
684 user,
685 application_name,
686 uuid,
687 client_ip,
688 conn_id: conn_id.clone(),
689 authenticated_role: role_id,
690 deferred_lock: None,
691 };
692 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
693 let update = self.catalog().state().resolve_builtin_table_update(update);
694 self.begin_session_for_statement_logging(&conn);
695 self.active_conns.insert(conn_id.clone(), conn);
696
697 // Note: Do NOT await the notify here, we pass this back to
698 // whatever requested the startup to prevent blocking startup
699 // and the Coordinator on a builtin table update.
700 let updates = vec![update];
701 // It's not a hard error if our list is missing a builtin table, but we want to
702 // make sure these two things stay in-sync.
703 if mz_ore::assert::soft_assertions_enabled() {
704 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
705 .iter()
706 .map(|table| self.catalog().resolve_builtin_table(*table))
707 .collect();
708 let updates_tracked = updates
709 .iter()
710 .all(|update| required_tables.contains(&update.id));
711 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
712 .iter()
713 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
714 mz_ore::soft_assert_or_log!(
715 updates_tracked,
716 "not tracking all required builtin table updates!"
717 );
718 // TODO(parkmycar): When checking if a query depends on these builtin table
719 // writes we do not check the transitive dependencies of the query, because
720 // we don't support creating views on mz_internal objects. If one of these
721 // tables is promoted out of mz_internal then we'll need to add this check.
722 mz_ore::soft_assert_or_log!(
723 all_mz_internal,
724 "not all builtin tables are in mz_internal! need to check transitive depends",
725 )
726 }
727 let notify = self.builtin_table_update().background(updates);
728
729 let catalog = self.owned_catalog();
730 let build_info_human_version =
731 catalog.state().config().build_info.human_version(None);
732
733 let statement_logging_frontend = self
734 .statement_logging
735 .create_frontend(build_info_human_version);
736
737 let resp = Ok(StartupResponse {
738 role_id,
739 write_notify: notify,
740 session_defaults,
741 catalog,
742 storage_collections: Arc::clone(&self.controller.storage_collections),
743 transient_id_gen: Arc::clone(&self.transient_id_gen),
744 optimizer_metrics: self.optimizer_metrics.clone(),
745 persist_client: self.persist_client.clone(),
746 statement_logging_frontend,
747 });
748 if tx.send(resp).is_err() {
749 // Failed to send to adapter, but everything is setup so we can terminate
750 // normally.
751 self.handle_terminate(conn_id).await;
752 }
753 }
754 Err(e) => {
755 // Error during startup or sending to adapter. A user may have been created and
756 // it can stay; no need to delete it.
757 // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
758
759 // Communicate the error back to the client. No need to
760 // handle failures to send the error back; we've already
761 // cleaned up all necessary state.
762 let _ = tx.send(Err(e));
763 }
764 }
765 }
766
767 // Failible startup work that needs to be cleaned up on error.
768 async fn handle_startup_inner(
769 &mut self,
770 user: &User,
771 _conn_id: &ConnectionId,
772 client_ip: &Option<IpAddr>,
773 ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
774 if self.catalog().try_get_role_by_name(&user.name).is_none() {
775 // If the user has made it to this point, that means they have been fully authenticated.
776 // This includes preventing any user, except a pre-defined set of system users, from
777 // connecting to an internal port. Therefore it's ok to always create a new role for the
778 // user.
779 let attributes = RoleAttributesRaw::new();
780 let plan = CreateRolePlan {
781 name: user.name.to_string(),
782 attributes,
783 };
784 self.sequence_create_role_for_startup(plan).await?;
785 }
786 let role_id = self
787 .catalog()
788 .try_get_role_by_name(&user.name)
789 .expect("created above")
790 .id;
791
792 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
793 return Err(AdapterError::UserSessionsDisallowed);
794 }
795
796 // Initialize the default session variables for this role.
797 let mut session_defaults = BTreeMap::new();
798 let system_config = self.catalog().state().system_config();
799
800 // Override the session with any system defaults.
801 session_defaults.extend(
802 system_config
803 .iter_session()
804 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
805 );
806 // Special case.
807 let statement_logging_default = system_config
808 .statement_logging_default_sample_rate()
809 .format();
810 session_defaults.insert(
811 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
812 OwnedVarInput::Flat(statement_logging_default),
813 );
814 // Override system defaults with role defaults.
815 session_defaults.extend(
816 self.catalog()
817 .get_role(&role_id)
818 .vars()
819 .map(|(name, val)| (name.to_string(), val.clone())),
820 );
821
822 // Validate network policies for external users. Internal users can only connect on the
823 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
824 // to ensure these internal interfaces are well secured.
825 //
826 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
827 // the default network policy for this role, so we read directly out of what the session
828 // will get initialized with.
829 if !user.is_internal() {
830 let network_policy_name = session_defaults
831 .get(NETWORK_POLICY.name())
832 .and_then(|value| match value {
833 OwnedVarInput::Flat(name) => Some(name.clone()),
834 OwnedVarInput::SqlSet(names) => {
835 tracing::error!(?names, "found multiple network policies");
836 None
837 }
838 })
839 .unwrap_or_else(|| system_config.default_network_policy_name());
840 let maybe_network_policy = self
841 .catalog()
842 .get_network_policy_by_name(&network_policy_name);
843
844 let Some(network_policy) = maybe_network_policy else {
845 // We should prevent dropping the default network policy, or setting the policy
846 // to something that doesn't exist, so complain loudly if this occurs.
847 tracing::error!(
848 network_policy_name,
849 "default network policy does not exist. All user traffic will be blocked"
850 );
851 let reason = match client_ip {
852 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
853 None => super::NetworkPolicyError::MissingIp,
854 };
855 return Err(AdapterError::NetworkPolicyDenied(reason));
856 };
857
858 if let Some(ip) = client_ip {
859 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
860 Ok(_) => {}
861 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
862 }
863 } else {
864 // Only temporary and internal representation of a session
865 // should be missing a client_ip. These sessions should not be
866 // making requests or going through handle_startup.
867 return Err(AdapterError::NetworkPolicyDenied(
868 super::NetworkPolicyError::MissingIp,
869 ));
870 }
871 }
872
873 // Temporary schemas are now created lazily when the first temporary object is created,
874 // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
875 // for the common case where connections never create temporary objects.
876
877 Ok((role_id, session_defaults))
878 }
879
880 /// Handles an execute command.
881 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
882 pub(crate) async fn handle_execute(
883 &mut self,
884 portal_name: String,
885 mut session: Session,
886 tx: ClientTransmitter<ExecuteResponse>,
887 // If this command was part of another execute command
888 // (for example, executing a `FETCH` statement causes an execute to be
889 // issued for the cursor it references),
890 // then `outer_context` should be `Some`.
891 // This instructs the coordinator that the
892 // outer execute should be considered finished once the inner one is.
893 outer_context: Option<ExecuteContextGuard>,
894 ) {
895 if session.vars().emit_trace_id_notice() {
896 let span_context = tracing::Span::current()
897 .context()
898 .span()
899 .span_context()
900 .clone();
901 if span_context.is_valid() {
902 session.add_notice(AdapterNotice::QueryTrace {
903 trace_id: span_context.trace_id(),
904 });
905 }
906 }
907
908 if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
909 // If statement logging hasn't started yet, we don't need
910 // to add any "end" event, so just make up a no-op
911 // `ExecuteContextExtra` here, via `Default::default`.
912 //
913 // It's a bit unfortunate because the edge case of failed
914 // portal verifications won't show up in statement
915 // logging, but there seems to be nothing else we can do,
916 // because we need access to the portal to begin logging.
917 //
918 // Another option would be to log a begin and end event, but just fill in NULLs
919 // for everything we get from the portal (prepared statement id, params).
920 let extra = outer_context.unwrap_or_else(Default::default);
921 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
922 return ctx.retire(Err(err));
923 }
924
925 // The reference to `portal` can't outlive `session`, which we
926 // use to construct the context, so scope the reference to this block where we
927 // get everything we need from the portal for later.
928 let (stmt, ctx, params) = {
929 let portal = session
930 .get_portal_unverified(&portal_name)
931 .expect("known to exist");
932 let params = portal.parameters.clone();
933 let stmt = portal.stmt.clone();
934 let logging = Arc::clone(&portal.logging);
935 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
936
937 let extra = if let Some(extra) = outer_context {
938 // We are executing in the context of another SQL statement, so we don't
939 // want to begin statement logging anew. The context of the actual statement
940 // being executed is the one that should be retired once this finishes.
941 extra
942 } else {
943 // This is a new statement, log it and return the context
944 let maybe_uuid = self.begin_statement_execution(
945 &mut session,
946 ¶ms,
947 &logging,
948 lifecycle_timestamps,
949 );
950
951 ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
952 };
953 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
954 (stmt, ctx, params)
955 };
956
957 let stmt = match stmt {
958 Some(stmt) => stmt,
959 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
960 };
961
962 let session_type = metrics::session_type_label_value(ctx.session().user());
963 let stmt_type = metrics::statement_type_label_value(&stmt);
964 self.metrics
965 .query_total
966 .with_label_values(&[session_type, stmt_type])
967 .inc();
968 match &*stmt {
969 Statement::Subscribe(SubscribeStatement { output, .. })
970 | Statement::Copy(CopyStatement {
971 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
972 ..
973 }) => {
974 self.metrics
975 .subscribe_outputs
976 .with_label_values(&[
977 session_type,
978 metrics::subscribe_output_label_value(output),
979 ])
980 .inc();
981 }
982 _ => {}
983 }
984
985 self.handle_execute_inner(stmt, params, ctx).await
986 }
987
988 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
989 pub(crate) async fn handle_execute_inner(
990 &mut self,
991 stmt: Arc<Statement<Raw>>,
992 params: Params,
993 mut ctx: ExecuteContext,
994 ) {
995 // This comment describes the various ways DDL can execute (the ordered operations: name
996 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
997 // three notable properties that all partially interact.
998 //
999 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1000 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1001 // We announce success of the single DDL when it is executed, but do not attempt to plan
1002 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1003 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
1004 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1005 // work. When the single DDL is announced as successful we also put the session's
1006 // transaction ops into `SingleStatement` which will produce an error if any other
1007 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
1008 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
1009 // statement.
1010 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1011 // any number of only these DDL statements to be executed in a transaction. At sequencing
1012 // these generate the `Op::TransactionDryRun` catalog op. When applied with
1013 // `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
1014 // `catalog_transact_with_ddl_transaction` function intercepts that error and reports
1015 // success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
1016 // of the ops but without dry run are applied. The purpose of this is to allow multiple,
1017 // atomic renames in the same transaction.
1018 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1019 // makes network calls (interfacing with secrets, optimization of views/indexes, source
1020 // purification). These must guarantee correctness when they return to the main
1021 // coordinator thread because the catalog state could have changed while they were doing
1022 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1023 // of IDs that we needed to exist. We discovered the way we were doing that was not
1024 // always correct. Instead of attempting to get that completely right, we have opted to
1025 // serialize DDL. Getting this right is difficult because catalog changes can affect name
1026 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
1027 // aware of all possible catalog changes that would affect any of those parts is not
1028 // something our current code has been designed to be helpful at. Even if a DDL statement
1029 // is doing off-thread work, another DDL must not yet execute at all. Executing these
1030 // serially will guarantee that no off-thread work has affected the state of the catalog.
1031 // This is done by adding a VecDeque of deferred statements and a lock to the
1032 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1033 // transaction ops are needed to the session as described above), it attempts to own the
1034 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1035 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1036 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
1037 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1038 // awaits dequeuing caused by the lock being released.
1039
1040 // Verify that this statement type can be executed in the current
1041 // transaction state.
1042 match ctx.session().transaction() {
1043 // By this point we should be in a running transaction.
1044 TransactionStatus::Default => unreachable!(),
1045
1046 // Failed transactions have already been checked in pgwire for a safe statement
1047 // (COMMIT, ROLLBACK, etc.) and can proceed.
1048 TransactionStatus::Failed(_) => {}
1049
1050 // Started is a deceptive name, and means different things depending on which
1051 // protocol was used. It's either exactly one statement (known because this
1052 // is the simple protocol and the parser parsed the entire string, and it had
1053 // one statement). Or from the extended protocol, it means *some* query is
1054 // being executed, but there might be others after it before the Sync (commit)
1055 // message. Postgres handles this by teaching Started to eagerly commit certain
1056 // statements that can't be run in a transaction block.
1057 TransactionStatus::Started(_) => {
1058 if let Statement::Declare(_) = &*stmt {
1059 // Declare is an exception. Although it's not against any spec to execute
1060 // it, it will always result in nothing happening, since all portals will be
1061 // immediately closed. Users don't know this detail, so this error helps them
1062 // understand what's going wrong. Postgres does this too.
1063 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1064 "DECLARE CURSOR".into(),
1065 )));
1066 }
1067 }
1068
1069 // Implicit or explicit transactions.
1070 //
1071 // Implicit transactions happen when a multi-statement query is executed
1072 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1073 // then the existing implicit transaction will be upgraded to an explicit
1074 // transaction. Thus, we should not separate what implicit and explicit
1075 // transactions can do unless there's some additional checking to make sure
1076 // something disallowed in explicit transactions did not previously take place
1077 // in the implicit portion.
1078 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1079 match &*stmt {
1080 // Statements that are safe in a transaction. We still need to verify that we
1081 // don't interleave reads and writes since we can't perform those serializably.
1082 Statement::Close(_)
1083 | Statement::Commit(_)
1084 | Statement::Copy(_)
1085 | Statement::Deallocate(_)
1086 | Statement::Declare(_)
1087 | Statement::Discard(_)
1088 | Statement::Execute(_)
1089 | Statement::ExplainPlan(_)
1090 | Statement::ExplainPushdown(_)
1091 | Statement::ExplainAnalyzeObject(_)
1092 | Statement::ExplainAnalyzeCluster(_)
1093 | Statement::ExplainTimestamp(_)
1094 | Statement::ExplainSinkSchema(_)
1095 | Statement::Fetch(_)
1096 | Statement::Prepare(_)
1097 | Statement::Rollback(_)
1098 | Statement::Select(_)
1099 | Statement::SetTransaction(_)
1100 | Statement::Show(_)
1101 | Statement::SetVariable(_)
1102 | Statement::ResetVariable(_)
1103 | Statement::StartTransaction(_)
1104 | Statement::Subscribe(_)
1105 | Statement::Raise(_) => {
1106 // Always safe.
1107 }
1108
1109 Statement::Insert(InsertStatement {
1110 source, returning, ..
1111 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1112 // Inserting from constant values statements that do not need to execute on
1113 // any cluster (no RETURNING) is always safe.
1114 }
1115
1116 // These statements must be kept in-sync with `must_serialize_ddl()`.
1117 Statement::AlterObjectRename(_)
1118 | Statement::AlterObjectSwap(_)
1119 | Statement::CreateTableFromSource(_)
1120 | Statement::CreateSource(_) => {
1121 let state = self.catalog().for_session(ctx.session()).state().clone();
1122 let revision = self.catalog().transient_revision();
1123
1124 // Initialize our transaction with a set of empty ops, or return an error
1125 // if we can't run a DDL transaction
1126 let txn_status = ctx.session_mut().transaction_mut();
1127 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1128 ops: vec![],
1129 state,
1130 revision,
1131 side_effects: vec![],
1132 }) {
1133 return ctx.retire(Err(err));
1134 }
1135 }
1136
1137 // Statements below must by run singly (in Started).
1138 Statement::AlterCluster(_)
1139 | Statement::AlterConnection(_)
1140 | Statement::AlterDefaultPrivileges(_)
1141 | Statement::AlterIndex(_)
1142 | Statement::AlterMaterializedViewApplyReplacement(_)
1143 | Statement::AlterSetCluster(_)
1144 | Statement::AlterOwner(_)
1145 | Statement::AlterRetainHistory(_)
1146 | Statement::AlterRole(_)
1147 | Statement::AlterSecret(_)
1148 | Statement::AlterSink(_)
1149 | Statement::AlterSource(_)
1150 | Statement::AlterSystemReset(_)
1151 | Statement::AlterSystemResetAll(_)
1152 | Statement::AlterSystemSet(_)
1153 | Statement::AlterTableAddColumn(_)
1154 | Statement::AlterNetworkPolicy(_)
1155 | Statement::CreateCluster(_)
1156 | Statement::CreateClusterReplica(_)
1157 | Statement::CreateConnection(_)
1158 | Statement::CreateDatabase(_)
1159 | Statement::CreateIndex(_)
1160 | Statement::CreateMaterializedView(_)
1161 | Statement::CreateContinualTask(_)
1162 | Statement::CreateRole(_)
1163 | Statement::CreateSchema(_)
1164 | Statement::CreateSecret(_)
1165 | Statement::CreateSink(_)
1166 | Statement::CreateSubsource(_)
1167 | Statement::CreateTable(_)
1168 | Statement::CreateType(_)
1169 | Statement::CreateView(_)
1170 | Statement::CreateWebhookSource(_)
1171 | Statement::CreateNetworkPolicy(_)
1172 | Statement::Delete(_)
1173 | Statement::DropObjects(_)
1174 | Statement::DropOwned(_)
1175 | Statement::GrantPrivileges(_)
1176 | Statement::GrantRole(_)
1177 | Statement::Insert(_)
1178 | Statement::ReassignOwned(_)
1179 | Statement::RevokePrivileges(_)
1180 | Statement::RevokeRole(_)
1181 | Statement::Update(_)
1182 | Statement::ValidateConnection(_)
1183 | Statement::Comment(_) => {
1184 let txn_status = ctx.session_mut().transaction_mut();
1185
1186 // If we're not in an implicit transaction and we could generate exactly one
1187 // valid ExecuteResponse, we can delay execution until commit.
1188 if !txn_status.is_implicit() {
1189 // Statements whose tag is trivial (known only from an unexecuted statement) can
1190 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1191 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1192 // delay execution until `COMMIT`.
1193 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1194 if let Err(err) = txn_status
1195 .add_ops(TransactionOps::SingleStatement { stmt, params })
1196 {
1197 ctx.retire(Err(err));
1198 return;
1199 }
1200 ctx.retire(Ok(resp));
1201 return;
1202 }
1203 }
1204
1205 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1206 stmt.to_string(),
1207 )));
1208 }
1209 }
1210 }
1211 }
1212
1213 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1214 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1215 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1216 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1217 // Coordinator::clear_transaction is correctly called when session transactions are ended
1218 // because that function will release the held lock from active_conns.
1219 if Self::must_serialize_ddl(&stmt, &ctx) {
1220 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1221 let prev = self
1222 .active_conns
1223 .get_mut(ctx.session().conn_id())
1224 .expect("connection must exist")
1225 .deferred_lock
1226 .replace(guard);
1227 assert!(
1228 prev.is_none(),
1229 "connections should have at most one lock guard"
1230 );
1231 } else {
1232 if self
1233 .active_conns
1234 .get(ctx.session().conn_id())
1235 .expect("connection must exist")
1236 .deferred_lock
1237 .is_some()
1238 {
1239 // This session *already* has the lock, and incorrectly tried to execute another
1240 // DDL while still holding the lock, violating the assumption documented above.
1241 // This is an internal error, probably in some AdapterClient user (pgwire or
1242 // http). Because the session is now in some unexpected state, return an error
1243 // which should cause the AdapterClient user to fail the transaction.
1244 // (Terminating the connection is maybe what we would prefer to do, but is not
1245 // currently a thing we can do from the coordinator: calling handle_terminate
1246 // cleans up Coordinator state for the session but doesn't inform the
1247 // AdapterClient that the session should terminate.)
1248 soft_panic_or_log!(
1249 "session {} attempted to get ddl lock while already owning it",
1250 ctx.session().conn_id()
1251 );
1252 ctx.retire(Err(AdapterError::Internal(
1253 "session attempted to get ddl lock while already owning it".to_string(),
1254 )));
1255 return;
1256 }
1257 self.serialized_ddl.push_back(DeferredPlanStatement {
1258 ctx,
1259 ps: PlanStatement::Statement { stmt, params },
1260 });
1261 return;
1262 }
1263 }
1264
1265 let catalog = self.catalog();
1266 let catalog = catalog.for_session(ctx.session());
1267 let original_stmt = Arc::clone(&stmt);
1268 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1269 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1270 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1271 Ok(resolved) => resolved,
1272 Err(e) => return ctx.retire(Err(e.into())),
1273 };
1274 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1275 // purification. This should be done back on the main thread.
1276 // We do the validation:
1277 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1278 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1279 // occurs.
1280 let (stmt, resolved_ids) = match stmt {
1281 // Various statements must be purified off the main coordinator thread of control.
1282 stmt if Self::must_spawn_purification(&stmt) => {
1283 let internal_cmd_tx = self.internal_cmd_tx.clone();
1284 let conn_id = ctx.session().conn_id().clone();
1285 let catalog = self.owned_catalog();
1286 let now = self.now();
1287 let otel_ctx = OpenTelemetryContext::obtain();
1288 let current_storage_configuration = self.controller.storage.config().clone();
1289 task::spawn(|| format!("purify:{conn_id}"), async move {
1290 let transient_revision = catalog.transient_revision();
1291 let catalog = catalog.for_session(ctx.session());
1292
1293 // Checks if the session is authorized to purify a statement. Usually
1294 // authorization is checked after planning, however purification happens before
1295 // planning, which may require the use of some connections and secrets.
1296 if let Err(e) = rbac::check_usage(
1297 &catalog,
1298 ctx.session(),
1299 &resolved_ids,
1300 &CREATE_ITEM_USAGE,
1301 ) {
1302 return ctx.retire(Err(e.into()));
1303 }
1304
1305 let (result, cluster_id) = mz_sql::pure::purify_statement(
1306 catalog,
1307 now,
1308 stmt,
1309 ¤t_storage_configuration,
1310 )
1311 .await;
1312 let result = result.map_err(|e| e.into());
1313 let dependency_ids = resolved_ids.items().copied().collect();
1314 let plan_validity = PlanValidity::new(
1315 transient_revision,
1316 dependency_ids,
1317 cluster_id,
1318 None,
1319 ctx.session().role_metadata().clone(),
1320 );
1321 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1322 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1323 PurifiedStatementReady {
1324 ctx,
1325 result,
1326 params,
1327 plan_validity,
1328 original_stmt,
1329 otel_ctx,
1330 },
1331 ));
1332 if let Err(e) = result {
1333 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1334 }
1335 });
1336 return;
1337 }
1338
1339 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1340 // automatically as part of purification
1341 Statement::CreateSubsource(_) => {
1342 ctx.retire(Err(AdapterError::Unsupported(
1343 "CREATE SUBSOURCE statements",
1344 )));
1345 return;
1346 }
1347
1348 Statement::CreateMaterializedView(mut cmvs) => {
1349 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1350 // only used for storing initial frontiers in the catalog.
1351 if cmvs.as_of.is_some() {
1352 return ctx.retire(Err(AdapterError::Unsupported(
1353 "CREATE MATERIALIZED VIEW ... AS OF statements",
1354 )));
1355 }
1356
1357 let mz_now = match self
1358 .resolve_mz_now_for_create_materialized_view(
1359 &cmvs,
1360 &resolved_ids,
1361 ctx.session_mut(),
1362 true,
1363 )
1364 .await
1365 {
1366 Ok(mz_now) => mz_now,
1367 Err(e) => return ctx.retire(Err(e)),
1368 };
1369
1370 let catalog = self.catalog().for_session(ctx.session());
1371
1372 purify_create_materialized_view_options(
1373 catalog,
1374 mz_now,
1375 &mut cmvs,
1376 &mut resolved_ids,
1377 );
1378
1379 let purified_stmt =
1380 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1381 if_exists: cmvs.if_exists,
1382 name: cmvs.name,
1383 columns: cmvs.columns,
1384 replacement_for: cmvs.replacement_for,
1385 in_cluster: cmvs.in_cluster,
1386 query: cmvs.query,
1387 with_options: cmvs.with_options,
1388 as_of: None,
1389 });
1390
1391 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1392 // `Message::PurifiedStatementReady` here.)
1393 (purified_stmt, resolved_ids)
1394 }
1395
1396 Statement::ExplainPlan(ExplainPlanStatement {
1397 stage,
1398 with_options,
1399 format,
1400 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1401 }) => {
1402 let mut cmvs = *box_cmvs;
1403 let mz_now = match self
1404 .resolve_mz_now_for_create_materialized_view(
1405 &cmvs,
1406 &resolved_ids,
1407 ctx.session_mut(),
1408 false,
1409 )
1410 .await
1411 {
1412 Ok(mz_now) => mz_now,
1413 Err(e) => return ctx.retire(Err(e)),
1414 };
1415
1416 let catalog = self.catalog().for_session(ctx.session());
1417
1418 purify_create_materialized_view_options(
1419 catalog,
1420 mz_now,
1421 &mut cmvs,
1422 &mut resolved_ids,
1423 );
1424
1425 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1426 stage,
1427 with_options,
1428 format,
1429 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1430 });
1431
1432 (purified_stmt, resolved_ids)
1433 }
1434
1435 // All other statements are handled immediately.
1436 _ => (stmt, resolved_ids),
1437 };
1438
1439 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1440 Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1441 Err(e) => ctx.retire(Err(e)),
1442 }
1443 }
1444
1445 /// Whether the statement must be serialized and is DDL.
1446 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1447 // Non-DDL is not serialized here.
1448 if !StatementClassification::from(&*stmt).is_ddl() {
1449 return false;
1450 }
1451 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1452 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1453 // those checks are sufficient.
1454 if Self::must_spawn_purification(stmt) {
1455 return false;
1456 }
1457
1458 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1459 // Their operations are serialized when applied to the catalog, guaranteeing that any
1460 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1461 if ctx.session.transaction().is_ddl() {
1462 return false;
1463 }
1464
1465 // Some DDL is exempt. It is not great that we are matching on Statements here because
1466 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1467 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1468 // planned in the first place, so we accept the abstraction leak.
1469 match stmt {
1470 // Secrets have a small and understood set of dependencies, and their off-thread work
1471 // interacts with k8s.
1472 Statement::AlterSecret(_) => false,
1473 Statement::CreateSecret(_) => false,
1474 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1475 if actions
1476 .iter()
1477 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1478 {
1479 false
1480 }
1481
1482 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1483 // does not affect its catalog names or ids and so is safe to not serialize. This could
1484 // change the set of replicas that exist. For queries that name replicas or use the
1485 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1486 // ensure that those replicas exist during the query finish stage. Additionally, that
1487 // work can take hours (configured by the user), so would also be a bad experience for
1488 // users.
1489 Statement::AlterCluster(_) => false,
1490
1491 // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1492 // cutover. If the old collection is stalled, it may block forever. Checks in
1493 // sequencing ensure that the operation fails if any one of these happens concurrently:
1494 // * the sink is dropped
1495 // * the new source relation is dropped
1496 // * another `ALTER SINK` for the same sink is applied first
1497 Statement::AlterSink(stmt)
1498 if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1499 {
1500 false
1501 }
1502
1503 // Everything else must be serialized.
1504 _ => true,
1505 }
1506 }
1507
1508 /// Whether the statement must be purified off of the Coordinator thread.
1509 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1510 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1511 // coordinator thread.
1512 if !matches!(
1513 stmt,
1514 Statement::CreateSource(_)
1515 | Statement::AlterSource(_)
1516 | Statement::CreateSink(_)
1517 | Statement::CreateTableFromSource(_)
1518 ) {
1519 return false;
1520 }
1521
1522 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1523 if let Statement::AlterSource(stmt) = stmt {
1524 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1525 AlterSourceAction::SetOptions(options) => {
1526 options.iter().map(|o| o.name.clone()).collect()
1527 }
1528 AlterSourceAction::ResetOptions(names) => names.clone(),
1529 _ => vec![],
1530 };
1531 if !names.is_empty()
1532 && names
1533 .iter()
1534 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1535 {
1536 return false;
1537 }
1538 }
1539
1540 true
1541 }
1542
1543 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1544 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1545 /// option, this function grabs read holds at the earliest possible time on input collections
1546 /// that might be involved in the MV.
1547 ///
1548 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1549 /// in `with_options`).
1550 ///
1551 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1552 /// unfortunately.)
1553 async fn resolve_mz_now_for_create_materialized_view(
1554 &mut self,
1555 cmvs: &CreateMaterializedViewStatement<Aug>,
1556 resolved_ids: &ResolvedIds,
1557 session: &Session,
1558 acquire_read_holds: bool,
1559 ) -> Result<Option<Timestamp>, AdapterError> {
1560 if cmvs
1561 .with_options
1562 .iter()
1563 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1564 {
1565 let catalog = self.catalog().for_session(session);
1566 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1567 let ids = self
1568 .index_oracle(cluster)
1569 .sufficient_collections(resolved_ids.collections().copied());
1570
1571 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1572 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1573 // we want to check the AT times against the read holds that we acquire here. But
1574 // we do it for any REFRESH option, to avoid having so many code paths doing different
1575 // things.)
1576 //
1577 // It's important that we acquire read holds _before_ we determine the least valid read.
1578 // Otherwise, we're not guaranteed that the since frontier doesn't
1579 // advance forward from underneath us.
1580 let read_holds = self.acquire_read_holds(&ids);
1581
1582 // Does `mz_now()` occur?
1583 let mz_now_ts = if cmvs
1584 .with_options
1585 .iter()
1586 .any(materialized_view_option_contains_temporal)
1587 {
1588 let timeline_context = self
1589 .catalog()
1590 .validate_timeline_context(resolved_ids.collections().copied())?;
1591
1592 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1593 // but even in the TimestampIndependent case.
1594 // Note that we didn't accurately decide whether we are TimestampDependent
1595 // or TimestampIndependent, because for this we'd need to also check whether
1596 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1597 // However, this doesn't matter here, as we are just going to default to
1598 // EpochMilliseconds in both cases.
1599 let timeline = timeline_context
1600 .timeline()
1601 .unwrap_or(&Timeline::EpochMilliseconds);
1602
1603 // Let's start with the timestamp oracle read timestamp.
1604 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1605
1606 // If `least_valid_read` is later than the oracle, then advance to that time.
1607 // If we didn't do this, then there would be a danger of missing the first refresh,
1608 // which might cause the materialized view to be unreadable for hours. This might
1609 // be what was happening here:
1610 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1611 //
1612 // In the long term, it would be good to actually block the MV creation statement
1613 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1614 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1615 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1616 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1617 // VIEW statement returned.
1618 let oracle_timestamp = timestamp;
1619 let least_valid_read = read_holds.least_valid_read();
1620 timestamp.advance_by(least_valid_read.borrow());
1621
1622 if oracle_timestamp != timestamp {
1623 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1624 }
1625
1626 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1627 Ok(Some(timestamp))
1628 } else {
1629 Ok(None)
1630 };
1631
1632 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1633 // released when we don't use it.
1634 if acquire_read_holds {
1635 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1636 }
1637
1638 mz_now_ts
1639 } else {
1640 Ok(None)
1641 }
1642 }
1643
1644 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1645 /// the named `conn_id` if the correct secret key is specified.
1646 ///
1647 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1648 /// `ConnectionId` because this method gets called by external clients when
1649 /// they request to cancel a request.
1650 #[mz_ore::instrument(level = "debug")]
1651 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1652 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1653 // If the secret key specified by the client doesn't match the
1654 // actual secret key for the target connection, we treat this as a
1655 // rogue cancellation request and ignore it.
1656 if conn_meta.secret_key != secret_key {
1657 return;
1658 }
1659
1660 // Now that we've verified the secret key, this is a privileged
1661 // cancellation request. We can upgrade the raw connection ID to a
1662 // proper `IdHandle`.
1663 self.handle_privileged_cancel(id_handle.clone()).await;
1664 }
1665 }
1666
1667 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1668 /// interactive work for the named `conn_id`.
1669 #[mz_ore::instrument(level = "debug")]
1670 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1671 let mut maybe_ctx = None;
1672
1673 // Cancel pending writes. There is at most one pending write per session.
1674 if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1675 matches!(pending_write_txn, PendingWriteTxn::User {
1676 pending_txn: PendingTxn { ctx, .. },
1677 ..
1678 } if *ctx.session().conn_id() == conn_id)
1679 }) {
1680 if let PendingWriteTxn::User {
1681 pending_txn: PendingTxn { ctx, .. },
1682 ..
1683 } = self.pending_writes.remove(idx)
1684 {
1685 maybe_ctx = Some(ctx);
1686 }
1687 }
1688
1689 // Cancel deferred writes.
1690 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1691 maybe_ctx = Some(write_op.into_ctx());
1692 }
1693
1694 // Cancel deferred statements.
1695 if let Some(idx) = self
1696 .serialized_ddl
1697 .iter()
1698 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1699 {
1700 let deferred = self
1701 .serialized_ddl
1702 .remove(idx)
1703 .expect("known to exist from call to `position` above");
1704 maybe_ctx = Some(deferred.ctx);
1705 }
1706
1707 // Cancel reads waiting on being linearized. There is at most one linearized read per
1708 // session.
1709 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1710 let ctx = pending_read_txn.take_context();
1711 maybe_ctx = Some(ctx);
1712 }
1713
1714 if let Some(ctx) = maybe_ctx {
1715 ctx.retire(Err(AdapterError::Canceled));
1716 }
1717
1718 self.cancel_pending_peeks(&conn_id);
1719 self.cancel_pending_watchsets(&conn_id);
1720 self.cancel_compute_sinks_for_conn(&conn_id).await;
1721 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1722 .await;
1723 self.cancel_pending_copy(&conn_id);
1724 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1725 let _ = tx.send(true);
1726 }
1727 }
1728
1729 /// Handle termination of a client session.
1730 ///
1731 /// This cleans up any state in the coordinator associated with the session.
1732 #[mz_ore::instrument(level = "debug")]
1733 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1734 if !self.active_conns.contains_key(&conn_id) {
1735 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1736 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1737 // debug. This panic is very infrequent so we want as much information as possible.
1738 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1739 panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1740 }
1741
1742 // We do not need to call clear_transaction here because there are no side effects to run
1743 // based on any session transaction state.
1744 self.clear_connection(&conn_id).await;
1745
1746 self.drop_temp_items(&conn_id).await;
1747 // Only call catalog_mut() if a temporary schema actually exists for this connection.
1748 // This avoids an expensive Arc::make_mut clone for the common case where the connection
1749 // never created any temporary objects.
1750 if self.catalog().state().has_temporary_schema(&conn_id) {
1751 self.catalog_mut()
1752 .drop_temporary_schema(&conn_id)
1753 .unwrap_or_terminate("unable to drop temporary schema");
1754 }
1755 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1756 let session_type = metrics::session_type_label_value(conn.user());
1757 self.metrics
1758 .active_sessions
1759 .with_label_values(&[session_type])
1760 .dec();
1761 self.cancel_pending_peeks(conn.conn_id());
1762 self.cancel_pending_watchsets(&conn_id);
1763 self.cancel_pending_copy(&conn_id);
1764 self.end_session_for_statement_logging(conn.uuid());
1765
1766 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1767 // this to prevent blocking the Coordinator in the case that a lot of connections are
1768 // closed at once, which occurs regularly in some workflows.
1769 let update = self
1770 .catalog()
1771 .state()
1772 .pack_session_update(&conn, Diff::MINUS_ONE);
1773 let update = self.catalog().state().resolve_builtin_table_update(update);
1774
1775 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1776 }
1777
1778 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1779 /// rows.
1780 #[mz_ore::instrument(level = "debug")]
1781 fn handle_get_webhook(
1782 &mut self,
1783 database: String,
1784 schema: String,
1785 name: String,
1786 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1787 ) {
1788 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1789 ///
1790 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1791 /// types we should cast the request to.
1792 fn resolve(
1793 coord: &mut Coordinator,
1794 database: String,
1795 schema: String,
1796 name: String,
1797 ) -> Result<AppendWebhookResponse, PartialItemName> {
1798 // Resolve our collection.
1799 let name = PartialItemName {
1800 database: Some(database),
1801 schema: Some(schema),
1802 item: name,
1803 };
1804 let Ok(entry) = coord
1805 .catalog()
1806 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1807 else {
1808 return Err(name);
1809 };
1810
1811 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1812 let (data_source, desc, global_id) = match entry.item() {
1813 CatalogItem::Source(Source {
1814 data_source: data_source @ DataSourceDesc::Webhook { .. },
1815 desc,
1816 global_id,
1817 ..
1818 }) => (data_source, desc.clone(), *global_id),
1819 CatalogItem::Table(
1820 table @ Table {
1821 desc,
1822 data_source:
1823 TableDataSource::DataSource {
1824 desc: data_source @ DataSourceDesc::Webhook { .. },
1825 ..
1826 },
1827 ..
1828 },
1829 ) => (data_source, desc.latest(), table.global_id_writes()),
1830 _ => return Err(name),
1831 };
1832
1833 let DataSourceDesc::Webhook {
1834 validate_using,
1835 body_format,
1836 headers,
1837 ..
1838 } = data_source
1839 else {
1840 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1841 return Err(name);
1842 };
1843 let body_format = body_format.clone();
1844 let header_tys = headers.clone();
1845
1846 // Assert we have one column for the body, and how ever many are required for
1847 // the headers.
1848 let num_columns = headers.num_columns() + 1;
1849 mz_ore::soft_assert_or_log!(
1850 desc.arity() <= num_columns,
1851 "expected at most {} columns, but got {}",
1852 num_columns,
1853 desc.arity()
1854 );
1855
1856 // Double check that the body column of the webhook source matches the type
1857 // we're about to deserialize as.
1858 let body_column = desc
1859 .get_by_name(&"body".into())
1860 .map(|(_idx, ty)| ty.clone())
1861 .ok_or_else(|| name.clone())?;
1862 assert!(!body_column.nullable, "webhook body column is nullable!?");
1863 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1864
1865 // Create a validator that can be called to validate a webhook request.
1866 let validator = validate_using.as_ref().map(|v| {
1867 let validation = v.clone();
1868 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1869 });
1870
1871 // Get a channel so we can queue updates to be written.
1872 let row_tx = coord
1873 .controller
1874 .storage
1875 .monotonic_appender(global_id)
1876 .map_err(|_| name.clone())?;
1877 let stats = coord
1878 .controller
1879 .storage
1880 .webhook_statistics(global_id)
1881 .map_err(|_| name)?;
1882 let invalidator = coord
1883 .active_webhooks
1884 .entry(entry.id())
1885 .or_insert_with(WebhookAppenderInvalidator::new);
1886 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1887
1888 Ok(AppendWebhookResponse {
1889 tx,
1890 body_format,
1891 header_tys,
1892 validator,
1893 })
1894 }
1895
1896 let response = resolve(self, database, schema, name).map_err(|name| {
1897 AppendWebhookError::UnknownWebhook {
1898 database: name.database.expect("provided"),
1899 schema: name.schema.expect("provided"),
1900 name: name.item,
1901 }
1902 });
1903 let _ = tx.send(response);
1904 }
1905
1906 /// Handle registration of a frontend peek, for statement logging and query cancellation
1907 /// handling.
1908 fn handle_register_frontend_peek(
1909 &mut self,
1910 uuid: Uuid,
1911 conn_id: ConnectionId,
1912 cluster_id: mz_controller_types::ClusterId,
1913 depends_on: BTreeSet<GlobalId>,
1914 is_fast_path: bool,
1915 watch_set: Option<WatchSetCreation>,
1916 tx: oneshot::Sender<Result<(), AdapterError>>,
1917 ) {
1918 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1919 if let Some(ws) = watch_set {
1920 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1921 let _ = tx.send(Err(
1922 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1923 e, cluster_id,
1924 ),
1925 ));
1926 return;
1927 }
1928 }
1929
1930 // Store the peek in pending_peeks for later retrieval when results arrive
1931 self.pending_peeks.insert(
1932 uuid,
1933 PendingPeek {
1934 conn_id: conn_id.clone(),
1935 cluster_id,
1936 depends_on,
1937 ctx_extra: ExecuteContextGuard::new(
1938 statement_logging_id,
1939 self.internal_cmd_tx.clone(),
1940 ),
1941 is_fast_path,
1942 },
1943 );
1944
1945 // Also track it by connection ID for cancellation support
1946 self.client_pending_peeks
1947 .entry(conn_id)
1948 .or_default()
1949 .insert(uuid, cluster_id);
1950
1951 let _ = tx.send(Ok(()));
1952 }
1953
1954 /// Handle unregistration of a frontend peek that was registered but failed to issue.
1955 /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
1956 fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
1957 // Remove from pending_peeks (this also removes from client_pending_peeks)
1958 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
1959 // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
1960 let _ = pending_peek.ctx_extra.defuse();
1961 }
1962 let _ = tx.send(());
1963 }
1964}