mz_adapter/coord/command_handler.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
11//! client via some external Materialize API (ex: HTTP and psql).
12
13use base64::prelude::*;
14use differential_dataflow::lattice::Lattice;
15use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
16use mz_auth::password::Password;
17use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
18use mz_sql::session::metadata::SessionMetadata;
19use std::collections::{BTreeMap, BTreeSet};
20use std::net::IpAddr;
21use std::sync::Arc;
22
23use futures::FutureExt;
24use futures::future::LocalBoxFuture;
25use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
26use mz_catalog::SYSTEM_CONN_ID;
27use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
28use mz_ore::task;
29use mz_ore::tracing::OpenTelemetryContext;
30use mz_ore::{instrument, soft_panic_or_log};
31use mz_repr::role_id::RoleId;
32use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp};
33use mz_sql::ast::{
34 AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo,
35 ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement,
36 SubscribeStatement,
37};
38use mz_sql::catalog::RoleAttributesRaw;
39use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
40use mz_sql::plan::{
41 AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
42 StatementClassification, TransactionType,
43};
44use mz_sql::pure::{
45 materialized_view_option_contains_temporal, purify_create_materialized_view_options,
46};
47use mz_sql::rbac;
48use mz_sql::rbac::CREATE_ITEM_USAGE;
49use mz_sql::session::user::User;
50use mz_sql::session::vars::{
51 EndTransactionAction, NETWORK_POLICY, OwnedVarInput, STATEMENT_LOGGING_SAMPLE_RATE, Value, Var,
52};
53use mz_sql_parser::ast::display::AstDisplay;
54use mz_sql_parser::ast::{
55 CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
56 WithOptionValue,
57};
58use mz_storage_types::sources::Timeline;
59use opentelemetry::trace::TraceContextExt;
60use tokio::sync::{mpsc, oneshot};
61use tracing::{Instrument, debug_span, info, warn};
62use tracing_opentelemetry::OpenTelemetrySpanExt;
63use uuid::Uuid;
64
65use crate::command::{
66 CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
67 SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
68};
69use crate::coord::appends::PendingWriteTxn;
70use crate::coord::peek::PendingPeek;
71use crate::coord::{
72 ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
73 PurifiedStatementReady, validate_ip_with_policy_rules,
74};
75use crate::error::{AdapterError, AuthenticationError};
76use crate::notice::AdapterNotice;
77use crate::session::{Session, TransactionOps, TransactionStatus};
78use crate::statement_logging::WatchSetCreation;
79use crate::util::{ClientTransmitter, ResultExt};
80use crate::webhook::{
81 AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
82};
83use crate::{AppendWebhookError, ExecuteContext, catalog, metrics};
84
85use super::ExecuteContextGuard;
86
87impl Coordinator {
88 /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
89 /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
90 /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
91 pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<'_, ()> {
92 async move {
93 if let Some(session) = cmd.session_mut() {
94 session.apply_external_metadata_updates();
95 }
96 match cmd {
97 Command::Startup {
98 tx,
99 user,
100 conn_id,
101 secret_key,
102 uuid,
103 client_ip,
104 application_name,
105 notice_tx,
106 } => {
107 // Note: We purposefully do not use a ClientTransmitter here because startup
108 // handles errors and cleanup of sessions itself.
109 self.handle_startup(
110 tx,
111 user,
112 conn_id,
113 secret_key,
114 uuid,
115 client_ip,
116 application_name,
117 notice_tx,
118 )
119 .await;
120 }
121
122 Command::AuthenticatePassword {
123 tx,
124 role_name,
125 password,
126 } => {
127 self.handle_authenticate_password(tx, role_name, password)
128 .await;
129 }
130
131 Command::AuthenticateGetSASLChallenge {
132 tx,
133 role_name,
134 nonce,
135 } => {
136 self.handle_generate_sasl_challenge(tx, role_name, nonce)
137 .await;
138 }
139
140 Command::AuthenticateVerifySASLProof {
141 tx,
142 role_name,
143 proof,
144 mock_hash,
145 auth_message,
146 } => {
147 self.handle_authenticate_verify_sasl_proof(
148 tx,
149 role_name,
150 proof,
151 auth_message,
152 mock_hash,
153 );
154 }
155
156 Command::Execute {
157 portal_name,
158 session,
159 tx,
160 outer_ctx_extra,
161 } => {
162 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
163
164 self.handle_execute(portal_name, session, tx, outer_ctx_extra)
165 .await;
166 }
167
168 Command::StartCopyFromStdin {
169 target_id,
170 target_name,
171 columns,
172 row_desc,
173 params,
174 session,
175 tx,
176 } => {
177 let otel_ctx = OpenTelemetryContext::obtain();
178 let result = self.setup_copy_from_stdin(
179 &session,
180 target_id,
181 target_name,
182 columns,
183 row_desc,
184 params,
185 );
186 let _ = tx.send(Response {
187 result,
188 session,
189 otel_ctx,
190 });
191 }
192
193 Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
194
195 Command::CancelRequest {
196 conn_id,
197 secret_key,
198 } => {
199 self.handle_cancel(conn_id, secret_key).await;
200 }
201
202 Command::PrivilegedCancelRequest { conn_id } => {
203 self.handle_privileged_cancel(conn_id).await;
204 }
205
206 Command::GetWebhook {
207 database,
208 schema,
209 name,
210 tx,
211 } => {
212 self.handle_get_webhook(database, schema, name, tx);
213 }
214
215 Command::GetSystemVars { tx } => {
216 let _ = tx.send(self.catalog.system_config().clone());
217 }
218
219 Command::SetSystemVars { vars, conn_id, tx } => {
220 let mut ops = Vec::with_capacity(vars.len());
221 let conn = &self.active_conns[&conn_id];
222
223 for (name, value) in vars {
224 if let Err(e) =
225 self.catalog().system_config().get(&name).and_then(|var| {
226 var.visible(conn.user(), self.catalog.system_config())
227 })
228 {
229 let _ = tx.send(Err(e.into()));
230 return;
231 }
232
233 ops.push(catalog::Op::UpdateSystemConfiguration {
234 name,
235 value: OwnedVarInput::Flat(value),
236 });
237 }
238
239 let result = self
240 .catalog_transact_with_context(Some(&conn_id), None, ops)
241 .await;
242 let _ = tx.send(result);
243 }
244
245 Command::Terminate { conn_id, tx } => {
246 self.handle_terminate(conn_id).await;
247 // Note: We purposefully do not use a ClientTransmitter here because we're already
248 // terminating the provided session.
249 if let Some(tx) = tx {
250 let _ = tx.send(Ok(()));
251 }
252 }
253
254 Command::Commit {
255 action,
256 session,
257 tx,
258 } => {
259 let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
260 // We reach here not through a statement execution, but from the
261 // "commit" pgwire command. Thus, we just generate a default statement
262 // execution context (once statement logging is implemented, this will cause nothing to be logged
263 // when the execution finishes.)
264 let ctx = ExecuteContext::from_parts(
265 tx,
266 self.internal_cmd_tx.clone(),
267 session,
268 Default::default(),
269 );
270 let plan = match action {
271 EndTransactionAction::Commit => {
272 Plan::CommitTransaction(CommitTransactionPlan {
273 transaction_type: TransactionType::Implicit,
274 })
275 }
276 EndTransactionAction::Rollback => {
277 Plan::AbortTransaction(AbortTransactionPlan {
278 transaction_type: TransactionType::Implicit,
279 })
280 }
281 };
282
283 let conn_id = ctx.session().conn_id().clone();
284 self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
285 // Part of the Command::Commit contract is that the Coordinator guarantees that
286 // it has cleared its transaction state for the connection.
287 self.clear_connection(&conn_id).await;
288 }
289
290 Command::CatalogSnapshot { tx } => {
291 let _ = tx.send(CatalogSnapshot {
292 catalog: self.owned_catalog(),
293 });
294 }
295
296 Command::CheckConsistency { tx } => {
297 let _ = tx.send(self.check_consistency());
298 }
299
300 Command::Dump { tx } => {
301 let _ = tx.send(self.dump().await);
302 }
303
304 Command::GetComputeInstanceClient { instance_id, tx } => {
305 let _ = tx.send(self.controller.compute.instance_client(instance_id));
306 }
307
308 Command::GetOracle { timeline, tx } => {
309 let oracle = self
310 .global_timelines
311 .get(&timeline)
312 .map(|timeline_state| Arc::clone(&timeline_state.oracle))
313 .ok_or(AdapterError::ChangedPlan(
314 "timeline has disappeared during planning".to_string(),
315 ));
316 let _ = tx.send(oracle);
317 }
318
319 Command::DetermineRealTimeRecentTimestamp {
320 source_ids,
321 real_time_recency_timeout,
322 tx,
323 } => {
324 let result = self
325 .determine_real_time_recent_timestamp(
326 source_ids.iter().copied(),
327 real_time_recency_timeout,
328 )
329 .await;
330
331 match result {
332 Ok(Some(fut)) => {
333 task::spawn(|| "determine real time recent timestamp", async move {
334 let result = fut.await.map(Some).map_err(AdapterError::from);
335 let _ = tx.send(result);
336 });
337 }
338 Ok(None) => {
339 let _ = tx.send(Ok(None));
340 }
341 Err(e) => {
342 let _ = tx.send(Err(e));
343 }
344 }
345 }
346
347 Command::GetTransactionReadHoldsBundle { conn_id, tx } => {
348 let read_holds = self.txn_read_holds.get(&conn_id).cloned();
349 let _ = tx.send(read_holds);
350 }
351
352 Command::StoreTransactionReadHolds {
353 conn_id,
354 read_holds,
355 tx,
356 } => {
357 self.store_transaction_read_holds(conn_id, read_holds);
358 let _ = tx.send(());
359 }
360
361 Command::ExecuteSlowPathPeek {
362 dataflow_plan,
363 determination,
364 finishing,
365 compute_instance,
366 target_replica,
367 intermediate_result_type,
368 source_ids,
369 conn_id,
370 max_result_size,
371 max_query_result_size,
372 watch_set,
373 tx,
374 } => {
375 let result = self
376 .implement_slow_path_peek(
377 *dataflow_plan,
378 determination,
379 finishing,
380 compute_instance,
381 target_replica,
382 intermediate_result_type,
383 source_ids,
384 conn_id,
385 max_result_size,
386 max_query_result_size,
387 watch_set,
388 )
389 .await;
390 let _ = tx.send(result);
391 }
392
393 Command::CopyToPreflight {
394 s3_sink_connection,
395 sink_id,
396 tx,
397 } => {
398 // Spawn a background task to perform the slow S3 preflight operations.
399 // This avoids blocking the coordinator's main task.
400 let connection_context = self.connection_context().clone();
401 task::spawn(|| "copy_to_preflight", async move {
402 let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
403 connection_context,
404 &s3_sink_connection.aws_connection,
405 &s3_sink_connection.upload_info,
406 s3_sink_connection.connection_id,
407 sink_id,
408 )
409 .await
410 .map_err(AdapterError::from);
411 let _ = tx.send(result);
412 });
413 }
414
415 Command::ExecuteCopyTo {
416 df_desc,
417 compute_instance,
418 target_replica,
419 source_ids,
420 conn_id,
421 watch_set,
422 tx,
423 } => {
424 // implement_copy_to spawns a background task that sends the response
425 // through tx when the COPY TO completes (or immediately if setup fails).
426 // We just call it and let it handle all response sending.
427 self.implement_copy_to(
428 *df_desc,
429 compute_instance,
430 target_replica,
431 source_ids,
432 conn_id,
433 watch_set,
434 tx,
435 )
436 .await;
437 }
438
439 Command::ExecuteSideEffectingFunc {
440 plan,
441 conn_id,
442 current_role,
443 tx,
444 } => {
445 let result = self
446 .execute_side_effecting_func(plan, conn_id, current_role)
447 .await;
448 let _ = tx.send(result);
449 }
450 Command::RegisterFrontendPeek {
451 uuid,
452 conn_id,
453 cluster_id,
454 depends_on,
455 is_fast_path,
456 watch_set,
457 tx,
458 } => {
459 self.handle_register_frontend_peek(
460 uuid,
461 conn_id,
462 cluster_id,
463 depends_on,
464 is_fast_path,
465 watch_set,
466 tx,
467 );
468 }
469 Command::UnregisterFrontendPeek { uuid, tx } => {
470 self.handle_unregister_frontend_peek(uuid, tx);
471 }
472 Command::ExplainTimestamp {
473 conn_id,
474 session_wall_time,
475 cluster_id,
476 id_bundle,
477 determination,
478 tx,
479 } => {
480 let explanation = self.explain_timestamp(
481 &conn_id,
482 session_wall_time,
483 cluster_id,
484 &id_bundle,
485 determination,
486 );
487 let _ = tx.send(explanation);
488 }
489 Command::FrontendStatementLogging(event) => {
490 self.handle_frontend_statement_logging_event(event);
491 }
492 }
493 }
494 .instrument(debug_span!("handle_command"))
495 .boxed_local()
496 }
497
498 fn handle_authenticate_verify_sasl_proof(
499 &self,
500 tx: oneshot::Sender<Result<SASLVerifyProofResponse, AdapterError>>,
501 role_name: String,
502 proof: String,
503 auth_message: String,
504 mock_hash: String,
505 ) {
506 let role = self.catalog().try_get_role_by_name(role_name.as_str());
507 let role_auth = role.and_then(|r| self.catalog().try_get_role_auth_by_id(&r.id));
508
509 let login = role
510 .as_ref()
511 .map(|r| r.attributes.login.unwrap_or(false))
512 .unwrap_or(false);
513
514 let real_hash = role_auth
515 .as_ref()
516 .and_then(|auth| auth.password_hash.as_ref());
517 let hash_ref = real_hash.map(|s| s.as_str()).unwrap_or(&mock_hash);
518
519 let role_present = role.is_some();
520 let make_auth_err = |role_present: bool, login: bool| {
521 AdapterError::AuthenticationError(if role_present && !login {
522 AuthenticationError::NonLogin
523 } else {
524 AuthenticationError::InvalidCredentials
525 })
526 };
527
528 match mz_auth::hash::sasl_verify(hash_ref, &proof, &auth_message) {
529 Ok(verifier) => {
530 // Success only if role exists, allows login, and a real password hash was used.
531 if login && real_hash.is_some() {
532 let _ = tx.send(Ok(SASLVerifyProofResponse { verifier }));
533 } else {
534 let _ = tx.send(Err(make_auth_err(role_present, login)));
535 }
536 }
537 Err(_) => {
538 let _ = tx.send(Err(AdapterError::AuthenticationError(
539 AuthenticationError::InvalidCredentials,
540 )));
541 }
542 }
543 }
544
545 #[mz_ore::instrument(level = "debug")]
546 async fn handle_generate_sasl_challenge(
547 &self,
548 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>,
549 role_name: String,
550 client_nonce: String,
551 ) {
552 let role_auth = self
553 .catalog()
554 .try_get_role_by_name(&role_name)
555 .and_then(|role| self.catalog().try_get_role_auth_by_id(&role.id));
556
557 let nonce = match mz_auth::hash::generate_nonce(&client_nonce) {
558 Ok(n) => n,
559 Err(e) => {
560 let msg = format!(
561 "failed to generate nonce for client nonce {}: {}",
562 client_nonce, e
563 );
564 let _ = tx.send(Err(AdapterError::Internal(msg.clone())));
565 soft_panic_or_log!("{msg}");
566 return;
567 }
568 };
569
570 // It's important that the mock_nonce is deterministic per role, otherwise the purpose of
571 // doing mock authentication is defeated. We use a catalog-wide nonce, and combine that
572 // with the role name to get a per-role mock nonce.
573 let send_mock_challenge =
574 |role_name: String,
575 mock_nonce: String,
576 nonce: String,
577 tx: oneshot::Sender<Result<SASLChallengeResponse, AdapterError>>| {
578 let opts = mz_auth::hash::mock_sasl_challenge(
579 &role_name,
580 &mock_nonce,
581 &self.catalog().system_config().scram_iterations(),
582 );
583 let _ = tx.send(Ok(SASLChallengeResponse {
584 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
585 salt: BASE64_STANDARD.encode(opts.salt),
586 nonce,
587 }));
588 };
589
590 match role_auth {
591 Some(auth) if auth.password_hash.is_some() => {
592 let hash = auth.password_hash.as_ref().expect("checked above");
593 match mz_auth::hash::scram256_parse_opts(hash) {
594 Ok(opts) => {
595 let _ = tx.send(Ok(SASLChallengeResponse {
596 iteration_count: mz_ore::cast::u32_to_usize(opts.iterations.get()),
597 salt: BASE64_STANDARD.encode(opts.salt),
598 nonce,
599 }));
600 }
601 Err(_) => {
602 send_mock_challenge(
603 role_name,
604 self.catalog().state().mock_authentication_nonce(),
605 nonce,
606 tx,
607 );
608 }
609 }
610 }
611 _ => {
612 send_mock_challenge(
613 role_name,
614 self.catalog().state().mock_authentication_nonce(),
615 nonce,
616 tx,
617 );
618 }
619 }
620 }
621
622 #[mz_ore::instrument(level = "debug")]
623 async fn handle_authenticate_password(
624 &self,
625 tx: oneshot::Sender<Result<(), AdapterError>>,
626 role_name: String,
627 password: Option<Password>,
628 ) {
629 let Some(password) = password else {
630 // The user did not provide a password.
631 let _ = tx.send(Err(AdapterError::AuthenticationError(
632 AuthenticationError::PasswordRequired,
633 )));
634 return;
635 };
636
637 if let Some(role) = self.catalog().try_get_role_by_name(role_name.as_str()) {
638 if !role.attributes.login.unwrap_or(false) {
639 // The user is not allowed to login.
640 let _ = tx.send(Err(AdapterError::AuthenticationError(
641 AuthenticationError::NonLogin,
642 )));
643 return;
644 }
645 if let Some(auth) = self.catalog().try_get_role_auth_by_id(&role.id) {
646 if let Some(hash) = &auth.password_hash {
647 let hash = hash.clone();
648 task::spawn_blocking(
649 || "auth-check-hash",
650 move || {
651 let _ = match mz_auth::hash::scram256_verify(&password, &hash) {
652 Ok(_) => tx.send(Ok(())),
653 Err(_) => tx.send(Err(AdapterError::AuthenticationError(
654 AuthenticationError::InvalidCredentials,
655 ))),
656 };
657 },
658 );
659 return;
660 }
661 }
662 // Authentication failed due to incorrect password or missing password hash.
663 let _ = tx.send(Err(AdapterError::AuthenticationError(
664 AuthenticationError::InvalidCredentials,
665 )));
666 } else {
667 // The user does not exist.
668 let _ = tx.send(Err(AdapterError::AuthenticationError(
669 AuthenticationError::RoleNotFound,
670 )));
671 }
672 }
673
674 #[mz_ore::instrument(level = "debug")]
675 async fn handle_startup(
676 &mut self,
677 tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
678 user: User,
679 conn_id: ConnectionId,
680 secret_key: u32,
681 uuid: uuid::Uuid,
682 client_ip: Option<IpAddr>,
683 application_name: String,
684 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
685 ) {
686 // Early return if successful, otherwise cleanup any possible state.
687 match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
688 Ok((role_id, superuser_attribute, session_defaults)) => {
689 let session_type = metrics::session_type_label_value(&user);
690 self.metrics
691 .active_sessions
692 .with_label_values(&[session_type])
693 .inc();
694 let conn = ConnMeta {
695 secret_key,
696 notice_tx,
697 drop_sinks: BTreeSet::new(),
698 pending_cluster_alters: BTreeSet::new(),
699 connected_at: self.now(),
700 user,
701 application_name,
702 uuid,
703 client_ip,
704 conn_id: conn_id.clone(),
705 authenticated_role: role_id,
706 deferred_lock: None,
707 };
708 let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
709 let update = self.catalog().state().resolve_builtin_table_update(update);
710 self.begin_session_for_statement_logging(&conn);
711 self.active_conns.insert(conn_id.clone(), conn);
712
713 // Note: Do NOT await the notify here, we pass this back to
714 // whatever requested the startup to prevent blocking startup
715 // and the Coordinator on a builtin table update.
716 let updates = vec![update];
717 // It's not a hard error if our list is missing a builtin table, but we want to
718 // make sure these two things stay in-sync.
719 if mz_ore::assert::soft_assertions_enabled() {
720 let required_tables: BTreeSet<_> = super::appends::REQUIRED_BUILTIN_TABLES
721 .iter()
722 .map(|table| self.catalog().resolve_builtin_table(*table))
723 .collect();
724 let updates_tracked = updates
725 .iter()
726 .all(|update| required_tables.contains(&update.id));
727 let all_mz_internal = super::appends::REQUIRED_BUILTIN_TABLES
728 .iter()
729 .all(|table| table.schema == MZ_INTERNAL_SCHEMA);
730 mz_ore::soft_assert_or_log!(
731 updates_tracked,
732 "not tracking all required builtin table updates!"
733 );
734 // TODO(parkmycar): When checking if a query depends on these builtin table
735 // writes we do not check the transitive dependencies of the query, because
736 // we don't support creating views on mz_internal objects. If one of these
737 // tables is promoted out of mz_internal then we'll need to add this check.
738 mz_ore::soft_assert_or_log!(
739 all_mz_internal,
740 "not all builtin tables are in mz_internal! need to check transitive depends",
741 )
742 }
743 let notify = self.builtin_table_update().background(updates);
744
745 let catalog = self.owned_catalog();
746 let build_info_human_version =
747 catalog.state().config().build_info.human_version(None);
748
749 let statement_logging_frontend = self
750 .statement_logging
751 .create_frontend(build_info_human_version);
752
753 let resp = Ok(StartupResponse {
754 role_id,
755 write_notify: notify,
756 session_defaults,
757 catalog,
758 storage_collections: Arc::clone(&self.controller.storage_collections),
759 transient_id_gen: Arc::clone(&self.transient_id_gen),
760 optimizer_metrics: self.optimizer_metrics.clone(),
761 persist_client: self.persist_client.clone(),
762 statement_logging_frontend,
763 superuser_attribute,
764 });
765 if tx.send(resp).is_err() {
766 // Failed to send to adapter, but everything is setup so we can terminate
767 // normally.
768 self.handle_terminate(conn_id).await;
769 }
770 }
771 Err(e) => {
772 // Error during startup or sending to adapter. A user may have been created and
773 // it can stay; no need to delete it.
774 // Note: Temporary schemas are created lazily, so there's nothing to clean up here.
775
776 // Communicate the error back to the client. No need to
777 // handle failures to send the error back; we've already
778 // cleaned up all necessary state.
779 let _ = tx.send(Err(e));
780 }
781 }
782 }
783
784 // Failible startup work that needs to be cleaned up on error.
785 async fn handle_startup_inner(
786 &mut self,
787 user: &User,
788 _conn_id: &ConnectionId,
789 client_ip: &Option<IpAddr>,
790 ) -> Result<(RoleId, SuperuserAttribute, BTreeMap<String, OwnedVarInput>), AdapterError> {
791 if self.catalog().try_get_role_by_name(&user.name).is_none() {
792 // If the user has made it to this point, that means they have been fully authenticated.
793 // This includes preventing any user, except a pre-defined set of system users, from
794 // connecting to an internal port. Therefore it's ok to always create a new role for the
795 // user.
796 let attributes = RoleAttributesRaw::new();
797 let plan = CreateRolePlan {
798 name: user.name.to_string(),
799 attributes,
800 };
801 self.sequence_create_role_for_startup(plan).await?;
802 }
803 let role = self
804 .catalog()
805 .try_get_role_by_name(&user.name)
806 .expect("created above");
807 let role_id = role.id;
808
809 let superuser_attribute = role.attributes.superuser;
810
811 if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
812 return Err(AdapterError::UserSessionsDisallowed);
813 }
814
815 // Initialize the default session variables for this role.
816 let mut session_defaults = BTreeMap::new();
817 let system_config = self.catalog().state().system_config();
818
819 // Override the session with any system defaults.
820 session_defaults.extend(
821 system_config
822 .iter_session()
823 .map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
824 );
825 // Special case.
826 let statement_logging_default = system_config
827 .statement_logging_default_sample_rate()
828 .format();
829 session_defaults.insert(
830 STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
831 OwnedVarInput::Flat(statement_logging_default),
832 );
833 // Override system defaults with role defaults.
834 session_defaults.extend(
835 self.catalog()
836 .get_role(&role_id)
837 .vars()
838 .map(|(name, val)| (name.to_string(), val.clone())),
839 );
840
841 // Validate network policies for external users. Internal users can only connect on the
842 // internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
843 // to ensure these internal interfaces are well secured.
844 //
845 // HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
846 // the default network policy for this role, so we read directly out of what the session
847 // will get initialized with.
848 if !user.is_internal() {
849 let network_policy_name = session_defaults
850 .get(NETWORK_POLICY.name())
851 .and_then(|value| match value {
852 OwnedVarInput::Flat(name) => Some(name.clone()),
853 OwnedVarInput::SqlSet(names) => {
854 tracing::error!(?names, "found multiple network policies");
855 None
856 }
857 })
858 .unwrap_or_else(|| system_config.default_network_policy_name());
859 let maybe_network_policy = self
860 .catalog()
861 .get_network_policy_by_name(&network_policy_name);
862
863 let Some(network_policy) = maybe_network_policy else {
864 // We should prevent dropping the default network policy, or setting the policy
865 // to something that doesn't exist, so complain loudly if this occurs.
866 tracing::error!(
867 network_policy_name,
868 "default network policy does not exist. All user traffic will be blocked"
869 );
870 let reason = match client_ip {
871 Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
872 None => super::NetworkPolicyError::MissingIp,
873 };
874 return Err(AdapterError::NetworkPolicyDenied(reason));
875 };
876
877 if let Some(ip) = client_ip {
878 match validate_ip_with_policy_rules(ip, &network_policy.rules) {
879 Ok(_) => {}
880 Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
881 }
882 } else {
883 // Only temporary and internal representation of a session
884 // should be missing a client_ip. These sessions should not be
885 // making requests or going through handle_startup.
886 return Err(AdapterError::NetworkPolicyDenied(
887 super::NetworkPolicyError::MissingIp,
888 ));
889 }
890 }
891
892 // Temporary schemas are now created lazily when the first temporary object is created,
893 // rather than eagerly on connection startup. This avoids expensive catalog_mut() calls
894 // for the common case where connections never create temporary objects.
895
896 Ok((
897 role_id,
898 SuperuserAttribute(superuser_attribute),
899 session_defaults,
900 ))
901 }
902
903 /// Handles an execute command.
904 #[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
905 pub(crate) async fn handle_execute(
906 &mut self,
907 portal_name: String,
908 mut session: Session,
909 tx: ClientTransmitter<ExecuteResponse>,
910 // If this command was part of another execute command
911 // (for example, executing a `FETCH` statement causes an execute to be
912 // issued for the cursor it references),
913 // then `outer_context` should be `Some`.
914 // This instructs the coordinator that the
915 // outer execute should be considered finished once the inner one is.
916 outer_context: Option<ExecuteContextGuard>,
917 ) {
918 if session.vars().emit_trace_id_notice() {
919 let span_context = tracing::Span::current()
920 .context()
921 .span()
922 .span_context()
923 .clone();
924 if span_context.is_valid() {
925 session.add_notice(AdapterNotice::QueryTrace {
926 trace_id: span_context.trace_id(),
927 });
928 }
929 }
930
931 if let Err(err) = Self::verify_portal(self.catalog(), &mut session, &portal_name) {
932 // If statement logging hasn't started yet, we don't need
933 // to add any "end" event, so just make up a no-op
934 // `ExecuteContextExtra` here, via `Default::default`.
935 //
936 // It's a bit unfortunate because the edge case of failed
937 // portal verifications won't show up in statement
938 // logging, but there seems to be nothing else we can do,
939 // because we need access to the portal to begin logging.
940 //
941 // Another option would be to log a begin and end event, but just fill in NULLs
942 // for everything we get from the portal (prepared statement id, params).
943 let extra = outer_context.unwrap_or_else(Default::default);
944 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
945 return ctx.retire(Err(err));
946 }
947
948 // The reference to `portal` can't outlive `session`, which we
949 // use to construct the context, so scope the reference to this block where we
950 // get everything we need from the portal for later.
951 let (stmt, ctx, params) = {
952 let portal = session
953 .get_portal_unverified(&portal_name)
954 .expect("known to exist");
955 let params = portal.parameters.clone();
956 let stmt = portal.stmt.clone();
957 let logging = Arc::clone(&portal.logging);
958 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
959
960 let extra = if let Some(extra) = outer_context {
961 // We are executing in the context of another SQL statement, so we don't
962 // want to begin statement logging anew. The context of the actual statement
963 // being executed is the one that should be retired once this finishes.
964 extra
965 } else {
966 // This is a new statement, log it and return the context
967 let maybe_uuid = self.begin_statement_execution(
968 &mut session,
969 ¶ms,
970 &logging,
971 lifecycle_timestamps,
972 );
973
974 ExecuteContextGuard::new(maybe_uuid, self.internal_cmd_tx.clone())
975 };
976 let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
977 (stmt, ctx, params)
978 };
979
980 let stmt = match stmt {
981 Some(stmt) => stmt,
982 None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
983 };
984
985 let session_type = metrics::session_type_label_value(ctx.session().user());
986 let stmt_type = metrics::statement_type_label_value(&stmt);
987 self.metrics
988 .query_total
989 .with_label_values(&[session_type, stmt_type])
990 .inc();
991 match &*stmt {
992 Statement::Subscribe(SubscribeStatement { output, .. })
993 | Statement::Copy(CopyStatement {
994 relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
995 ..
996 }) => {
997 self.metrics
998 .subscribe_outputs
999 .with_label_values(&[
1000 session_type,
1001 metrics::subscribe_output_label_value(output),
1002 ])
1003 .inc();
1004 }
1005 _ => {}
1006 }
1007
1008 self.handle_execute_inner(stmt, params, ctx).await
1009 }
1010
1011 #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
1012 pub(crate) async fn handle_execute_inner(
1013 &mut self,
1014 stmt: Arc<Statement<Raw>>,
1015 params: Params,
1016 mut ctx: ExecuteContext,
1017 ) {
1018 // This comment describes the various ways DDL can execute (the ordered operations: name
1019 // resolve, purify, plan, sequence), all of which are managed by this function. DDL has
1020 // three notable properties that all partially interact.
1021 //
1022 // 1. Most DDL statements (and a few others) support single-statement transaction delayed
1023 // execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
1024 // We announce success of the single DDL when it is executed, but do not attempt to plan
1025 // or sequence it until `COMMIT`, which is able to error if needed while sequencing the
1026 // DDL (this behavior is Postgres-compatible). The purpose of this is because some
1027 // drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
1028 // work. When the single DDL is announced as successful we also put the session's
1029 // transaction ops into `SingleStatement` which will produce an error if any other
1030 // statement is run in the transaction except `COMMIT`. Additionally, this will cause
1031 // `handle_execute_inner` to stop further processing (no planning, etc.) of the
1032 // statement.
1033 // 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
1034 // any number of only these DDL statements to be executed in a transaction. At sequencing
1035 // these generate the `Op::TransactionDryRun` catalog op. When applied with
1036 // `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
1037 // `catalog_transact_with_ddl_transaction` function intercepts that error and reports
1038 // success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
1039 // of the ops but without dry run are applied. The purpose of this is to allow multiple,
1040 // atomic renames in the same transaction.
1041 // 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
1042 // makes network calls (interfacing with secrets, optimization of views/indexes, source
1043 // purification). These must guarantee correctness when they return to the main
1044 // coordinator thread because the catalog state could have changed while they were doing
1045 // the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
1046 // of IDs that we needed to exist. We discovered the way we were doing that was not
1047 // always correct. Instead of attempting to get that completely right, we have opted to
1048 // serialize DDL. Getting this right is difficult because catalog changes can affect name
1049 // resolution, planning, sequencing, and optimization. Correctly writing logic that is
1050 // aware of all possible catalog changes that would affect any of those parts is not
1051 // something our current code has been designed to be helpful at. Even if a DDL statement
1052 // is doing off-thread work, another DDL must not yet execute at all. Executing these
1053 // serially will guarantee that no off-thread work has affected the state of the catalog.
1054 // This is done by adding a VecDeque of deferred statements and a lock to the
1055 // Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
1056 // transaction ops are needed to the session as described above), it attempts to own the
1057 // lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
1058 // struct in `active_conns` and proceeds. The lock is dropped at transaction end in
1059 // `clear_transaction` and a message sent to the Coordinator to execute the next queued
1060 // DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
1061 // awaits dequeuing caused by the lock being released.
1062
1063 // Verify that this statement type can be executed in the current
1064 // transaction state.
1065 match ctx.session().transaction() {
1066 // By this point we should be in a running transaction.
1067 TransactionStatus::Default => unreachable!(),
1068
1069 // Failed transactions have already been checked in pgwire for a safe statement
1070 // (COMMIT, ROLLBACK, etc.) and can proceed.
1071 TransactionStatus::Failed(_) => {}
1072
1073 // Started is a deceptive name, and means different things depending on which
1074 // protocol was used. It's either exactly one statement (known because this
1075 // is the simple protocol and the parser parsed the entire string, and it had
1076 // one statement). Or from the extended protocol, it means *some* query is
1077 // being executed, but there might be others after it before the Sync (commit)
1078 // message. Postgres handles this by teaching Started to eagerly commit certain
1079 // statements that can't be run in a transaction block.
1080 TransactionStatus::Started(_) => {
1081 if let Statement::Declare(_) = &*stmt {
1082 // Declare is an exception. Although it's not against any spec to execute
1083 // it, it will always result in nothing happening, since all portals will be
1084 // immediately closed. Users don't know this detail, so this error helps them
1085 // understand what's going wrong. Postgres does this too.
1086 return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
1087 "DECLARE CURSOR".into(),
1088 )));
1089 }
1090 }
1091
1092 // Implicit or explicit transactions.
1093 //
1094 // Implicit transactions happen when a multi-statement query is executed
1095 // (a "simple query"). However if a "BEGIN" appears somewhere in there,
1096 // then the existing implicit transaction will be upgraded to an explicit
1097 // transaction. Thus, we should not separate what implicit and explicit
1098 // transactions can do unless there's some additional checking to make sure
1099 // something disallowed in explicit transactions did not previously take place
1100 // in the implicit portion.
1101 TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
1102 match &*stmt {
1103 // Statements that are safe in a transaction. We still need to verify that we
1104 // don't interleave reads and writes since we can't perform those serializably.
1105 Statement::Close(_)
1106 | Statement::Commit(_)
1107 | Statement::Copy(_)
1108 | Statement::Deallocate(_)
1109 | Statement::Declare(_)
1110 | Statement::Discard(_)
1111 | Statement::Execute(_)
1112 | Statement::ExplainPlan(_)
1113 | Statement::ExplainPushdown(_)
1114 | Statement::ExplainAnalyzeObject(_)
1115 | Statement::ExplainAnalyzeCluster(_)
1116 | Statement::ExplainTimestamp(_)
1117 | Statement::ExplainSinkSchema(_)
1118 | Statement::Fetch(_)
1119 | Statement::Prepare(_)
1120 | Statement::Rollback(_)
1121 | Statement::Select(_)
1122 | Statement::SetTransaction(_)
1123 | Statement::Show(_)
1124 | Statement::SetVariable(_)
1125 | Statement::ResetVariable(_)
1126 | Statement::StartTransaction(_)
1127 | Statement::Subscribe(_)
1128 | Statement::Raise(_) => {
1129 // Always safe.
1130 }
1131
1132 Statement::Insert(InsertStatement {
1133 source, returning, ..
1134 }) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
1135 // Inserting from constant values statements that do not need to execute on
1136 // any cluster (no RETURNING) is always safe.
1137 }
1138
1139 // These statements must be kept in-sync with `must_serialize_ddl()`.
1140 Statement::AlterObjectRename(_)
1141 | Statement::AlterObjectSwap(_)
1142 | Statement::CreateTableFromSource(_)
1143 | Statement::CreateSource(_) => {
1144 let state = self.catalog().for_session(ctx.session()).state().clone();
1145 let revision = self.catalog().transient_revision();
1146
1147 // Initialize our transaction with a set of empty ops, or return an error
1148 // if we can't run a DDL transaction
1149 let txn_status = ctx.session_mut().transaction_mut();
1150 if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
1151 ops: vec![],
1152 state,
1153 revision,
1154 side_effects: vec![],
1155 }) {
1156 return ctx.retire(Err(err));
1157 }
1158 }
1159
1160 // Statements below must by run singly (in Started).
1161 Statement::AlterCluster(_)
1162 | Statement::AlterConnection(_)
1163 | Statement::AlterDefaultPrivileges(_)
1164 | Statement::AlterIndex(_)
1165 | Statement::AlterMaterializedViewApplyReplacement(_)
1166 | Statement::AlterSetCluster(_)
1167 | Statement::AlterOwner(_)
1168 | Statement::AlterRetainHistory(_)
1169 | Statement::AlterRole(_)
1170 | Statement::AlterSecret(_)
1171 | Statement::AlterSink(_)
1172 | Statement::AlterSource(_)
1173 | Statement::AlterSystemReset(_)
1174 | Statement::AlterSystemResetAll(_)
1175 | Statement::AlterSystemSet(_)
1176 | Statement::AlterTableAddColumn(_)
1177 | Statement::AlterNetworkPolicy(_)
1178 | Statement::CreateCluster(_)
1179 | Statement::CreateClusterReplica(_)
1180 | Statement::CreateConnection(_)
1181 | Statement::CreateDatabase(_)
1182 | Statement::CreateIndex(_)
1183 | Statement::CreateMaterializedView(_)
1184 | Statement::CreateContinualTask(_)
1185 | Statement::CreateRole(_)
1186 | Statement::CreateSchema(_)
1187 | Statement::CreateSecret(_)
1188 | Statement::CreateSink(_)
1189 | Statement::CreateSubsource(_)
1190 | Statement::CreateTable(_)
1191 | Statement::CreateType(_)
1192 | Statement::CreateView(_)
1193 | Statement::CreateWebhookSource(_)
1194 | Statement::CreateNetworkPolicy(_)
1195 | Statement::Delete(_)
1196 | Statement::DropObjects(_)
1197 | Statement::DropOwned(_)
1198 | Statement::GrantPrivileges(_)
1199 | Statement::GrantRole(_)
1200 | Statement::Insert(_)
1201 | Statement::ReassignOwned(_)
1202 | Statement::RevokePrivileges(_)
1203 | Statement::RevokeRole(_)
1204 | Statement::Update(_)
1205 | Statement::ValidateConnection(_)
1206 | Statement::Comment(_) => {
1207 let txn_status = ctx.session_mut().transaction_mut();
1208
1209 // If we're not in an implicit transaction and we could generate exactly one
1210 // valid ExecuteResponse, we can delay execution until commit.
1211 if !txn_status.is_implicit() {
1212 // Statements whose tag is trivial (known only from an unexecuted statement) can
1213 // be run in a special single-statement explicit mode. In this mode (`BEGIN;
1214 // <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
1215 // delay execution until `COMMIT`.
1216 if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
1217 if let Err(err) = txn_status
1218 .add_ops(TransactionOps::SingleStatement { stmt, params })
1219 {
1220 ctx.retire(Err(err));
1221 return;
1222 }
1223 ctx.retire(Ok(resp));
1224 return;
1225 }
1226 }
1227
1228 return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
1229 stmt.to_string(),
1230 )));
1231 }
1232 }
1233 }
1234 }
1235
1236 // DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
1237 // various IDs because we have incorrectly done that in the past. Attempt to acquire the
1238 // ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
1239 // acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
1240 // Coordinator::clear_transaction is correctly called when session transactions are ended
1241 // because that function will release the held lock from active_conns.
1242 if Self::must_serialize_ddl(&stmt, &ctx) {
1243 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
1244 let prev = self
1245 .active_conns
1246 .get_mut(ctx.session().conn_id())
1247 .expect("connection must exist")
1248 .deferred_lock
1249 .replace(guard);
1250 assert!(
1251 prev.is_none(),
1252 "connections should have at most one lock guard"
1253 );
1254 } else {
1255 if self
1256 .active_conns
1257 .get(ctx.session().conn_id())
1258 .expect("connection must exist")
1259 .deferred_lock
1260 .is_some()
1261 {
1262 // This session *already* has the lock, and incorrectly tried to execute another
1263 // DDL while still holding the lock, violating the assumption documented above.
1264 // This is an internal error, probably in some AdapterClient user (pgwire or
1265 // http). Because the session is now in some unexpected state, return an error
1266 // which should cause the AdapterClient user to fail the transaction.
1267 // (Terminating the connection is maybe what we would prefer to do, but is not
1268 // currently a thing we can do from the coordinator: calling handle_terminate
1269 // cleans up Coordinator state for the session but doesn't inform the
1270 // AdapterClient that the session should terminate.)
1271 soft_panic_or_log!(
1272 "session {} attempted to get ddl lock while already owning it",
1273 ctx.session().conn_id()
1274 );
1275 ctx.retire(Err(AdapterError::Internal(
1276 "session attempted to get ddl lock while already owning it".to_string(),
1277 )));
1278 return;
1279 }
1280 self.serialized_ddl.push_back(DeferredPlanStatement {
1281 ctx,
1282 ps: PlanStatement::Statement { stmt, params },
1283 });
1284 return;
1285 }
1286 }
1287
1288 let catalog = self.catalog();
1289 let catalog = catalog.for_session(ctx.session());
1290 let original_stmt = Arc::clone(&stmt);
1291 // `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
1292 // IDs, then `resolved_ids` should be updated to also remove/add those IDs.
1293 let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
1294 Ok(resolved) => resolved,
1295 Err(e) => return ctx.retire(Err(e.into())),
1296 };
1297 // N.B. The catalog can change during purification so we must validate that the dependencies still exist after
1298 // purification. This should be done back on the main thread.
1299 // We do the validation:
1300 // - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
1301 // If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
1302 // occurs.
1303 let (stmt, resolved_ids) = match stmt {
1304 // Various statements must be purified off the main coordinator thread of control.
1305 stmt if Self::must_spawn_purification(&stmt) => {
1306 let internal_cmd_tx = self.internal_cmd_tx.clone();
1307 let conn_id = ctx.session().conn_id().clone();
1308 let catalog = self.owned_catalog();
1309 let now = self.now();
1310 let otel_ctx = OpenTelemetryContext::obtain();
1311 let current_storage_configuration = self.controller.storage.config().clone();
1312 task::spawn(|| format!("purify:{conn_id}"), async move {
1313 let transient_revision = catalog.transient_revision();
1314 let catalog = catalog.for_session(ctx.session());
1315
1316 // Checks if the session is authorized to purify a statement. Usually
1317 // authorization is checked after planning, however purification happens before
1318 // planning, which may require the use of some connections and secrets.
1319 if let Err(e) = rbac::check_usage(
1320 &catalog,
1321 ctx.session(),
1322 &resolved_ids,
1323 &CREATE_ITEM_USAGE,
1324 ) {
1325 return ctx.retire(Err(e.into()));
1326 }
1327
1328 let (result, cluster_id) = mz_sql::pure::purify_statement(
1329 catalog,
1330 now,
1331 stmt,
1332 ¤t_storage_configuration,
1333 )
1334 .await;
1335 let result = result.map_err(|e| e.into());
1336 let dependency_ids = resolved_ids.items().copied().collect();
1337 let plan_validity = PlanValidity::new(
1338 transient_revision,
1339 dependency_ids,
1340 cluster_id,
1341 None,
1342 ctx.session().role_metadata().clone(),
1343 );
1344 // It is not an error for purification to complete after `internal_cmd_rx` is dropped.
1345 let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
1346 PurifiedStatementReady {
1347 ctx,
1348 result,
1349 params,
1350 plan_validity,
1351 original_stmt,
1352 otel_ctx,
1353 },
1354 ));
1355 if let Err(e) = result {
1356 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1357 }
1358 });
1359 return;
1360 }
1361
1362 // `CREATE SUBSOURCE` statements are disallowed for users and are only generated
1363 // automatically as part of purification
1364 Statement::CreateSubsource(_) => {
1365 ctx.retire(Err(AdapterError::Unsupported(
1366 "CREATE SUBSOURCE statements",
1367 )));
1368 return;
1369 }
1370
1371 Statement::CreateMaterializedView(mut cmvs) => {
1372 // `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
1373 // only used for storing initial frontiers in the catalog.
1374 if cmvs.as_of.is_some() {
1375 return ctx.retire(Err(AdapterError::Unsupported(
1376 "CREATE MATERIALIZED VIEW ... AS OF statements",
1377 )));
1378 }
1379
1380 let mz_now = match self
1381 .resolve_mz_now_for_create_materialized_view(
1382 &cmvs,
1383 &resolved_ids,
1384 ctx.session_mut(),
1385 true,
1386 )
1387 .await
1388 {
1389 Ok(mz_now) => mz_now,
1390 Err(e) => return ctx.retire(Err(e)),
1391 };
1392
1393 let catalog = self.catalog().for_session(ctx.session());
1394
1395 purify_create_materialized_view_options(
1396 catalog,
1397 mz_now,
1398 &mut cmvs,
1399 &mut resolved_ids,
1400 );
1401
1402 let purified_stmt =
1403 Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
1404 if_exists: cmvs.if_exists,
1405 name: cmvs.name,
1406 columns: cmvs.columns,
1407 replacement_for: cmvs.replacement_for,
1408 in_cluster: cmvs.in_cluster,
1409 in_cluster_replica: cmvs.in_cluster_replica,
1410 query: cmvs.query,
1411 with_options: cmvs.with_options,
1412 as_of: None,
1413 });
1414
1415 // (Purifying CreateMaterializedView doesn't happen async, so no need to send
1416 // `Message::PurifiedStatementReady` here.)
1417 (purified_stmt, resolved_ids)
1418 }
1419
1420 Statement::ExplainPlan(ExplainPlanStatement {
1421 stage,
1422 with_options,
1423 format,
1424 explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
1425 }) => {
1426 let mut cmvs = *box_cmvs;
1427 let mz_now = match self
1428 .resolve_mz_now_for_create_materialized_view(
1429 &cmvs,
1430 &resolved_ids,
1431 ctx.session_mut(),
1432 false,
1433 )
1434 .await
1435 {
1436 Ok(mz_now) => mz_now,
1437 Err(e) => return ctx.retire(Err(e)),
1438 };
1439
1440 let catalog = self.catalog().for_session(ctx.session());
1441
1442 purify_create_materialized_view_options(
1443 catalog,
1444 mz_now,
1445 &mut cmvs,
1446 &mut resolved_ids,
1447 );
1448
1449 let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
1450 stage,
1451 with_options,
1452 format,
1453 explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
1454 });
1455
1456 (purified_stmt, resolved_ids)
1457 }
1458
1459 // All other statements are handled immediately.
1460 _ => (stmt, resolved_ids),
1461 };
1462
1463 match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
1464 Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
1465 Err(e) => ctx.retire(Err(e)),
1466 }
1467 }
1468
1469 /// Whether the statement must be serialized and is DDL.
1470 fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
1471 // Non-DDL is not serialized here.
1472 if !StatementClassification::from(&*stmt).is_ddl() {
1473 return false;
1474 }
1475 // Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
1476 // not be serialized. These all use PlanValidity for their checking, and we must ensure
1477 // those checks are sufficient.
1478 if Self::must_spawn_purification(stmt) {
1479 return false;
1480 }
1481
1482 // Statements that support multiple DDLs in a single transaction aren't serialized here.
1483 // Their operations are serialized when applied to the catalog, guaranteeing that any
1484 // off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
1485 if ctx.session.transaction().is_ddl() {
1486 return false;
1487 }
1488
1489 // Some DDL is exempt. It is not great that we are matching on Statements here because
1490 // different plans can be produced from the same top-level statement type (i.e., `ALTER
1491 // CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
1492 // planned in the first place, so we accept the abstraction leak.
1493 match stmt {
1494 // Secrets have a small and understood set of dependencies, and their off-thread work
1495 // interacts with k8s.
1496 Statement::AlterSecret(_) => false,
1497 Statement::CreateSecret(_) => false,
1498 Statement::AlterConnection(AlterConnectionStatement { actions, .. })
1499 if actions
1500 .iter()
1501 .all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
1502 {
1503 false
1504 }
1505
1506 // The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
1507 // does not affect its catalog names or ids and so is safe to not serialize. This could
1508 // change the set of replicas that exist. For queries that name replicas or use the
1509 // current_replica session var, the `replica_id` field of `PlanValidity` serves to
1510 // ensure that those replicas exist during the query finish stage. Additionally, that
1511 // work can take hours (configured by the user), so would also be a bad experience for
1512 // users.
1513 Statement::AlterCluster(_) => false,
1514
1515 // `ALTER SINK SET FROM` waits for the old relation to make enough progress for a clean
1516 // cutover. If the old collection is stalled, it may block forever. Checks in
1517 // sequencing ensure that the operation fails if any one of these happens concurrently:
1518 // * the sink is dropped
1519 // * the new source relation is dropped
1520 // * another `ALTER SINK` for the same sink is applied first
1521 Statement::AlterSink(stmt)
1522 if matches!(stmt.action, AlterSinkAction::ChangeRelation(_)) =>
1523 {
1524 false
1525 }
1526
1527 // `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT` waits for the target MV to make
1528 // enough progress for a clean cutover. If the target MV is stalled, it may block
1529 // forever. Checks in sequencing ensure the operation fails if any of these happens
1530 // concurrently:
1531 // * the target MV is dropped
1532 // * the replacement MV is dropped
1533 Statement::AlterMaterializedViewApplyReplacement(_) => false,
1534
1535 // Everything else must be serialized.
1536 _ => true,
1537 }
1538 }
1539
1540 /// Whether the statement must be purified off of the Coordinator thread.
1541 fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
1542 // `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
1543 // coordinator thread.
1544 if !matches!(
1545 stmt,
1546 Statement::CreateSource(_)
1547 | Statement::AlterSource(_)
1548 | Statement::CreateSink(_)
1549 | Statement::CreateTableFromSource(_)
1550 ) {
1551 return false;
1552 }
1553
1554 // However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
1555 if let Statement::AlterSource(stmt) = stmt {
1556 let names: Vec<CreateSourceOptionName> = match &stmt.action {
1557 AlterSourceAction::SetOptions(options) => {
1558 options.iter().map(|o| o.name.clone()).collect()
1559 }
1560 AlterSourceAction::ResetOptions(names) => names.clone(),
1561 _ => vec![],
1562 };
1563 if !names.is_empty()
1564 && names
1565 .iter()
1566 .all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
1567 {
1568 return false;
1569 }
1570 }
1571
1572 true
1573 }
1574
1575 /// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
1576 /// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
1577 /// option, this function grabs read holds at the earliest possible time on input collections
1578 /// that might be involved in the MV.
1579 ///
1580 /// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
1581 /// in `with_options`).
1582 ///
1583 /// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
1584 /// unfortunately.)
1585 async fn resolve_mz_now_for_create_materialized_view(
1586 &mut self,
1587 cmvs: &CreateMaterializedViewStatement<Aug>,
1588 resolved_ids: &ResolvedIds,
1589 session: &Session,
1590 acquire_read_holds: bool,
1591 ) -> Result<Option<Timestamp>, AdapterError> {
1592 if cmvs
1593 .with_options
1594 .iter()
1595 .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
1596 {
1597 let catalog = self.catalog().for_session(session);
1598 let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
1599 let ids = self
1600 .index_oracle(cluster)
1601 .sufficient_collections(resolved_ids.collections().copied());
1602
1603 // If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
1604 // need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
1605 // we want to check the AT times against the read holds that we acquire here. But
1606 // we do it for any REFRESH option, to avoid having so many code paths doing different
1607 // things.)
1608 //
1609 // It's important that we acquire read holds _before_ we determine the least valid read.
1610 // Otherwise, we're not guaranteed that the since frontier doesn't
1611 // advance forward from underneath us.
1612 let read_holds = self.acquire_read_holds(&ids);
1613
1614 // Does `mz_now()` occur?
1615 let mz_now_ts = if cmvs
1616 .with_options
1617 .iter()
1618 .any(materialized_view_option_contains_temporal)
1619 {
1620 let timeline_context = self
1621 .catalog()
1622 .validate_timeline_context(resolved_ids.collections().copied())?;
1623
1624 // We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
1625 // but even in the TimestampIndependent case.
1626 // Note that we didn't accurately decide whether we are TimestampDependent
1627 // or TimestampIndependent, because for this we'd need to also check whether
1628 // `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
1629 // However, this doesn't matter here, as we are just going to default to
1630 // EpochMilliseconds in both cases.
1631 let timeline = timeline_context
1632 .timeline()
1633 .unwrap_or(&Timeline::EpochMilliseconds);
1634
1635 // Let's start with the timestamp oracle read timestamp.
1636 let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
1637
1638 // If `least_valid_read` is later than the oracle, then advance to that time.
1639 // If we didn't do this, then there would be a danger of missing the first refresh,
1640 // which might cause the materialized view to be unreadable for hours. This might
1641 // be what was happening here:
1642 // https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
1643 //
1644 // In the long term, it would be good to actually block the MV creation statement
1645 // until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
1646 // Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
1647 // with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
1648 // after its creation might see input changes that happened after the CRATE MATERIALIZED
1649 // VIEW statement returned.
1650 let oracle_timestamp = timestamp;
1651 let least_valid_read = read_holds.least_valid_read();
1652 timestamp.advance_by(least_valid_read.borrow());
1653
1654 if oracle_timestamp != timestamp {
1655 warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
1656 }
1657
1658 info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
1659 Ok(Some(timestamp))
1660 } else {
1661 Ok(None)
1662 };
1663
1664 // NOTE: The Drop impl of ReadHolds makes sure that the hold is
1665 // released when we don't use it.
1666 if acquire_read_holds {
1667 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1668 }
1669
1670 mz_now_ts
1671 } else {
1672 Ok(None)
1673 }
1674 }
1675
1676 /// Instruct the dataflow layer to cancel any ongoing, interactive work for
1677 /// the named `conn_id` if the correct secret key is specified.
1678 ///
1679 /// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
1680 /// `ConnectionId` because this method gets called by external clients when
1681 /// they request to cancel a request.
1682 #[mz_ore::instrument(level = "debug")]
1683 async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
1684 if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
1685 // If the secret key specified by the client doesn't match the
1686 // actual secret key for the target connection, we treat this as a
1687 // rogue cancellation request and ignore it.
1688 if conn_meta.secret_key != secret_key {
1689 return;
1690 }
1691
1692 // Now that we've verified the secret key, this is a privileged
1693 // cancellation request. We can upgrade the raw connection ID to a
1694 // proper `IdHandle`.
1695 self.handle_privileged_cancel(id_handle.clone()).await;
1696 }
1697 }
1698
1699 /// Unconditionally instructs the dataflow layer to cancel any ongoing,
1700 /// interactive work for the named `conn_id`.
1701 #[mz_ore::instrument(level = "debug")]
1702 pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
1703 let mut maybe_ctx = None;
1704
1705 // Cancel pending writes. There is at most one pending write per session.
1706 if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
1707 matches!(pending_write_txn, PendingWriteTxn::User {
1708 pending_txn: PendingTxn { ctx, .. },
1709 ..
1710 } if *ctx.session().conn_id() == conn_id)
1711 }) {
1712 if let PendingWriteTxn::User {
1713 pending_txn: PendingTxn { ctx, .. },
1714 ..
1715 } = self.pending_writes.remove(idx)
1716 {
1717 maybe_ctx = Some(ctx);
1718 }
1719 }
1720
1721 // Cancel deferred writes.
1722 if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
1723 maybe_ctx = Some(write_op.into_ctx());
1724 }
1725
1726 // Cancel deferred statements.
1727 if let Some(idx) = self
1728 .serialized_ddl
1729 .iter()
1730 .position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
1731 {
1732 let deferred = self
1733 .serialized_ddl
1734 .remove(idx)
1735 .expect("known to exist from call to `position` above");
1736 maybe_ctx = Some(deferred.ctx);
1737 }
1738
1739 // Cancel reads waiting on being linearized. There is at most one linearized read per
1740 // session.
1741 if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
1742 let ctx = pending_read_txn.take_context();
1743 maybe_ctx = Some(ctx);
1744 }
1745
1746 if let Some(ctx) = maybe_ctx {
1747 ctx.retire(Err(AdapterError::Canceled));
1748 }
1749
1750 self.cancel_pending_peeks(&conn_id);
1751 self.cancel_pending_watchsets(&conn_id);
1752 self.cancel_compute_sinks_for_conn(&conn_id).await;
1753 self.cancel_cluster_reconfigurations_for_conn(&conn_id)
1754 .await;
1755 self.cancel_pending_copy(&conn_id);
1756 if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
1757 let _ = tx.send(true);
1758 }
1759 }
1760
1761 /// Handle termination of a client session.
1762 ///
1763 /// This cleans up any state in the coordinator associated with the session.
1764 #[mz_ore::instrument(level = "debug")]
1765 async fn handle_terminate(&mut self, conn_id: ConnectionId) {
1766 if !self.active_conns.contains_key(&conn_id) {
1767 // If the session doesn't exist in `active_conns`, then this method will panic later on.
1768 // Instead we explicitly panic here while dumping the entire Coord to the logs to help
1769 // debug. This panic is very infrequent so we want as much information as possible.
1770 // See https://github.com/MaterializeInc/database-issues/issues/5627.
1771 panic!("unknown connection: {conn_id:?}\n\n{self:?}")
1772 }
1773
1774 // We do not need to call clear_transaction here because there are no side effects to run
1775 // based on any session transaction state.
1776 self.clear_connection(&conn_id).await;
1777
1778 self.drop_temp_items(&conn_id).await;
1779 // Only call catalog_mut() if a temporary schema actually exists for this connection.
1780 // This avoids an expensive Arc::make_mut clone for the common case where the connection
1781 // never created any temporary objects.
1782 if self.catalog().state().has_temporary_schema(&conn_id) {
1783 self.catalog_mut()
1784 .drop_temporary_schema(&conn_id)
1785 .unwrap_or_terminate("unable to drop temporary schema");
1786 }
1787 let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
1788 let session_type = metrics::session_type_label_value(conn.user());
1789 self.metrics
1790 .active_sessions
1791 .with_label_values(&[session_type])
1792 .dec();
1793 self.cancel_pending_peeks(conn.conn_id());
1794 self.cancel_pending_watchsets(&conn_id);
1795 self.cancel_pending_copy(&conn_id);
1796 self.end_session_for_statement_logging(conn.uuid());
1797
1798 // Queue the builtin table update, but do not wait for it to complete. We explicitly do
1799 // this to prevent blocking the Coordinator in the case that a lot of connections are
1800 // closed at once, which occurs regularly in some workflows.
1801 let update = self
1802 .catalog()
1803 .state()
1804 .pack_session_update(&conn, Diff::MINUS_ONE);
1805 let update = self.catalog().state().resolve_builtin_table_update(update);
1806
1807 let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
1808 }
1809
1810 /// Returns the necessary metadata for appending to a webhook source, and a channel to send
1811 /// rows.
1812 #[mz_ore::instrument(level = "debug")]
1813 fn handle_get_webhook(
1814 &mut self,
1815 database: String,
1816 schema: String,
1817 name: String,
1818 tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
1819 ) {
1820 /// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
1821 ///
1822 /// Returns a struct that can be used to append data to the underlying storate collection, and the
1823 /// types we should cast the request to.
1824 fn resolve(
1825 coord: &mut Coordinator,
1826 database: String,
1827 schema: String,
1828 name: String,
1829 ) -> Result<AppendWebhookResponse, PartialItemName> {
1830 // Resolve our collection.
1831 let name = PartialItemName {
1832 database: Some(database),
1833 schema: Some(schema),
1834 item: name,
1835 };
1836 let Ok(entry) = coord
1837 .catalog()
1838 .resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
1839 else {
1840 return Err(name);
1841 };
1842
1843 // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
1844 let (data_source, desc, global_id) = match entry.item() {
1845 CatalogItem::Source(Source {
1846 data_source: data_source @ DataSourceDesc::Webhook { .. },
1847 desc,
1848 global_id,
1849 ..
1850 }) => (data_source, desc.clone(), *global_id),
1851 CatalogItem::Table(
1852 table @ Table {
1853 desc,
1854 data_source:
1855 TableDataSource::DataSource {
1856 desc: data_source @ DataSourceDesc::Webhook { .. },
1857 ..
1858 },
1859 ..
1860 },
1861 ) => (data_source, desc.latest(), table.global_id_writes()),
1862 _ => return Err(name),
1863 };
1864
1865 let DataSourceDesc::Webhook {
1866 validate_using,
1867 body_format,
1868 headers,
1869 ..
1870 } = data_source
1871 else {
1872 mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
1873 return Err(name);
1874 };
1875 let body_format = body_format.clone();
1876 let header_tys = headers.clone();
1877
1878 // Assert we have one column for the body, and how ever many are required for
1879 // the headers.
1880 let num_columns = headers.num_columns() + 1;
1881 mz_ore::soft_assert_or_log!(
1882 desc.arity() <= num_columns,
1883 "expected at most {} columns, but got {}",
1884 num_columns,
1885 desc.arity()
1886 );
1887
1888 // Double check that the body column of the webhook source matches the type
1889 // we're about to deserialize as.
1890 let body_column = desc
1891 .get_by_name(&"body".into())
1892 .map(|(_idx, ty)| ty.clone())
1893 .ok_or_else(|| name.clone())?;
1894 assert!(!body_column.nullable, "webhook body column is nullable!?");
1895 assert_eq!(body_column.scalar_type, SqlScalarType::from(body_format));
1896
1897 // Create a validator that can be called to validate a webhook request.
1898 let validator = validate_using.as_ref().map(|v| {
1899 let validation = v.clone();
1900 AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
1901 });
1902
1903 // Get a channel so we can queue updates to be written.
1904 let row_tx = coord
1905 .controller
1906 .storage
1907 .monotonic_appender(global_id)
1908 .map_err(|_| name.clone())?;
1909 let stats = coord
1910 .controller
1911 .storage
1912 .webhook_statistics(global_id)
1913 .map_err(|_| name)?;
1914 let invalidator = coord
1915 .active_webhooks
1916 .entry(entry.id())
1917 .or_insert_with(WebhookAppenderInvalidator::new);
1918 let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
1919
1920 Ok(AppendWebhookResponse {
1921 tx,
1922 body_format,
1923 header_tys,
1924 validator,
1925 })
1926 }
1927
1928 let response = resolve(self, database, schema, name).map_err(|name| {
1929 AppendWebhookError::UnknownWebhook {
1930 database: name.database.expect("provided"),
1931 schema: name.schema.expect("provided"),
1932 name: name.item,
1933 }
1934 });
1935 let _ = tx.send(response);
1936 }
1937
1938 /// Handle registration of a frontend peek, for statement logging and query cancellation
1939 /// handling.
1940 fn handle_register_frontend_peek(
1941 &mut self,
1942 uuid: Uuid,
1943 conn_id: ConnectionId,
1944 cluster_id: mz_controller_types::ClusterId,
1945 depends_on: BTreeSet<GlobalId>,
1946 is_fast_path: bool,
1947 watch_set: Option<WatchSetCreation>,
1948 tx: oneshot::Sender<Result<(), AdapterError>>,
1949 ) {
1950 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1951 if let Some(ws) = watch_set {
1952 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1953 let _ = tx.send(Err(
1954 AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
1955 ));
1956 return;
1957 }
1958 }
1959
1960 // Store the peek in pending_peeks for later retrieval when results arrive
1961 self.pending_peeks.insert(
1962 uuid,
1963 PendingPeek {
1964 conn_id: conn_id.clone(),
1965 cluster_id,
1966 depends_on,
1967 ctx_extra: ExecuteContextGuard::new(
1968 statement_logging_id,
1969 self.internal_cmd_tx.clone(),
1970 ),
1971 is_fast_path,
1972 },
1973 );
1974
1975 // Also track it by connection ID for cancellation support
1976 self.client_pending_peeks
1977 .entry(conn_id)
1978 .or_default()
1979 .insert(uuid, cluster_id);
1980
1981 let _ = tx.send(Ok(()));
1982 }
1983
1984 /// Handle unregistration of a frontend peek that was registered but failed to issue.
1985 /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
1986 fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) {
1987 // Remove from pending_peeks (this also removes from client_pending_peeks)
1988 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
1989 // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result.
1990 let _ = pending_peek.ctx_extra.defuse();
1991 }
1992 let _ = tx.send(());
1993 }
1994}