mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34 AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
35 ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
36 SubscribeStatement,
37};
38use mz_sql::catalog::RoleAttributesRaw;
39use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
40use mz_sql::plan::{
41 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
42 StatementClassification, TransactionType,
43};
44use mz_sql::pure::{
45 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
46};
47use mz_sql::rbac;
48use mz_sql::rbac::CREATE_ITEM_USAGE;
49use mz_sql::session::user::User;
50use mz_sql::session::vars::{
51 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
52};
53use mz_sql_parser::ast::display::AstDisplay;
54use mz_sql_parser::ast::{
55 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
56 WithOptionValue,
57};
58use mz_storage_types::sources::Timeline;
59use opentelemetry::trace::TraceContextExt;
60use tokio::sync::{mpsc, oneshot};
61use tracing::{Instrument, debug_span, info, warn};
62use tracing_opentelemetry::OpenTelemetrySpanExt;
63
64use crate::command::{
65 AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse,
66 SASLVerifyProofResponse, StartupResponse,
67};
68use crate::coord::appends::PendingWriteTxn;
69use crate::coord::{
70 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
71 PurifiedStatementReady, validate_ip_with_policy_rules,
72};
73use crate::error::{AdapterError, AuthenticationError};
74use crate::notice::AdapterNotice;
75use crate::session::{Session, TransactionOps, TransactionStatus};
76use crate::util::{ClientTransmitter, ResultExt};
77use crate::webhook::{
78 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
79};
80use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
81
82use super::ExecuteContextExtra;
83
84impl Coordinator {
85 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
86 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
87 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
88 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
89 async move {
90 if let Some(session) = cmd.session_mut() {
91 session.apply_external_metadata_updates();
92 }
93 match cmd {
94 Command::Startup {
95 tx,
96 user,
97 conn_id,
98 secret_key,
99 uuid,
100 client_ip,
101 application_name,
102 notice_tx,
103 } => {
104 // Note: We purposefully do not use a ClientTransmitter here because startup
105 // handles errors and cleanup of sessions itself.
106 self.handle_startup(
107 tx,
108 user,
109 conn_id,
110 secret_key,
111 uuid,
112 client_ip,
113 application_name,
114 notice_tx,
115 )
116 .await;
117 }
118
119 Command::AuthenticatePassword {
120 tx,
121 role_name,
122 password,
123 } => {
124 self.handle_authenticate_password(tx, role_name, password)
125 .await;
126 }
127
128 Command::AuthenticateGetSASLChallenge {
129 tx,
130 role_name,
131 nonce,
132 } => {
133 self.handle_generate_sasl_challenge(tx, role_name, nonce)
134 .await;
135 }
136
137 Command::AuthenticateVerifySASLProof {
138 tx,
139 role_name,
140 proof,
141 mock_hash,
142 auth_message,
143 } => {
144 self.handle_authenticate_verify_sasl_proof(
145 tx,
146 role_name,
147 proof,
148 auth_message,
149 mock_hash,
150 );
151 }
152
153 Command::Execute {
154 portal_name,
155 session,
156 tx,
157 outer_ctx_extra,
158 } => {
159 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
160
161 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
162 .await;
163 }
164
165 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
166
167 Command::CancelRequest {
168 conn_id,
169 secret_key,
170 } => {
171 self.handle_cancel(conn_id, secret_key).await;
172 }
173
174 Command::PrivilegedCancelRequest { conn_id } => {
175 self.handle_privileged_cancel(conn_id).await;
176 }
177
178 Command::GetWebhook {
179 database,
180 schema,
181 name,
182 tx,
183 } => {
184 self.handle_get_webhook(database, schema, name, tx);
185 }
186
187 Command::GetSystemVars { tx } => {
188 let _ = tx.send(self.catalog.system_config().clone());
189 }
190
191 Command::SetSystemVars { vars, conn_id, tx } => {
192 let mut ops = Vec::with_capacity(vars.len());
193 let conn = &self.active_conns[&conn_id];
194
195 for (name, value) in vars {
196 if let Err(e) =
197 self.catalog().system_config().get(&name).and_then(|var| {
198 var.visible(conn.user(), self.catalog.system_config())
199 })
200 {
201 let _ = tx.send(Err(e.into()));
202 return;
203 }
204
205 ops.push(catalog::Op::UpdateSystemConfiguration {
206 name,
207 value: OwnedVarInput::Flat(value),
208 });
209 }
210
211 let result = self
212 .catalog_transact_with_context(Some(&conn_id), None, ops)
213 .await;
214 let _ = tx.send(result);
215 }
216
217 Command::Terminate { conn_id, tx } => {
218 self.handle_terminate(conn_id).await;
219 // Note: We purposefully do not use a ClientTransmitter here because we're already
220 // terminating the provided session.
221 if let Some(tx) = tx {
222 let _ = tx.send(Ok(()));
223 }
224 }
225
226 Command::Commit {
227 action,
228 session,
229 tx,
230 } => {
231 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
232 // We reach here not through a statement execution, but from the
233 // "commit" pgwire command. Thus, we just generate a default statement
234 // execution context (once statement logging is implemented, this will cause nothing to be logged
235 // when the execution finishes.)
236 let ctx = ExecuteContext::from_parts(
237 tx,
238 self.internal_cmd_tx.clone(),
239 session,
240 Default::default(),
241 );
242 let plan = match action {
243 EndTransactionAction::Commit => {
244 Plan::CommitTransaction(CommitTransactionPlan {
245 transaction_type: TransactionType::Implicit,
246 })
247 }
248 EndTransactionAction::Rollback => {
249 Plan::AbortTransaction(AbortTransactionPlan {
250 transaction_type: TransactionType::Implicit,
251 })
252 }
253 };
254
255 let conn_id = ctx.session().conn_id().clone();
256 self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
257 // Part of the Command::Commit contract is that the Coordinator guarantees that
258 // it has cleared its transaction state for the connection.
259 self.clear_connection(&conn_id).await;
260 }
261
262 Command::CatalogSnapshot { tx } => {
263 let _ = tx.send(CatalogSnapshot {
264 catalog: self.owned_catalog(),
265 });
266 }
267
268 Command::CheckConsistency { tx } => {
269 let _ = tx.send(self.check_consistency());
270 }
271
272 Command::Dump { tx } => {
273 let _ = tx.send(self.dump().await);
274 }
275
276 Command::GetComputeInstanceClient { instance_id, tx } => {
277 let _ = tx.send(self.controller.compute.instance_client(instance_id));
278 }
279
280 Command::GetOracle { timeline, tx } => {
281 let oracle = self
282 .global_timelines
283 .get(&timeline)
284 .map(|timeline_state| Arc::clone(&timeline_state.oracle))
285 .ok_or(AdapterError::ChangedPlan(
286 "timeline has disappeared during planning".to_string(),
287 ));
288 let _ = tx.send(oracle);
289 }
290
291 Command::DetermineRealTimeRecentTimestamp {
292 source_ids,
293 real_time_recency_timeout,
294 tx,
295 } => {
296 let result = self
297 .determine_real_time_recent_timestamp(
298 source_ids.iter().copied(),
299 real_time_recency_timeout,
300 )
301 .await;
302
303 match result {
304 Ok(Some(fut)) => {
305 task::spawn(|| "determine real time recent timestamp", async move {
306 let result = fut.await.map(Some).map_err(AdapterError::from);
307 let _ = tx.send(result);
308 });
309 }
310 Ok(None) => {
311 let _ = tx.send(Ok(None));
312 }
313 Err(e) => {
314 let _ = tx.send(Err(e));
315 }
316 }
317 }
318
319 Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
320 let read_holds = self.txn_read_holds.get(&conn_id).cloned();
321 let _ = tx.send(read_holds);
322 }
323
324 Command::StoreTransactionReadHolds {
325 conn_id,
326 read_holds,
327 tx,
328 } => {
329 self.store_transaction_read_holds(conn_id, read_holds);
330 let _ = tx.send(());
331 }
332
333 Command::ExecuteSlowPathPeek {
334 dataflow_plan,
335 determination,
336 finishing,
337 compute_instance,
338 target_replica,
339 intermediate_result_type,
340 source_ids,
341 conn_id,
342 max_result_size,
343 max_query_result_size,
344 tx,
345 } => {
346 let result = self
347 .implement_slow_path_peek(
348 *dataflow_plan,
349 determination,
350 finishing,
351 compute_instance,
352 target_replica,
353 intermediate_result_type,
354 source_ids,
355 conn_id,
356 max_result_size,
357 max_query_result_size,
358 )
359 .await;
360
361 let _ = tx.send(result);
362 }
363
364 Command::ExecuteCopyTo {
365 df_desc,
366 compute_instance,
367 target_replica,
368 source_ids,
369 conn_id,
370 tx,
371 } => {
372 // implement_copy_to spawns a background task that sends the response
373 // through tx when the COPY TO completes (or immediately if setup fails).
374 // We just call it and let it handle all response sending.
375 self.implement_copy_to(
376 *df_desc,
377 compute_instance,
378 target_replica,
379 source_ids,
380 conn_id,
381 tx,
382 )
383 .await;
384 }
385 }
386 }
387 .instrument(debug_span!("handle_command"))
388 .boxed_local()
389 }
390
391 fn handle_authenticate_verify_sasl_proof(
392 &self,
393 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
394 role_name: String,
395 proof: String,
396 auth_message: String,
397 mock_hash: String,
398 ) {
399 let role = self.catalog().try_get_role_by_name(role_name.as_str());
400 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
401
402 let login = role
403 .as_ref()
404 .map(|r| r.attributes.login.unwrap_or(false))
405 .unwrap_or(false);
406
407 let real_hash = role_auth
408 .as_ref()
409 .and_then(|auth| auth.password_hash.as_ref());
410 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
411
412 let role_present = role.is_some();
413 let make_auth_err = |role_present: bool, login: bool| {
414 AdapterError::AuthenticationError(if role_present && !login {
415 AuthenticationError::NonLogin
416 } else {
417 AuthenticationError::InvalidCredentials
418 })
419 };
420
421 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
422 Ok(verifier) => {
423 // Success only if role exists, allows login, and a real password hash was used.
424 if login && real_hash.is_some() {
425 let role = role.expect("login implies role exists");
426 let _ = tx.send(Ok(SASLVerifyProofResponse {
427 verifier,
428 auth_resp: AuthResponse {
429 role_id: role.id,
430 superuser: role.attributes.superuser.unwrap_or(false),
431 },
432 }));
433 } else {
434 let _ = tx.send(Err(make_auth_err(role_present, login)));
435 }
436 }
437 Err(_) => {
438 let _ = tx.send(Err(AdapterError::AuthenticationError(
439 AuthenticationError::InvalidCredentials,
440 )));
441 }
442 }
443 }
444
445 #[mz_ore::instrument(level = "debug")]
446 async fn handle_generate_sasl_challenge(
447 &self,
448 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
449 role_name: String,
450 client_nonce: String,
451 ) {
452 let role_auth = self
453 .catalog()
454 .try_get_role_by_name(&role_name)
455 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
456
457 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
458 Ok(n) => n,
459 Err(e) => {
460 let msg = format!(
461 "failed to generate nonce for client nonce {}: {}",
462 client_nonce, e
463 );
464 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
465 soft_panic_or_log!("{msg}");
466 return;
467 }
468 };
469
470 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
471 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
472 // with the role name to get a per-role mock nonce.
473 let send_mock_challenge =
474 |role_name: String,
475 mock_nonce: String,
476 nonce: String,
477 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
478 let opts = mz_auth::hash::mock_sasl_challenge(
479 &role_name,
480 &mock_nonce,
481 &self.catalog().system_config().scram_iterations(),
482 );
483 let _ = tx.send(Ok(SASLChallengeResponse {
484 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
485 salt: BASE64_STANDARD.encode(opts.salt),
486 nonce,
487 }));
488 };
489
490 match role_auth {
491 Some(auth) if auth.password_hash.is_some() => {
492 let hash = auth.password_hash.as_ref().expect("checked above");
493 match mz_auth::hash::scram256_parse_opts(hash) {
494 Ok(opts) => {
495 let _ = tx.send(Ok(SASLChallengeResponse {
496 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
497 salt: BASE64_STANDARD.encode(opts.salt),
498 nonce,
499 }));
500 }
501 Err(_) => {
502 send_mock_challenge(
503 role_name,
504 self.catalog().state().mock_authentication_nonce(),
505 nonce,
506 tx,
507 );
508 }
509 }
510 }
511 _ => {
512 send_mock_challenge(
513 role_name,
514 self.catalog().state().mock_authentication_nonce(),
515 nonce,
516 tx,
517 );
518 }
519 }
520 }
521
522 #[mz_ore::instrument(level = "debug")]
523 async fn handle_authenticate_password(
524 &self,
525 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
526 role_name: String,
527 password: Option<Password>,
528 ) {
529 let Some(password) = password else {
530 // The user did not provide a password.
531 let _ = tx.send(Err(AdapterError::AuthenticationError(
532 AuthenticationError::PasswordRequired,
533 )));
534 return;
535 };
536
537 if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
538 if !role.attributes.login.unwrap_or(false) {
539 // The user is not allowed to login.
540 let _ = tx.send(Err(AdapterError::AuthenticationError(
541 AuthenticationError::NonLogin,
542 )));
543 return;
544 }
545 if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
546 if let Some(hash) = &auth.password_hash {
547 let hash = hash.clone();
548 let role_id = role.id;
549 let superuser = role.attributes.superuser.unwrap_or(false);
550 task::spawn_blocking(
551 || "auth-check-hash",
552 move || {
553 let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
554 Ok(_) => tx.send(Ok(AuthResponse { role_id, superuser })),
555 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
556 AuthenticationError::InvalidCredentials,
557 ))),
558 };
559 },
560 );
561 return;
562 }
563 }
564 // Authentication failed due to incorrect password or missing password hash.
565 let _ = tx.send(Err(AdapterError::AuthenticationError(
566 AuthenticationError::InvalidCredentials,
567 )));
568 } else {
569 // The user does not exist.
570 let _ = tx.send(Err(AdapterError::AuthenticationError(
571 AuthenticationError::RoleNotFound,
572 )));
573 }
574 }
575
576 #[mz_ore::instrument(level = "debug")]
577 async fn handle_startup(
578 &mut self,
579 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
580 user: User,
581 conn_id: ConnectionId,
582 secret_key: u32,
583 uuid: uuid::Uuid,
584 client_ip: Option<IpAddr>,
585 application_name: String,
586 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
587 ) {
588 // Early return if successful, otherwise cleanup any possible state.
589 match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
590 Ok((role_id, session_defaults)) => {
591 let session_type = metrics::session_type_label_value(&user);
592 self.metrics
593 .active_sessions
594 .with_label_values(&[session_type])
595 .inc();
596 let conn = ConnMeta {
597 secret_key,
598 notice_tx,
599 drop_sinks: BTreeSet::new(),
600 pending_cluster_alters: BTreeSet::new(),
601 connected_at: self.now(),
602 user,
603 application_name,
604 uuid,
605 client_ip,
606 conn_id: conn_id.clone(),
607 authenticated_role: role_id,
608 deferred_lock: None,
609 };
610 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
611 let update = self.catalog().state().resolve_builtin_table_update(update);
612 self.begin_session_for_statement_logging(&conn);
613 self.active_conns.insert(conn_id.clone(), conn);
614
615 // Note: Do NOT await the notify here, we pass this back to
616 // whatever requested the startup to prevent blocking startup
617 // and the Coordinator on a builtin table update.
618 let updates = vec![update];
619 // It's not a hard error if our list is missing a builtin table, but we want to
620 // make sure these two things stay in-sync.
621 if mz_ore::assert::soft_assertions_enabled() {
622 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
623 .iter()
624 .map(|table| self.catalog().resolve_builtin_table(*table))
625 .collect();
626 let updates_tracked = updates
627 .iter()
628 .all(|update| required_tables.contains(&update.id));
629 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
630 .iter()
631 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
632 mz_ore::soft_assert_or_log!(
633 updates_tracked,
634 "not tracking all required builtin table updates!"
635 );
636 // TODO(parkmycar): When checking if a query depends on these builtin table
637 // writes we do not check the transitive dependencies of the query, because
638 // we don't support creating views on mz_internal objects. If one of these
639 // tables is promoted out of mz_internal then we'll need to add this check.
640 mz_ore::soft_assert_or_log!(
641 all_mz_internal,
642 "not all builtin tables are in mz_internal! need to check transitive depends",
643 )
644 }
645 let notify = self.builtin_table_update().background(updates);
646
647 let resp = Ok(StartupResponse {
648 role_id,
649 write_notify: notify,
650 session_defaults,
651 catalog: self.owned_catalog(),
652 storage_collections: Arc::clone(&self.controller.storage_collections),
653 transient_id_gen: Arc::clone(&self.transient_id_gen),
654 optimizer_metrics: self.optimizer_metrics.clone(),
655 persist_client: self.persist_client.clone(),
656 });
657 if tx.send(resp).is_err() {
658 // Failed to send to adapter, but everything is setup so we can terminate
659 // normally.
660 self.handle_terminate(conn_id).await;
661 }
662 }
663 Err(e) => {
664 // Error during startup or sending to adapter, cleanup possible state created by
665 // handle_startup_inner. A user may have been created and it can stay; no need to
666 // delete it.
667 self.catalog_mut()
668 .drop_temporary_schema(&conn_id)
669 .unwrap_or_terminate("unable to drop temporary schema");
670
671 // Communicate the error back to the client. No need to
672 // handle failures to send the error back; we've already
673 // cleaned up all necessary state.
674 let _ = tx.send(Err(e));
675 }
676 }
677 }
678
679 // Failible startup work that needs to be cleaned up on error.
680 async fn handle_startup_inner(
681 &mut self,
682 user: &User,
683 conn_id: &ConnectionId,
684 client_ip: &Option<IpAddr>,
685 ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
686 if self.catalog().try_get_role_by_name(&user.name).is_none() {
687 // If the user has made it to this point, that means they have been fully authenticated.
688 // This includes preventing any user, except a pre-defined set of system users, from
689 // connecting to an internal port. Therefore it's ok to always create a new role for the
690 // user.
691 let attributes = RoleAttributesRaw::new();
692 let plan = CreateRolePlan {
693 name: user.name.to_string(),
694 attributes,
695 };
696 self.sequence_create_role_for_startup(plan).await?;
697 }
698 let role_id = self
699 .catalog()
700 .try_get_role_by_name(&user.name)
701 .expect("created above")
702 .id;
703
704 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
705 return Err(AdapterError::UserSessionsDisallowed);
706 }
707
708 // Initialize the default session variables for this role.
709 let mut session_defaults = BTreeMap::new();
710 let system_config = self.catalog().state().system_config();
711
712 // Override the session with any system defaults.
713 session_defaults.extend(
714 system_config
715 .iter_session()
716 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
717 );
718 // Special case.
719 let statement_logging_default = system_config
720 .statement_logging_default_sample_rate()
721 .format();
722 session_defaults.insert(
723 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
724 OwnedVarInput::Flat(statement_logging_default),
725 );
726 // Override system defaults with role defaults.
727 session_defaults.extend(
728 self.catalog()
729 .get_role(&role_id)
730 .vars()
731 .map(|(name, val)| (name.to_string(), val.clone())),
732 );
733
734 // Validate network policies for external users. Internal users can only connect on the
735 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
736 // to ensure these internal interfaces are well secured.
737 //
738 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
739 // the default network policy for this role, so we read directly out of what the session
740 // will get initialized with.
741 if !user.is_internal() {
742 let network_policy_name = session_defaults
743 .get(NETWORK_POLICY.name())
744 .and_then(|value| match value {
745 OwnedVarInput::Flat(name) => Some(name.clone()),
746 OwnedVarInput::SqlSet(names) => {
747 tracing::error!(?names, "found multiple network policies");
748 None
749 }
750 })
751 .unwrap_or_else(|| system_config.default_network_policy_name());
752 let maybe_network_policy = self
753 .catalog()
754 .get_network_policy_by_name(&network_policy_name);
755
756 let Some(network_policy) = maybe_network_policy else {
757 // We should prevent dropping the default network policy, or setting the policy
758 // to something that doesn't exist, so complain loudly if this occurs.
759 tracing::error!(
760 network_policy_name,
761 "default network policy does not exist. All user traffic will be blocked"
762 );
763 let reason = match client_ip {
764 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
765 None => super::NetworkPolicyError::MissingIp,
766 };
767 return Err(AdapterError::NetworkPolicyDenied(reason));
768 };
769
770 if let Some(ip) = client_ip {
771 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
772 Ok(_) => {}
773 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
774 }
775 } else {
776 // Only temporary and internal representation of a session
777 // should be missing a client_ip. These sessions should not be
778 // making requests or going through handle_startup.
779 return Err(AdapterError::NetworkPolicyDenied(
780 super::NetworkPolicyError::MissingIp,
781 ));
782 }
783 }
784
785 self.catalog_mut()
786 .create_temporary_schema(conn_id, role_id)?;
787
788 Ok((role_id, session_defaults))
789 }
790
791 /// Handles an execute command.
792 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
793 pub(crate) async fn handle_execute(
794 &mut self,
795 portal_name: String,
796 mut session: Session,
797 tx: ClientTransmitter<ExecuteResponse>,
798 // If this command was part of another execute command
799 // (for example, executing a `FETCH` statement causes an execute to be
800 // issued for the cursor it references),
801 // then `outer_context` should be `Some`.
802 // This instructs the coordinator that the
803 // outer execute should be considered finished once the inner one is.
804 outer_context: Option<ExecuteContextExtra>,
805 ) {
806 if session.vars().emit_trace_id_notice() {
807 let span_context = tracing::Span::current()
808 .context()
809 .span()
810 .span_context()
811 .clone();
812 if span_context.is_valid() {
813 session.add_notice(AdapterNotice::QueryTrace {
814 trace_id: span_context.trace_id(),
815 });
816 }
817 }
818
819 if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
820 // If statement logging hasn't started yet, we don't need
821 // to add any "end" event, so just make up a no-op
822 // `ExecuteContextExtra` here, via `Default::default`.
823 //
824 // It's a bit unfortunate because the edge case of failed
825 // portal verifications won't show up in statement
826 // logging, but there seems to be nothing else we can do,
827 // because we need access to the portal to begin logging.
828 //
829 // Another option would be to log a begin and end event, but just fill in NULLs
830 // for everything we get from the portal (prepared statement id, params).
831 let extra = outer_context.unwrap_or_else(Default::default);
832 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
833 return ctx.retire(Err(err));
834 }
835
836 // The reference to `portal` can't outlive `session`, which we
837 // use to construct the context, so scope the reference to this block where we
838 // get everything we need from the portal for later.
839 let (stmt, ctx, params) = {
840 let portal = session
841 .get_portal_unverified(&portal_name)
842 .expect("known to exist");
843 let params = portal.parameters.clone();
844 let stmt = portal.stmt.clone();
845 let logging = Arc::clone(&portal.logging);
846 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
847
848 let extra = if let Some(extra) = outer_context {
849 // We are executing in the context of another SQL statement, so we don't
850 // want to begin statement logging anew. The context of the actual statement
851 // being executed is the one that should be retired once this finishes.
852 extra
853 } else {
854 // This is a new statement, log it and return the context
855 let maybe_uuid = self.begin_statement_execution(
856 &mut session,
857 ¶ms,
858 &logging,
859 lifecycle_timestamps,
860 );
861
862 ExecuteContextExtra::new(maybe_uuid)
863 };
864 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
865 (stmt, ctx, params)
866 };
867
868 let stmt = match stmt {
869 Some(stmt) => stmt,
870 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
871 };
872
873 let session_type = metrics::session_type_label_value(ctx.session().user());
874 let stmt_type = metrics::statement_type_label_value(&stmt);
875 self.metrics
876 .query_total
877 .with_label_values(&[session_type, stmt_type])
878 .inc();
879 match &*stmt {
880 Statement::Subscribe(SubscribeStatement { output, .. })
881 | Statement::Copy(CopyStatement {
882 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
883 ..
884 }) => {
885 self.metrics
886 .subscribe_outputs
887 .with_label_values(&[
888 session_type,
889 metrics::subscribe_output_label_value(output),
890 ])
891 .inc();
892 }
893 _ => {}
894 }
895
896 self.handle_execute_inner(stmt, params, ctx).await
897 }
898
899 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
900 pub(crate) async fn handle_execute_inner(
901 &mut self,
902 stmt: Arc<Statement<Raw>>,
903 params: Params,
904 mut ctx: ExecuteContext,
905 ) {
906 // This comment describes the various ways DDL can execute (the ordered operations: name
907 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
908 // three notable properties that all partially interact.
909 //
910 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
911 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
912 // We announce success of the single DDL when it is executed, but do not attempt to plan
913 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
914 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
915 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
916 // work. When the single DDL is announced as successful we also put the session's
917 // transaction ops into `SingleStatement` which will produce an error if any other
918 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
919 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
920 // statement.
921 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
922 // any number of only these DDL statements to be executed in a transaction. At sequencing
923 // these generate the `Op::TransactionDryRun` catalog op. When applied with
924 // `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
925 // `catalog_transact_with_ddl_transaction` function intercepts that error and reports
926 // success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
927 // of the ops but without dry run are applied. The purpose of this is to allow multiple,
928 // atomic renames in the same transaction.
929 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
930 // makes network calls (interfacing with secrets, optimization of views/indexes, source
931 // purification). These must guarantee correctness when they return to the main
932 // coordinator thread because the catalog state could have changed while they were doing
933 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
934 // of IDs that we needed to exist. We discovered the way we were doing that was not
935 // always correct. Instead of attempting to get that completely right, we have opted to
936 // serialize DDL. Getting this right is difficult because catalog changes can affect name
937 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
938 // aware of all possible catalog changes that would affect any of those parts is not
939 // something our current code has been designed to be helpful at. Even if a DDL statement
940 // is doing off-thread work, another DDL must not yet execute at all. Executing these
941 // serially will guarantee that no off-thread work has affected the state of the catalog.
942 // This is done by adding a VecDeque of deferred statements and a lock to the
943 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
944 // transaction ops are needed to the session as described above), it attempts to own the
945 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
946 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
947 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
948 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
949 // awaits dequeuing caused by the lock being released.
950
951 // Verify that this statement type can be executed in the current
952 // transaction state.
953 match ctx.session().transaction() {
954 // By this point we should be in a running transaction.
955 TransactionStatus::Default => unreachable!(),
956
957 // Failed transactions have already been checked in pgwire for a safe statement
958 // (COMMIT, ROLLBACK, etc.) and can proceed.
959 TransactionStatus::Failed(_) => {}
960
961 // Started is a deceptive name, and means different things depending on which
962 // protocol was used. It's either exactly one statement (known because this
963 // is the simple protocol and the parser parsed the entire string, and it had
964 // one statement). Or from the extended protocol, it means *some* query is
965 // being executed, but there might be others after it before the Sync (commit)
966 // message. Postgres handles this by teaching Started to eagerly commit certain
967 // statements that can't be run in a transaction block.
968 TransactionStatus::Started(_) => {
969 if let Statement::Declare(_) = &*stmt {
970 // Declare is an exception. Although it's not against any spec to execute
971 // it, it will always result in nothing happening, since all portals will be
972 // immediately closed. Users don't know this detail, so this error helps them
973 // understand what's going wrong. Postgres does this too.
974 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
975 "DECLARE CURSOR".into(),
976 )));
977 }
978 }
979
980 // Implicit or explicit transactions.
981 //
982 // Implicit transactions happen when a multi-statement query is executed
983 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
984 // then the existing implicit transaction will be upgraded to an explicit
985 // transaction. Thus, we should not separate what implicit and explicit
986 // transactions can do unless there's some additional checking to make sure
987 // something disallowed in explicit transactions did not previously take place
988 // in the implicit portion.
989 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
990 match &*stmt {
991 // Statements that are safe in a transaction. We still need to verify that we
992 // don't interleave reads and writes since we can't perform those serializably.
993 Statement::Close(_)
994 | Statement::Commit(_)
995 | Statement::Copy(_)
996 | Statement::Deallocate(_)
997 | Statement::Declare(_)
998 | Statement::Discard(_)
999 | Statement::Execute(_)
1000 | Statement::ExplainPlan(_)
1001 | Statement::ExplainPushdown(_)
1002 | Statement::ExplainAnalyzeObject(_)
1003 | Statement::ExplainAnalyzeCluster(_)
1004 | Statement::ExplainTimestamp(_)
1005 | Statement::ExplainSinkSchema(_)
1006 | Statement::Fetch(_)
1007 | Statement::Prepare(_)
1008 | Statement::Rollback(_)
1009 | Statement::Select(_)
1010 | Statement::SetTransaction(_)
1011 | Statement::Show(_)
1012 | Statement::SetVariable(_)
1013 | Statement::ResetVariable(_)
1014 | Statement::StartTransaction(_)
1015 | Statement::Subscribe(_)
1016 | Statement::Raise(_) => {
1017 // Always safe.
1018 }
1019
1020 Statement::Insert(InsertStatement {
1021 source, returning, ..
1022 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1023 // Inserting from constant values statements that do not need to execute on
1024 // any cluster (no RETURNING) is always safe.
1025 }
1026
1027 // These statements must be kept in-sync with `must_serialize_ddl()`.
1028 Statement::AlterObjectRename(_)
1029 | Statement::AlterObjectSwap(_)
1030 | Statement::CreateTableFromSource(_)
1031 | Statement::CreateSource(_) => {
1032 let state = self.catalog().for_session(ctx.session()).state().clone();
1033 let revision = self.catalog().transient_revision();
1034
1035 // Initialize our transaction with a set of empty ops, or return an error
1036 // if we can't run a DDL transaction
1037 let txn_status = ctx.session_mut().transaction_mut();
1038 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1039 ops: vec![],
1040 state,
1041 revision,
1042 side_effects: vec![],
1043 }) {
1044 return ctx.retire(Err(err));
1045 }
1046 }
1047
1048 // Statements below must by run singly (in Started).
1049 Statement::AlterCluster(_)
1050 | Statement::AlterConnection(_)
1051 | Statement::AlterDefaultPrivileges(_)
1052 | Statement::AlterIndex(_)
1053 | Statement::AlterMaterializedViewApplyReplacement(_)
1054 | Statement::AlterSetCluster(_)
1055 | Statement::AlterOwner(_)
1056 | Statement::AlterRetainHistory(_)
1057 | Statement::AlterRole(_)
1058 | Statement::AlterSecret(_)
1059 | Statement::AlterSink(_)
1060 | Statement::AlterSource(_)
1061 | Statement::AlterSystemReset(_)
1062 | Statement::AlterSystemResetAll(_)
1063 | Statement::AlterSystemSet(_)
1064 | Statement::AlterTableAddColumn(_)
1065 | Statement::AlterNetworkPolicy(_)
1066 | Statement::CreateCluster(_)
1067 | Statement::CreateClusterReplica(_)
1068 | Statement::CreateConnection(_)
1069 | Statement::CreateDatabase(_)
1070 | Statement::CreateIndex(_)
1071 | Statement::CreateMaterializedView(_)
1072 | Statement::CreateContinualTask(_)
1073 | Statement::CreateRole(_)
1074 | Statement::CreateSchema(_)
1075 | Statement::CreateSecret(_)
1076 | Statement::CreateSink(_)
1077 | Statement::CreateSubsource(_)
1078 | Statement::CreateTable(_)
1079 | Statement::CreateType(_)
1080 | Statement::CreateView(_)
1081 | Statement::CreateWebhookSource(_)
1082 | Statement::CreateNetworkPolicy(_)
1083 | Statement::Delete(_)
1084 | Statement::DropObjects(_)
1085 | Statement::DropOwned(_)
1086 | Statement::GrantPrivileges(_)
1087 | Statement::GrantRole(_)
1088 | Statement::Insert(_)
1089 | Statement::ReassignOwned(_)
1090 | Statement::RevokePrivileges(_)
1091 | Statement::RevokeRole(_)
1092 | Statement::Update(_)
1093 | Statement::ValidateConnection(_)
1094 | Statement::Comment(_) => {
1095 let txn_status = ctx.session_mut().transaction_mut();
1096
1097 // If we're not in an implicit transaction and we could generate exactly one
1098 // valid ExecuteResponse, we can delay execution until commit.
1099 if !txn_status.is_implicit() {
1100 // Statements whose tag is trivial (known only from an unexecuted statement) can
1101 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1102 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1103 // delay execution until `COMMIT`.
1104 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1105 if let Err(err) = txn_status
1106 .add_ops(TransactionOps::SingleStatement { stmt, params })
1107 {
1108 ctx.retire(Err(err));
1109 return;
1110 }
1111 ctx.retire(Ok(resp));
1112 return;
1113 }
1114 }
1115
1116 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1117 stmt.to_string(),
1118 )));
1119 }
1120 }
1121 }
1122 }
1123
1124 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1125 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1126 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1127 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1128 // Coordinator::clear_transaction is correctly called when session transactions are ended
1129 // because that function will release the held lock from active_conns.
1130 if Self::must_serialize_ddl(&stmt, &ctx) {
1131 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1132 let prev = self
1133 .active_conns
1134 .get_mut(ctx.session().conn_id())
1135 .expect("connection must exist")
1136 .deferred_lock
1137 .replace(guard);
1138 assert!(
1139 prev.is_none(),
1140 "connections should have at most one lock guard"
1141 );
1142 } else {
1143 if self
1144 .active_conns
1145 .get(ctx.session().conn_id())
1146 .expect("connection must exist")
1147 .deferred_lock
1148 .is_some()
1149 {
1150 // This session *already* has the lock, and incorrectly tried to execute another
1151 // DDL while still holding the lock, violating the assumption documented above.
1152 // This is an internal error, probably in some AdapterClient user (pgwire or
1153 // http). Because the session is now in some unexpected state, return an error
1154 // which should cause the AdapterClient user to fail the transaction.
1155 // (Terminating the connection is maybe what we would prefer to do, but is not
1156 // currently a thing we can do from the coordinator: calling handle_terminate
1157 // cleans up Coordinator state for the session but doesn't inform the
1158 // AdapterClient that the session should terminate.)
1159 soft_panic_or_log!(
1160 "session {} attempted to get ddl lock while already owning it",
1161 ctx.session().conn_id()
1162 );
1163 ctx.retire(Err(AdapterError::Internal(
1164 "session attempted to get ddl lock while already owning it".to_string(),
1165 )));
1166 return;
1167 }
1168 self.serialized_ddl.push_back(DeferredPlanStatement {
1169 ctx,
1170 ps: PlanStatement::Statement { stmt, params },
1171 });
1172 return;
1173 }
1174 }
1175
1176 let catalog = self.catalog();
1177 let catalog = catalog.for_session(ctx.session());
1178 let original_stmt = Arc::clone(&stmt);
1179 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1180 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1181 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1182 Ok(resolved) => resolved,
1183 Err(e) => return ctx.retire(Err(e.into())),
1184 };
1185 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1186 // purification. This should be done back on the main thread.
1187 // We do the validation:
1188 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1189 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1190 // occurs.
1191 let (stmt, resolved_ids) = match stmt {
1192 // Various statements must be purified off the main coordinator thread of control.
1193 stmt if Self::must_spawn_purification(&stmt) => {
1194 let internal_cmd_tx = self.internal_cmd_tx.clone();
1195 let conn_id = ctx.session().conn_id().clone();
1196 let catalog = self.owned_catalog();
1197 let now = self.now();
1198 let otel_ctx = OpenTelemetryContext::obtain();
1199 let current_storage_configuration = self.controller.storage.config().clone();
1200 task::spawn(|| format!("purify:{conn_id}"), async move {
1201 let transient_revision = catalog.transient_revision();
1202 let catalog = catalog.for_session(ctx.session());
1203
1204 // Checks if the session is authorized to purify a statement. Usually
1205 // authorization is checked after planning, however purification happens before
1206 // planning, which may require the use of some connections and secrets.
1207 if let Err(e) = rbac::check_usage(
1208 &catalog,
1209 ctx.session(),
1210 &resolved_ids,
1211 &CREATE_ITEM_USAGE,
1212 ) {
1213 return ctx.retire(Err(e.into()));
1214 }
1215
1216 let (result, cluster_id) = mz_sql::pure::purify_statement(
1217 catalog,
1218 now,
1219 stmt,
1220 ¤t_storage_configuration,
1221 )
1222 .await;
1223 let result = result.map_err(|e| e.into());
1224 let dependency_ids = resolved_ids.items().copied().collect();
1225 let plan_validity = PlanValidity::new(
1226 transient_revision,
1227 dependency_ids,
1228 cluster_id,
1229 None,
1230 ctx.session().role_metadata().clone(),
1231 );
1232 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1233 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1234 PurifiedStatementReady {
1235 ctx,
1236 result,
1237 params,
1238 plan_validity,
1239 original_stmt,
1240 otel_ctx,
1241 },
1242 ));
1243 if let Err(e) = result {
1244 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1245 }
1246 });
1247 return;
1248 }
1249
1250 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1251 // automatically as part of purification
1252 Statement::CreateSubsource(_) => {
1253 ctx.retire(Err(AdapterError::Unsupported(
1254 "CREATE SUBSOURCE statements",
1255 )));
1256 return;
1257 }
1258
1259 Statement::CreateMaterializedView(mut cmvs) => {
1260 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1261 // only used for storing initial frontiers in the catalog.
1262 if cmvs.as_of.is_some() {
1263 return ctx.retire(Err(AdapterError::Unsupported(
1264 "CREATE MATERIALIZED VIEW ... AS OF statements",
1265 )));
1266 }
1267
1268 let mz_now = match self
1269 .resolve_mz_now_for_create_materialized_view(
1270 &cmvs,
1271 &resolved_ids,
1272 ctx.session_mut(),
1273 true,
1274 )
1275 .await
1276 {
1277 Ok(mz_now) => mz_now,
1278 Err(e) => return ctx.retire(Err(e)),
1279 };
1280
1281 let catalog = self.catalog().for_session(ctx.session());
1282
1283 purify_create_materialized_view_options(
1284 catalog,
1285 mz_now,
1286 &mut cmvs,
1287 &mut resolved_ids,
1288 );
1289
1290 let purified_stmt =
1291 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1292 if_exists: cmvs.if_exists,
1293 name: cmvs.name,
1294 columns: cmvs.columns,
1295 replacing: cmvs.replacing,
1296 in_cluster: cmvs.in_cluster,
1297 query: cmvs.query,
1298 with_options: cmvs.with_options,
1299 as_of: None,
1300 });
1301
1302 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1303 // `Message::PurifiedStatementReady` here.)
1304 (purified_stmt, resolved_ids)
1305 }
1306
1307 Statement::ExplainPlan(ExplainPlanStatement {
1308 stage,
1309 with_options,
1310 format,
1311 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1312 }) => {
1313 let mut cmvs = *box_cmvs;
1314 let mz_now = match self
1315 .resolve_mz_now_for_create_materialized_view(
1316 &cmvs,
1317 &resolved_ids,
1318 ctx.session_mut(),
1319 false,
1320 )
1321 .await
1322 {
1323 Ok(mz_now) => mz_now,
1324 Err(e) => return ctx.retire(Err(e)),
1325 };
1326
1327 let catalog = self.catalog().for_session(ctx.session());
1328
1329 purify_create_materialized_view_options(
1330 catalog,
1331 mz_now,
1332 &mut cmvs,
1333 &mut resolved_ids,
1334 );
1335
1336 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1337 stage,
1338 with_options,
1339 format,
1340 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1341 });
1342
1343 (purified_stmt, resolved_ids)
1344 }
1345
1346 // All other statements are handled immediately.
1347 _ => (stmt, resolved_ids),
1348 };
1349
1350 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1351 Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1352 Err(e) => ctx.retire(Err(e)),
1353 }
1354 }
1355
1356 /// Whether the statement must be serialized and is DDL.
1357 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1358 // Non-DDL is not serialized here.
1359 if !StatementClassification::from(&*stmt).is_ddl() {
1360 return false;
1361 }
1362 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1363 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1364 // those checks are sufficient.
1365 if Self::must_spawn_purification(stmt) {
1366 return false;
1367 }
1368
1369 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1370 // Their operations are serialized when applied to the catalog, guaranteeing that any
1371 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1372 if ctx.session.transaction().is_ddl() {
1373 return false;
1374 }
1375
1376 // Some DDL is exempt. It is not great that we are matching on Statements here because
1377 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1378 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1379 // planned in the first place, so we accept the abstraction leak.
1380 match stmt {
1381 // Secrets have a small and understood set of dependencies, and their off-thread work
1382 // interacts with k8s.
1383 Statement::AlterSecret(_) => false,
1384 Statement::CreateSecret(_) => false,
1385 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1386 if actions
1387 .iter()
1388 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1389 {
1390 false
1391 }
1392
1393 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1394 // does not affect its catalog names or ids and so is safe to not serialize. This could
1395 // change the set of replicas that exist. For queries that name replicas or use the
1396 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1397 // ensure that those replicas exist during the query finish stage. Additionally, that
1398 // work can take hours (configured by the user), so would also be a bad experience for
1399 // users.
1400 Statement::AlterCluster(_) => false,
1401
1402 // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1403 // cutover. If the old collection is stalled, it may block forever. Checks in
1404 // sequencing ensure that the operation fails if any one of these happens concurrently:
1405 // * the sink is dropped
1406 // * the new source relation is dropped
1407 // * another `ALTER SINK` for the same sink is applied first
1408 Statement::AlterSink(stmt)
1409 if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1410 {
1411 false
1412 }
1413
1414 // Everything else must be serialized.
1415 _ => true,
1416 }
1417 }
1418
1419 /// Whether the statement must be purified off of the Coordinator thread.
1420 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1421 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1422 // coordinator thread.
1423 if !matches!(
1424 stmt,
1425 Statement::CreateSource(_)
1426 | Statement::AlterSource(_)
1427 | Statement::CreateSink(_)
1428 | Statement::CreateTableFromSource(_)
1429 ) {
1430 return false;
1431 }
1432
1433 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1434 if let Statement::AlterSource(stmt) = stmt {
1435 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1436 AlterSourceAction::SetOptions(options) => {
1437 options.iter().map(|o| o.name.clone()).collect()
1438 }
1439 AlterSourceAction::ResetOptions(names) => names.clone(),
1440 _ => vec![],
1441 };
1442 if !names.is_empty()
1443 && names
1444 .iter()
1445 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1446 {
1447 return false;
1448 }
1449 }
1450
1451 true
1452 }
1453
1454 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1455 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1456 /// option, this function grabs read holds at the earliest possible time on input collections
1457 /// that might be involved in the MV.
1458 ///
1459 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1460 /// in `with_options`).
1461 ///
1462 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1463 /// unfortunately.)
1464 async fn resolve_mz_now_for_create_materialized_view(
1465 &mut self,
1466 cmvs: &CreateMaterializedViewStatement<Aug>,
1467 resolved_ids: &ResolvedIds,
1468 session: &Session,
1469 acquire_read_holds: bool,
1470 ) -> Result<Option<Timestamp>, AdapterError> {
1471 if cmvs
1472 .with_options
1473 .iter()
1474 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1475 {
1476 let catalog = self.catalog().for_session(session);
1477 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1478 let ids = self
1479 .index_oracle(cluster)
1480 .sufficient_collections(resolved_ids.collections().copied());
1481
1482 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1483 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1484 // we want to check the AT times against the read holds that we acquire here. But
1485 // we do it for any REFRESH option, to avoid having so many code paths doing different
1486 // things.)
1487 //
1488 // It's important that we acquire read holds _before_ we determine the least valid read.
1489 // Otherwise, we're not guaranteed that the since frontier doesn't
1490 // advance forward from underneath us.
1491 let read_holds = self.acquire_read_holds(&ids);
1492
1493 // Does `mz_now()` occur?
1494 let mz_now_ts = if cmvs
1495 .with_options
1496 .iter()
1497 .any(materialized_view_option_contains_temporal)
1498 {
1499 let timeline_context = self
1500 .catalog()
1501 .validate_timeline_context(resolved_ids.collections().copied())?;
1502
1503 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1504 // but even in the TimestampIndependent case.
1505 // Note that we didn't accurately decide whether we are TimestampDependent
1506 // or TimestampIndependent, because for this we'd need to also check whether
1507 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1508 // However, this doesn't matter here, as we are just going to default to
1509 // EpochMilliseconds in both cases.
1510 let timeline = timeline_context
1511 .timeline()
1512 .unwrap_or(&Timeline::EpochMilliseconds);
1513
1514 // Let's start with the timestamp oracle read timestamp.
1515 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1516
1517 // If `least_valid_read` is later than the oracle, then advance to that time.
1518 // If we didn't do this, then there would be a danger of missing the first refresh,
1519 // which might cause the materialized view to be unreadable for hours. This might
1520 // be what was happening here:
1521 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1522 //
1523 // In the long term, it would be good to actually block the MV creation statement
1524 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1525 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1526 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1527 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1528 // VIEW statement returned.
1529 let oracle_timestamp = timestamp;
1530 let least_valid_read = read_holds.least_valid_read();
1531 timestamp.advance_by(least_valid_read.borrow());
1532
1533 if oracle_timestamp != timestamp {
1534 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1535 }
1536
1537 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1538 Ok(Some(timestamp))
1539 } else {
1540 Ok(None)
1541 };
1542
1543 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1544 // released when we don't use it.
1545 if acquire_read_holds {
1546 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1547 }
1548
1549 mz_now_ts
1550 } else {
1551 Ok(None)
1552 }
1553 }
1554
1555 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1556 /// the named `conn_id` if the correct secret key is specified.
1557 ///
1558 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1559 /// `ConnectionId` because this method gets called by external clients when
1560 /// they request to cancel a request.
1561 #[mz_ore::instrument(level = "debug")]
1562 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1563 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1564 // If the secret key specified by the client doesn't match the
1565 // actual secret key for the target connection, we treat this as a
1566 // rogue cancellation request and ignore it.
1567 if conn_meta.secret_key != secret_key {
1568 return;
1569 }
1570
1571 // Now that we've verified the secret key, this is a privileged
1572 // cancellation request. We can upgrade the raw connection ID to a
1573 // proper `IdHandle`.
1574 self.handle_privileged_cancel(id_handle.clone()).await;
1575 }
1576 }
1577
1578 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1579 /// interactive work for the named `conn_id`.
1580 #[mz_ore::instrument(level = "debug")]
1581 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1582 let mut maybe_ctx = None;
1583
1584 // Cancel pending writes. There is at most one pending write per session.
1585 if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1586 matches!(pending_write_txn, PendingWriteTxn::User {
1587 pending_txn: PendingTxn { ctx, .. },
1588 ..
1589 } if *ctx.session().conn_id() == conn_id)
1590 }) {
1591 if let PendingWriteTxn::User {
1592 pending_txn: PendingTxn { ctx, .. },
1593 ..
1594 } = self.pending_writes.remove(idx)
1595 {
1596 maybe_ctx = Some(ctx);
1597 }
1598 }
1599
1600 // Cancel deferred writes.
1601 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1602 maybe_ctx = Some(write_op.into_ctx());
1603 }
1604
1605 // Cancel deferred statements.
1606 if let Some(idx) = self
1607 .serialized_ddl
1608 .iter()
1609 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1610 {
1611 let deferred = self
1612 .serialized_ddl
1613 .remove(idx)
1614 .expect("known to exist from call to `position` above");
1615 maybe_ctx = Some(deferred.ctx);
1616 }
1617
1618 // Cancel reads waiting on being linearized. There is at most one linearized read per
1619 // session.
1620 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1621 let ctx = pending_read_txn.take_context();
1622 maybe_ctx = Some(ctx);
1623 }
1624
1625 if let Some(ctx) = maybe_ctx {
1626 ctx.retire(Err(AdapterError::Canceled));
1627 }
1628
1629 self.cancel_pending_peeks(&conn_id);
1630 self.cancel_pending_watchsets(&conn_id);
1631 self.cancel_compute_sinks_for_conn(&conn_id).await;
1632 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1633 .await;
1634 self.cancel_pending_copy(&conn_id);
1635 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1636 let _ = tx.send(true);
1637 }
1638 }
1639
1640 /// Handle termination of a client session.
1641 ///
1642 /// This cleans up any state in the coordinator associated with the session.
1643 #[mz_ore::instrument(level = "debug")]
1644 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1645 if !self.active_conns.contains_key(&conn_id) {
1646 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1647 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1648 // debug. This panic is very infrequent so we want as much information as possible.
1649 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1650 panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1651 }
1652
1653 // We do not need to call clear_transaction here because there are no side effects to run
1654 // based on any session transaction state.
1655 self.clear_connection(&conn_id).await;
1656
1657 self.drop_temp_items(&conn_id).await;
1658 self.catalog_mut()
1659 .drop_temporary_schema(&conn_id)
1660 .unwrap_or_terminate("unable to drop temporary schema");
1661 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1662 let session_type = metrics::session_type_label_value(conn.user());
1663 self.metrics
1664 .active_sessions
1665 .with_label_values(&[session_type])
1666 .dec();
1667 self.cancel_pending_peeks(conn.conn_id());
1668 self.cancel_pending_watchsets(&conn_id);
1669 self.cancel_pending_copy(&conn_id);
1670 self.end_session_for_statement_logging(conn.uuid());
1671
1672 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1673 // this to prevent blocking the Coordinator in the case that a lot of connections are
1674 // closed at once, which occurs regularly in some workflows.
1675 let update = self
1676 .catalog()
1677 .state()
1678 .pack_session_update(&conn, Diff::MINUS_ONE);
1679 let update = self.catalog().state().resolve_builtin_table_update(update);
1680
1681 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1682 }
1683
1684 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1685 /// rows.
1686 #[mz_ore::instrument(level = "debug")]
1687 fn handle_get_webhook(
1688 &mut self,
1689 database: String,
1690 schema: String,
1691 name: String,
1692 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1693 ) {
1694 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1695 ///
1696 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1697 /// types we should cast the request to.
1698 fn resolve(
1699 coord: &mut Coordinator,
1700 database: String,
1701 schema: String,
1702 name: String,
1703 ) -> Result<AppendWebhookResponse, PartialItemName> {
1704 // Resolve our collection.
1705 let name = PartialItemName {
1706 database: Some(database),
1707 schema: Some(schema),
1708 item: name,
1709 };
1710 let Ok(entry) = coord
1711 .catalog()
1712 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1713 else {
1714 return Err(name);
1715 };
1716
1717 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1718 let (data_source, desc, global_id) = match entry.item() {
1719 CatalogItem::Source(Source {
1720 data_source: data_source @ DataSourceDesc::Webhook { .. },
1721 desc,
1722 global_id,
1723 ..
1724 }) => (data_source, desc.clone(), *global_id),
1725 CatalogItem::Table(
1726 table @ Table {
1727 desc,
1728 data_source:
1729 TableDataSource::DataSource {
1730 desc: data_source @ DataSourceDesc::Webhook { .. },
1731 ..
1732 },
1733 ..
1734 },
1735 ) => (data_source, desc.latest(), table.global_id_writes()),
1736 _ => return Err(name),
1737 };
1738
1739 let DataSourceDesc::Webhook {
1740 validate_using,
1741 body_format,
1742 headers,
1743 ..
1744 } = data_source
1745 else {
1746 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1747 return Err(name);
1748 };
1749 let body_format = body_format.clone();
1750 let header_tys = headers.clone();
1751
1752 // Assert we have one column for the body, and how ever many are required for
1753 // the headers.
1754 let num_columns = headers.num_columns() + 1;
1755 mz_ore::soft_assert_or_log!(
1756 desc.arity() <= num_columns,
1757 "expected at most {} columns, but got {}",
1758 num_columns,
1759 desc.arity()
1760 );
1761
1762 // Double check that the body column of the webhook source matches the type
1763 // we're about to deserialize as.
1764 let body_column = desc
1765 .get_by_name(&"body".into())
1766 .map(|(_idx, ty)| ty.clone())
1767 .ok_or_else(|| name.clone())?;
1768 assert!(!body_column.nullable, "webhook body column is nullable!?");
1769 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1770
1771 // Create a validator that can be called to validate a webhook request.
1772 let validator = validate_using.as_ref().map(|v| {
1773 let validation = v.clone();
1774 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1775 });
1776
1777 // Get a channel so we can queue updates to be written.
1778 let row_tx = coord
1779 .controller
1780 .storage
1781 .monotonic_appender(global_id)
1782 .map_err(|_| name.clone())?;
1783 let stats = coord
1784 .controller
1785 .storage
1786 .webhook_statistics(global_id)
1787 .map_err(|_| name)?;
1788 let invalidator = coord
1789 .active_webhooks
1790 .entry(entry.id())
1791 .or_insert_with(WebhookAppenderInvalidator::new);
1792 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1793
1794 Ok(AppendWebhookResponse {
1795 tx,
1796 body_format,
1797 header_tys,
1798 validator,
1799 })
1800 }
1801
1802 let response = resolve(self, database, schema, name).map_err(|name| {
1803 AppendWebhookError::UnknownWebhook {
1804 database: name.database.expect("provided"),
1805 schema: name.schema.expect("provided"),
1806 name: name.item,
1807 }
1808 });
1809 let _ = tx.send(response);
1810 }
1811}