mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34 AlterConnectionAction, AlterConnectionStatement, AlterSourceAction, AstInfo, ConstantVisitor,
35 CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, SubscribeStatement,
36};
37use mz_sql::catalog::RoleAttributesRaw;
38use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
39use mz_sql::plan::{
40 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
41 StatementClassification, TransactionType,
42};
43use mz_sql::pure::{
44 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
45};
46use mz_sql::rbac;
47use mz_sql::rbac::CREATE_ITEM_USAGE;
48use mz_sql::session::user::User;
49use mz_sql::session::vars::{
50 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
51};
52use mz_sql_parser::ast::display::AstDisplay;
53use mz_sql_parser::ast::{
54 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
55 WithOptionValue,
56};
57use mz_storage_types::sources::Timeline;
58use opentelemetry::trace::TraceContextExt;
59use tokio::sync::{mpsc, oneshot};
60use tracing::{Instrument, debug_span, info, warn};
61use tracing_opentelemetry::OpenTelemetrySpanExt;
62
63use crate::command::{
64 AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse,
65 SASLVerifyProofResponse, StartupResponse,
66};
67use crate::coord::appends::PendingWriteTxn;
68use crate::coord::{
69 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
70 PurifiedStatementReady, validate_ip_with_policy_rules,
71};
72use crate::error::{AdapterError, AuthenticationError};
73use crate::notice::AdapterNotice;
74use crate::session::{Session, TransactionOps, TransactionStatus};
75use crate::util::{ClientTransmitter, ResultExt};
76use crate::webhook::{
77 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
78};
79use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
80
81use super::ExecuteContextExtra;
82
83impl Coordinator {
84 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
85 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
86 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
87 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
88 async move {
89 if let Some(session) = cmd.session_mut() {
90 session.apply_external_metadata_updates();
91 }
92 match cmd {
93 Command::Startup {
94 tx,
95 user,
96 conn_id,
97 secret_key,
98 uuid,
99 client_ip,
100 application_name,
101 notice_tx,
102 } => {
103 // Note: We purposefully do not use a ClientTransmitter here because startup
104 // handles errors and cleanup of sessions itself.
105 self.handle_startup(
106 tx,
107 user,
108 conn_id,
109 secret_key,
110 uuid,
111 client_ip,
112 application_name,
113 notice_tx,
114 )
115 .await;
116 }
117
118 Command::AuthenticatePassword {
119 tx,
120 role_name,
121 password,
122 } => {
123 self.handle_authenticate_password(tx, role_name, password)
124 .await;
125 }
126
127 Command::AuthenticateGetSASLChallenge {
128 tx,
129 role_name,
130 nonce,
131 } => {
132 self.handle_generate_sasl_challenge(tx, role_name, nonce)
133 .await;
134 }
135
136 Command::AuthenticateVerifySASLProof {
137 tx,
138 role_name,
139 proof,
140 mock_hash,
141 auth_message,
142 } => {
143 self.handle_authenticate_verify_sasl_proof(
144 tx,
145 role_name,
146 proof,
147 auth_message,
148 mock_hash,
149 );
150 }
151
152 Command::Execute {
153 portal_name,
154 session,
155 tx,
156 outer_ctx_extra,
157 } => {
158 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
159
160 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
161 .await;
162 }
163
164 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
165
166 Command::CancelRequest {
167 conn_id,
168 secret_key,
169 } => {
170 self.handle_cancel(conn_id, secret_key).await;
171 }
172
173 Command::PrivilegedCancelRequest { conn_id } => {
174 self.handle_privileged_cancel(conn_id).await;
175 }
176
177 Command::GetWebhook {
178 database,
179 schema,
180 name,
181 tx,
182 } => {
183 self.handle_get_webhook(database, schema, name, tx);
184 }
185
186 Command::GetSystemVars { tx } => {
187 let _ = tx.send(self.catalog.system_config().clone());
188 }
189
190 Command::SetSystemVars { vars, conn_id, tx } => {
191 let mut ops = Vec::with_capacity(vars.len());
192 let conn = &self.active_conns[&conn_id];
193
194 for (name, value) in vars {
195 if let Err(e) =
196 self.catalog().system_config().get(&name).and_then(|var| {
197 var.visible(conn.user(), self.catalog.system_config())
198 })
199 {
200 let _ = tx.send(Err(e.into()));
201 return;
202 }
203
204 ops.push(catalog::Op::UpdateSystemConfiguration {
205 name,
206 value: OwnedVarInput::Flat(value),
207 });
208 }
209
210 let result = self.catalog_transact_conn(Some(&conn_id), ops).await;
211 let _ = tx.send(result);
212 }
213
214 Command::Terminate { conn_id, tx } => {
215 self.handle_terminate(conn_id).await;
216 // Note: We purposefully do not use a ClientTransmitter here because we're already
217 // terminating the provided session.
218 if let Some(tx) = tx {
219 let _ = tx.send(Ok(()));
220 }
221 }
222
223 Command::Commit {
224 action,
225 session,
226 tx,
227 } => {
228 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
229 // We reach here not through a statement execution, but from the
230 // "commit" pgwire command. Thus, we just generate a default statement
231 // execution context (once statement logging is implemented, this will cause nothing to be logged
232 // when the execution finishes.)
233 let ctx = ExecuteContext::from_parts(
234 tx,
235 self.internal_cmd_tx.clone(),
236 session,
237 Default::default(),
238 );
239 let plan = match action {
240 EndTransactionAction::Commit => {
241 Plan::CommitTransaction(CommitTransactionPlan {
242 transaction_type: TransactionType::Implicit,
243 })
244 }
245 EndTransactionAction::Rollback => {
246 Plan::AbortTransaction(AbortTransactionPlan {
247 transaction_type: TransactionType::Implicit,
248 })
249 }
250 };
251
252 let conn_id = ctx.session().conn_id().clone();
253 self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
254 // Part of the Command::Commit contract is that the Coordinator guarantees that
255 // it has cleared its transaction state for the connection.
256 self.clear_connection(&conn_id).await;
257 }
258
259 Command::CatalogSnapshot { tx } => {
260 let _ = tx.send(CatalogSnapshot {
261 catalog: self.owned_catalog(),
262 });
263 }
264
265 Command::CheckConsistency { tx } => {
266 let _ = tx.send(self.check_consistency());
267 }
268
269 Command::Dump { tx } => {
270 let _ = tx.send(self.dump().await);
271 }
272 }
273 }
274 .instrument(debug_span!("handle_command"))
275 .boxed_local()
276 }
277
278 fn handle_authenticate_verify_sasl_proof(
279 &self,
280 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
281 role_name: String,
282 proof: String,
283 auth_message: String,
284 mock_hash: String,
285 ) {
286 let role = self.catalog().try_get_role_by_name(role_name.as_str());
287 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
288
289 let login = role
290 .as_ref()
291 .map(|r| r.attributes.login.unwrap_or(false))
292 .unwrap_or(false);
293
294 let real_hash = role_auth
295 .as_ref()
296 .and_then(|auth| auth.password_hash.as_ref());
297 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
298
299 let role_present = role.is_some();
300 let make_auth_err = |role_present: bool, login: bool| {
301 AdapterError::AuthenticationError(if role_present && !login {
302 AuthenticationError::NonLogin
303 } else {
304 AuthenticationError::InvalidCredentials
305 })
306 };
307
308 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
309 Ok(verifier) => {
310 // Success only if role exists, allows login, and a real password hash was used.
311 if login && real_hash.is_some() {
312 let role = role.expect("login implies role exists");
313 let _ = tx.send(Ok(SASLVerifyProofResponse {
314 verifier,
315 auth_resp: AuthResponse {
316 role_id: role.id,
317 superuser: role.attributes.superuser.unwrap_or(false),
318 },
319 }));
320 } else {
321 let _ = tx.send(Err(make_auth_err(role_present, login)));
322 }
323 }
324 Err(_) => {
325 let _ = tx.send(Err(AdapterError::AuthenticationError(
326 AuthenticationError::InvalidCredentials,
327 )));
328 }
329 }
330 }
331
332 #[mz_ore::instrument(level = "debug")]
333 async fn handle_generate_sasl_challenge(
334 &mut self,
335 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
336 role_name: String,
337 client_nonce: String,
338 ) {
339 let role_auth = self
340 .catalog()
341 .try_get_role_by_name(&role_name)
342 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
343
344 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
345 Ok(n) => n,
346 Err(e) => {
347 let msg = format!(
348 "failed to generate nonce for client nonce {}: {}",
349 client_nonce, e
350 );
351 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
352 soft_panic_or_log!("{msg}");
353 return;
354 }
355 };
356
357 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
358 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
359 // with the role name to get a per-role mock nonce.
360 let send_mock_challenge =
361 |role_name: String,
362 mock_nonce: String,
363 nonce: String,
364 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
365 let opts = mz_auth::hash::mock_sasl_challenge(&role_name, &mock_nonce);
366 let _ = tx.send(Ok(SASLChallengeResponse {
367 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
368 salt: BASE64_STANDARD.encode(opts.salt),
369 nonce,
370 }));
371 };
372
373 match role_auth {
374 Some(auth) if auth.password_hash.is_some() => {
375 let hash = auth.password_hash.as_ref().expect("checked above");
376 match mz_auth::hash::scram256_parse_opts(hash) {
377 Ok(opts) => {
378 let _ = tx.send(Ok(SASLChallengeResponse {
379 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
380 salt: BASE64_STANDARD.encode(opts.salt),
381 nonce,
382 }));
383 }
384 Err(_) => {
385 send_mock_challenge(
386 role_name,
387 self.catalog().state().mock_authentication_nonce(),
388 nonce,
389 tx,
390 );
391 }
392 }
393 }
394 _ => {
395 send_mock_challenge(
396 role_name,
397 self.catalog().state().mock_authentication_nonce(),
398 nonce,
399 tx,
400 );
401 }
402 }
403 }
404
405 #[mz_ore::instrument(level = "debug")]
406 async fn handle_authenticate_password(
407 &mut self,
408 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
409 role_name: String,
410 password: Option<Password>,
411 ) {
412 let Some(password) = password else {
413 // The user did not provide a password.
414 let _ = tx.send(Err(AdapterError::AuthenticationError(
415 AuthenticationError::PasswordRequired,
416 )));
417 return;
418 };
419
420 if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
421 if !role.attributes.login.unwrap_or(false) {
422 // The user is not allowed to login.
423 let _ = tx.send(Err(AdapterError::AuthenticationError(
424 AuthenticationError::NonLogin,
425 )));
426 return;
427 }
428 if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
429 if let Some(hash) = &auth.password_hash {
430 let _ = match mz_auth::hash::scram256_verify(&password, hash) {
431 Ok(_) => tx.send(Ok(AuthResponse {
432 role_id: role.id,
433 superuser: role.attributes.superuser.unwrap_or(false),
434 })),
435 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
436 AuthenticationError::InvalidCredentials,
437 ))),
438 };
439 return;
440 }
441 }
442 // Authentication failed due to incorrect password or missing password hash.
443 let _ = tx.send(Err(AdapterError::AuthenticationError(
444 AuthenticationError::InvalidCredentials,
445 )));
446 } else {
447 // The user does not exist.
448 let _ = tx.send(Err(AdapterError::AuthenticationError(
449 AuthenticationError::RoleNotFound,
450 )));
451 }
452 }
453
454 #[mz_ore::instrument(level = "debug")]
455 async fn handle_startup(
456 &mut self,
457 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
458 user: User,
459 conn_id: ConnectionId,
460 secret_key: u32,
461 uuid: uuid::Uuid,
462 client_ip: Option<IpAddr>,
463 application_name: String,
464 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
465 ) {
466 // Early return if successful, otherwise cleanup any possible state.
467 match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
468 Ok((role_id, session_defaults)) => {
469 let session_type = metrics::session_type_label_value(&user);
470 self.metrics
471 .active_sessions
472 .with_label_values(&[session_type])
473 .inc();
474 let conn = ConnMeta {
475 secret_key,
476 notice_tx,
477 drop_sinks: BTreeSet::new(),
478 pending_cluster_alters: BTreeSet::new(),
479 connected_at: self.now(),
480 user,
481 application_name,
482 uuid,
483 client_ip,
484 conn_id: conn_id.clone(),
485 authenticated_role: role_id,
486 deferred_lock: None,
487 };
488 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
489 let update = self.catalog().state().resolve_builtin_table_update(update);
490 self.begin_session_for_statement_logging(&conn);
491 self.active_conns.insert(conn_id.clone(), conn);
492
493 // Note: Do NOT await the notify here, we pass this back to
494 // whatever requested the startup to prevent blocking startup
495 // and the Coordinator on a builtin table update.
496 let updates = vec![update];
497 // It's not a hard error if our list is missing a builtin table, but we want to
498 // make sure these two things stay in-sync.
499 if mz_ore::assert::soft_assertions_enabled() {
500 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
501 .iter()
502 .map(|table| self.catalog().resolve_builtin_table(*table))
503 .collect();
504 let updates_tracked = updates
505 .iter()
506 .all(|update| required_tables.contains(&update.id));
507 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
508 .iter()
509 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
510 mz_ore::soft_assert_or_log!(
511 updates_tracked,
512 "not tracking all required builtin table updates!"
513 );
514 // TODO(parkmycar): When checking if a query depends on these builtin table
515 // writes we do not check the transitive dependencies of the query, because
516 // we don't support creating views on mz_internal objects. If one of these
517 // tables is promoted out of mz_internal then we'll need to add this check.
518 mz_ore::soft_assert_or_log!(
519 all_mz_internal,
520 "not all builtin tables are in mz_internal! need to check transitive depends",
521 )
522 }
523 let notify = self.builtin_table_update().background(updates);
524
525 let resp = Ok(StartupResponse {
526 role_id,
527 write_notify: notify,
528 session_defaults,
529 catalog: self.owned_catalog(),
530 });
531 if tx.send(resp).is_err() {
532 // Failed to send to adapter, but everything is setup so we can terminate
533 // normally.
534 self.handle_terminate(conn_id).await;
535 }
536 }
537 Err(e) => {
538 // Error during startup or sending to adapter, cleanup possible state created by
539 // handle_startup_inner. A user may have been created and it can stay; no need to
540 // delete it.
541 self.catalog_mut()
542 .drop_temporary_schema(&conn_id)
543 .unwrap_or_terminate("unable to drop temporary schema");
544
545 // Communicate the error back to the client. No need to
546 // handle failures to send the error back; we've already
547 // cleaned up all necessary state.
548 let _ = tx.send(Err(e));
549 }
550 }
551 }
552
553 // Failible startup work that needs to be cleaned up on error.
554 async fn handle_startup_inner(
555 &mut self,
556 user: &User,
557 conn_id: &ConnectionId,
558 client_ip: &Option<IpAddr>,
559 ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
560 if self.catalog().try_get_role_by_name(&user.name).is_none() {
561 // If the user has made it to this point, that means they have been fully authenticated.
562 // This includes preventing any user, except a pre-defined set of system users, from
563 // connecting to an internal port. Therefore it's ok to always create a new role for the
564 // user.
565 let attributes = RoleAttributesRaw::new();
566 let plan = CreateRolePlan {
567 name: user.name.to_string(),
568 attributes,
569 };
570 self.sequence_create_role_for_startup(plan).await?;
571 }
572 let role_id = self
573 .catalog()
574 .try_get_role_by_name(&user.name)
575 .expect("created above")
576 .id;
577
578 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
579 return Err(AdapterError::UserSessionsDisallowed);
580 }
581
582 // Initialize the default session variables for this role.
583 let mut session_defaults = BTreeMap::new();
584 let system_config = self.catalog().state().system_config();
585
586 // Override the session with any system defaults.
587 session_defaults.extend(
588 system_config
589 .iter_session()
590 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
591 );
592 // Special case.
593 let statement_logging_default = system_config
594 .statement_logging_default_sample_rate()
595 .format();
596 session_defaults.insert(
597 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
598 OwnedVarInput::Flat(statement_logging_default),
599 );
600 // Override system defaults with role defaults.
601 session_defaults.extend(
602 self.catalog()
603 .get_role(&role_id)
604 .vars()
605 .map(|(name, val)| (name.to_string(), val.clone())),
606 );
607
608 // Validate network policies for external users. Internal users can only connect on the
609 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
610 // to ensure these internal interfaces are well secured.
611 //
612 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
613 // the default network policy for this role, so we read directly out of what the session
614 // will get initialized with.
615 if !user.is_internal() {
616 let network_policy_name = session_defaults
617 .get(NETWORK_POLICY.name())
618 .and_then(|value| match value {
619 OwnedVarInput::Flat(name) => Some(name.clone()),
620 OwnedVarInput::SqlSet(names) => {
621 tracing::error!(?names, "found multiple network policies");
622 None
623 }
624 })
625 .unwrap_or_else(|| system_config.default_network_policy_name());
626 let maybe_network_policy = self
627 .catalog()
628 .get_network_policy_by_name(&network_policy_name);
629
630 let Some(network_policy) = maybe_network_policy else {
631 // We should prevent dropping the default network policy, or setting the policy
632 // to something that doesn't exist, so complain loudly if this occurs.
633 tracing::error!(
634 network_policy_name,
635 "default network policy does not exist. All user traffic will be blocked"
636 );
637 let reason = match client_ip {
638 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
639 None => super::NetworkPolicyError::MissingIp,
640 };
641 return Err(AdapterError::NetworkPolicyDenied(reason));
642 };
643
644 if let Some(ip) = client_ip {
645 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
646 Ok(_) => {}
647 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
648 }
649 } else {
650 // Only temporary and internal representation of a session
651 // should be missing a client_ip. These sessions should not be
652 // making requests or going through handle_startup.
653 return Err(AdapterError::NetworkPolicyDenied(
654 super::NetworkPolicyError::MissingIp,
655 ));
656 }
657 }
658
659 self.catalog_mut()
660 .create_temporary_schema(conn_id, role_id)?;
661
662 Ok((role_id, session_defaults))
663 }
664
665 /// Handles an execute command.
666 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
667 pub(crate) async fn handle_execute(
668 &mut self,
669 portal_name: String,
670 mut session: Session,
671 tx: ClientTransmitter<ExecuteResponse>,
672 // If this command was part of another execute command
673 // (for example, executing a `FETCH` statement causes an execute to be
674 // issued for the cursor it references),
675 // then `outer_context` should be `Some`.
676 // This instructs the coordinator that the
677 // outer execute should be considered finished once the inner one is.
678 outer_context: Option<ExecuteContextExtra>,
679 ) {
680 if session.vars().emit_trace_id_notice() {
681 let span_context = tracing::Span::current()
682 .context()
683 .span()
684 .span_context()
685 .clone();
686 if span_context.is_valid() {
687 session.add_notice(AdapterNotice::QueryTrace {
688 trace_id: span_context.trace_id(),
689 });
690 }
691 }
692
693 if let Err(err) = self.verify_portal(&mut session, &portal_name) {
694 // If statement logging hasn't started yet, we don't need
695 // to add any "end" event, so just make up a no-op
696 // `ExecuteContextExtra` here, via `Default::default`.
697 //
698 // It's a bit unfortunate because the edge case of failed
699 // portal verifications won't show up in statement
700 // logging, but there seems to be nothing else we can do,
701 // because we need access to the portal to begin logging.
702 //
703 // Another option would be to log a begin and end event, but just fill in NULLs
704 // for everything we get from the portal (prepared statement id, params).
705 let extra = outer_context.unwrap_or_else(Default::default);
706 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
707 return ctx.retire(Err(err));
708 }
709
710 // The reference to `portal` can't outlive `session`, which we
711 // use to construct the context, so scope the reference to this block where we
712 // get everything we need from the portal for later.
713 let (stmt, ctx, params) = {
714 let portal = session
715 .get_portal_unverified(&portal_name)
716 .expect("known to exist");
717 let params = portal.parameters.clone();
718 let stmt = portal.stmt.clone();
719 let logging = Arc::clone(&portal.logging);
720 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
721
722 let extra = if let Some(extra) = outer_context {
723 // We are executing in the context of another SQL statement, so we don't
724 // want to begin statement logging anew. The context of the actual statement
725 // being executed is the one that should be retired once this finishes.
726 extra
727 } else {
728 // This is a new statement, log it and return the context
729 let maybe_uuid = self.begin_statement_execution(
730 &mut session,
731 ¶ms,
732 &logging,
733 lifecycle_timestamps,
734 );
735
736 ExecuteContextExtra::new(maybe_uuid)
737 };
738 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
739 (stmt, ctx, params)
740 };
741
742 let stmt = match stmt {
743 Some(stmt) => stmt,
744 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
745 };
746
747 let session_type = metrics::session_type_label_value(ctx.session().user());
748 let stmt_type = metrics::statement_type_label_value(&stmt);
749 self.metrics
750 .query_total
751 .with_label_values(&[session_type, stmt_type])
752 .inc();
753 match &*stmt {
754 Statement::Subscribe(SubscribeStatement { output, .. })
755 | Statement::Copy(CopyStatement {
756 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
757 ..
758 }) => {
759 self.metrics
760 .subscribe_outputs
761 .with_label_values(&[
762 session_type,
763 metrics::subscribe_output_label_value(output),
764 ])
765 .inc();
766 }
767 _ => {}
768 }
769
770 self.handle_execute_inner(stmt, params, ctx).await
771 }
772
773 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
774 pub(crate) async fn handle_execute_inner(
775 &mut self,
776 stmt: Arc<Statement<Raw>>,
777 params: Params,
778 mut ctx: ExecuteContext,
779 ) {
780 // This comment describes the various ways DDL can execute (the ordered operations: name
781 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
782 // three notable properties that all partially interact.
783 //
784 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
785 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
786 // We announce success of the single DDL when it is executed, but do not attempt to plan
787 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
788 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
789 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
790 // work. When the single DDL is announced as successful we also put the session's
791 // transaction ops into `SingleStatement` which will produce an error if any other
792 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
793 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
794 // statement.
795 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
796 // any number of only these DDL statements to be executed in a transaction. At sequencing
797 // these generate the `Op::TransactionDryRun` catalog op. When applied with
798 // `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
799 // `catalog_transact_with_ddl_transaction` function intercepts that error and reports
800 // success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
801 // of the ops but without dry run are applied. The purpose of this is to allow multiple,
802 // atomic renames in the same transaction.
803 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
804 // makes network calls (interfacing with secrets, optimization of views/indexes, source
805 // purification). These must guarantee correctness when they return to the main
806 // coordinator thread because the catalog state could have changed while they were doing
807 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
808 // of IDs that we needed to exist. We discovered the way we were doing that was not
809 // always correct. Instead of attempting to get that completely right, we have opted to
810 // serialize DDL. Getting this right is difficult because catalog changes can affect name
811 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
812 // aware of all possible catalog changes that would affect any of those parts is not
813 // something our current code has been designed to be helpful at. Even if a DDL statement
814 // is doing off-thread work, another DDL must not yet execute at all. Executing these
815 // serially will guarantee that no off-thread work has affected the state of the catalog.
816 // This is done by adding a VecDeque of deferred statements and a lock to the
817 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
818 // transaction ops are needed to the session as described above), it attempts to own the
819 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
820 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
821 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
822 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
823 // awaits dequeuing caused by the lock being released.
824
825 // Verify that this statement type can be executed in the current
826 // transaction state.
827 match ctx.session().transaction() {
828 // By this point we should be in a running transaction.
829 TransactionStatus::Default => unreachable!(),
830
831 // Failed transactions have already been checked in pgwire for a safe statement
832 // (COMMIT, ROLLBACK, etc.) and can proceed.
833 TransactionStatus::Failed(_) => {}
834
835 // Started is a deceptive name, and means different things depending on which
836 // protocol was used. It's either exactly one statement (known because this
837 // is the simple protocol and the parser parsed the entire string, and it had
838 // one statement). Or from the extended protocol, it means *some* query is
839 // being executed, but there might be others after it before the Sync (commit)
840 // message. Postgres handles this by teaching Started to eagerly commit certain
841 // statements that can't be run in a transaction block.
842 TransactionStatus::Started(_) => {
843 if let Statement::Declare(_) = &*stmt {
844 // Declare is an exception. Although it's not against any spec to execute
845 // it, it will always result in nothing happening, since all portals will be
846 // immediately closed. Users don't know this detail, so this error helps them
847 // understand what's going wrong. Postgres does this too.
848 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
849 "DECLARE CURSOR".into(),
850 )));
851 }
852 }
853
854 // Implicit or explicit transactions.
855 //
856 // Implicit transactions happen when a multi-statement query is executed
857 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
858 // then the existing implicit transaction will be upgraded to an explicit
859 // transaction. Thus, we should not separate what implicit and explicit
860 // transactions can do unless there's some additional checking to make sure
861 // something disallowed in explicit transactions did not previously take place
862 // in the implicit portion.
863 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
864 match &*stmt {
865 // Statements that are safe in a transaction. We still need to verify that we
866 // don't interleave reads and writes since we can't perform those serializably.
867 Statement::Close(_)
868 | Statement::Commit(_)
869 | Statement::Copy(_)
870 | Statement::Deallocate(_)
871 | Statement::Declare(_)
872 | Statement::Discard(_)
873 | Statement::Execute(_)
874 | Statement::ExplainPlan(_)
875 | Statement::ExplainPushdown(_)
876 | Statement::ExplainAnalyze(_)
877 | Statement::ExplainTimestamp(_)
878 | Statement::ExplainSinkSchema(_)
879 | Statement::Fetch(_)
880 | Statement::Prepare(_)
881 | Statement::Rollback(_)
882 | Statement::Select(_)
883 | Statement::SetTransaction(_)
884 | Statement::Show(_)
885 | Statement::SetVariable(_)
886 | Statement::ResetVariable(_)
887 | Statement::StartTransaction(_)
888 | Statement::Subscribe(_)
889 | Statement::Raise(_) => {
890 // Always safe.
891 }
892
893 Statement::Insert(InsertStatement {
894 source, returning, ..
895 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
896 // Inserting from constant values statements that do not need to execute on
897 // any cluster (no RETURNING) is always safe.
898 }
899
900 // These statements must be kept in-sync with `must_serialize_ddl()`.
901 Statement::AlterObjectRename(_)
902 | Statement::AlterObjectSwap(_)
903 | Statement::CreateTableFromSource(_)
904 | Statement::CreateSource(_) => {
905 let state = self.catalog().for_session(ctx.session()).state().clone();
906 let revision = self.catalog().transient_revision();
907
908 // Initialize our transaction with a set of empty ops, or return an error
909 // if we can't run a DDL transaction
910 let txn_status = ctx.session_mut().transaction_mut();
911 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
912 ops: vec![],
913 state,
914 revision,
915 side_effects: vec![],
916 }) {
917 return ctx.retire(Err(err));
918 }
919 }
920
921 // Statements below must by run singly (in Started).
922 Statement::AlterCluster(_)
923 | Statement::AlterConnection(_)
924 | Statement::AlterDefaultPrivileges(_)
925 | Statement::AlterIndex(_)
926 | Statement::AlterSetCluster(_)
927 | Statement::AlterOwner(_)
928 | Statement::AlterRetainHistory(_)
929 | Statement::AlterRole(_)
930 | Statement::AlterSecret(_)
931 | Statement::AlterSink(_)
932 | Statement::AlterSource(_)
933 | Statement::AlterSystemReset(_)
934 | Statement::AlterSystemResetAll(_)
935 | Statement::AlterSystemSet(_)
936 | Statement::AlterTableAddColumn(_)
937 | Statement::AlterNetworkPolicy(_)
938 | Statement::CreateCluster(_)
939 | Statement::CreateClusterReplica(_)
940 | Statement::CreateConnection(_)
941 | Statement::CreateDatabase(_)
942 | Statement::CreateIndex(_)
943 | Statement::CreateMaterializedView(_)
944 | Statement::CreateContinualTask(_)
945 | Statement::CreateRole(_)
946 | Statement::CreateSchema(_)
947 | Statement::CreateSecret(_)
948 | Statement::CreateSink(_)
949 | Statement::CreateSubsource(_)
950 | Statement::CreateTable(_)
951 | Statement::CreateType(_)
952 | Statement::CreateView(_)
953 | Statement::CreateWebhookSource(_)
954 | Statement::CreateNetworkPolicy(_)
955 | Statement::Delete(_)
956 | Statement::DropObjects(_)
957 | Statement::DropOwned(_)
958 | Statement::GrantPrivileges(_)
959 | Statement::GrantRole(_)
960 | Statement::Insert(_)
961 | Statement::ReassignOwned(_)
962 | Statement::RevokePrivileges(_)
963 | Statement::RevokeRole(_)
964 | Statement::Update(_)
965 | Statement::ValidateConnection(_)
966 | Statement::Comment(_) => {
967 let txn_status = ctx.session_mut().transaction_mut();
968
969 // If we're not in an implicit transaction and we could generate exactly one
970 // valid ExecuteResponse, we can delay execution until commit.
971 if !txn_status.is_implicit() {
972 // Statements whose tag is trivial (known only from an unexecuted statement) can
973 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
974 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
975 // delay execution until `COMMIT`.
976 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
977 if let Err(err) = txn_status
978 .add_ops(TransactionOps::SingleStatement { stmt, params })
979 {
980 ctx.retire(Err(err));
981 return;
982 }
983 ctx.retire(Ok(resp));
984 return;
985 }
986 }
987
988 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
989 stmt.to_string(),
990 )));
991 }
992 }
993 }
994 }
995
996 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
997 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
998 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
999 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1000 // Coordinator::clear_transaction is correctly called when session transactions are ended
1001 // because that function will release the held lock from active_conns.
1002 if Self::must_serialize_ddl(&stmt, &ctx) {
1003 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1004 let prev = self
1005 .active_conns
1006 .get_mut(ctx.session().conn_id())
1007 .expect("connection must exist")
1008 .deferred_lock
1009 .replace(guard);
1010 assert!(
1011 prev.is_none(),
1012 "connections should have at most one lock guard"
1013 );
1014 } else {
1015 if self
1016 .active_conns
1017 .get(ctx.session().conn_id())
1018 .expect("connection must exist")
1019 .deferred_lock
1020 .is_some()
1021 {
1022 // This session *already* has the lock, and incorrectly tried to execute another
1023 // DDL while still holding the lock, violating the assumption documented above.
1024 // This is an internal error, probably in some AdapterClient user (pgwire or
1025 // http). Because the session is now in some unexpected state, return an error
1026 // which should cause the AdapterClient user to fail the transaction.
1027 // (Terminating the connection is maybe what we would prefer to do, but is not
1028 // currently a thing we can do from the coordinator: calling handle_terminate
1029 // cleans up Coordinator state for the session but doesn't inform the
1030 // AdapterClient that the session should terminate.)
1031 soft_panic_or_log!(
1032 "session {} attempted to get ddl lock while already owning it",
1033 ctx.session().conn_id()
1034 );
1035 ctx.retire(Err(AdapterError::Internal(
1036 "session attempted to get ddl lock while already owning it".to_string(),
1037 )));
1038 return;
1039 }
1040 self.serialized_ddl.push_back(DeferredPlanStatement {
1041 ctx,
1042 ps: PlanStatement::Statement { stmt, params },
1043 });
1044 return;
1045 }
1046 }
1047
1048 let catalog = self.catalog();
1049 let catalog = catalog.for_session(ctx.session());
1050 let original_stmt = Arc::clone(&stmt);
1051 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1052 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1053 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1054 Ok(resolved) => resolved,
1055 Err(e) => return ctx.retire(Err(e.into())),
1056 };
1057 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1058 // purification. This should be done back on the main thread.
1059 // We do the validation:
1060 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1061 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1062 // occurs.
1063 let (stmt, resolved_ids) = match stmt {
1064 // Various statements must be purified off the main coordinator thread of control.
1065 stmt if Self::must_spawn_purification(&stmt) => {
1066 let internal_cmd_tx = self.internal_cmd_tx.clone();
1067 let conn_id = ctx.session().conn_id().clone();
1068 let catalog = self.owned_catalog();
1069 let now = self.now();
1070 let otel_ctx = OpenTelemetryContext::obtain();
1071 let current_storage_configuration = self.controller.storage.config().clone();
1072 task::spawn(|| format!("purify:{conn_id}"), async move {
1073 let transient_revision = catalog.transient_revision();
1074 let catalog = catalog.for_session(ctx.session());
1075
1076 // Checks if the session is authorized to purify a statement. Usually
1077 // authorization is checked after planning, however purification happens before
1078 // planning, which may require the use of some connections and secrets.
1079 if let Err(e) = rbac::check_usage(
1080 &catalog,
1081 ctx.session(),
1082 &resolved_ids,
1083 &CREATE_ITEM_USAGE,
1084 ) {
1085 return ctx.retire(Err(e.into()));
1086 }
1087
1088 let (result, cluster_id) = mz_sql::pure::purify_statement(
1089 catalog,
1090 now,
1091 stmt,
1092 ¤t_storage_configuration,
1093 )
1094 .await;
1095 let result = result.map_err(|e| e.into());
1096 let dependency_ids = resolved_ids.items().copied().collect();
1097 let plan_validity = PlanValidity::new(
1098 transient_revision,
1099 dependency_ids,
1100 cluster_id,
1101 None,
1102 ctx.session().role_metadata().clone(),
1103 );
1104 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1105 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1106 PurifiedStatementReady {
1107 ctx,
1108 result,
1109 params,
1110 plan_validity,
1111 original_stmt,
1112 otel_ctx,
1113 },
1114 ));
1115 if let Err(e) = result {
1116 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1117 }
1118 });
1119 return;
1120 }
1121
1122 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1123 // automatically as part of purification
1124 Statement::CreateSubsource(_) => {
1125 ctx.retire(Err(AdapterError::Unsupported(
1126 "CREATE SUBSOURCE statements",
1127 )));
1128 return;
1129 }
1130
1131 Statement::CreateMaterializedView(mut cmvs) => {
1132 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1133 // only used for storing initial frontiers in the catalog.
1134 if cmvs.as_of.is_some() {
1135 return ctx.retire(Err(AdapterError::Unsupported(
1136 "CREATE MATERIALIZED VIEW ... AS OF statements",
1137 )));
1138 }
1139
1140 let mz_now = match self
1141 .resolve_mz_now_for_create_materialized_view(
1142 &cmvs,
1143 &resolved_ids,
1144 ctx.session_mut(),
1145 true,
1146 )
1147 .await
1148 {
1149 Ok(mz_now) => mz_now,
1150 Err(e) => return ctx.retire(Err(e)),
1151 };
1152
1153 let owned_catalog = self.owned_catalog();
1154 let catalog = owned_catalog.for_session(ctx.session());
1155
1156 purify_create_materialized_view_options(
1157 catalog,
1158 mz_now,
1159 &mut cmvs,
1160 &mut resolved_ids,
1161 );
1162
1163 let purified_stmt =
1164 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1165 if_exists: cmvs.if_exists,
1166 name: cmvs.name,
1167 columns: cmvs.columns,
1168 in_cluster: cmvs.in_cluster,
1169 query: cmvs.query,
1170 with_options: cmvs.with_options,
1171 as_of: None,
1172 });
1173
1174 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1175 // `Message::PurifiedStatementReady` here.)
1176 (purified_stmt, resolved_ids)
1177 }
1178
1179 Statement::ExplainPlan(ExplainPlanStatement {
1180 stage,
1181 with_options,
1182 format,
1183 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1184 }) => {
1185 let mut cmvs = *box_cmvs;
1186 let mz_now = match self
1187 .resolve_mz_now_for_create_materialized_view(
1188 &cmvs,
1189 &resolved_ids,
1190 ctx.session_mut(),
1191 false,
1192 )
1193 .await
1194 {
1195 Ok(mz_now) => mz_now,
1196 Err(e) => return ctx.retire(Err(e)),
1197 };
1198
1199 let owned_catalog = self.owned_catalog();
1200 let catalog = owned_catalog.for_session(ctx.session());
1201
1202 purify_create_materialized_view_options(
1203 catalog,
1204 mz_now,
1205 &mut cmvs,
1206 &mut resolved_ids,
1207 );
1208
1209 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1210 stage,
1211 with_options,
1212 format,
1213 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1214 });
1215
1216 (purified_stmt, resolved_ids)
1217 }
1218
1219 // All other statements are handled immediately.
1220 _ => (stmt, resolved_ids),
1221 };
1222
1223 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1224 Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1225 Err(e) => ctx.retire(Err(e)),
1226 }
1227 }
1228
1229 /// Whether the statement must be serialized and is DDL.
1230 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1231 // Non-DDL is not serialized here.
1232 if !StatementClassification::from(&*stmt).is_ddl() {
1233 return false;
1234 }
1235 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1236 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1237 // those checks are sufficient.
1238 if Self::must_spawn_purification(stmt) {
1239 return false;
1240 }
1241
1242 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1243 // Their operations are serialized when applied to the catalog, guaranteeing that any
1244 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1245 if ctx.session.transaction().is_ddl() {
1246 return false;
1247 }
1248
1249 // Some DDL is exempt. It is not great that we are matching on Statements here because
1250 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1251 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1252 // planned in the first place, so we accept the abstraction leak.
1253 match stmt {
1254 // Secrets have a small and understood set of dependencies, and their off-thread work
1255 // interacts with k8s.
1256 Statement::AlterSecret(_) => false,
1257 Statement::CreateSecret(_) => false,
1258 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1259 if actions
1260 .iter()
1261 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1262 {
1263 false
1264 }
1265
1266 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1267 // does not affect its catalog names or ids and so is safe to not serialize. This could
1268 // change the set of replicas that exist. For queries that name replicas or use the
1269 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1270 // ensure that those replicas exist during the query finish stage. Additionally, that
1271 // work can take hours (configured by the user), so would also be a bad experience for
1272 // users.
1273 Statement::AlterCluster(_) => false,
1274
1275 // Everything else must be serialized.
1276 _ => true,
1277 }
1278 }
1279
1280 /// Whether the statement must be purified off of the Coordinator thread.
1281 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1282 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1283 // coordinator thread.
1284 if !matches!(
1285 stmt,
1286 Statement::CreateSource(_)
1287 | Statement::AlterSource(_)
1288 | Statement::CreateSink(_)
1289 | Statement::CreateTableFromSource(_)
1290 ) {
1291 return false;
1292 }
1293
1294 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1295 if let Statement::AlterSource(stmt) = stmt {
1296 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1297 AlterSourceAction::SetOptions(options) => {
1298 options.iter().map(|o| o.name.clone()).collect()
1299 }
1300 AlterSourceAction::ResetOptions(names) => names.clone(),
1301 _ => vec![],
1302 };
1303 if !names.is_empty()
1304 && names
1305 .iter()
1306 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1307 {
1308 return false;
1309 }
1310 }
1311
1312 true
1313 }
1314
1315 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1316 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1317 /// option, this function grabs read holds at the earliest possible time on input collections
1318 /// that might be involved in the MV.
1319 ///
1320 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1321 /// in `with_options`).
1322 ///
1323 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1324 /// unfortunately.)
1325 async fn resolve_mz_now_for_create_materialized_view(
1326 &mut self,
1327 cmvs: &CreateMaterializedViewStatement<Aug>,
1328 resolved_ids: &ResolvedIds,
1329 session: &Session,
1330 acquire_read_holds: bool,
1331 ) -> Result<Option<Timestamp>, AdapterError> {
1332 if cmvs
1333 .with_options
1334 .iter()
1335 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1336 {
1337 let catalog = self.catalog().for_session(session);
1338 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1339 let ids = self
1340 .index_oracle(cluster)
1341 .sufficient_collections(resolved_ids.collections().copied());
1342
1343 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1344 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1345 // we want to check the AT times against the read holds that we acquire here. But
1346 // we do it for any REFRESH option, to avoid having so many code paths doing different
1347 // things.)
1348 //
1349 // It's important that we acquire read holds _before_ we determine the least valid read.
1350 // Otherwise, we're not guaranteed that the since frontier doesn't
1351 // advance forward from underneath us.
1352 let read_holds = self.acquire_read_holds(&ids);
1353
1354 // Does `mz_now()` occur?
1355 let mz_now_ts = if cmvs
1356 .with_options
1357 .iter()
1358 .any(materialized_view_option_contains_temporal)
1359 {
1360 let timeline_context = self
1361 .catalog()
1362 .validate_timeline_context(resolved_ids.collections().copied())?;
1363
1364 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1365 // but even in the TimestampIndependent case.
1366 // Note that we didn't accurately decide whether we are TimestampDependent
1367 // or TimestampIndependent, because for this we'd need to also check whether
1368 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1369 // However, this doesn't matter here, as we are just going to default to
1370 // EpochMilliseconds in both cases.
1371 let timeline = timeline_context
1372 .timeline()
1373 .unwrap_or(&Timeline::EpochMilliseconds);
1374
1375 // Let's start with the timestamp oracle read timestamp.
1376 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1377
1378 // If `least_valid_read` is later than the oracle, then advance to that time.
1379 // If we didn't do this, then there would be a danger of missing the first refresh,
1380 // which might cause the materialized view to be unreadable for hours. This might
1381 // be what was happening here:
1382 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1383 //
1384 // In the long term, it would be good to actually block the MV creation statement
1385 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1386 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1387 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1388 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1389 // VIEW statement returned.
1390 let oracle_timestamp = timestamp;
1391 let least_valid_read = read_holds.least_valid_read();
1392 timestamp.advance_by(least_valid_read.borrow());
1393
1394 if oracle_timestamp != timestamp {
1395 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1396 }
1397
1398 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1399 Ok(Some(timestamp))
1400 } else {
1401 Ok(None)
1402 };
1403
1404 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1405 // released when we don't use it.
1406 if acquire_read_holds {
1407 self.store_transaction_read_holds(session, read_holds);
1408 }
1409
1410 mz_now_ts
1411 } else {
1412 Ok(None)
1413 }
1414 }
1415
1416 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1417 /// the named `conn_id` if the correct secret key is specified.
1418 ///
1419 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1420 /// `ConnectionId` because this method gets called by external clients when
1421 /// they request to cancel a request.
1422 #[mz_ore::instrument(level = "debug")]
1423 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1424 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1425 // If the secret key specified by the client doesn't match the
1426 // actual secret key for the target connection, we treat this as a
1427 // rogue cancellation request and ignore it.
1428 if conn_meta.secret_key != secret_key {
1429 return;
1430 }
1431
1432 // Now that we've verified the secret key, this is a privileged
1433 // cancellation request. We can upgrade the raw connection ID to a
1434 // proper `IdHandle`.
1435 self.handle_privileged_cancel(id_handle.clone()).await;
1436 }
1437 }
1438
1439 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1440 /// interactive work for the named `conn_id`.
1441 #[mz_ore::instrument(level = "debug")]
1442 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1443 let mut maybe_ctx = None;
1444
1445 // Cancel pending writes. There is at most one pending write per session.
1446 if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1447 matches!(pending_write_txn, PendingWriteTxn::User {
1448 pending_txn: PendingTxn { ctx, .. },
1449 ..
1450 } if *ctx.session().conn_id() == conn_id)
1451 }) {
1452 if let PendingWriteTxn::User {
1453 pending_txn: PendingTxn { ctx, .. },
1454 ..
1455 } = self.pending_writes.remove(idx)
1456 {
1457 maybe_ctx = Some(ctx);
1458 }
1459 }
1460
1461 // Cancel deferred writes.
1462 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1463 maybe_ctx = Some(write_op.into_ctx());
1464 }
1465
1466 // Cancel deferred statements.
1467 if let Some(idx) = self
1468 .serialized_ddl
1469 .iter()
1470 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1471 {
1472 let deferred = self
1473 .serialized_ddl
1474 .remove(idx)
1475 .expect("known to exist from call to `position` above");
1476 maybe_ctx = Some(deferred.ctx);
1477 }
1478
1479 // Cancel reads waiting on being linearized. There is at most one linearized read per
1480 // session.
1481 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1482 let ctx = pending_read_txn.take_context();
1483 maybe_ctx = Some(ctx);
1484 }
1485
1486 if let Some(ctx) = maybe_ctx {
1487 ctx.retire(Err(AdapterError::Canceled));
1488 }
1489
1490 self.cancel_pending_peeks(&conn_id);
1491 self.cancel_pending_watchsets(&conn_id);
1492 self.cancel_compute_sinks_for_conn(&conn_id).await;
1493 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1494 .await;
1495 self.cancel_pending_copy(&conn_id);
1496 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1497 let _ = tx.send(true);
1498 }
1499 }
1500
1501 /// Handle termination of a client session.
1502 ///
1503 /// This cleans up any state in the coordinator associated with the session.
1504 #[mz_ore::instrument(level = "debug")]
1505 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1506 if !self.active_conns.contains_key(&conn_id) {
1507 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1508 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1509 // debug. This panic is very infrequent so we want as much information as possible.
1510 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1511 panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1512 }
1513
1514 // We do not need to call clear_transaction here because there are no side effects to run
1515 // based on any session transaction state.
1516 self.clear_connection(&conn_id).await;
1517
1518 self.drop_temp_items(&conn_id).await;
1519 self.catalog_mut()
1520 .drop_temporary_schema(&conn_id)
1521 .unwrap_or_terminate("unable to drop temporary schema");
1522 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1523 let session_type = metrics::session_type_label_value(conn.user());
1524 self.metrics
1525 .active_sessions
1526 .with_label_values(&[session_type])
1527 .dec();
1528 self.cancel_pending_peeks(conn.conn_id());
1529 self.cancel_pending_watchsets(&conn_id);
1530 self.cancel_pending_copy(&conn_id);
1531 self.end_session_for_statement_logging(conn.uuid());
1532
1533 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1534 // this to prevent blocking the Coordinator in the case that a lot of connections are
1535 // closed at once, which occurs regularly in some workflows.
1536 let update = self
1537 .catalog()
1538 .state()
1539 .pack_session_update(&conn, Diff::MINUS_ONE);
1540 let update = self.catalog().state().resolve_builtin_table_update(update);
1541
1542 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1543 }
1544
1545 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1546 /// rows.
1547 #[mz_ore::instrument(level = "debug")]
1548 fn handle_get_webhook(
1549 &mut self,
1550 database: String,
1551 schema: String,
1552 name: String,
1553 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1554 ) {
1555 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1556 ///
1557 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1558 /// types we should cast the request to.
1559 fn resolve(
1560 coord: &mut Coordinator,
1561 database: String,
1562 schema: String,
1563 name: String,
1564 ) -> Result<AppendWebhookResponse, PartialItemName> {
1565 // Resolve our collection.
1566 let name = PartialItemName {
1567 database: Some(database),
1568 schema: Some(schema),
1569 item: name,
1570 };
1571 let Ok(entry) = coord
1572 .catalog()
1573 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1574 else {
1575 return Err(name);
1576 };
1577
1578 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1579 let (data_source, desc, global_id) = match entry.item() {
1580 CatalogItem::Source(Source {
1581 data_source: data_source @ DataSourceDesc::Webhook { .. },
1582 desc,
1583 global_id,
1584 ..
1585 }) => (data_source, desc.clone(), *global_id),
1586 CatalogItem::Table(
1587 table @ Table {
1588 desc,
1589 data_source:
1590 TableDataSource::DataSource {
1591 desc: data_source @ DataSourceDesc::Webhook { .. },
1592 ..
1593 },
1594 ..
1595 },
1596 ) => (data_source, desc.latest(), table.global_id_writes()),
1597 _ => return Err(name),
1598 };
1599
1600 let DataSourceDesc::Webhook {
1601 validate_using,
1602 body_format,
1603 headers,
1604 ..
1605 } = data_source
1606 else {
1607 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1608 return Err(name);
1609 };
1610 let body_format = body_format.clone();
1611 let header_tys = headers.clone();
1612
1613 // Assert we have one column for the body, and how ever many are required for
1614 // the headers.
1615 let num_columns = headers.num_columns() + 1;
1616 mz_ore::soft_assert_or_log!(
1617 desc.arity() <= num_columns,
1618 "expected at most {} columns, but got {}",
1619 num_columns,
1620 desc.arity()
1621 );
1622
1623 // Double check that the body column of the webhook source matches the type
1624 // we're about to deserialize as.
1625 let body_column = desc
1626 .get_by_name(&"body".into())
1627 .map(|(_idx, ty)| ty.clone())
1628 .ok_or_else(|| name.clone())?;
1629 assert!(!body_column.nullable, "webhook body column is nullable!?");
1630 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1631
1632 // Create a validator that can be called to validate a webhook request.
1633 let validator = validate_using.as_ref().map(|v| {
1634 let validation = v.clone();
1635 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1636 });
1637
1638 // Get a channel so we can queue updates to be written.
1639 let row_tx = coord
1640 .controller
1641 .storage
1642 .monotonic_appender(global_id)
1643 .map_err(|_| name.clone())?;
1644 let stats = coord
1645 .controller
1646 .storage
1647 .webhook_statistics(global_id)
1648 .map_err(|_| name)?;
1649 let invalidator = coord
1650 .active_webhooks
1651 .entry(entry.id())
1652 .or_insert_with(WebhookAppenderInvalidator::new);
1653 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1654
1655 Ok(AppendWebhookResponse {
1656 tx,
1657 body_format,
1658 header_tys,
1659 validator,
1660 })
1661 }
1662
1663 let response = resolve(self, database, schema, name).map_err(|name| {
1664 AppendWebhookError::UnknownWebhook {
1665 database: name.database.expect("provided"),
1666 schema: name.schema.expect("provided"),
1667 name: name.item,
1668 }
1669 });
1670 let _ = tx.send(response);
1671 }
1672}