mz_adapter/coord/
sequencer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Prevents anyone from accidentally exporting a method from the `inner` module.
11#![allow(clippy::pub_use)]
12
13//! Logic for executing a planned SQL query.
14
15use futures::FutureExt;
16use futures::future::LocalBoxFuture;
17use inner::return_if_err;
18use mz_expr::row::RowCollection;
19use mz_expr::{MirRelationExpr, RowSetFinishing};
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_repr::{CatalogItemId, Diff, GlobalId};
22use mz_sql::catalog::CatalogError;
23use mz_sql::names::ResolvedIds;
24use mz_sql::plan::{
25    self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
26    CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
27};
28use mz_sql::rbac;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast::{Raw, Statement};
31use mz_storage_client::client::TableData;
32use mz_storage_types::connections::inline::IntoInlineConnection;
33use std::collections::BTreeSet;
34use std::sync::Arc;
35use tokio::sync::oneshot;
36use tracing::{Instrument, Level, Span, event};
37
38use crate::ExecuteContext;
39use crate::catalog::Catalog;
40use crate::command::{Command, ExecuteResponse, Response};
41use crate::coord::appends::{DeferredOp, DeferredPlan};
42use crate::coord::validity::PlanValidity;
43use crate::coord::{
44    Coordinator, DeferredPlanStatement, Message, PlanStatement, TargetCluster, catalog_serving,
45};
46use crate::error::AdapterError;
47use crate::notice::AdapterNotice;
48use crate::session::{
49    EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
50};
51use crate::util::ClientTransmitter;
52
53// DO NOT make this visible in any way, i.e. do not add any version of
54// `pub` to this mod. The inner `sequence_X` methods are hidden in this
55// private module to prevent anyone from calling them directly. All
56// sequencing should be done through the `sequence_plan` method.
57// This allows us to add catch-all logic that should be applied to all
58// plans in `sequence_plan` and guarantee that no caller can circumvent
59// that logic.
60//
61// The two exceptions are:
62//
63// - Creating a role during connection startup. In this scenario, the session has not been properly
64// initialized and we need to skip directly to creating role. We have a specific method,
65// `sequence_create_role_for_startup` for this purpose.
66// - Methods that continue the execution of some plan that was being run asynchronously, such as
67// `sequence_peek_stage` and `sequence_create_connection_stage_finish`.
68
69mod inner;
70
71impl Coordinator {
72    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would
73    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
74    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
75    pub(crate) fn sequence_plan(
76        &mut self,
77        mut ctx: ExecuteContext,
78        plan: Plan,
79        resolved_ids: ResolvedIds,
80    ) -> LocalBoxFuture<'_, ()> {
81        async move {
82            let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
83            ctx.tx_mut().set_allowed(responses);
84
85            if self.controller.read_only() && !plan.allowed_in_read_only() {
86                ctx.retire(Err(AdapterError::ReadOnly));
87                return;
88            }
89
90            // Check if we're still waiting for any of the builtin table appends from when we
91            // started the Session to complete.
92            if let Some((dependencies, wait_future)) =
93                super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
94            {
95                let conn_id = ctx.session().conn_id();
96                tracing::debug!(%conn_id, "deferring plan for startup appends");
97
98                let role_metadata = ctx.session().role_metadata().clone();
99                let validity = PlanValidity::new(
100                    self.catalog.transient_revision(),
101                    dependencies,
102                    None,
103                    None,
104                    role_metadata,
105                );
106                let deferred_plan = DeferredPlan {
107                    ctx,
108                    plan,
109                    validity,
110                    requires_locks: BTreeSet::default(),
111                };
112                // Defer op accepts an optional write lock, but there aren't any writes occurring
113                // here, since the map to `None`.
114                let acquire_future = wait_future.map(|()| None);
115
116                self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
117
118                // Return early because our op is deferred on waiting for the builtin writes to
119                // complete.
120                return;
121            };
122
123            // Scope the borrow of the Catalog because we need to mutate the Coordinator state below.
124            let target_cluster = match ctx.session().transaction().cluster() {
125                // Use the current transaction's cluster.
126                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
127                // If there isn't a current cluster set for a transaction, then try to auto route.
128                None => {
129                    let session_catalog = self.catalog.for_session(ctx.session());
130                    catalog_serving::auto_run_on_catalog_server(
131                        &session_catalog,
132                        ctx.session(),
133                        &plan,
134                    )
135                }
136            };
137            let (target_cluster_id, target_cluster_name) = match self
138                .catalog()
139                .resolve_target_cluster(target_cluster, ctx.session())
140            {
141                Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
142                Err(_) => (None, None),
143            };
144
145            if let (Some(cluster_id), Some(statement_id)) =
146                (target_cluster_id, ctx.extra().contents())
147            {
148                self.set_statement_execution_cluster(statement_id, cluster_id);
149            }
150
151            let session_catalog = self.catalog.for_session(ctx.session());
152
153            if let Some(cluster_name) = &target_cluster_name {
154                if let Err(e) = catalog_serving::check_cluster_restrictions(
155                    cluster_name,
156                    &session_catalog,
157                    &plan,
158                ) {
159                    return ctx.retire(Err(e));
160                }
161            }
162
163            if let Err(e) = rbac::check_plan(
164                &session_catalog,
165                |id| {
166                    // We use linear search through active connections if needed, which is fine
167                    // because the RBAC check will call the closure at most once.
168                    self.active_conns()
169                        .into_iter()
170                        .find(|(conn_id, _)| conn_id.unhandled() == id)
171                        .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
172                },
173                ctx.session(),
174                &plan,
175                target_cluster_id,
176                &resolved_ids,
177            ) {
178                return ctx.retire(Err(e.into()));
179            }
180
181            match plan {
182                Plan::CreateSource(plan) => {
183                    let id_ts = self.get_catalog_write_ts().await;
184                    let (item_id, global_id) =
185                        return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
186                    let result = self
187                        .sequence_create_source(
188                            &mut ctx,
189                            vec![CreateSourcePlanBundle {
190                                item_id,
191                                global_id,
192                                plan,
193                                resolved_ids,
194                                available_source_references: None,
195                            }],
196                        )
197                        .await;
198                    ctx.retire(result);
199                }
200                Plan::CreateSources(plans) => {
201                    assert!(
202                        resolved_ids.is_empty(),
203                        "each plan has separate resolved_ids"
204                    );
205                    let result = self.sequence_create_source(&mut ctx, plans).await;
206                    ctx.retire(result);
207                }
208                Plan::CreateConnection(plan) => {
209                    self.sequence_create_connection(ctx, plan, resolved_ids)
210                        .await;
211                }
212                Plan::CreateDatabase(plan) => {
213                    let result = self.sequence_create_database(ctx.session_mut(), plan).await;
214                    ctx.retire(result);
215                }
216                Plan::CreateSchema(plan) => {
217                    let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
218                    ctx.retire(result);
219                }
220                Plan::CreateRole(plan) => {
221                    let result = self
222                        .sequence_create_role(Some(ctx.session().conn_id()), plan)
223                        .await;
224                    if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
225                        ctx.session().add_notice(notice);
226                    }
227                    ctx.retire(result);
228                }
229                Plan::CreateCluster(plan) => {
230                    let result = self.sequence_create_cluster(ctx.session(), plan).await;
231                    ctx.retire(result);
232                }
233                Plan::CreateClusterReplica(plan) => {
234                    let result = self
235                        .sequence_create_cluster_replica(ctx.session(), plan)
236                        .await;
237                    ctx.retire(result);
238                }
239                Plan::CreateTable(plan) => {
240                    let result = self
241                        .sequence_create_table(&mut ctx, plan, resolved_ids)
242                        .await;
243                    ctx.retire(result);
244                }
245                Plan::CreateSecret(plan) => {
246                    self.sequence_create_secret(ctx, plan).await;
247                }
248                Plan::CreateSink(plan) => {
249                    self.sequence_create_sink(ctx, plan, resolved_ids).await;
250                }
251                Plan::CreateView(plan) => {
252                    self.sequence_create_view(ctx, plan, resolved_ids).await;
253                }
254                Plan::CreateMaterializedView(plan) => {
255                    self.sequence_create_materialized_view(ctx, plan, resolved_ids)
256                        .await;
257                }
258                Plan::CreateContinualTask(plan) => {
259                    let res = self
260                        .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
261                        .await;
262                    ctx.retire(res);
263                }
264                Plan::CreateIndex(plan) => {
265                    self.sequence_create_index(ctx, plan, resolved_ids).await;
266                }
267                Plan::CreateType(plan) => {
268                    let result = self
269                        .sequence_create_type(ctx.session(), plan, resolved_ids)
270                        .await;
271                    ctx.retire(result);
272                }
273                Plan::CreateNetworkPolicy(plan) => {
274                    let res = self
275                        .sequence_create_network_policy(ctx.session(), plan)
276                        .await;
277                    ctx.retire(res);
278                }
279                Plan::Comment(plan) => {
280                    let result = self.sequence_comment_on(ctx.session(), plan).await;
281                    ctx.retire(result);
282                }
283                Plan::CopyTo(plan) => {
284                    self.sequence_copy_to(ctx, plan, target_cluster).await;
285                }
286                Plan::DropObjects(plan) => {
287                    let result = self.sequence_drop_objects(ctx.session_mut(), plan).await;
288                    ctx.retire(result);
289                }
290                Plan::DropOwned(plan) => {
291                    let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
292                    ctx.retire(result);
293                }
294                Plan::EmptyQuery => {
295                    ctx.retire(Ok(ExecuteResponse::EmptyQuery));
296                }
297                Plan::ShowAllVariables => {
298                    let result = self.sequence_show_all_variables(ctx.session());
299                    ctx.retire(result);
300                }
301                Plan::ShowVariable(plan) => {
302                    let result = self.sequence_show_variable(ctx.session(), plan);
303                    ctx.retire(result);
304                }
305                Plan::InspectShard(plan) => {
306                    // TODO: Ideally, this await would happen off the main thread.
307                    let result = self.sequence_inspect_shard(ctx.session(), plan).await;
308                    ctx.retire(result);
309                }
310                Plan::SetVariable(plan) => {
311                    let result = self.sequence_set_variable(ctx.session_mut(), plan);
312                    ctx.retire(result);
313                }
314                Plan::ResetVariable(plan) => {
315                    let result = self.sequence_reset_variable(ctx.session_mut(), plan);
316                    ctx.retire(result);
317                }
318                Plan::SetTransaction(plan) => {
319                    let result = self.sequence_set_transaction(ctx.session_mut(), plan);
320                    ctx.retire(result);
321                }
322                Plan::StartTransaction(plan) => {
323                    if matches!(
324                        ctx.session().transaction(),
325                        TransactionStatus::InTransaction(_)
326                    ) {
327                        ctx.session()
328                            .add_notice(AdapterNotice::ExistingTransactionInProgress);
329                    }
330                    let result = ctx.session_mut().start_transaction(
331                        self.now_datetime(),
332                        plan.access,
333                        plan.isolation_level,
334                    );
335                    ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
336                }
337                Plan::CommitTransaction(CommitTransactionPlan {
338                    ref transaction_type,
339                })
340                | Plan::AbortTransaction(AbortTransactionPlan {
341                    ref transaction_type,
342                }) => {
343                    // Serialize DDL transactions. Statements that use this mode must return false
344                    // in `must_serialize_ddl()`.
345                    if ctx.session().transaction().is_ddl() {
346                        if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
347                            let prev = self
348                                .active_conns
349                                .get_mut(ctx.session().conn_id())
350                                .expect("connection must exist")
351                                .deferred_lock
352                                .replace(guard);
353                            assert!(
354                                prev.is_none(),
355                                "connections should have at most one lock guard"
356                            );
357                        } else {
358                            self.serialized_ddl.push_back(DeferredPlanStatement {
359                                ctx,
360                                ps: PlanStatement::Plan { plan, resolved_ids },
361                            });
362                            return;
363                        }
364                    }
365
366                    let action = match &plan {
367                        Plan::CommitTransaction(_) => EndTransactionAction::Commit,
368                        Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
369                        _ => unreachable!(),
370                    };
371                    if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
372                    {
373                        // In Postgres, if a user sends a COMMIT or ROLLBACK in an
374                        // implicit transaction, a warning is sent warning them.
375                        // (The transaction is still closed and a new implicit
376                        // transaction started, though.)
377                        ctx.session().add_notice(
378                            AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
379                        );
380                    }
381                    self.sequence_end_transaction(ctx, action).await;
382                }
383                Plan::Select(plan) => {
384                    let max = Some(ctx.session().vars().max_query_result_size());
385                    self.sequence_peek(ctx, plan, target_cluster, max).await;
386                }
387                Plan::Subscribe(plan) => {
388                    self.sequence_subscribe(ctx, plan, target_cluster).await;
389                }
390                Plan::SideEffectingFunc(plan) => {
391                    self.sequence_side_effecting_func(ctx, plan).await;
392                }
393                Plan::ShowCreate(plan) => {
394                    ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
395                }
396                Plan::ShowColumns(show_columns_plan) => {
397                    let max = Some(ctx.session().vars().max_query_result_size());
398                    self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
399                        .await;
400                }
401                Plan::CopyFrom(plan) => match plan.source {
402                    CopyFromSource::Stdin => {
403                        let (tx, _, session, ctx_extra) = ctx.into_parts();
404                        tx.send(
405                            Ok(ExecuteResponse::CopyFrom {
406                                target_id: plan.target_id,
407                                target_name: plan.target_name,
408                                columns: plan.columns,
409                                params: plan.params,
410                                ctx_extra,
411                            }),
412                            session,
413                        );
414                    }
415                    CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
416                        self.sequence_copy_from(ctx, plan, target_cluster).await;
417                    }
418                },
419                Plan::ExplainPlan(plan) => {
420                    self.sequence_explain_plan(ctx, plan, target_cluster).await;
421                }
422                Plan::ExplainPushdown(plan) => {
423                    self.sequence_explain_pushdown(ctx, plan, target_cluster)
424                        .await;
425                }
426                Plan::ExplainSinkSchema(plan) => {
427                    let result = self.sequence_explain_schema(plan);
428                    ctx.retire(result);
429                }
430                Plan::ExplainTimestamp(plan) => {
431                    self.sequence_explain_timestamp(ctx, plan, target_cluster)
432                        .await;
433                }
434                Plan::Insert(plan) => {
435                    self.sequence_insert(ctx, plan).await;
436                }
437                Plan::ReadThenWrite(plan) => {
438                    self.sequence_read_then_write(ctx, plan).await;
439                }
440                Plan::AlterNoop(plan) => {
441                    ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
442                }
443                Plan::AlterCluster(plan) => {
444                    self.sequence_alter_cluster_staged(ctx, plan).await;
445                }
446                Plan::AlterClusterRename(plan) => {
447                    let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
448                    ctx.retire(result);
449                }
450                Plan::AlterClusterSwap(plan) => {
451                    let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
452                    ctx.retire(result);
453                }
454                Plan::AlterClusterReplicaRename(plan) => {
455                    let result = self
456                        .sequence_alter_cluster_replica_rename(ctx.session(), plan)
457                        .await;
458                    ctx.retire(result);
459                }
460                Plan::AlterConnection(plan) => {
461                    self.sequence_alter_connection(ctx, plan).await;
462                }
463                Plan::AlterSetCluster(plan) => {
464                    let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
465                    ctx.retire(result);
466                }
467                Plan::AlterRetainHistory(plan) => {
468                    let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
469                    ctx.retire(result);
470                }
471                Plan::AlterItemRename(plan) => {
472                    let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
473                    ctx.retire(result);
474                }
475                Plan::AlterSchemaRename(plan) => {
476                    let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
477                    ctx.retire(result);
478                }
479                Plan::AlterSchemaSwap(plan) => {
480                    let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
481                    ctx.retire(result);
482                }
483                Plan::AlterRole(plan) => {
484                    let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
485                    ctx.retire(result);
486                }
487                Plan::AlterSecret(plan) => {
488                    self.sequence_alter_secret(ctx, plan).await;
489                }
490                Plan::AlterSink(plan) => {
491                    self.sequence_alter_sink_prepare(ctx, plan).await;
492                }
493                Plan::AlterSource(plan) => {
494                    let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
495                    ctx.retire(result);
496                }
497                Plan::AlterSystemSet(plan) => {
498                    let result = self.sequence_alter_system_set(ctx.session(), plan).await;
499                    ctx.retire(result);
500                }
501                Plan::AlterSystemReset(plan) => {
502                    let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
503                    ctx.retire(result);
504                }
505                Plan::AlterSystemResetAll(plan) => {
506                    let result = self
507                        .sequence_alter_system_reset_all(ctx.session(), plan)
508                        .await;
509                    ctx.retire(result);
510                }
511                Plan::AlterTableAddColumn(plan) => {
512                    let result = self.sequence_alter_table(&mut ctx, plan).await;
513                    ctx.retire(result);
514                }
515                Plan::AlterNetworkPolicy(plan) => {
516                    let res = self
517                        .sequence_alter_network_policy(ctx.session(), plan)
518                        .await;
519                    ctx.retire(res);
520                }
521                Plan::DiscardTemp => {
522                    self.drop_temp_items(ctx.session().conn_id()).await;
523                    ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
524                }
525                Plan::DiscardAll => {
526                    let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
527                        self.clear_transaction(ctx.session_mut()).await;
528                        self.drop_temp_items(ctx.session().conn_id()).await;
529                        ctx.session_mut().reset();
530                        Ok(ExecuteResponse::DiscardedAll)
531                    } else {
532                        Err(AdapterError::OperationProhibitsTransaction(
533                            "DISCARD ALL".into(),
534                        ))
535                    };
536                    ctx.retire(ret);
537                }
538                Plan::Declare(plan) => {
539                    self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
540                }
541                Plan::Fetch(FetchPlan {
542                    name,
543                    count,
544                    timeout,
545                }) => {
546                    let ctx_extra = std::mem::take(ctx.extra_mut());
547                    ctx.retire(Ok(ExecuteResponse::Fetch {
548                        name,
549                        count,
550                        timeout,
551                        ctx_extra,
552                    }));
553                }
554                Plan::Close(plan) => {
555                    if ctx.session_mut().remove_portal(&plan.name) {
556                        ctx.retire(Ok(ExecuteResponse::ClosedCursor));
557                    } else {
558                        ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
559                    }
560                }
561                Plan::Prepare(plan) => {
562                    if ctx
563                        .session()
564                        .get_prepared_statement_unverified(&plan.name)
565                        .is_some()
566                    {
567                        ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
568                    } else {
569                        let state_revision = StateRevision {
570                            catalog_revision: self.catalog().transient_revision(),
571                            session_state_revision: ctx.session().state_revision(),
572                        };
573                        ctx.session_mut().set_prepared_statement(
574                            plan.name,
575                            Some(plan.stmt),
576                            plan.sql,
577                            plan.desc,
578                            state_revision,
579                            self.now(),
580                        );
581                        ctx.retire(Ok(ExecuteResponse::Prepare));
582                    }
583                }
584                Plan::Execute(plan) => {
585                    match self.sequence_execute(ctx.session_mut(), plan) {
586                        Ok(portal_name) => {
587                            let (tx, _, session, extra) = ctx.into_parts();
588                            self.internal_cmd_tx
589                                .send(Message::Command(
590                                    OpenTelemetryContext::obtain(),
591                                    Command::Execute {
592                                        portal_name,
593                                        session,
594                                        tx: tx.take(),
595                                        outer_ctx_extra: Some(extra),
596                                    },
597                                ))
598                                .expect("sending to self.internal_cmd_tx cannot fail");
599                        }
600                        Err(err) => ctx.retire(Err(err)),
601                    };
602                }
603                Plan::Deallocate(plan) => match plan.name {
604                    Some(name) => {
605                        if ctx.session_mut().remove_prepared_statement(&name) {
606                            ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
607                        } else {
608                            ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
609                        }
610                    }
611                    None => {
612                        ctx.session_mut().remove_all_prepared_statements();
613                        ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
614                    }
615                },
616                Plan::Raise(RaisePlan { severity }) => {
617                    ctx.session()
618                        .add_notice(AdapterNotice::UserRequested { severity });
619                    ctx.retire(Ok(ExecuteResponse::Raised));
620                }
621                Plan::GrantPrivileges(plan) => {
622                    let result = self
623                        .sequence_grant_privileges(ctx.session_mut(), plan)
624                        .await;
625                    ctx.retire(result);
626                }
627                Plan::RevokePrivileges(plan) => {
628                    let result = self
629                        .sequence_revoke_privileges(ctx.session_mut(), plan)
630                        .await;
631                    ctx.retire(result);
632                }
633                Plan::AlterDefaultPrivileges(plan) => {
634                    let result = self
635                        .sequence_alter_default_privileges(ctx.session_mut(), plan)
636                        .await;
637                    ctx.retire(result);
638                }
639                Plan::GrantRole(plan) => {
640                    let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
641                    ctx.retire(result);
642                }
643                Plan::RevokeRole(plan) => {
644                    let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
645                    ctx.retire(result);
646                }
647                Plan::AlterOwner(plan) => {
648                    let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
649                    ctx.retire(result);
650                }
651                Plan::ReassignOwned(plan) => {
652                    let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
653                    ctx.retire(result);
654                }
655                Plan::ValidateConnection(plan) => {
656                    let connection = plan
657                        .connection
658                        .into_inline_connection(self.catalog().state());
659                    let current_storage_configuration = self.controller.storage.config().clone();
660                    mz_ore::task::spawn(|| "coord::validate_connection", async move {
661                        let res = match connection
662                            .validate(plan.id, &current_storage_configuration)
663                            .await
664                        {
665                            Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
666                            Err(err) => Err(err.into()),
667                        };
668                        ctx.retire(res);
669                    });
670                }
671            }
672        }
673        .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
674        .boxed_local()
675    }
676
677    #[mz_ore::instrument(level = "debug")]
678    pub(crate) async fn sequence_execute_single_statement_transaction(
679        &mut self,
680        ctx: ExecuteContext,
681        stmt: Arc<Statement<Raw>>,
682        params: Params,
683    ) {
684        // Put the session into single statement implicit so anything can execute.
685        let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
686        assert!(matches!(session.transaction(), TransactionStatus::Default));
687        session.start_transaction_single_stmt(self.now_datetime());
688        let conn_id = session.conn_id().unhandled();
689
690        // Execute the saved statement in a temp transmitter so we can run COMMIT.
691        let (sub_tx, sub_rx) = oneshot::channel();
692        let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
693        let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
694        self.handle_execute_inner(stmt, params, sub_ctx).await;
695
696        // The response can need off-thread processing. Wait for it elsewhere so the coordinator can
697        // continue processing.
698        let internal_cmd_tx = self.internal_cmd_tx.clone();
699        mz_ore::task::spawn(
700            || format!("execute_single_statement:{conn_id}"),
701            async move {
702                let Ok(Response {
703                    result,
704                    session,
705                    otel_ctx,
706                }) = sub_rx.await
707                else {
708                    // Coordinator went away.
709                    return;
710                };
711                otel_ctx.attach_as_parent();
712                let (sub_tx, sub_rx) = oneshot::channel();
713                let _ = internal_cmd_tx.send(Message::Command(
714                    otel_ctx,
715                    Command::Commit {
716                        action: EndTransactionAction::Commit,
717                        session,
718                        tx: sub_tx,
719                    },
720                ));
721                let Ok(commit_response) = sub_rx.await else {
722                    // Coordinator went away.
723                    return;
724                };
725                assert!(matches!(
726                    commit_response.session.transaction(),
727                    TransactionStatus::Default
728                ));
729                // The fake, generated response was already sent to the user and we don't need to
730                // ever send an `Ok(result)` to the user, because they are expecting a response from
731                // a `COMMIT`. So, always send the `COMMIT`'s result if the original statement
732                // succeeded. If it failed, we can send an error and don't need to wrap it or send a
733                // later COMMIT or ROLLBACK.
734                let result = match (result, commit_response.result) {
735                    (Ok(_), commit) => commit,
736                    (Err(result), _) => Err(result),
737                };
738                // We ignore the resp.result because it's not clear what to do if it failed since we
739                // can only send a single ExecuteResponse to tx.
740                tx.send(result, commit_response.session);
741            }
742            .instrument(Span::current()),
743        );
744    }
745
746    /// Creates a role during connection startup.
747    ///
748    /// This should not be called from anywhere except connection startup.
749    #[mz_ore::instrument(level = "debug")]
750    pub(crate) async fn sequence_create_role_for_startup(
751        &mut self,
752        plan: CreateRolePlan,
753    ) -> Result<ExecuteResponse, AdapterError> {
754        // This does not set conn_id because it's not yet in active_conns. That is because we can't
755        // make a ConnMeta until we have a role id which we don't have until after the catalog txn
756        // is committed. Passing None here means the audit log won't have a user set in the event's
757        // user field. This seems fine because it is indeed the system that is creating this role,
758        // not a user request, and the user name is still recorded in the plan, so we aren't losing
759        // information.
760        self.sequence_create_role(None, plan).await
761    }
762
763    pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
764        self.transient_id_gen.allocate_id()
765    }
766
767    fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
768        if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
769            Some(AdapterNotice::RbacUserDisabled)
770        } else {
771            None
772        }
773    }
774
775    /// Inserts the rows from `constants` into the table identified by `target_id`.
776    ///
777    /// # Panics
778    ///
779    /// Panics if `constants` is not an `MirRelationExpr::Constant`.
780    pub(crate) fn insert_constant(
781        catalog: &Catalog,
782        session: &mut Session,
783        target_id: CatalogItemId,
784        constants: MirRelationExpr,
785    ) -> Result<ExecuteResponse, AdapterError> {
786        // Insert can be queued, so we need to re-verify the id exists.
787        let desc = match catalog.try_get_entry(&target_id) {
788            Some(table) => {
789                let full_name = catalog.resolve_full_name(table.name(), Some(session.conn_id()));
790                // Inserts always happen at the latest version of a table.
791                table.desc_latest(&full_name)?
792            }
793            None => {
794                return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
795                    kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
796                        target_id.to_string(),
797                    )),
798                }));
799            }
800        };
801
802        match constants.as_const() {
803            Some((rows, ..)) => {
804                let rows = rows.clone()?;
805                for (row, _) in &rows {
806                    for (i, datum) in row.iter().enumerate() {
807                        desc.constraints_met(i, &datum)?;
808                    }
809                }
810                let diffs_plan = plan::SendDiffsPlan {
811                    id: target_id,
812                    updates: rows,
813                    kind: MutationKind::Insert,
814                    returning: Vec::new(),
815                    max_result_size: catalog.system_config().max_result_size(),
816                };
817                Self::send_diffs(session, diffs_plan)
818            }
819            None => panic!(
820                "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
821                constants.pretty(),
822            ),
823        }
824    }
825
826    #[mz_ore::instrument(level = "debug")]
827    pub(crate) fn send_diffs(
828        session: &mut Session,
829        mut plan: plan::SendDiffsPlan,
830    ) -> Result<ExecuteResponse, AdapterError> {
831        let affected_rows = {
832            let mut affected_rows = Diff::from(0);
833            let mut all_positive_diffs = true;
834            // If all diffs are positive, the number of affected rows is just the
835            // sum of all unconsolidated diffs.
836            for (_, diff) in plan.updates.iter() {
837                if diff.is_negative() {
838                    all_positive_diffs = false;
839                    break;
840                }
841
842                affected_rows += diff;
843            }
844
845            if !all_positive_diffs {
846                // Consolidate rows. This is useful e.g. for an UPDATE where the row
847                // doesn't change, and we need to reflect that in the number of
848                // affected rows.
849                differential_dataflow::consolidation::consolidate(&mut plan.updates);
850
851                affected_rows = Diff::ZERO;
852                // With retractions, the number of affected rows is not the number
853                // of rows we see, but the sum of the absolute value of their diffs,
854                // e.g. if one row is retracted and another is added, the total
855                // number of rows affected is 2.
856                for (_, diff) in plan.updates.iter() {
857                    affected_rows += diff.abs();
858                }
859            }
860
861            usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
862        };
863        event!(
864            Level::TRACE,
865            affected_rows,
866            id = format!("{:?}", plan.id),
867            kind = format!("{:?}", plan.kind),
868            updates = plan.updates.len(),
869            returning = plan.returning.len(),
870        );
871
872        session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
873            id: plan.id,
874            rows: TableData::Rows(plan.updates),
875        }]))?;
876        if !plan.returning.is_empty() {
877            let finishing = RowSetFinishing {
878                order_by: Vec::new(),
879                limit: None,
880                offset: 0,
881                project: (0..plan.returning[0].0.iter().count()).collect(),
882            };
883            let max_returned_query_size = session.vars().max_query_result_size();
884            let duration_histogram = session.metrics().row_set_finishing_seconds();
885
886            return match finishing.finish(
887                RowCollection::new(plan.returning, &finishing.order_by),
888                plan.max_result_size,
889                Some(max_returned_query_size),
890                duration_histogram,
891            ) {
892                Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
893                Err(e) => Err(AdapterError::ResultSize(e)),
894            };
895        }
896        Ok(match plan.kind {
897            MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
898            MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
899            MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
900        })
901    }
902}