mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34 AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
35 ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
36 SubscribeStatement,
37};
38use mz_sql::catalog::RoleAttributesRaw;
39use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
40use mz_sql::plan::{
41 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
42 StatementClassification, TransactionType,
43};
44use mz_sql::pure::{
45 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
46};
47use mz_sql::rbac;
48use mz_sql::rbac::CREATE_ITEM_USAGE;
49use mz_sql::session::user::User;
50use mz_sql::session::vars::{
51 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
52};
53use mz_sql_parser::ast::display::AstDisplay;
54use mz_sql_parser::ast::{
55 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
56 WithOptionValue,
57};
58use mz_storage_types::sources::Timeline;
59use opentelemetry::trace::TraceContextExt;
60use tokio::sync::{mpsc, oneshot};
61use tracing::{Instrument, debug_span, info, warn};
62use tracing_opentelemetry::OpenTelemetrySpanExt;
63
64use crate::command::{
65 AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse,
66 SASLVerifyProofResponse, StartupResponse,
67};
68use crate::coord::appends::PendingWriteTxn;
69use crate::coord::{
70 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
71 PurifiedStatementReady, validate_ip_with_policy_rules,
72};
73use crate::error::{AdapterError, AuthenticationError};
74use crate::notice::AdapterNotice;
75use crate::session::{Session, TransactionOps, TransactionStatus};
76use crate::util::{ClientTransmitter, ResultExt};
77use crate::webhook::{
78 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
79};
80use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
81
82use super::ExecuteContextExtra;
83
84impl Coordinator {
85 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
86 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
87 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
88 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
89 async move {
90 if let Some(session) = cmd.session_mut() {
91 session.apply_external_metadata_updates();
92 }
93 match cmd {
94 Command::Startup {
95 tx,
96 user,
97 conn_id,
98 secret_key,
99 uuid,
100 client_ip,
101 application_name,
102 notice_tx,
103 } => {
104 // Note: We purposefully do not use a ClientTransmitter here because startup
105 // handles errors and cleanup of sessions itself.
106 self.handle_startup(
107 tx,
108 user,
109 conn_id,
110 secret_key,
111 uuid,
112 client_ip,
113 application_name,
114 notice_tx,
115 )
116 .await;
117 }
118
119 Command::AuthenticatePassword {
120 tx,
121 role_name,
122 password,
123 } => {
124 self.handle_authenticate_password(tx, role_name, password)
125 .await;
126 }
127
128 Command::AuthenticateGetSASLChallenge {
129 tx,
130 role_name,
131 nonce,
132 } => {
133 self.handle_generate_sasl_challenge(tx, role_name, nonce)
134 .await;
135 }
136
137 Command::AuthenticateVerifySASLProof {
138 tx,
139 role_name,
140 proof,
141 mock_hash,
142 auth_message,
143 } => {
144 self.handle_authenticate_verify_sasl_proof(
145 tx,
146 role_name,
147 proof,
148 auth_message,
149 mock_hash,
150 );
151 }
152
153 Command::Execute {
154 portal_name,
155 session,
156 tx,
157 outer_ctx_extra,
158 } => {
159 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
160
161 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
162 .await;
163 }
164
165 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
166
167 Command::CancelRequest {
168 conn_id,
169 secret_key,
170 } => {
171 self.handle_cancel(conn_id, secret_key).await;
172 }
173
174 Command::PrivilegedCancelRequest { conn_id } => {
175 self.handle_privileged_cancel(conn_id).await;
176 }
177
178 Command::GetWebhook {
179 database,
180 schema,
181 name,
182 tx,
183 } => {
184 self.handle_get_webhook(database, schema, name, tx);
185 }
186
187 Command::GetSystemVars { tx } => {
188 let _ = tx.send(self.catalog.system_config().clone());
189 }
190
191 Command::SetSystemVars { vars, conn_id, tx } => {
192 let mut ops = Vec::with_capacity(vars.len());
193 let conn = &self.active_conns[&conn_id];
194
195 for (name, value) in vars {
196 if let Err(e) =
197 self.catalog().system_config().get(&name).and_then(|var| {
198 var.visible(conn.user(), self.catalog.system_config())
199 })
200 {
201 let _ = tx.send(Err(e.into()));
202 return;
203 }
204
205 ops.push(catalog::Op::UpdateSystemConfiguration {
206 name,
207 value: OwnedVarInput::Flat(value),
208 });
209 }
210
211 let result = self.catalog_transact_conn(Some(&conn_id), ops).await;
212 let _ = tx.send(result);
213 }
214
215 Command::Terminate { conn_id, tx } => {
216 self.handle_terminate(conn_id).await;
217 // Note: We purposefully do not use a ClientTransmitter here because we're already
218 // terminating the provided session.
219 if let Some(tx) = tx {
220 let _ = tx.send(Ok(()));
221 }
222 }
223
224 Command::Commit {
225 action,
226 session,
227 tx,
228 } => {
229 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
230 // We reach here not through a statement execution, but from the
231 // "commit" pgwire command. Thus, we just generate a default statement
232 // execution context (once statement logging is implemented, this will cause nothing to be logged
233 // when the execution finishes.)
234 let ctx = ExecuteContext::from_parts(
235 tx,
236 self.internal_cmd_tx.clone(),
237 session,
238 Default::default(),
239 );
240 let plan = match action {
241 EndTransactionAction::Commit => {
242 Plan::CommitTransaction(CommitTransactionPlan {
243 transaction_type: TransactionType::Implicit,
244 })
245 }
246 EndTransactionAction::Rollback => {
247 Plan::AbortTransaction(AbortTransactionPlan {
248 transaction_type: TransactionType::Implicit,
249 })
250 }
251 };
252
253 let conn_id = ctx.session().conn_id().clone();
254 self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
255 // Part of the Command::Commit contract is that the Coordinator guarantees that
256 // it has cleared its transaction state for the connection.
257 self.clear_connection(&conn_id).await;
258 }
259
260 Command::CatalogSnapshot { tx } => {
261 let _ = tx.send(CatalogSnapshot {
262 catalog: self.owned_catalog(),
263 });
264 }
265
266 Command::CheckConsistency { tx } => {
267 let _ = tx.send(self.check_consistency());
268 }
269
270 Command::Dump { tx } => {
271 let _ = tx.send(self.dump().await);
272 }
273 }
274 }
275 .instrument(debug_span!("handle_command"))
276 .boxed_local()
277 }
278
279 fn handle_authenticate_verify_sasl_proof(
280 &self,
281 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
282 role_name: String,
283 proof: String,
284 auth_message: String,
285 mock_hash: String,
286 ) {
287 let role = self.catalog().try_get_role_by_name(role_name.as_str());
288 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
289
290 let login = role
291 .as_ref()
292 .map(|r| r.attributes.login.unwrap_or(false))
293 .unwrap_or(false);
294
295 let real_hash = role_auth
296 .as_ref()
297 .and_then(|auth| auth.password_hash.as_ref());
298 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
299
300 let role_present = role.is_some();
301 let make_auth_err = |role_present: bool, login: bool| {
302 AdapterError::AuthenticationError(if role_present && !login {
303 AuthenticationError::NonLogin
304 } else {
305 AuthenticationError::InvalidCredentials
306 })
307 };
308
309 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
310 Ok(verifier) => {
311 // Success only if role exists, allows login, and a real password hash was used.
312 if login && real_hash.is_some() {
313 let role = role.expect("login implies role exists");
314 let _ = tx.send(Ok(SASLVerifyProofResponse {
315 verifier,
316 auth_resp: AuthResponse {
317 role_id: role.id,
318 superuser: role.attributes.superuser.unwrap_or(false),
319 },
320 }));
321 } else {
322 let _ = tx.send(Err(make_auth_err(role_present, login)));
323 }
324 }
325 Err(_) => {
326 let _ = tx.send(Err(AdapterError::AuthenticationError(
327 AuthenticationError::InvalidCredentials,
328 )));
329 }
330 }
331 }
332
333 #[mz_ore::instrument(level = "debug")]
334 async fn handle_generate_sasl_challenge(
335 &mut self,
336 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
337 role_name: String,
338 client_nonce: String,
339 ) {
340 let role_auth = self
341 .catalog()
342 .try_get_role_by_name(&role_name)
343 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
344
345 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
346 Ok(n) => n,
347 Err(e) => {
348 let msg = format!(
349 "failed to generate nonce for client nonce {}: {}",
350 client_nonce, e
351 );
352 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
353 soft_panic_or_log!("{msg}");
354 return;
355 }
356 };
357
358 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
359 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
360 // with the role name to get a per-role mock nonce.
361 let send_mock_challenge =
362 |role_name: String,
363 mock_nonce: String,
364 nonce: String,
365 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
366 let opts = mz_auth::hash::mock_sasl_challenge(&role_name, &mock_nonce);
367 let _ = tx.send(Ok(SASLChallengeResponse {
368 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
369 salt: BASE64_STANDARD.encode(opts.salt),
370 nonce,
371 }));
372 };
373
374 match role_auth {
375 Some(auth) if auth.password_hash.is_some() => {
376 let hash = auth.password_hash.as_ref().expect("checked above");
377 match mz_auth::hash::scram256_parse_opts(hash) {
378 Ok(opts) => {
379 let _ = tx.send(Ok(SASLChallengeResponse {
380 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
381 salt: BASE64_STANDARD.encode(opts.salt),
382 nonce,
383 }));
384 }
385 Err(_) => {
386 send_mock_challenge(
387 role_name,
388 self.catalog().state().mock_authentication_nonce(),
389 nonce,
390 tx,
391 );
392 }
393 }
394 }
395 _ => {
396 send_mock_challenge(
397 role_name,
398 self.catalog().state().mock_authentication_nonce(),
399 nonce,
400 tx,
401 );
402 }
403 }
404 }
405
406 #[mz_ore::instrument(level = "debug")]
407 async fn handle_authenticate_password(
408 &mut self,
409 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
410 role_name: String,
411 password: Option<Password>,
412 ) {
413 let Some(password) = password else {
414 // The user did not provide a password.
415 let _ = tx.send(Err(AdapterError::AuthenticationError(
416 AuthenticationError::PasswordRequired,
417 )));
418 return;
419 };
420
421 if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
422 if !role.attributes.login.unwrap_or(false) {
423 // The user is not allowed to login.
424 let _ = tx.send(Err(AdapterError::AuthenticationError(
425 AuthenticationError::NonLogin,
426 )));
427 return;
428 }
429 if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
430 if let Some(hash) = &auth.password_hash {
431 let _ = match mz_auth::hash::scram256_verify(&password, hash) {
432 Ok(_) => tx.send(Ok(AuthResponse {
433 role_id: role.id,
434 superuser: role.attributes.superuser.unwrap_or(false),
435 })),
436 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
437 AuthenticationError::InvalidCredentials,
438 ))),
439 };
440 return;
441 }
442 }
443 // Authentication failed due to incorrect password or missing password hash.
444 let _ = tx.send(Err(AdapterError::AuthenticationError(
445 AuthenticationError::InvalidCredentials,
446 )));
447 } else {
448 // The user does not exist.
449 let _ = tx.send(Err(AdapterError::AuthenticationError(
450 AuthenticationError::RoleNotFound,
451 )));
452 }
453 }
454
455 #[mz_ore::instrument(level = "debug")]
456 async fn handle_startup(
457 &mut self,
458 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
459 user: User,
460 conn_id: ConnectionId,
461 secret_key: u32,
462 uuid: uuid::Uuid,
463 client_ip: Option<IpAddr>,
464 application_name: String,
465 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
466 ) {
467 // Early return if successful, otherwise cleanup any possible state.
468 match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
469 Ok((role_id, session_defaults)) => {
470 let session_type = metrics::session_type_label_value(&user);
471 self.metrics
472 .active_sessions
473 .with_label_values(&[session_type])
474 .inc();
475 let conn = ConnMeta {
476 secret_key,
477 notice_tx,
478 drop_sinks: BTreeSet::new(),
479 pending_cluster_alters: BTreeSet::new(),
480 connected_at: self.now(),
481 user,
482 application_name,
483 uuid,
484 client_ip,
485 conn_id: conn_id.clone(),
486 authenticated_role: role_id,
487 deferred_lock: None,
488 };
489 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
490 let update = self.catalog().state().resolve_builtin_table_update(update);
491 self.begin_session_for_statement_logging(&conn);
492 self.active_conns.insert(conn_id.clone(), conn);
493
494 // Note: Do NOT await the notify here, we pass this back to
495 // whatever requested the startup to prevent blocking startup
496 // and the Coordinator on a builtin table update.
497 let updates = vec![update];
498 // It's not a hard error if our list is missing a builtin table, but we want to
499 // make sure these two things stay in-sync.
500 if mz_ore::assert::soft_assertions_enabled() {
501 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
502 .iter()
503 .map(|table| self.catalog().resolve_builtin_table(*table))
504 .collect();
505 let updates_tracked = updates
506 .iter()
507 .all(|update| required_tables.contains(&update.id));
508 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
509 .iter()
510 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
511 mz_ore::soft_assert_or_log!(
512 updates_tracked,
513 "not tracking all required builtin table updates!"
514 );
515 // TODO(parkmycar): When checking if a query depends on these builtin table
516 // writes we do not check the transitive dependencies of the query, because
517 // we don't support creating views on mz_internal objects. If one of these
518 // tables is promoted out of mz_internal then we'll need to add this check.
519 mz_ore::soft_assert_or_log!(
520 all_mz_internal,
521 "not all builtin tables are in mz_internal! need to check transitive depends",
522 )
523 }
524 let notify = self.builtin_table_update().background(updates);
525
526 let resp = Ok(StartupResponse {
527 role_id,
528 write_notify: notify,
529 session_defaults,
530 catalog: self.owned_catalog(),
531 });
532 if tx.send(resp).is_err() {
533 // Failed to send to adapter, but everything is setup so we can terminate
534 // normally.
535 self.handle_terminate(conn_id).await;
536 }
537 }
538 Err(e) => {
539 // Error during startup or sending to adapter, cleanup possible state created by
540 // handle_startup_inner. A user may have been created and it can stay; no need to
541 // delete it.
542 self.catalog_mut()
543 .drop_temporary_schema(&conn_id)
544 .unwrap_or_terminate("unable to drop temporary schema");
545
546 // Communicate the error back to the client. No need to
547 // handle failures to send the error back; we've already
548 // cleaned up all necessary state.
549 let _ = tx.send(Err(e));
550 }
551 }
552 }
553
554 // Failible startup work that needs to be cleaned up on error.
555 async fn handle_startup_inner(
556 &mut self,
557 user: &User,
558 conn_id: &ConnectionId,
559 client_ip: &Option<IpAddr>,
560 ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
561 if self.catalog().try_get_role_by_name(&user.name).is_none() {
562 // If the user has made it to this point, that means they have been fully authenticated.
563 // This includes preventing any user, except a pre-defined set of system users, from
564 // connecting to an internal port. Therefore it's ok to always create a new role for the
565 // user.
566 let attributes = RoleAttributesRaw::new();
567 let plan = CreateRolePlan {
568 name: user.name.to_string(),
569 attributes,
570 };
571 self.sequence_create_role_for_startup(plan).await?;
572 }
573 let role_id = self
574 .catalog()
575 .try_get_role_by_name(&user.name)
576 .expect("created above")
577 .id;
578
579 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
580 return Err(AdapterError::UserSessionsDisallowed);
581 }
582
583 // Initialize the default session variables for this role.
584 let mut session_defaults = BTreeMap::new();
585 let system_config = self.catalog().state().system_config();
586
587 // Override the session with any system defaults.
588 session_defaults.extend(
589 system_config
590 .iter_session()
591 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
592 );
593 // Special case.
594 let statement_logging_default = system_config
595 .statement_logging_default_sample_rate()
596 .format();
597 session_defaults.insert(
598 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
599 OwnedVarInput::Flat(statement_logging_default),
600 );
601 // Override system defaults with role defaults.
602 session_defaults.extend(
603 self.catalog()
604 .get_role(&role_id)
605 .vars()
606 .map(|(name, val)| (name.to_string(), val.clone())),
607 );
608
609 // Validate network policies for external users. Internal users can only connect on the
610 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
611 // to ensure these internal interfaces are well secured.
612 //
613 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
614 // the default network policy for this role, so we read directly out of what the session
615 // will get initialized with.
616 if !user.is_internal() {
617 let network_policy_name = session_defaults
618 .get(NETWORK_POLICY.name())
619 .and_then(|value| match value {
620 OwnedVarInput::Flat(name) => Some(name.clone()),
621 OwnedVarInput::SqlSet(names) => {
622 tracing::error!(?names, "found multiple network policies");
623 None
624 }
625 })
626 .unwrap_or_else(|| system_config.default_network_policy_name());
627 let maybe_network_policy = self
628 .catalog()
629 .get_network_policy_by_name(&network_policy_name);
630
631 let Some(network_policy) = maybe_network_policy else {
632 // We should prevent dropping the default network policy, or setting the policy
633 // to something that doesn't exist, so complain loudly if this occurs.
634 tracing::error!(
635 network_policy_name,
636 "default network policy does not exist. All user traffic will be blocked"
637 );
638 let reason = match client_ip {
639 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
640 None => super::NetworkPolicyError::MissingIp,
641 };
642 return Err(AdapterError::NetworkPolicyDenied(reason));
643 };
644
645 if let Some(ip) = client_ip {
646 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
647 Ok(_) => {}
648 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
649 }
650 } else {
651 // Only temporary and internal representation of a session
652 // should be missing a client_ip. These sessions should not be
653 // making requests or going through handle_startup.
654 return Err(AdapterError::NetworkPolicyDenied(
655 super::NetworkPolicyError::MissingIp,
656 ));
657 }
658 }
659
660 self.catalog_mut()
661 .create_temporary_schema(conn_id, role_id)?;
662
663 Ok((role_id, session_defaults))
664 }
665
666 /// Handles an execute command.
667 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
668 pub(crate) async fn handle_execute(
669 &mut self,
670 portal_name: String,
671 mut session: Session,
672 tx: ClientTransmitter<ExecuteResponse>,
673 // If this command was part of another execute command
674 // (for example, executing a `FETCH` statement causes an execute to be
675 // issued for the cursor it references),
676 // then `outer_context` should be `Some`.
677 // This instructs the coordinator that the
678 // outer execute should be considered finished once the inner one is.
679 outer_context: Option<ExecuteContextExtra>,
680 ) {
681 if session.vars().emit_trace_id_notice() {
682 let span_context = tracing::Span::current()
683 .context()
684 .span()
685 .span_context()
686 .clone();
687 if span_context.is_valid() {
688 session.add_notice(AdapterNotice::QueryTrace {
689 trace_id: span_context.trace_id(),
690 });
691 }
692 }
693
694 if let Err(err) = self.verify_portal(&mut session, &portal_name) {
695 // If statement logging hasn't started yet, we don't need
696 // to add any "end" event, so just make up a no-op
697 // `ExecuteContextExtra` here, via `Default::default`.
698 //
699 // It's a bit unfortunate because the edge case of failed
700 // portal verifications won't show up in statement
701 // logging, but there seems to be nothing else we can do,
702 // because we need access to the portal to begin logging.
703 //
704 // Another option would be to log a begin and end event, but just fill in NULLs
705 // for everything we get from the portal (prepared statement id, params).
706 let extra = outer_context.unwrap_or_else(Default::default);
707 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
708 return ctx.retire(Err(err));
709 }
710
711 // The reference to `portal` can't outlive `session`, which we
712 // use to construct the context, so scope the reference to this block where we
713 // get everything we need from the portal for later.
714 let (stmt, ctx, params) = {
715 let portal = session
716 .get_portal_unverified(&portal_name)
717 .expect("known to exist");
718 let params = portal.parameters.clone();
719 let stmt = portal.stmt.clone();
720 let logging = Arc::clone(&portal.logging);
721 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
722
723 let extra = if let Some(extra) = outer_context {
724 // We are executing in the context of another SQL statement, so we don't
725 // want to begin statement logging anew. The context of the actual statement
726 // being executed is the one that should be retired once this finishes.
727 extra
728 } else {
729 // This is a new statement, log it and return the context
730 let maybe_uuid = self.begin_statement_execution(
731 &mut session,
732 ¶ms,
733 &logging,
734 lifecycle_timestamps,
735 );
736
737 ExecuteContextExtra::new(maybe_uuid)
738 };
739 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
740 (stmt, ctx, params)
741 };
742
743 let stmt = match stmt {
744 Some(stmt) => stmt,
745 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
746 };
747
748 let session_type = metrics::session_type_label_value(ctx.session().user());
749 let stmt_type = metrics::statement_type_label_value(&stmt);
750 self.metrics
751 .query_total
752 .with_label_values(&[session_type, stmt_type])
753 .inc();
754 match &*stmt {
755 Statement::Subscribe(SubscribeStatement { output, .. })
756 | Statement::Copy(CopyStatement {
757 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
758 ..
759 }) => {
760 self.metrics
761 .subscribe_outputs
762 .with_label_values(&[
763 session_type,
764 metrics::subscribe_output_label_value(output),
765 ])
766 .inc();
767 }
768 _ => {}
769 }
770
771 self.handle_execute_inner(stmt, params, ctx).await
772 }
773
774 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
775 pub(crate) async fn handle_execute_inner(
776 &mut self,
777 stmt: Arc<Statement<Raw>>,
778 params: Params,
779 mut ctx: ExecuteContext,
780 ) {
781 // This comment describes the various ways DDL can execute (the ordered operations: name
782 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
783 // three notable properties that all partially interact.
784 //
785 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
786 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
787 // We announce success of the single DDL when it is executed, but do not attempt to plan
788 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
789 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
790 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
791 // work. When the single DDL is announced as successful we also put the session's
792 // transaction ops into `SingleStatement` which will produce an error if any other
793 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
794 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
795 // statement.
796 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
797 // any number of only these DDL statements to be executed in a transaction. At sequencing
798 // these generate the `Op::TransactionDryRun` catalog op. When applied with
799 // `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
800 // `catalog_transact_with_ddl_transaction` function intercepts that error and reports
801 // success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
802 // of the ops but without dry run are applied. The purpose of this is to allow multiple,
803 // atomic renames in the same transaction.
804 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
805 // makes network calls (interfacing with secrets, optimization of views/indexes, source
806 // purification). These must guarantee correctness when they return to the main
807 // coordinator thread because the catalog state could have changed while they were doing
808 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
809 // of IDs that we needed to exist. We discovered the way we were doing that was not
810 // always correct. Instead of attempting to get that completely right, we have opted to
811 // serialize DDL. Getting this right is difficult because catalog changes can affect name
812 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
813 // aware of all possible catalog changes that would affect any of those parts is not
814 // something our current code has been designed to be helpful at. Even if a DDL statement
815 // is doing off-thread work, another DDL must not yet execute at all. Executing these
816 // serially will guarantee that no off-thread work has affected the state of the catalog.
817 // This is done by adding a VecDeque of deferred statements and a lock to the
818 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
819 // transaction ops are needed to the session as described above), it attempts to own the
820 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
821 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
822 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
823 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
824 // awaits dequeuing caused by the lock being released.
825
826 // Verify that this statement type can be executed in the current
827 // transaction state.
828 match ctx.session().transaction() {
829 // By this point we should be in a running transaction.
830 TransactionStatus::Default => unreachable!(),
831
832 // Failed transactions have already been checked in pgwire for a safe statement
833 // (COMMIT, ROLLBACK, etc.) and can proceed.
834 TransactionStatus::Failed(_) => {}
835
836 // Started is a deceptive name, and means different things depending on which
837 // protocol was used. It's either exactly one statement (known because this
838 // is the simple protocol and the parser parsed the entire string, and it had
839 // one statement). Or from the extended protocol, it means *some* query is
840 // being executed, but there might be others after it before the Sync (commit)
841 // message. Postgres handles this by teaching Started to eagerly commit certain
842 // statements that can't be run in a transaction block.
843 TransactionStatus::Started(_) => {
844 if let Statement::Declare(_) = &*stmt {
845 // Declare is an exception. Although it's not against any spec to execute
846 // it, it will always result in nothing happening, since all portals will be
847 // immediately closed. Users don't know this detail, so this error helps them
848 // understand what's going wrong. Postgres does this too.
849 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
850 "DECLARE CURSOR".into(),
851 )));
852 }
853 }
854
855 // Implicit or explicit transactions.
856 //
857 // Implicit transactions happen when a multi-statement query is executed
858 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
859 // then the existing implicit transaction will be upgraded to an explicit
860 // transaction. Thus, we should not separate what implicit and explicit
861 // transactions can do unless there's some additional checking to make sure
862 // something disallowed in explicit transactions did not previously take place
863 // in the implicit portion.
864 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
865 match &*stmt {
866 // Statements that are safe in a transaction. We still need to verify that we
867 // don't interleave reads and writes since we can't perform those serializably.
868 Statement::Close(_)
869 | Statement::Commit(_)
870 | Statement::Copy(_)
871 | Statement::Deallocate(_)
872 | Statement::Declare(_)
873 | Statement::Discard(_)
874 | Statement::Execute(_)
875 | Statement::ExplainPlan(_)
876 | Statement::ExplainPushdown(_)
877 | Statement::ExplainAnalyze(_)
878 | Statement::ExplainTimestamp(_)
879 | Statement::ExplainSinkSchema(_)
880 | Statement::Fetch(_)
881 | Statement::Prepare(_)
882 | Statement::Rollback(_)
883 | Statement::Select(_)
884 | Statement::SetTransaction(_)
885 | Statement::Show(_)
886 | Statement::SetVariable(_)
887 | Statement::ResetVariable(_)
888 | Statement::StartTransaction(_)
889 | Statement::Subscribe(_)
890 | Statement::Raise(_) => {
891 // Always safe.
892 }
893
894 Statement::Insert(InsertStatement {
895 source, returning, ..
896 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
897 // Inserting from constant values statements that do not need to execute on
898 // any cluster (no RETURNING) is always safe.
899 }
900
901 // These statements must be kept in-sync with `must_serialize_ddl()`.
902 Statement::AlterObjectRename(_)
903 | Statement::AlterObjectSwap(_)
904 | Statement::CreateTableFromSource(_)
905 | Statement::CreateSource(_) => {
906 let state = self.catalog().for_session(ctx.session()).state().clone();
907 let revision = self.catalog().transient_revision();
908
909 // Initialize our transaction with a set of empty ops, or return an error
910 // if we can't run a DDL transaction
911 let txn_status = ctx.session_mut().transaction_mut();
912 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
913 ops: vec![],
914 state,
915 revision,
916 side_effects: vec![],
917 }) {
918 return ctx.retire(Err(err));
919 }
920 }
921
922 // Statements below must by run singly (in Started).
923 Statement::AlterCluster(_)
924 | Statement::AlterConnection(_)
925 | Statement::AlterDefaultPrivileges(_)
926 | Statement::AlterIndex(_)
927 | Statement::AlterSetCluster(_)
928 | Statement::AlterOwner(_)
929 | Statement::AlterRetainHistory(_)
930 | Statement::AlterRole(_)
931 | Statement::AlterSecret(_)
932 | Statement::AlterSink(_)
933 | Statement::AlterSource(_)
934 | Statement::AlterSystemReset(_)
935 | Statement::AlterSystemResetAll(_)
936 | Statement::AlterSystemSet(_)
937 | Statement::AlterTableAddColumn(_)
938 | Statement::AlterNetworkPolicy(_)
939 | Statement::CreateCluster(_)
940 | Statement::CreateClusterReplica(_)
941 | Statement::CreateConnection(_)
942 | Statement::CreateDatabase(_)
943 | Statement::CreateIndex(_)
944 | Statement::CreateMaterializedView(_)
945 | Statement::CreateContinualTask(_)
946 | Statement::CreateRole(_)
947 | Statement::CreateSchema(_)
948 | Statement::CreateSecret(_)
949 | Statement::CreateSink(_)
950 | Statement::CreateSubsource(_)
951 | Statement::CreateTable(_)
952 | Statement::CreateType(_)
953 | Statement::CreateView(_)
954 | Statement::CreateWebhookSource(_)
955 | Statement::CreateNetworkPolicy(_)
956 | Statement::Delete(_)
957 | Statement::DropObjects(_)
958 | Statement::DropOwned(_)
959 | Statement::GrantPrivileges(_)
960 | Statement::GrantRole(_)
961 | Statement::Insert(_)
962 | Statement::ReassignOwned(_)
963 | Statement::RevokePrivileges(_)
964 | Statement::RevokeRole(_)
965 | Statement::Update(_)
966 | Statement::ValidateConnection(_)
967 | Statement::Comment(_) => {
968 let txn_status = ctx.session_mut().transaction_mut();
969
970 // If we're not in an implicit transaction and we could generate exactly one
971 // valid ExecuteResponse, we can delay execution until commit.
972 if !txn_status.is_implicit() {
973 // Statements whose tag is trivial (known only from an unexecuted statement) can
974 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
975 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
976 // delay execution until `COMMIT`.
977 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
978 if let Err(err) = txn_status
979 .add_ops(TransactionOps::SingleStatement { stmt, params })
980 {
981 ctx.retire(Err(err));
982 return;
983 }
984 ctx.retire(Ok(resp));
985 return;
986 }
987 }
988
989 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
990 stmt.to_string(),
991 )));
992 }
993 }
994 }
995 }
996
997 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
998 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
999 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1000 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1001 // Coordinator::clear_transaction is correctly called when session transactions are ended
1002 // because that function will release the held lock from active_conns.
1003 if Self::must_serialize_ddl(&stmt, &ctx) {
1004 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1005 let prev = self
1006 .active_conns
1007 .get_mut(ctx.session().conn_id())
1008 .expect("connection must exist")
1009 .deferred_lock
1010 .replace(guard);
1011 assert!(
1012 prev.is_none(),
1013 "connections should have at most one lock guard"
1014 );
1015 } else {
1016 if self
1017 .active_conns
1018 .get(ctx.session().conn_id())
1019 .expect("connection must exist")
1020 .deferred_lock
1021 .is_some()
1022 {
1023 // This session *already* has the lock, and incorrectly tried to execute another
1024 // DDL while still holding the lock, violating the assumption documented above.
1025 // This is an internal error, probably in some AdapterClient user (pgwire or
1026 // http). Because the session is now in some unexpected state, return an error
1027 // which should cause the AdapterClient user to fail the transaction.
1028 // (Terminating the connection is maybe what we would prefer to do, but is not
1029 // currently a thing we can do from the coordinator: calling handle_terminate
1030 // cleans up Coordinator state for the session but doesn't inform the
1031 // AdapterClient that the session should terminate.)
1032 soft_panic_or_log!(
1033 "session {} attempted to get ddl lock while already owning it",
1034 ctx.session().conn_id()
1035 );
1036 ctx.retire(Err(AdapterError::Internal(
1037 "session attempted to get ddl lock while already owning it".to_string(),
1038 )));
1039 return;
1040 }
1041 self.serialized_ddl.push_back(DeferredPlanStatement {
1042 ctx,
1043 ps: PlanStatement::Statement { stmt, params },
1044 });
1045 return;
1046 }
1047 }
1048
1049 let catalog = self.catalog();
1050 let catalog = catalog.for_session(ctx.session());
1051 let original_stmt = Arc::clone(&stmt);
1052 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1053 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1054 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1055 Ok(resolved) => resolved,
1056 Err(e) => return ctx.retire(Err(e.into())),
1057 };
1058 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1059 // purification. This should be done back on the main thread.
1060 // We do the validation:
1061 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1062 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1063 // occurs.
1064 let (stmt, resolved_ids) = match stmt {
1065 // Various statements must be purified off the main coordinator thread of control.
1066 stmt if Self::must_spawn_purification(&stmt) => {
1067 let internal_cmd_tx = self.internal_cmd_tx.clone();
1068 let conn_id = ctx.session().conn_id().clone();
1069 let catalog = self.owned_catalog();
1070 let now = self.now();
1071 let otel_ctx = OpenTelemetryContext::obtain();
1072 let current_storage_configuration = self.controller.storage.config().clone();
1073 task::spawn(|| format!("purify:{conn_id}"), async move {
1074 let transient_revision = catalog.transient_revision();
1075 let catalog = catalog.for_session(ctx.session());
1076
1077 // Checks if the session is authorized to purify a statement. Usually
1078 // authorization is checked after planning, however purification happens before
1079 // planning, which may require the use of some connections and secrets.
1080 if let Err(e) = rbac::check_usage(
1081 &catalog,
1082 ctx.session(),
1083 &resolved_ids,
1084 &CREATE_ITEM_USAGE,
1085 ) {
1086 return ctx.retire(Err(e.into()));
1087 }
1088
1089 let (result, cluster_id) = mz_sql::pure::purify_statement(
1090 catalog,
1091 now,
1092 stmt,
1093 ¤t_storage_configuration,
1094 )
1095 .await;
1096 let result = result.map_err(|e| e.into());
1097 let dependency_ids = resolved_ids.items().copied().collect();
1098 let plan_validity = PlanValidity::new(
1099 transient_revision,
1100 dependency_ids,
1101 cluster_id,
1102 None,
1103 ctx.session().role_metadata().clone(),
1104 );
1105 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1106 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1107 PurifiedStatementReady {
1108 ctx,
1109 result,
1110 params,
1111 plan_validity,
1112 original_stmt,
1113 otel_ctx,
1114 },
1115 ));
1116 if let Err(e) = result {
1117 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1118 }
1119 });
1120 return;
1121 }
1122
1123 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1124 // automatically as part of purification
1125 Statement::CreateSubsource(_) => {
1126 ctx.retire(Err(AdapterError::Unsupported(
1127 "CREATE SUBSOURCE statements",
1128 )));
1129 return;
1130 }
1131
1132 Statement::CreateMaterializedView(mut cmvs) => {
1133 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1134 // only used for storing initial frontiers in the catalog.
1135 if cmvs.as_of.is_some() {
1136 return ctx.retire(Err(AdapterError::Unsupported(
1137 "CREATE MATERIALIZED VIEW ... AS OF statements",
1138 )));
1139 }
1140
1141 let mz_now = match self
1142 .resolve_mz_now_for_create_materialized_view(
1143 &cmvs,
1144 &resolved_ids,
1145 ctx.session_mut(),
1146 true,
1147 )
1148 .await
1149 {
1150 Ok(mz_now) => mz_now,
1151 Err(e) => return ctx.retire(Err(e)),
1152 };
1153
1154 let owned_catalog = self.owned_catalog();
1155 let catalog = owned_catalog.for_session(ctx.session());
1156
1157 purify_create_materialized_view_options(
1158 catalog,
1159 mz_now,
1160 &mut cmvs,
1161 &mut resolved_ids,
1162 );
1163
1164 let purified_stmt =
1165 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1166 if_exists: cmvs.if_exists,
1167 name: cmvs.name,
1168 columns: cmvs.columns,
1169 in_cluster: cmvs.in_cluster,
1170 query: cmvs.query,
1171 with_options: cmvs.with_options,
1172 as_of: None,
1173 });
1174
1175 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1176 // `Message::PurifiedStatementReady` here.)
1177 (purified_stmt, resolved_ids)
1178 }
1179
1180 Statement::ExplainPlan(ExplainPlanStatement {
1181 stage,
1182 with_options,
1183 format,
1184 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1185 }) => {
1186 let mut cmvs = *box_cmvs;
1187 let mz_now = match self
1188 .resolve_mz_now_for_create_materialized_view(
1189 &cmvs,
1190 &resolved_ids,
1191 ctx.session_mut(),
1192 false,
1193 )
1194 .await
1195 {
1196 Ok(mz_now) => mz_now,
1197 Err(e) => return ctx.retire(Err(e)),
1198 };
1199
1200 let owned_catalog = self.owned_catalog();
1201 let catalog = owned_catalog.for_session(ctx.session());
1202
1203 purify_create_materialized_view_options(
1204 catalog,
1205 mz_now,
1206 &mut cmvs,
1207 &mut resolved_ids,
1208 );
1209
1210 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1211 stage,
1212 with_options,
1213 format,
1214 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1215 });
1216
1217 (purified_stmt, resolved_ids)
1218 }
1219
1220 // All other statements are handled immediately.
1221 _ => (stmt, resolved_ids),
1222 };
1223
1224 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1225 Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1226 Err(e) => ctx.retire(Err(e)),
1227 }
1228 }
1229
1230 /// Whether the statement must be serialized and is DDL.
1231 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1232 // Non-DDL is not serialized here.
1233 if !StatementClassification::from(&*stmt).is_ddl() {
1234 return false;
1235 }
1236 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1237 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1238 // those checks are sufficient.
1239 if Self::must_spawn_purification(stmt) {
1240 return false;
1241 }
1242
1243 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1244 // Their operations are serialized when applied to the catalog, guaranteeing that any
1245 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1246 if ctx.session.transaction().is_ddl() {
1247 return false;
1248 }
1249
1250 // Some DDL is exempt. It is not great that we are matching on Statements here because
1251 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1252 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1253 // planned in the first place, so we accept the abstraction leak.
1254 match stmt {
1255 // Secrets have a small and understood set of dependencies, and their off-thread work
1256 // interacts with k8s.
1257 Statement::AlterSecret(_) => false,
1258 Statement::CreateSecret(_) => false,
1259 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1260 if actions
1261 .iter()
1262 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1263 {
1264 false
1265 }
1266
1267 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1268 // does not affect its catalog names or ids and so is safe to not serialize. This could
1269 // change the set of replicas that exist. For queries that name replicas or use the
1270 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1271 // ensure that those replicas exist during the query finish stage. Additionally, that
1272 // work can take hours (configured by the user), so would also be a bad experience for
1273 // users.
1274 Statement::AlterCluster(_) => false,
1275
1276 // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1277 // cutover. If the old collection is stalled, it may block forever. Checks in
1278 // sequencing ensure that the operation fails if any one of these happens concurrently:
1279 // * the sink is dropped
1280 // * the new source relation is dropped
1281 // * another `ALTER SINK` for the same sink is applied first
1282 Statement::AlterSink(stmt)
1283 if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1284 {
1285 false
1286 }
1287
1288 // Everything else must be serialized.
1289 _ => true,
1290 }
1291 }
1292
1293 /// Whether the statement must be purified off of the Coordinator thread.
1294 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1295 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1296 // coordinator thread.
1297 if !matches!(
1298 stmt,
1299 Statement::CreateSource(_)
1300 | Statement::AlterSource(_)
1301 | Statement::CreateSink(_)
1302 | Statement::CreateTableFromSource(_)
1303 ) {
1304 return false;
1305 }
1306
1307 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1308 if let Statement::AlterSource(stmt) = stmt {
1309 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1310 AlterSourceAction::SetOptions(options) => {
1311 options.iter().map(|o| o.name.clone()).collect()
1312 }
1313 AlterSourceAction::ResetOptions(names) => names.clone(),
1314 _ => vec![],
1315 };
1316 if !names.is_empty()
1317 && names
1318 .iter()
1319 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1320 {
1321 return false;
1322 }
1323 }
1324
1325 true
1326 }
1327
1328 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1329 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1330 /// option, this function grabs read holds at the earliest possible time on input collections
1331 /// that might be involved in the MV.
1332 ///
1333 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1334 /// in `with_options`).
1335 ///
1336 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1337 /// unfortunately.)
1338 async fn resolve_mz_now_for_create_materialized_view(
1339 &mut self,
1340 cmvs: &CreateMaterializedViewStatement<Aug>,
1341 resolved_ids: &ResolvedIds,
1342 session: &Session,
1343 acquire_read_holds: bool,
1344 ) -> Result<Option<Timestamp>, AdapterError> {
1345 if cmvs
1346 .with_options
1347 .iter()
1348 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1349 {
1350 let catalog = self.catalog().for_session(session);
1351 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1352 let ids = self
1353 .index_oracle(cluster)
1354 .sufficient_collections(resolved_ids.collections().copied());
1355
1356 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1357 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1358 // we want to check the AT times against the read holds that we acquire here. But
1359 // we do it for any REFRESH option, to avoid having so many code paths doing different
1360 // things.)
1361 //
1362 // It's important that we acquire read holds _before_ we determine the least valid read.
1363 // Otherwise, we're not guaranteed that the since frontier doesn't
1364 // advance forward from underneath us.
1365 let read_holds = self.acquire_read_holds(&ids);
1366
1367 // Does `mz_now()` occur?
1368 let mz_now_ts = if cmvs
1369 .with_options
1370 .iter()
1371 .any(materialized_view_option_contains_temporal)
1372 {
1373 let timeline_context = self
1374 .catalog()
1375 .validate_timeline_context(resolved_ids.collections().copied())?;
1376
1377 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1378 // but even in the TimestampIndependent case.
1379 // Note that we didn't accurately decide whether we are TimestampDependent
1380 // or TimestampIndependent, because for this we'd need to also check whether
1381 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1382 // However, this doesn't matter here, as we are just going to default to
1383 // EpochMilliseconds in both cases.
1384 let timeline = timeline_context
1385 .timeline()
1386 .unwrap_or(&Timeline::EpochMilliseconds);
1387
1388 // Let's start with the timestamp oracle read timestamp.
1389 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1390
1391 // If `least_valid_read` is later than the oracle, then advance to that time.
1392 // If we didn't do this, then there would be a danger of missing the first refresh,
1393 // which might cause the materialized view to be unreadable for hours. This might
1394 // be what was happening here:
1395 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1396 //
1397 // In the long term, it would be good to actually block the MV creation statement
1398 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1399 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1400 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1401 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1402 // VIEW statement returned.
1403 let oracle_timestamp = timestamp;
1404 let least_valid_read = read_holds.least_valid_read();
1405 timestamp.advance_by(least_valid_read.borrow());
1406
1407 if oracle_timestamp != timestamp {
1408 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1409 }
1410
1411 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1412 Ok(Some(timestamp))
1413 } else {
1414 Ok(None)
1415 };
1416
1417 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1418 // released when we don't use it.
1419 if acquire_read_holds {
1420 self.store_transaction_read_holds(session, read_holds);
1421 }
1422
1423 mz_now_ts
1424 } else {
1425 Ok(None)
1426 }
1427 }
1428
1429 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1430 /// the named `conn_id` if the correct secret key is specified.
1431 ///
1432 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1433 /// `ConnectionId` because this method gets called by external clients when
1434 /// they request to cancel a request.
1435 #[mz_ore::instrument(level = "debug")]
1436 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1437 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1438 // If the secret key specified by the client doesn't match the
1439 // actual secret key for the target connection, we treat this as a
1440 // rogue cancellation request and ignore it.
1441 if conn_meta.secret_key != secret_key {
1442 return;
1443 }
1444
1445 // Now that we've verified the secret key, this is a privileged
1446 // cancellation request. We can upgrade the raw connection ID to a
1447 // proper `IdHandle`.
1448 self.handle_privileged_cancel(id_handle.clone()).await;
1449 }
1450 }
1451
1452 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1453 /// interactive work for the named `conn_id`.
1454 #[mz_ore::instrument(level = "debug")]
1455 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1456 let mut maybe_ctx = None;
1457
1458 // Cancel pending writes. There is at most one pending write per session.
1459 if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1460 matches!(pending_write_txn, PendingWriteTxn::User {
1461 pending_txn: PendingTxn { ctx, .. },
1462 ..
1463 } if *ctx.session().conn_id() == conn_id)
1464 }) {
1465 if let PendingWriteTxn::User {
1466 pending_txn: PendingTxn { ctx, .. },
1467 ..
1468 } = self.pending_writes.remove(idx)
1469 {
1470 maybe_ctx = Some(ctx);
1471 }
1472 }
1473
1474 // Cancel deferred writes.
1475 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1476 maybe_ctx = Some(write_op.into_ctx());
1477 }
1478
1479 // Cancel deferred statements.
1480 if let Some(idx) = self
1481 .serialized_ddl
1482 .iter()
1483 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1484 {
1485 let deferred = self
1486 .serialized_ddl
1487 .remove(idx)
1488 .expect("known to exist from call to `position` above");
1489 maybe_ctx = Some(deferred.ctx);
1490 }
1491
1492 // Cancel reads waiting on being linearized. There is at most one linearized read per
1493 // session.
1494 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1495 let ctx = pending_read_txn.take_context();
1496 maybe_ctx = Some(ctx);
1497 }
1498
1499 if let Some(ctx) = maybe_ctx {
1500 ctx.retire(Err(AdapterError::Canceled));
1501 }
1502
1503 self.cancel_pending_peeks(&conn_id);
1504 self.cancel_pending_watchsets(&conn_id);
1505 self.cancel_compute_sinks_for_conn(&conn_id).await;
1506 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1507 .await;
1508 self.cancel_pending_copy(&conn_id);
1509 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1510 let _ = tx.send(true);
1511 }
1512 }
1513
1514 /// Handle termination of a client session.
1515 ///
1516 /// This cleans up any state in the coordinator associated with the session.
1517 #[mz_ore::instrument(level = "debug")]
1518 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1519 if !self.active_conns.contains_key(&conn_id) {
1520 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1521 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1522 // debug. This panic is very infrequent so we want as much information as possible.
1523 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1524 panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1525 }
1526
1527 // We do not need to call clear_transaction here because there are no side effects to run
1528 // based on any session transaction state.
1529 self.clear_connection(&conn_id).await;
1530
1531 self.drop_temp_items(&conn_id).await;
1532 self.catalog_mut()
1533 .drop_temporary_schema(&conn_id)
1534 .unwrap_or_terminate("unable to drop temporary schema");
1535 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1536 let session_type = metrics::session_type_label_value(conn.user());
1537 self.metrics
1538 .active_sessions
1539 .with_label_values(&[session_type])
1540 .dec();
1541 self.cancel_pending_peeks(conn.conn_id());
1542 self.cancel_pending_watchsets(&conn_id);
1543 self.cancel_pending_copy(&conn_id);
1544 self.end_session_for_statement_logging(conn.uuid());
1545
1546 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1547 // this to prevent blocking the Coordinator in the case that a lot of connections are
1548 // closed at once, which occurs regularly in some workflows.
1549 let update = self
1550 .catalog()
1551 .state()
1552 .pack_session_update(&conn, Diff::MINUS_ONE);
1553 let update = self.catalog().state().resolve_builtin_table_update(update);
1554
1555 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1556 }
1557
1558 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1559 /// rows.
1560 #[mz_ore::instrument(level = "debug")]
1561 fn handle_get_webhook(
1562 &mut self,
1563 database: String,
1564 schema: String,
1565 name: String,
1566 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1567 ) {
1568 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1569 ///
1570 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1571 /// types we should cast the request to.
1572 fn resolve(
1573 coord: &mut Coordinator,
1574 database: String,
1575 schema: String,
1576 name: String,
1577 ) -> Result<AppendWebhookResponse, PartialItemName> {
1578 // Resolve our collection.
1579 let name = PartialItemName {
1580 database: Some(database),
1581 schema: Some(schema),
1582 item: name,
1583 };
1584 let Ok(entry) = coord
1585 .catalog()
1586 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1587 else {
1588 return Err(name);
1589 };
1590
1591 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1592 let (data_source, desc, global_id) = match entry.item() {
1593 CatalogItem::Source(Source {
1594 data_source: data_source @ DataSourceDesc::Webhook { .. },
1595 desc,
1596 global_id,
1597 ..
1598 }) => (data_source, desc.clone(), *global_id),
1599 CatalogItem::Table(
1600 table @ Table {
1601 desc,
1602 data_source:
1603 TableDataSource::DataSource {
1604 desc: data_source @ DataSourceDesc::Webhook { .. },
1605 ..
1606 },
1607 ..
1608 },
1609 ) => (data_source, desc.latest(), table.global_id_writes()),
1610 _ => return Err(name),
1611 };
1612
1613 let DataSourceDesc::Webhook {
1614 validate_using,
1615 body_format,
1616 headers,
1617 ..
1618 } = data_source
1619 else {
1620 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1621 return Err(name);
1622 };
1623 let body_format = body_format.clone();
1624 let header_tys = headers.clone();
1625
1626 // Assert we have one column for the body, and how ever many are required for
1627 // the headers.
1628 let num_columns = headers.num_columns() + 1;
1629 mz_ore::soft_assert_or_log!(
1630 desc.arity() <= num_columns,
1631 "expected at most {} columns, but got {}",
1632 num_columns,
1633 desc.arity()
1634 );
1635
1636 // Double check that the body column of the webhook source matches the type
1637 // we're about to deserialize as.
1638 let body_column = desc
1639 .get_by_name(&"body".into())
1640 .map(|(_idx, ty)| ty.clone())
1641 .ok_or_else(|| name.clone())?;
1642 assert!(!body_column.nullable, "webhook body column is nullable!?");
1643 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1644
1645 // Create a validator that can be called to validate a webhook request.
1646 let validator = validate_using.as_ref().map(|v| {
1647 let validation = v.clone();
1648 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1649 });
1650
1651 // Get a channel so we can queue updates to be written.
1652 let row_tx = coord
1653 .controller
1654 .storage
1655 .monotonic_appender(global_id)
1656 .map_err(|_| name.clone())?;
1657 let stats = coord
1658 .controller
1659 .storage
1660 .webhook_statistics(global_id)
1661 .map_err(|_| name)?;
1662 let invalidator = coord
1663 .active_webhooks
1664 .entry(entry.id())
1665 .or_insert_with(WebhookAppenderInvalidator::new);
1666 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1667
1668 Ok(AppendWebhookResponse {
1669 tx,
1670 body_format,
1671 header_tys,
1672 validator,
1673 })
1674 }
1675
1676 let response = resolve(self, database, schema, name).map_err(|name| {
1677 AppendWebhookError::UnknownWebhook {
1678 database: name.database.expect("provided"),
1679 schema: name.schema.expect("provided"),
1680 name: name.item,
1681 }
1682 });
1683 let _ = tx.send(response);
1684 }
1685}