mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use differential_dataflow::lattice::Lattice;
14use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
15use mz_auth::password::Password;
16use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
17use mz_sql::session::metadata::SessionMetadata;
18use std::collections::{BTreeMap, BTreeSet};
19use std::net::IpAddr;
20use std::sync::Arc;
21
22use futures::FutureExt;
23use futures::future::LocalBoxFuture;
24use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
27use mz_ore::task;
28use mz_ore::tracing::OpenTelemetryContext;
29use mz_ore::{instrument, soft_panic_or_log};
30use mz_repr::role_id::RoleId;
31use mz_repr::{Diff, ScalarType, Timestamp};
32use mz_sql::ast::{
33 AlterConnectionAction, AlterConnectionStatement, AlterSourceAction, AstInfo, ConstantVisitor,
34 CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, SubscribeStatement,
35};
36use mz_sql::catalog::RoleAttributes;
37use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
38use mz_sql::plan::{
39 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
40 StatementClassification, TransactionType,
41};
42use mz_sql::pure::{
43 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
44};
45use mz_sql::rbac;
46use mz_sql::rbac::CREATE_ITEM_USAGE;
47use mz_sql::session::user::User;
48use mz_sql::session::vars::{
49 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
50};
51use mz_sql_parser::ast::display::AstDisplay;
52use mz_sql_parser::ast::{
53 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
54 WithOptionValue,
55};
56use mz_storage_types::sources::Timeline;
57use opentelemetry::trace::TraceContextExt;
58use tokio::sync::{mpsc, oneshot};
59use tracing::{Instrument, debug_span, info, warn};
60use tracing_opentelemetry::OpenTelemetrySpanExt;
61
62use crate::command::{AuthResponse, CatalogSnapshot, Command, ExecuteResponse, StartupResponse};
63use crate::coord::appends::PendingWriteTxn;
64use crate::coord::{
65 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
66 PurifiedStatementReady, validate_ip_with_policy_rules,
67};
68use crate::error::AdapterError;
69use crate::notice::AdapterNotice;
70use crate::session::{Session, TransactionOps, TransactionStatus};
71use crate::util::{ClientTransmitter, ResultExt};
72use crate::webhook::{
73 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
74};
75use crate::{AppendWebhookError, ExecuteContext, TimestampProvider, catalog, metrics};
76
77use super::ExecuteContextExtra;
78
79impl Coordinator {
80 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
81 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
82 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
83 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<()> {
84 async move {
85 if let Some(session) = cmd.session_mut() {
86 session.apply_external_metadata_updates();
87 }
88 match cmd {
89 Command::Startup {
90 tx,
91 user,
92 conn_id,
93 secret_key,
94 uuid,
95 client_ip,
96 application_name,
97 notice_tx,
98 } => {
99 // Note: We purposefully do not use a ClientTransmitter here because startup
100 // handles errors and cleanup of sessions itself.
101 self.handle_startup(
102 tx,
103 user,
104 conn_id,
105 secret_key,
106 uuid,
107 client_ip,
108 application_name,
109 notice_tx,
110 )
111 .await;
112 }
113
114 Command::AuthenticatePassword {
115 tx,
116 role_name,
117 password,
118 } => {
119 self.handle_authenticate_password(tx, role_name, password)
120 .await;
121 }
122
123 Command::Execute {
124 portal_name,
125 session,
126 tx,
127 outer_ctx_extra,
128 } => {
129 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
130
131 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
132 .await;
133 }
134
135 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
136
137 Command::CancelRequest {
138 conn_id,
139 secret_key,
140 } => {
141 self.handle_cancel(conn_id, secret_key).await;
142 }
143
144 Command::PrivilegedCancelRequest { conn_id } => {
145 self.handle_privileged_cancel(conn_id).await;
146 }
147
148 Command::GetWebhook {
149 database,
150 schema,
151 name,
152 tx,
153 } => {
154 self.handle_get_webhook(database, schema, name, tx);
155 }
156
157 Command::GetSystemVars { tx } => {
158 let _ = tx.send(self.catalog.system_config().clone());
159 }
160
161 Command::SetSystemVars { vars, conn_id, tx } => {
162 let mut ops = Vec::with_capacity(vars.len());
163 let conn = &self.active_conns[&conn_id];
164
165 for (name, value) in vars {
166 if let Err(e) =
167 self.catalog().system_config().get(&name).and_then(|var| {
168 var.visible(conn.user(), self.catalog.system_config())
169 })
170 {
171 let _ = tx.send(Err(e.into()));
172 return;
173 }
174
175 ops.push(catalog::Op::UpdateSystemConfiguration {
176 name,
177 value: OwnedVarInput::Flat(value),
178 });
179 }
180
181 let result = self.catalog_transact_conn(Some(&conn_id), ops).await;
182 let _ = tx.send(result);
183 }
184
185 Command::Terminate { conn_id, tx } => {
186 self.handle_terminate(conn_id).await;
187 // Note: We purposefully do not use a ClientTransmitter here because we're already
188 // terminating the provided session.
189 if let Some(tx) = tx {
190 let _ = tx.send(Ok(()));
191 }
192 }
193
194 Command::Commit {
195 action,
196 session,
197 tx,
198 } => {
199 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
200 // We reach here not through a statement execution, but from the
201 // "commit" pgwire command. Thus, we just generate a default statement
202 // execution context (once statement logging is implemented, this will cause nothing to be logged
203 // when the execution finishes.)
204 let ctx = ExecuteContext::from_parts(
205 tx,
206 self.internal_cmd_tx.clone(),
207 session,
208 Default::default(),
209 );
210 let plan = match action {
211 EndTransactionAction::Commit => {
212 Plan::CommitTransaction(CommitTransactionPlan {
213 transaction_type: TransactionType::Implicit,
214 })
215 }
216 EndTransactionAction::Rollback => {
217 Plan::AbortTransaction(AbortTransactionPlan {
218 transaction_type: TransactionType::Implicit,
219 })
220 }
221 };
222
223 let conn_id = ctx.session().conn_id().clone();
224 self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
225 // Part of the Command::Commit contract is that the Coordinator guarantees that
226 // it has cleared its transaction state for the connection.
227 self.clear_connection(&conn_id).await;
228 }
229
230 Command::CatalogSnapshot { tx } => {
231 let _ = tx.send(CatalogSnapshot {
232 catalog: self.owned_catalog(),
233 });
234 }
235
236 Command::CheckConsistency { tx } => {
237 let _ = tx.send(self.check_consistency());
238 }
239
240 Command::Dump { tx } => {
241 let _ = tx.send(self.dump().await);
242 }
243 }
244 }
245 .instrument(debug_span!("handle_command"))
246 .boxed_local()
247 }
248
249 #[mz_ore::instrument(level = "debug")]
250 async fn handle_authenticate_password(
251 &mut self,
252 tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
253 role_name: String,
254 password: Option<Password>,
255 ) {
256 let Some(password) = password else {
257 // The user did not provide a password.
258 let _ = tx.send(Err(AdapterError::AuthenticationError));
259 return;
260 };
261
262 if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
263 if !role.attributes.login.unwrap_or(false) {
264 // The user is not allowed to login.
265 let _ = tx.send(Err(AdapterError::AuthenticationError));
266 return;
267 }
268 if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
269 if let Some(hash) = &auth.password_hash {
270 let _ = match mz_auth::hash::scram256_verify(&password, hash) {
271 Ok(_) => tx.send(Ok(AuthResponse {
272 role_id: role.id,
273 superuser: role.attributes.superuser.unwrap_or(false),
274 })),
275 Err(_) => tx.send(Err(AdapterError::AuthenticationError)),
276 };
277 return;
278 }
279 }
280 // Authentication failed due to incorrect password or missing password hash.
281 let _ = tx.send(Err(AdapterError::AuthenticationError));
282 } else {
283 // The user does not exist.
284 let _ = tx.send(Err(AdapterError::AuthenticationError));
285 }
286 }
287
288 #[mz_ore::instrument(level = "debug")]
289 async fn handle_startup(
290 &mut self,
291 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
292 user: User,
293 conn_id: ConnectionId,
294 secret_key: u32,
295 uuid: uuid::Uuid,
296 client_ip: Option<IpAddr>,
297 application_name: String,
298 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
299 ) {
300 // Early return if successful, otherwise cleanup any possible state.
301 match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
302 Ok((role_id, session_defaults)) => {
303 let session_type = metrics::session_type_label_value(&user);
304 self.metrics
305 .active_sessions
306 .with_label_values(&[session_type])
307 .inc();
308 let conn = ConnMeta {
309 secret_key,
310 notice_tx,
311 drop_sinks: BTreeSet::new(),
312 pending_cluster_alters: BTreeSet::new(),
313 connected_at: self.now(),
314 user,
315 application_name,
316 uuid,
317 client_ip,
318 conn_id: conn_id.clone(),
319 authenticated_role: role_id,
320 deferred_lock: None,
321 };
322 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
323 let update = self.catalog().state().resolve_builtin_table_update(update);
324 self.begin_session_for_statement_logging(&conn);
325 self.active_conns.insert(conn_id.clone(), conn);
326
327 // Note: Do NOT await the notify here, we pass this back to
328 // whatever requested the startup to prevent blocking startup
329 // and the Coordinator on a builtin table update.
330 let updates = vec![update];
331 // It's not a hard error if our list is missing a builtin table, but we want to
332 // make sure these two things stay in-sync.
333 if mz_ore::assert::soft_assertions_enabled() {
334 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
335 .iter()
336 .map(|table| self.catalog().resolve_builtin_table(*table))
337 .collect();
338 let updates_tracked = updates
339 .iter()
340 .all(|update| required_tables.contains(&update.id));
341 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
342 .iter()
343 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
344 mz_ore::soft_assert_or_log!(
345 updates_tracked,
346 "not tracking all required builtin table updates!"
347 );
348 // TODO(parkmycar): When checking if a query depends on these builtin table
349 // writes we do not check the transitive dependencies of the query, because
350 // we don't support creating views on mz_internal objects. If one of these
351 // tables is promoted out of mz_internal then we'll need to add this check.
352 mz_ore::soft_assert_or_log!(
353 all_mz_internal,
354 "not all builtin tables are in mz_internal! need to check transitive depends",
355 )
356 }
357 let notify = self.builtin_table_update().background(updates);
358
359 let resp = Ok(StartupResponse {
360 role_id,
361 write_notify: notify,
362 session_defaults,
363 catalog: self.owned_catalog(),
364 });
365 if tx.send(resp).is_err() {
366 // Failed to send to adapter, but everything is setup so we can terminate
367 // normally.
368 self.handle_terminate(conn_id).await;
369 }
370 }
371 Err(e) => {
372 // Error during startup or sending to adapter, cleanup possible state created by
373 // handle_startup_inner. A user may have been created and it can stay; no need to
374 // delete it.
375 self.catalog_mut()
376 .drop_temporary_schema(&conn_id)
377 .unwrap_or_terminate("unable to drop temporary schema");
378
379 // Communicate the error back to the client. No need to
380 // handle failures to send the error back; we've already
381 // cleaned up all necessary state.
382 let _ = tx.send(Err(e));
383 }
384 }
385 }
386
387 // Failible startup work that needs to be cleaned up on error.
388 async fn handle_startup_inner(
389 &mut self,
390 user: &User,
391 conn_id: &ConnectionId,
392 client_ip: &Option<IpAddr>,
393 ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
394 if self.catalog().try_get_role_by_name(&user.name).is_none() {
395 // If the user has made it to this point, that means they have been fully authenticated.
396 // This includes preventing any user, except a pre-defined set of system users, from
397 // connecting to an internal port. Therefore it's ok to always create a new role for the
398 // user.
399 let attributes = RoleAttributes::new();
400 let plan = CreateRolePlan {
401 name: user.name.to_string(),
402 attributes,
403 };
404 self.sequence_create_role_for_startup(plan).await?;
405 }
406 let role_id = self
407 .catalog()
408 .try_get_role_by_name(&user.name)
409 .expect("created above")
410 .id;
411
412 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
413 return Err(AdapterError::UserSessionsDisallowed);
414 }
415
416 // Initialize the default session variables for this role.
417 let mut session_defaults = BTreeMap::new();
418 let system_config = self.catalog().state().system_config();
419
420 // Override the session with any system defaults.
421 session_defaults.extend(
422 system_config
423 .iter_session()
424 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
425 );
426 // Special case.
427 let statement_logging_default = system_config
428 .statement_logging_default_sample_rate()
429 .format();
430 session_defaults.insert(
431 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
432 OwnedVarInput::Flat(statement_logging_default),
433 );
434 // Override system defaults with role defaults.
435 session_defaults.extend(
436 self.catalog()
437 .get_role(&role_id)
438 .vars()
439 .map(|(name, val)| (name.to_string(), val.clone())),
440 );
441
442 // Validate network policies for external users. Internal users can only connect on the
443 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
444 // to ensure these internal interfaces are well secured.
445 //
446 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
447 // the default network policy for this role, so we read directly out of what the session
448 // will get initialized with.
449 if !user.is_internal() {
450 let network_policy_name = session_defaults
451 .get(NETWORK_POLICY.name())
452 .and_then(|value| match value {
453 OwnedVarInput::Flat(name) => Some(name.clone()),
454 OwnedVarInput::SqlSet(names) => {
455 tracing::error!(?names, "found multiple network policies");
456 None
457 }
458 })
459 .unwrap_or_else(|| system_config.default_network_policy_name());
460 let maybe_network_policy = self
461 .catalog()
462 .get_network_policy_by_name(&network_policy_name);
463
464 let Some(network_policy) = maybe_network_policy else {
465 // We should prevent dropping the default network policy, or setting the policy
466 // to something that doesn't exist, so complain loudly if this occurs.
467 tracing::error!(
468 network_policy_name,
469 "default network policy does not exist. All user traffic will be blocked"
470 );
471 let reason = match client_ip {
472 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
473 None => super::NetworkPolicyError::MissingIp,
474 };
475 return Err(AdapterError::NetworkPolicyDenied(reason));
476 };
477
478 if let Some(ip) = client_ip {
479 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
480 Ok(_) => {}
481 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
482 }
483 } else {
484 // Only temporary and internal representation of a session
485 // should be missing a client_ip. These sessions should not be
486 // making requests or going through handle_startup.
487 return Err(AdapterError::NetworkPolicyDenied(
488 super::NetworkPolicyError::MissingIp,
489 ));
490 }
491 }
492
493 self.catalog_mut()
494 .create_temporary_schema(conn_id, role_id)?;
495
496 Ok((role_id, session_defaults))
497 }
498
499 /// Handles an execute command.
500 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
501 pub(crate) async fn handle_execute(
502 &mut self,
503 portal_name: String,
504 mut session: Session,
505 tx: ClientTransmitter<ExecuteResponse>,
506 // If this command was part of another execute command
507 // (for example, executing a `FETCH` statement causes an execute to be
508 // issued for the cursor it references),
509 // then `outer_context` should be `Some`.
510 // This instructs the coordinator that the
511 // outer execute should be considered finished once the inner one is.
512 outer_context: Option<ExecuteContextExtra>,
513 ) {
514 if session.vars().emit_trace_id_notice() {
515 let span_context = tracing::Span::current()
516 .context()
517 .span()
518 .span_context()
519 .clone();
520 if span_context.is_valid() {
521 session.add_notice(AdapterNotice::QueryTrace {
522 trace_id: span_context.trace_id(),
523 });
524 }
525 }
526
527 if let Err(err) = self.verify_portal(&mut session, &portal_name) {
528 // If statement logging hasn't started yet, we don't need
529 // to add any "end" event, so just make up a no-op
530 // `ExecuteContextExtra` here, via `Default::default`.
531 //
532 // It's a bit unfortunate because the edge case of failed
533 // portal verifications won't show up in statement
534 // logging, but there seems to be nothing else we can do,
535 // because we need access to the portal to begin logging.
536 //
537 // Another option would be to log a begin and end event, but just fill in NULLs
538 // for everything we get from the portal (prepared statement id, params).
539 let extra = outer_context.unwrap_or_else(Default::default);
540 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
541 return ctx.retire(Err(err));
542 }
543
544 // The reference to `portal` can't outlive `session`, which we
545 // use to construct the context, so scope the reference to this block where we
546 // get everything we need from the portal for later.
547 let (stmt, ctx, params) = {
548 let portal = session
549 .get_portal_unverified(&portal_name)
550 .expect("known to exist");
551 let params = portal.parameters.clone();
552 let stmt = portal.stmt.clone();
553 let logging = Arc::clone(&portal.logging);
554
555 let extra = if let Some(extra) = outer_context {
556 // We are executing in the context of another SQL statement, so we don't
557 // want to begin statement logging anew. The context of the actual statement
558 // being executed is the one that should be retired once this finishes.
559 extra
560 } else {
561 // This is a new statement, log it and return the context
562 let maybe_uuid = self.begin_statement_execution(&mut session, ¶ms, &logging);
563
564 ExecuteContextExtra::new(maybe_uuid)
565 };
566 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
567 (stmt, ctx, params)
568 };
569
570 let stmt = match stmt {
571 Some(stmt) => stmt,
572 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
573 };
574
575 let session_type = metrics::session_type_label_value(ctx.session().user());
576 let stmt_type = metrics::statement_type_label_value(&stmt);
577 self.metrics
578 .query_total
579 .with_label_values(&[session_type, stmt_type])
580 .inc();
581 match &*stmt {
582 Statement::Subscribe(SubscribeStatement { output, .. })
583 | Statement::Copy(CopyStatement {
584 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
585 ..
586 }) => {
587 self.metrics
588 .subscribe_outputs
589 .with_label_values(&[
590 session_type,
591 metrics::subscribe_output_label_value(output),
592 ])
593 .inc();
594 }
595 _ => {}
596 }
597
598 self.handle_execute_inner(stmt, params, ctx).await
599 }
600
601 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
602 pub(crate) async fn handle_execute_inner(
603 &mut self,
604 stmt: Arc<Statement<Raw>>,
605 params: Params,
606 mut ctx: ExecuteContext,
607 ) {
608 // This comment describes the various ways DDL can execute (the ordered operations: name
609 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
610 // three notable properties that all partially interact.
611 //
612 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
613 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
614 // We announce success of the single DDL when it is executed, but do not attempt to plan
615 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
616 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
617 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
618 // work. When the single DDL is announced as successful we also put the session's
619 // transaction ops into `SingleStatement` which will produce an error if any other
620 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
621 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
622 // statement.
623 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
624 // any number of only these DDL statements to be executed in a transaction. At sequencing
625 // these generate the `Op::TransactionDryRun` catalog op. When applied with
626 // `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
627 // `catalog_transact_with_ddl_transaction` function intercepts that error and reports
628 // success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
629 // of the ops but without dry run are applied. The purpose of this is to allow multiple,
630 // atomic renames in the same transaction.
631 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
632 // makes network calls (interfacing with secrets, optimization of views/indexes, source
633 // purification). These must guarantee correctness when they return to the main
634 // coordinator thread because the catalog state could have changed while they were doing
635 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
636 // of IDs that we needed to exist. We discovered the way we were doing that was not
637 // always correct. Instead of attempting to get that completely right, we have opted to
638 // serialize DDL. Getting this right is difficult because catalog changes can affect name
639 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
640 // aware of all possible catalog changes that would affect any of those parts is not
641 // something our current code has been designed to be helpful at. Even if a DDL statement
642 // is doing off-thread work, another DDL must not yet execute at all. Executing these
643 // serially will guarantee that no off-thread work has affected the state of the catalog.
644 // This is done by adding a VecDeque of deferred statements and a lock to the
645 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
646 // transaction ops are needed to the session as described above), it attempts to own the
647 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
648 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
649 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
650 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
651 // awaits dequeuing caused by the lock being released.
652
653 // Verify that this statement type can be executed in the current
654 // transaction state.
655 match ctx.session().transaction() {
656 // By this point we should be in a running transaction.
657 TransactionStatus::Default => unreachable!(),
658
659 // Failed transactions have already been checked in pgwire for a safe statement
660 // (COMMIT, ROLLBACK, etc.) and can proceed.
661 TransactionStatus::Failed(_) => {}
662
663 // Started is a deceptive name, and means different things depending on which
664 // protocol was used. It's either exactly one statement (known because this
665 // is the simple protocol and the parser parsed the entire string, and it had
666 // one statement). Or from the extended protocol, it means *some* query is
667 // being executed, but there might be others after it before the Sync (commit)
668 // message. Postgres handles this by teaching Started to eagerly commit certain
669 // statements that can't be run in a transaction block.
670 TransactionStatus::Started(_) => {
671 if let Statement::Declare(_) = &*stmt {
672 // Declare is an exception. Although it's not against any spec to execute
673 // it, it will always result in nothing happening, since all portals will be
674 // immediately closed. Users don't know this detail, so this error helps them
675 // understand what's going wrong. Postgres does this too.
676 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
677 "DECLARE CURSOR".into(),
678 )));
679 }
680 }
681
682 // Implicit or explicit transactions.
683 //
684 // Implicit transactions happen when a multi-statement query is executed
685 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
686 // then the existing implicit transaction will be upgraded to an explicit
687 // transaction. Thus, we should not separate what implicit and explicit
688 // transactions can do unless there's some additional checking to make sure
689 // something disallowed in explicit transactions did not previously take place
690 // in the implicit portion.
691 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
692 match &*stmt {
693 // Statements that are safe in a transaction. We still need to verify that we
694 // don't interleave reads and writes since we can't perform those serializably.
695 Statement::Close(_)
696 | Statement::Commit(_)
697 | Statement::Copy(_)
698 | Statement::Deallocate(_)
699 | Statement::Declare(_)
700 | Statement::Discard(_)
701 | Statement::Execute(_)
702 | Statement::ExplainPlan(_)
703 | Statement::ExplainPushdown(_)
704 | Statement::ExplainTimestamp(_)
705 | Statement::ExplainSinkSchema(_)
706 | Statement::Fetch(_)
707 | Statement::Prepare(_)
708 | Statement::Rollback(_)
709 | Statement::Select(_)
710 | Statement::SetTransaction(_)
711 | Statement::Show(_)
712 | Statement::SetVariable(_)
713 | Statement::ResetVariable(_)
714 | Statement::StartTransaction(_)
715 | Statement::Subscribe(_)
716 | Statement::Raise(_) => {
717 // Always safe.
718 }
719
720 Statement::Insert(InsertStatement {
721 source, returning, ..
722 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
723 // Inserting from constant values statements that do not need to execute on
724 // any cluster (no RETURNING) is always safe.
725 }
726
727 // These statements must be kept in-sync with `must_serialize_ddl()`.
728 Statement::AlterObjectRename(_) | Statement::AlterObjectSwap(_) => {
729 let state = self.catalog().for_session(ctx.session()).state().clone();
730 let revision = self.catalog().transient_revision();
731
732 // Initialize our transaction with a set of empty ops, or return an error
733 // if we can't run a DDL transaction
734 let txn_status = ctx.session_mut().transaction_mut();
735 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
736 ops: vec![],
737 state,
738 revision,
739 }) {
740 return ctx.retire(Err(err));
741 }
742 }
743
744 // Statements below must by run singly (in Started).
745 Statement::AlterCluster(_)
746 | Statement::AlterConnection(_)
747 | Statement::AlterDefaultPrivileges(_)
748 | Statement::AlterIndex(_)
749 | Statement::AlterSetCluster(_)
750 | Statement::AlterOwner(_)
751 | Statement::AlterRetainHistory(_)
752 | Statement::AlterRole(_)
753 | Statement::AlterSecret(_)
754 | Statement::AlterSink(_)
755 | Statement::AlterSource(_)
756 | Statement::AlterSystemReset(_)
757 | Statement::AlterSystemResetAll(_)
758 | Statement::AlterSystemSet(_)
759 | Statement::AlterTableAddColumn(_)
760 | Statement::AlterNetworkPolicy(_)
761 | Statement::CreateCluster(_)
762 | Statement::CreateClusterReplica(_)
763 | Statement::CreateConnection(_)
764 | Statement::CreateDatabase(_)
765 | Statement::CreateIndex(_)
766 | Statement::CreateMaterializedView(_)
767 | Statement::CreateContinualTask(_)
768 | Statement::CreateRole(_)
769 | Statement::CreateSchema(_)
770 | Statement::CreateSecret(_)
771 | Statement::CreateSink(_)
772 | Statement::CreateSource(_)
773 | Statement::CreateSubsource(_)
774 | Statement::CreateTable(_)
775 | Statement::CreateTableFromSource(_)
776 | Statement::CreateType(_)
777 | Statement::CreateView(_)
778 | Statement::CreateWebhookSource(_)
779 | Statement::CreateNetworkPolicy(_)
780 | Statement::Delete(_)
781 | Statement::DropObjects(_)
782 | Statement::DropOwned(_)
783 | Statement::GrantPrivileges(_)
784 | Statement::GrantRole(_)
785 | Statement::Insert(_)
786 | Statement::ReassignOwned(_)
787 | Statement::RevokePrivileges(_)
788 | Statement::RevokeRole(_)
789 | Statement::Update(_)
790 | Statement::ValidateConnection(_)
791 | Statement::Comment(_) => {
792 let txn_status = ctx.session_mut().transaction_mut();
793
794 // If we're not in an implicit transaction and we could generate exactly one
795 // valid ExecuteResponse, we can delay execution until commit.
796 if !txn_status.is_implicit() {
797 // Statements whose tag is trivial (known only from an unexecuted statement) can
798 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
799 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
800 // delay execution until `COMMIT`.
801 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
802 if let Err(err) = txn_status
803 .add_ops(TransactionOps::SingleStatement { stmt, params })
804 {
805 ctx.retire(Err(err));
806 return;
807 }
808 ctx.retire(Ok(resp));
809 return;
810 }
811 }
812
813 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
814 stmt.to_string(),
815 )));
816 }
817 }
818 }
819 }
820
821 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
822 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
823 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
824 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
825 // Coordinator::clear_transaction is correctly called when session transactions are ended
826 // because that function will release the held lock from active_conns.
827 if Self::must_serialize_ddl(&stmt, &ctx) {
828 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
829 let prev = self
830 .active_conns
831 .get_mut(ctx.session().conn_id())
832 .expect("connection must exist")
833 .deferred_lock
834 .replace(guard);
835 assert!(
836 prev.is_none(),
837 "connections should have at most one lock guard"
838 );
839 } else {
840 if self
841 .active_conns
842 .get(ctx.session().conn_id())
843 .expect("connection must exist")
844 .deferred_lock
845 .is_some()
846 {
847 // This session *already* has the lock, and incorrectly tried to execute another
848 // DDL while still holding the lock, violating the assumption documented above.
849 // This is an internal error, probably in some AdapterClient user (pgwire or
850 // http). Because the session is now in some unexpected state, return an error
851 // which should cause the AdapterClient user to fail the transaction.
852 // (Terminating the connection is maybe what we would prefer to do, but is not
853 // currently a thing we can do from the coordinator: calling handle_terminate
854 // cleans up Coordinator state for the session but doesn't inform the
855 // AdapterClient that the session should terminate.)
856 soft_panic_or_log!(
857 "session {} attempted to get ddl lock while already owning it",
858 ctx.session().conn_id()
859 );
860 ctx.retire(Err(AdapterError::Internal(
861 "session attempted to get ddl lock while already owning it".to_string(),
862 )));
863 return;
864 }
865 self.serialized_ddl.push_back(DeferredPlanStatement {
866 ctx,
867 ps: PlanStatement::Statement { stmt, params },
868 });
869 return;
870 }
871 }
872
873 let catalog = self.catalog();
874 let catalog = catalog.for_session(ctx.session());
875 let original_stmt = Arc::clone(&stmt);
876 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
877 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
878 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
879 Ok(resolved) => resolved,
880 Err(e) => return ctx.retire(Err(e.into())),
881 };
882 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
883 // purification. This should be done back on the main thread.
884 // We do the validation:
885 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
886 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
887 // occurs.
888 let (stmt, resolved_ids) = match stmt {
889 // Various statements must be purified off the main coordinator thread of control.
890 stmt if Self::must_spawn_purification(&stmt) => {
891 let internal_cmd_tx = self.internal_cmd_tx.clone();
892 let conn_id = ctx.session().conn_id().clone();
893 let catalog = self.owned_catalog();
894 let now = self.now();
895 let otel_ctx = OpenTelemetryContext::obtain();
896 let current_storage_configuration = self.controller.storage.config().clone();
897 task::spawn(|| format!("purify:{conn_id}"), async move {
898 let transient_revision = catalog.transient_revision();
899 let catalog = catalog.for_session(ctx.session());
900
901 // Checks if the session is authorized to purify a statement. Usually
902 // authorization is checked after planning, however purification happens before
903 // planning, which may require the use of some connections and secrets.
904 if let Err(e) = rbac::check_usage(
905 &catalog,
906 ctx.session(),
907 &resolved_ids,
908 &CREATE_ITEM_USAGE,
909 ) {
910 return ctx.retire(Err(e.into()));
911 }
912
913 let (result, cluster_id) = mz_sql::pure::purify_statement(
914 catalog,
915 now,
916 stmt,
917 ¤t_storage_configuration,
918 )
919 .await;
920 let result = result.map_err(|e| e.into());
921 let dependency_ids = resolved_ids.items().copied().collect();
922 let plan_validity = PlanValidity::new(
923 transient_revision,
924 dependency_ids,
925 cluster_id,
926 None,
927 ctx.session().role_metadata().clone(),
928 );
929 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
930 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
931 PurifiedStatementReady {
932 ctx,
933 result,
934 params,
935 plan_validity,
936 original_stmt,
937 otel_ctx,
938 },
939 ));
940 if let Err(e) = result {
941 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
942 }
943 });
944 return;
945 }
946
947 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
948 // automatically as part of purification
949 Statement::CreateSubsource(_) => {
950 ctx.retire(Err(AdapterError::Unsupported(
951 "CREATE SUBSOURCE statements",
952 )));
953 return;
954 }
955
956 Statement::CreateMaterializedView(mut cmvs) => {
957 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
958 // only used for storing initial frontiers in the catalog.
959 if cmvs.as_of.is_some() {
960 return ctx.retire(Err(AdapterError::Unsupported(
961 "CREATE MATERIALIZED VIEW ... AS OF statements",
962 )));
963 }
964
965 let mz_now = match self
966 .resolve_mz_now_for_create_materialized_view(
967 &cmvs,
968 &resolved_ids,
969 ctx.session_mut(),
970 true,
971 )
972 .await
973 {
974 Ok(mz_now) => mz_now,
975 Err(e) => return ctx.retire(Err(e)),
976 };
977
978 let owned_catalog = self.owned_catalog();
979 let catalog = owned_catalog.for_session(ctx.session());
980
981 purify_create_materialized_view_options(
982 catalog,
983 mz_now,
984 &mut cmvs,
985 &mut resolved_ids,
986 );
987
988 let purified_stmt =
989 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
990 if_exists: cmvs.if_exists,
991 name: cmvs.name,
992 columns: cmvs.columns,
993 in_cluster: cmvs.in_cluster,
994 query: cmvs.query,
995 with_options: cmvs.with_options,
996 as_of: None,
997 });
998
999 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1000 // `Message::PurifiedStatementReady` here.)
1001 (purified_stmt, resolved_ids)
1002 }
1003
1004 Statement::ExplainPlan(ExplainPlanStatement {
1005 stage,
1006 with_options,
1007 format,
1008 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1009 }) => {
1010 let mut cmvs = *box_cmvs;
1011 let mz_now = match self
1012 .resolve_mz_now_for_create_materialized_view(
1013 &cmvs,
1014 &resolved_ids,
1015 ctx.session_mut(),
1016 false,
1017 )
1018 .await
1019 {
1020 Ok(mz_now) => mz_now,
1021 Err(e) => return ctx.retire(Err(e)),
1022 };
1023
1024 let owned_catalog = self.owned_catalog();
1025 let catalog = owned_catalog.for_session(ctx.session());
1026
1027 purify_create_materialized_view_options(
1028 catalog,
1029 mz_now,
1030 &mut cmvs,
1031 &mut resolved_ids,
1032 );
1033
1034 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1035 stage,
1036 with_options,
1037 format,
1038 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1039 });
1040
1041 (purified_stmt, resolved_ids)
1042 }
1043
1044 // All other statements are handled immediately.
1045 _ => (stmt, resolved_ids),
1046 };
1047
1048 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1049 Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1050 Err(e) => ctx.retire(Err(e)),
1051 }
1052 }
1053
1054 /// Whether the statement must be serialized and is DDL.
1055 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1056 // Non-DDL is not serialized here.
1057 if !StatementClassification::from(&*stmt).is_ddl() {
1058 return false;
1059 }
1060 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1061 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1062 // those checks are sufficient.
1063 if Self::must_spawn_purification(stmt) {
1064 return false;
1065 }
1066
1067 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1068 // Their operations are serialized when applied to the catalog, guaranteeing that any
1069 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1070 if ctx.session.transaction().is_ddl() {
1071 return false;
1072 }
1073
1074 // Some DDL is exempt. It is not great that we are matching on Statements here because
1075 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1076 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1077 // planned in the first place, so we accept the abstraction leak.
1078 match stmt {
1079 // Secrets have a small and understood set of dependencies, and their off-thread work
1080 // interacts with k8s.
1081 Statement::AlterSecret(_) => false,
1082 Statement::CreateSecret(_) => false,
1083 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1084 if actions
1085 .iter()
1086 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1087 {
1088 false
1089 }
1090
1091 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1092 // does not affect its catalog names or ids and so is safe to not serialize. This could
1093 // change the set of replicas that exist. For queries that name replicas or use the
1094 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1095 // ensure that those replicas exist during the query finish stage. Additionally, that
1096 // work can take hours (configured by the user), so would also be a bad experience for
1097 // users.
1098 Statement::AlterCluster(_) => false,
1099
1100 // Everything else must be serialized.
1101 _ => true,
1102 }
1103 }
1104
1105 /// Whether the statement must be purified off of the Coordinator thread.
1106 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1107 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1108 // coordinator thread.
1109 if !matches!(
1110 stmt,
1111 Statement::CreateSource(_)
1112 | Statement::AlterSource(_)
1113 | Statement::CreateSink(_)
1114 | Statement::CreateTableFromSource(_)
1115 ) {
1116 return false;
1117 }
1118
1119 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1120 if let Statement::AlterSource(stmt) = stmt {
1121 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1122 AlterSourceAction::SetOptions(options) => {
1123 options.iter().map(|o| o.name.clone()).collect()
1124 }
1125 AlterSourceAction::ResetOptions(names) => names.clone(),
1126 _ => vec![],
1127 };
1128 if !names.is_empty()
1129 && names
1130 .iter()
1131 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1132 {
1133 return false;
1134 }
1135 }
1136
1137 true
1138 }
1139
1140 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1141 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1142 /// option, this function grabs read holds at the earliest possible time on input collections
1143 /// that might be involved in the MV.
1144 ///
1145 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1146 /// in `with_options`).
1147 ///
1148 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1149 /// unfortunately.)
1150 async fn resolve_mz_now_for_create_materialized_view(
1151 &mut self,
1152 cmvs: &CreateMaterializedViewStatement<Aug>,
1153 resolved_ids: &ResolvedIds,
1154 session: &Session,
1155 acquire_read_holds: bool,
1156 ) -> Result<Option<Timestamp>, AdapterError> {
1157 if cmvs
1158 .with_options
1159 .iter()
1160 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1161 {
1162 let catalog = self.catalog().for_session(session);
1163 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1164 let ids = self
1165 .index_oracle(cluster)
1166 .sufficient_collections(resolved_ids.collections().copied());
1167
1168 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1169 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1170 // we want to check the AT times against the read holds that we acquire here. But
1171 // we do it for any REFRESH option, to avoid having so many code paths doing different
1172 // things.)
1173 //
1174 // It's important that we acquire read holds _before_ we determine the least valid read.
1175 // Otherwise, we're not guaranteed that the since frontier doesn't
1176 // advance forward from underneath us.
1177 let read_holds = self.acquire_read_holds(&ids);
1178
1179 // Does `mz_now()` occur?
1180 let mz_now_ts = if cmvs
1181 .with_options
1182 .iter()
1183 .any(materialized_view_option_contains_temporal)
1184 {
1185 let timeline_context =
1186 self.validate_timeline_context(resolved_ids.collections().copied())?;
1187
1188 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1189 // but even in the TimestampIndependent case.
1190 // Note that we didn't accurately decide whether we are TimestampDependent
1191 // or TimestampIndependent, because for this we'd need to also check whether
1192 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1193 // However, this doesn't matter here, as we are just going to default to
1194 // EpochMilliseconds in both cases.
1195 let timeline = timeline_context
1196 .timeline()
1197 .unwrap_or(&Timeline::EpochMilliseconds);
1198
1199 // Let's start with the timestamp oracle read timestamp.
1200 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1201
1202 // If `least_valid_read` is later than the oracle, then advance to that time.
1203 // If we didn't do this, then there would be a danger of missing the first refresh,
1204 // which might cause the materialized view to be unreadable for hours. This might
1205 // be what was happening here:
1206 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1207 //
1208 // In the long term, it would be good to actually block the MV creation statement
1209 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1210 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1211 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1212 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1213 // VIEW statement returned.
1214 let oracle_timestamp = timestamp;
1215 let least_valid_read = self.least_valid_read(&read_holds);
1216 timestamp.advance_by(least_valid_read.borrow());
1217
1218 if oracle_timestamp != timestamp {
1219 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1220 }
1221
1222 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1223 Ok(Some(timestamp))
1224 } else {
1225 Ok(None)
1226 };
1227
1228 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1229 // released when we don't use it.
1230 if acquire_read_holds {
1231 self.store_transaction_read_holds(session, read_holds);
1232 }
1233
1234 mz_now_ts
1235 } else {
1236 Ok(None)
1237 }
1238 }
1239
1240 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1241 /// the named `conn_id` if the correct secret key is specified.
1242 ///
1243 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1244 /// `ConnectionId` because this method gets called by external clients when
1245 /// they request to cancel a request.
1246 #[mz_ore::instrument(level = "debug")]
1247 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1248 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1249 // If the secret key specified by the client doesn't match the
1250 // actual secret key for the target connection, we treat this as a
1251 // rogue cancellation request and ignore it.
1252 if conn_meta.secret_key != secret_key {
1253 return;
1254 }
1255
1256 // Now that we've verified the secret key, this is a privileged
1257 // cancellation request. We can upgrade the raw connection ID to a
1258 // proper `IdHandle`.
1259 self.handle_privileged_cancel(id_handle.clone()).await;
1260 }
1261 }
1262
1263 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1264 /// interactive work for the named `conn_id`.
1265 #[mz_ore::instrument(level = "debug")]
1266 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1267 let mut maybe_ctx = None;
1268
1269 // Cancel pending writes. There is at most one pending write per session.
1270 if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1271 matches!(pending_write_txn, PendingWriteTxn::User {
1272 pending_txn: PendingTxn { ctx, .. },
1273 ..
1274 } if *ctx.session().conn_id() == conn_id)
1275 }) {
1276 if let PendingWriteTxn::User {
1277 pending_txn: PendingTxn { ctx, .. },
1278 ..
1279 } = self.pending_writes.remove(idx)
1280 {
1281 maybe_ctx = Some(ctx);
1282 }
1283 }
1284
1285 // Cancel deferred writes.
1286 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1287 maybe_ctx = Some(write_op.into_ctx());
1288 }
1289
1290 // Cancel deferred statements.
1291 if let Some(idx) = self
1292 .serialized_ddl
1293 .iter()
1294 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1295 {
1296 let deferred = self
1297 .serialized_ddl
1298 .remove(idx)
1299 .expect("known to exist from call to `position` above");
1300 maybe_ctx = Some(deferred.ctx);
1301 }
1302
1303 // Cancel reads waiting on being linearized. There is at most one linearized read per
1304 // session.
1305 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1306 let ctx = pending_read_txn.take_context();
1307 maybe_ctx = Some(ctx);
1308 }
1309
1310 if let Some(ctx) = maybe_ctx {
1311 ctx.retire(Err(AdapterError::Canceled));
1312 }
1313
1314 self.cancel_pending_peeks(&conn_id);
1315 self.cancel_pending_watchsets(&conn_id);
1316 self.cancel_compute_sinks_for_conn(&conn_id).await;
1317 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1318 .await;
1319 self.cancel_pending_copy(&conn_id);
1320 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1321 let _ = tx.send(true);
1322 }
1323 }
1324
1325 /// Handle termination of a client session.
1326 ///
1327 /// This cleans up any state in the coordinator associated with the session.
1328 #[mz_ore::instrument(level = "debug")]
1329 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1330 if !self.active_conns.contains_key(&conn_id) {
1331 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1332 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1333 // debug. This panic is very infrequent so we want as much information as possible.
1334 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1335 panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1336 }
1337
1338 // We do not need to call clear_transaction here because there are no side effects to run
1339 // based on any session transaction state.
1340 self.clear_connection(&conn_id).await;
1341
1342 self.drop_temp_items(&conn_id).await;
1343 self.catalog_mut()
1344 .drop_temporary_schema(&conn_id)
1345 .unwrap_or_terminate("unable to drop temporary schema");
1346 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1347 let session_type = metrics::session_type_label_value(conn.user());
1348 self.metrics
1349 .active_sessions
1350 .with_label_values(&[session_type])
1351 .dec();
1352 self.cancel_pending_peeks(conn.conn_id());
1353 self.cancel_pending_watchsets(&conn_id);
1354 self.cancel_pending_copy(&conn_id);
1355 self.end_session_for_statement_logging(conn.uuid());
1356
1357 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1358 // this to prevent blocking the Coordinator in the case that a lot of connections are
1359 // closed at once, which occurs regularly in some workflows.
1360 let update = self
1361 .catalog()
1362 .state()
1363 .pack_session_update(&conn, Diff::MINUS_ONE);
1364 let update = self.catalog().state().resolve_builtin_table_update(update);
1365
1366 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1367 }
1368
1369 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1370 /// rows.
1371 #[mz_ore::instrument(level = "debug")]
1372 fn handle_get_webhook(
1373 &mut self,
1374 database: String,
1375 schema: String,
1376 name: String,
1377 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1378 ) {
1379 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1380 ///
1381 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1382 /// types we should cast the request to.
1383 fn resolve(
1384 coord: &mut Coordinator,
1385 database: String,
1386 schema: String,
1387 name: String,
1388 ) -> Result<AppendWebhookResponse, PartialItemName> {
1389 // Resolve our collection.
1390 let name = PartialItemName {
1391 database: Some(database),
1392 schema: Some(schema),
1393 item: name,
1394 };
1395 let Ok(entry) = coord
1396 .catalog()
1397 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1398 else {
1399 return Err(name);
1400 };
1401
1402 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1403 let (data_source, desc, global_id) = match entry.item() {
1404 CatalogItem::Source(Source {
1405 data_source: data_source @ DataSourceDesc::Webhook { .. },
1406 desc,
1407 global_id,
1408 ..
1409 }) => (data_source, desc.clone(), *global_id),
1410 CatalogItem::Table(
1411 table @ Table {
1412 desc,
1413 data_source:
1414 TableDataSource::DataSource {
1415 desc: data_source @ DataSourceDesc::Webhook { .. },
1416 ..
1417 },
1418 ..
1419 },
1420 ) => (data_source, desc.latest(), table.global_id_writes()),
1421 _ => return Err(name),
1422 };
1423
1424 let DataSourceDesc::Webhook {
1425 validate_using,
1426 body_format,
1427 headers,
1428 ..
1429 } = data_source
1430 else {
1431 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1432 return Err(name);
1433 };
1434 let body_format = body_format.clone();
1435 let header_tys = headers.clone();
1436
1437 // Assert we have one column for the body, and how ever many are required for
1438 // the headers.
1439 let num_columns = headers.num_columns() + 1;
1440 mz_ore::soft_assert_or_log!(
1441 desc.arity() <= num_columns,
1442 "expected at most {} columns, but got {}",
1443 num_columns,
1444 desc.arity()
1445 );
1446
1447 // Double check that the body column of the webhook source matches the type
1448 // we're about to deserialize as.
1449 let body_column = desc
1450 .get_by_name(&"body".into())
1451 .map(|(_idx, ty)| ty.clone())
1452 .ok_or_else(|| name.clone())?;
1453 assert!(!body_column.nullable, "webhook body column is nullable!?");
1454 assert_eq!(body_column.scalar_type, ScalarType::from(body_format));
1455
1456 // Create a validator that can be called to validate a webhook request.
1457 let validator = validate_using.as_ref().map(|v| {
1458 let validation = v.clone();
1459 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1460 });
1461
1462 // Get a channel so we can queue updates to be written.
1463 let row_tx = coord
1464 .controller
1465 .storage
1466 .monotonic_appender(global_id)
1467 .map_err(|_| name.clone())?;
1468 let stats = coord
1469 .controller
1470 .storage
1471 .webhook_statistics(global_id)
1472 .map_err(|_| name)?;
1473 let invalidator = coord
1474 .active_webhooks
1475 .entry(entry.id())
1476 .or_insert_with(WebhookAppenderInvalidator::new);
1477 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1478
1479 Ok(AppendWebhookResponse {
1480 tx,
1481 body_format,
1482 header_tys,
1483 validator,
1484 })
1485 }
1486
1487 let response = resolve(self, database, schema, name).map_err(|name| {
1488 AppendWebhookError::UnknownWebhook {
1489 database: name.database.expect("provided"),
1490 schema: name.schema.expect("provided"),
1491 name: name.item,
1492 }
1493 });
1494 let _ = tx.send(response);
1495 }
1496}