mz_adapter/coord/
sequencer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Prevents anyone from accidentally exporting a method from the `inner` module.
11#![allow(clippy::pub_use)]
12
13//! Logic for executing a planned SQL query.
14
15use futures::FutureExt;
16use futures::future::LocalBoxFuture;
17use inner::return_if_err;
18use mz_expr::row::RowCollection;
19use mz_expr::{MirRelationExpr, RowSetFinishing};
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_repr::{CatalogItemId, Diff, GlobalId};
22use mz_sql::catalog::CatalogError;
23use mz_sql::names::ResolvedIds;
24use mz_sql::plan::{
25    self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
26    CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
27};
28use mz_sql::rbac;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast::{Raw, Statement};
31use mz_storage_client::client::TableData;
32use mz_storage_types::connections::inline::IntoInlineConnection;
33use std::collections::BTreeSet;
34use std::sync::Arc;
35use tokio::sync::oneshot;
36use tracing::{Instrument, Level, Span, event};
37
38use crate::ExecuteContext;
39use crate::catalog::Catalog;
40use crate::command::{Command, ExecuteResponse, Response};
41use crate::coord::appends::{DeferredOp, DeferredPlan};
42use crate::coord::validity::PlanValidity;
43use crate::coord::{
44    Coordinator, DeferredPlanStatement, Message, PlanStatement, TargetCluster, catalog_serving,
45};
46use crate::error::AdapterError;
47use crate::notice::AdapterNotice;
48use crate::session::{
49    EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
50};
51use crate::util::ClientTransmitter;
52
53// DO NOT make this visible in any way, i.e. do not add any version of
54// `pub` to this mod. The inner `sequence_X` methods are hidden in this
55// private module to prevent anyone from calling them directly. All
56// sequencing should be done through the `sequence_plan` method.
57// This allows us to add catch-all logic that should be applied to all
58// plans in `sequence_plan` and guarantee that no caller can circumvent
59// that logic.
60//
61// The two exceptions are:
62//
63// - Creating a role during connection startup. In this scenario, the session has not been properly
64// initialized and we need to skip directly to creating role. We have a specific method,
65// `sequence_create_role_for_startup` for this purpose.
66// - Methods that continue the execution of some plan that was being run asynchronously, such as
67// `sequence_peek_stage` and `sequence_create_connection_stage_finish`.
68
69mod inner;
70
71impl Coordinator {
72    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would
73    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
74    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
75    pub(crate) fn sequence_plan(
76        &mut self,
77        mut ctx: ExecuteContext,
78        plan: Plan,
79        resolved_ids: ResolvedIds,
80    ) -> LocalBoxFuture<'_, ()> {
81        async move {
82            let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
83            ctx.tx_mut().set_allowed(responses);
84
85            if self.controller.read_only() && !plan.allowed_in_read_only() {
86                ctx.retire(Err(AdapterError::ReadOnly));
87                return;
88            }
89
90            // Check if we're still waiting for any of the builtin table appends from when we
91            // started the Session to complete.
92            if let Some((dependencies, wait_future)) =
93                super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
94            {
95                let conn_id = ctx.session().conn_id();
96                tracing::debug!(%conn_id, "deferring plan for startup appends");
97
98                let role_metadata = ctx.session().role_metadata().clone();
99                let validity = PlanValidity::new(
100                    self.catalog.transient_revision(),
101                    dependencies,
102                    None,
103                    None,
104                    role_metadata,
105                );
106                let deferred_plan = DeferredPlan {
107                    ctx,
108                    plan,
109                    validity,
110                    requires_locks: BTreeSet::default(),
111                };
112                // Defer op accepts an optional write lock, but there aren't any writes occurring
113                // here, since the map to `None`.
114                let acquire_future = wait_future.map(|()| None);
115
116                self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
117
118                // Return early because our op is deferred on waiting for the builtin writes to
119                // complete.
120                return;
121            };
122
123            // Scope the borrow of the Catalog because we need to mutate the Coordinator state below.
124            let target_cluster = match ctx.session().transaction().cluster() {
125                // Use the current transaction's cluster.
126                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
127                // If there isn't a current cluster set for a transaction, then try to auto route.
128                None => {
129                    let session_catalog = self.catalog.for_session(ctx.session());
130                    catalog_serving::auto_run_on_catalog_server(
131                        &session_catalog,
132                        ctx.session(),
133                        &plan,
134                    )
135                }
136            };
137            let (target_cluster_id, target_cluster_name) = match self
138                .catalog()
139                .resolve_target_cluster(target_cluster, ctx.session())
140            {
141                Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
142                Err(_) => (None, None),
143            };
144
145            if let (Some(cluster_id), Some(statement_id)) =
146                (target_cluster_id, ctx.extra().contents())
147            {
148                self.set_statement_execution_cluster(statement_id, cluster_id);
149            }
150
151            let session_catalog = self.catalog.for_session(ctx.session());
152
153            if let Some(cluster_name) = &target_cluster_name {
154                if let Err(e) = catalog_serving::check_cluster_restrictions(
155                    cluster_name,
156                    &session_catalog,
157                    &plan,
158                ) {
159                    return ctx.retire(Err(e));
160                }
161            }
162
163            if let Err(e) = rbac::check_plan(
164                &session_catalog,
165                |id| {
166                    // We use linear search through active connections if needed, which is fine
167                    // because the RBAC check will call the closure at most once.
168                    self.active_conns()
169                        .into_iter()
170                        .find(|(conn_id, _)| conn_id.unhandled() == id)
171                        .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
172                },
173                ctx.session(),
174                &plan,
175                target_cluster_id,
176                &resolved_ids,
177            ) {
178                return ctx.retire(Err(e.into()));
179            }
180
181            match plan {
182                Plan::CreateSource(plan) => {
183                    let id_ts = self.get_catalog_write_ts().await;
184                    let (item_id, global_id) =
185                        return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
186                    let result = self
187                        .sequence_create_source(
188                            &mut ctx,
189                            vec![CreateSourcePlanBundle {
190                                item_id,
191                                global_id,
192                                plan,
193                                resolved_ids,
194                                available_source_references: None,
195                            }],
196                        )
197                        .await;
198                    ctx.retire(result);
199                }
200                Plan::CreateSources(plans) => {
201                    assert!(
202                        resolved_ids.is_empty(),
203                        "each plan has separate resolved_ids"
204                    );
205                    let result = self.sequence_create_source(&mut ctx, plans).await;
206                    ctx.retire(result);
207                }
208                Plan::CreateConnection(plan) => {
209                    self.sequence_create_connection(ctx, plan, resolved_ids)
210                        .await;
211                }
212                Plan::CreateDatabase(plan) => {
213                    let result = self.sequence_create_database(ctx.session_mut(), plan).await;
214                    ctx.retire(result);
215                }
216                Plan::CreateSchema(plan) => {
217                    let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
218                    ctx.retire(result);
219                }
220                Plan::CreateRole(plan) => {
221                    let result = self
222                        .sequence_create_role(Some(ctx.session().conn_id()), plan)
223                        .await;
224                    if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
225                        ctx.session().add_notice(notice);
226                    }
227                    ctx.retire(result);
228                }
229                Plan::CreateCluster(plan) => {
230                    let result = self.sequence_create_cluster(ctx.session(), plan).await;
231                    ctx.retire(result);
232                }
233                Plan::CreateClusterReplica(plan) => {
234                    let result = self
235                        .sequence_create_cluster_replica(ctx.session(), plan)
236                        .await;
237                    ctx.retire(result);
238                }
239                Plan::CreateTable(plan) => {
240                    let result = self
241                        .sequence_create_table(&mut ctx, plan, resolved_ids)
242                        .await;
243                    ctx.retire(result);
244                }
245                Plan::CreateSecret(plan) => {
246                    self.sequence_create_secret(ctx, plan).await;
247                }
248                Plan::CreateSink(plan) => {
249                    self.sequence_create_sink(ctx, plan, resolved_ids).await;
250                }
251                Plan::CreateView(plan) => {
252                    self.sequence_create_view(ctx, plan, resolved_ids).await;
253                }
254                Plan::CreateMaterializedView(plan) => {
255                    self.sequence_create_materialized_view(ctx, plan, resolved_ids)
256                        .await;
257                }
258                Plan::CreateContinualTask(plan) => {
259                    let res = self
260                        .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
261                        .await;
262                    ctx.retire(res);
263                }
264                Plan::CreateIndex(plan) => {
265                    self.sequence_create_index(ctx, plan, resolved_ids).await;
266                }
267                Plan::CreateType(plan) => {
268                    let result = self
269                        .sequence_create_type(ctx.session(), plan, resolved_ids)
270                        .await;
271                    ctx.retire(result);
272                }
273                Plan::CreateNetworkPolicy(plan) => {
274                    let res = self
275                        .sequence_create_network_policy(ctx.session(), plan)
276                        .await;
277                    ctx.retire(res);
278                }
279                Plan::Comment(plan) => {
280                    let result = self.sequence_comment_on(ctx.session(), plan).await;
281                    ctx.retire(result);
282                }
283                Plan::CopyTo(plan) => {
284                    self.sequence_copy_to(ctx, plan, target_cluster).await;
285                }
286                Plan::DropObjects(plan) => {
287                    let result = self.sequence_drop_objects(ctx.session_mut(), plan).await;
288                    ctx.retire(result);
289                }
290                Plan::DropOwned(plan) => {
291                    let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
292                    ctx.retire(result);
293                }
294                Plan::EmptyQuery => {
295                    ctx.retire(Ok(ExecuteResponse::EmptyQuery));
296                }
297                Plan::ShowAllVariables => {
298                    let result = self.sequence_show_all_variables(ctx.session());
299                    ctx.retire(result);
300                }
301                Plan::ShowVariable(plan) => {
302                    let result = self.sequence_show_variable(ctx.session(), plan);
303                    ctx.retire(result);
304                }
305                Plan::InspectShard(plan) => {
306                    // TODO: Ideally, this await would happen off the main thread.
307                    let result = self.sequence_inspect_shard(ctx.session(), plan).await;
308                    ctx.retire(result);
309                }
310                Plan::SetVariable(plan) => {
311                    let result = self.sequence_set_variable(ctx.session_mut(), plan);
312                    ctx.retire(result);
313                }
314                Plan::ResetVariable(plan) => {
315                    let result = self.sequence_reset_variable(ctx.session_mut(), plan);
316                    ctx.retire(result);
317                }
318                Plan::SetTransaction(plan) => {
319                    let result = self.sequence_set_transaction(ctx.session_mut(), plan);
320                    ctx.retire(result);
321                }
322                Plan::StartTransaction(plan) => {
323                    if matches!(
324                        ctx.session().transaction(),
325                        TransactionStatus::InTransaction(_)
326                    ) {
327                        ctx.session()
328                            .add_notice(AdapterNotice::ExistingTransactionInProgress);
329                    }
330                    let result = ctx.session_mut().start_transaction(
331                        self.now_datetime(),
332                        plan.access,
333                        plan.isolation_level,
334                    );
335                    ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
336                }
337                Plan::CommitTransaction(CommitTransactionPlan {
338                    ref transaction_type,
339                })
340                | Plan::AbortTransaction(AbortTransactionPlan {
341                    ref transaction_type,
342                }) => {
343                    // Serialize DDL transactions. Statements that use this mode must return false
344                    // in `must_serialize_ddl()`.
345                    if ctx.session().transaction().is_ddl() {
346                        if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
347                            let prev = self
348                                .active_conns
349                                .get_mut(ctx.session().conn_id())
350                                .expect("connection must exist")
351                                .deferred_lock
352                                .replace(guard);
353                            assert!(
354                                prev.is_none(),
355                                "connections should have at most one lock guard"
356                            );
357                        } else {
358                            self.serialized_ddl.push_back(DeferredPlanStatement {
359                                ctx,
360                                ps: PlanStatement::Plan { plan, resolved_ids },
361                            });
362                            return;
363                        }
364                    }
365
366                    let action = match &plan {
367                        Plan::CommitTransaction(_) => EndTransactionAction::Commit,
368                        Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
369                        _ => unreachable!(),
370                    };
371                    if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
372                    {
373                        // In Postgres, if a user sends a COMMIT or ROLLBACK in an
374                        // implicit transaction, a warning is sent warning them.
375                        // (The transaction is still closed and a new implicit
376                        // transaction started, though.)
377                        ctx.session().add_notice(
378                            AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
379                        );
380                    }
381                    self.sequence_end_transaction(ctx, action).await;
382                }
383                Plan::Select(plan) => {
384                    let max = Some(ctx.session().vars().max_query_result_size());
385                    self.sequence_peek(ctx, plan, target_cluster, max).await;
386                }
387                Plan::Subscribe(plan) => {
388                    self.sequence_subscribe(ctx, plan, target_cluster).await;
389                }
390                Plan::SideEffectingFunc(plan) => {
391                    self.sequence_side_effecting_func(ctx, plan).await;
392                }
393                Plan::ShowCreate(plan) => {
394                    ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
395                }
396                Plan::ShowColumns(show_columns_plan) => {
397                    let max = Some(ctx.session().vars().max_query_result_size());
398                    self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
399                        .await;
400                }
401                Plan::CopyFrom(plan) => match plan.source {
402                    CopyFromSource::Stdin => {
403                        let (tx, _, session, ctx_extra) = ctx.into_parts();
404                        tx.send(
405                            Ok(ExecuteResponse::CopyFrom {
406                                id: plan.id,
407                                columns: plan.columns,
408                                params: plan.params,
409                                ctx_extra,
410                            }),
411                            session,
412                        );
413                    }
414                    CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
415                        self.sequence_copy_from(ctx, plan, target_cluster).await;
416                    }
417                },
418                Plan::ExplainPlan(plan) => {
419                    self.sequence_explain_plan(ctx, plan, target_cluster).await;
420                }
421                Plan::ExplainPushdown(plan) => {
422                    self.sequence_explain_pushdown(ctx, plan, target_cluster)
423                        .await;
424                }
425                Plan::ExplainSinkSchema(plan) => {
426                    let result = self.sequence_explain_schema(plan);
427                    ctx.retire(result);
428                }
429                Plan::ExplainTimestamp(plan) => {
430                    self.sequence_explain_timestamp(ctx, plan, target_cluster)
431                        .await;
432                }
433                Plan::Insert(plan) => {
434                    self.sequence_insert(ctx, plan).await;
435                }
436                Plan::ReadThenWrite(plan) => {
437                    self.sequence_read_then_write(ctx, plan).await;
438                }
439                Plan::AlterNoop(plan) => {
440                    ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
441                }
442                Plan::AlterCluster(plan) => {
443                    self.sequence_alter_cluster_staged(ctx, plan).await;
444                }
445                Plan::AlterClusterRename(plan) => {
446                    let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
447                    ctx.retire(result);
448                }
449                Plan::AlterClusterSwap(plan) => {
450                    let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
451                    ctx.retire(result);
452                }
453                Plan::AlterClusterReplicaRename(plan) => {
454                    let result = self
455                        .sequence_alter_cluster_replica_rename(ctx.session(), plan)
456                        .await;
457                    ctx.retire(result);
458                }
459                Plan::AlterConnection(plan) => {
460                    self.sequence_alter_connection(ctx, plan).await;
461                }
462                Plan::AlterSetCluster(plan) => {
463                    let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
464                    ctx.retire(result);
465                }
466                Plan::AlterRetainHistory(plan) => {
467                    let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
468                    ctx.retire(result);
469                }
470                Plan::AlterItemRename(plan) => {
471                    let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
472                    ctx.retire(result);
473                }
474                Plan::AlterSchemaRename(plan) => {
475                    let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
476                    ctx.retire(result);
477                }
478                Plan::AlterSchemaSwap(plan) => {
479                    let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
480                    ctx.retire(result);
481                }
482                Plan::AlterRole(plan) => {
483                    let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
484                    ctx.retire(result);
485                }
486                Plan::AlterSecret(plan) => {
487                    self.sequence_alter_secret(ctx, plan).await;
488                }
489                Plan::AlterSink(plan) => {
490                    self.sequence_alter_sink_prepare(ctx, plan).await;
491                }
492                Plan::AlterSource(plan) => {
493                    let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
494                    ctx.retire(result);
495                }
496                Plan::AlterSystemSet(plan) => {
497                    let result = self.sequence_alter_system_set(ctx.session(), plan).await;
498                    ctx.retire(result);
499                }
500                Plan::AlterSystemReset(plan) => {
501                    let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
502                    ctx.retire(result);
503                }
504                Plan::AlterSystemResetAll(plan) => {
505                    let result = self
506                        .sequence_alter_system_reset_all(ctx.session(), plan)
507                        .await;
508                    ctx.retire(result);
509                }
510                Plan::AlterTableAddColumn(plan) => {
511                    let result = self.sequence_alter_table(&mut ctx, plan).await;
512                    ctx.retire(result);
513                }
514                Plan::AlterNetworkPolicy(plan) => {
515                    let res = self
516                        .sequence_alter_network_policy(ctx.session(), plan)
517                        .await;
518                    ctx.retire(res);
519                }
520                Plan::DiscardTemp => {
521                    self.drop_temp_items(ctx.session().conn_id()).await;
522                    ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
523                }
524                Plan::DiscardAll => {
525                    let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
526                        self.clear_transaction(ctx.session_mut()).await;
527                        self.drop_temp_items(ctx.session().conn_id()).await;
528                        ctx.session_mut().reset();
529                        Ok(ExecuteResponse::DiscardedAll)
530                    } else {
531                        Err(AdapterError::OperationProhibitsTransaction(
532                            "DISCARD ALL".into(),
533                        ))
534                    };
535                    ctx.retire(ret);
536                }
537                Plan::Declare(plan) => {
538                    self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
539                }
540                Plan::Fetch(FetchPlan {
541                    name,
542                    count,
543                    timeout,
544                }) => {
545                    let ctx_extra = std::mem::take(ctx.extra_mut());
546                    ctx.retire(Ok(ExecuteResponse::Fetch {
547                        name,
548                        count,
549                        timeout,
550                        ctx_extra,
551                    }));
552                }
553                Plan::Close(plan) => {
554                    if ctx.session_mut().remove_portal(&plan.name) {
555                        ctx.retire(Ok(ExecuteResponse::ClosedCursor));
556                    } else {
557                        ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
558                    }
559                }
560                Plan::Prepare(plan) => {
561                    if ctx
562                        .session()
563                        .get_prepared_statement_unverified(&plan.name)
564                        .is_some()
565                    {
566                        ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
567                    } else {
568                        let state_revision = StateRevision {
569                            catalog_revision: self.catalog().transient_revision(),
570                            session_state_revision: ctx.session().state_revision(),
571                        };
572                        ctx.session_mut().set_prepared_statement(
573                            plan.name,
574                            Some(plan.stmt),
575                            plan.sql,
576                            plan.desc,
577                            state_revision,
578                            self.now(),
579                        );
580                        ctx.retire(Ok(ExecuteResponse::Prepare));
581                    }
582                }
583                Plan::Execute(plan) => {
584                    match self.sequence_execute(ctx.session_mut(), plan) {
585                        Ok(portal_name) => {
586                            let (tx, _, session, extra) = ctx.into_parts();
587                            self.internal_cmd_tx
588                                .send(Message::Command(
589                                    OpenTelemetryContext::obtain(),
590                                    Command::Execute {
591                                        portal_name,
592                                        session,
593                                        tx: tx.take(),
594                                        outer_ctx_extra: Some(extra),
595                                    },
596                                ))
597                                .expect("sending to self.internal_cmd_tx cannot fail");
598                        }
599                        Err(err) => ctx.retire(Err(err)),
600                    };
601                }
602                Plan::Deallocate(plan) => match plan.name {
603                    Some(name) => {
604                        if ctx.session_mut().remove_prepared_statement(&name) {
605                            ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
606                        } else {
607                            ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
608                        }
609                    }
610                    None => {
611                        ctx.session_mut().remove_all_prepared_statements();
612                        ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
613                    }
614                },
615                Plan::Raise(RaisePlan { severity }) => {
616                    ctx.session()
617                        .add_notice(AdapterNotice::UserRequested { severity });
618                    ctx.retire(Ok(ExecuteResponse::Raised));
619                }
620                Plan::GrantPrivileges(plan) => {
621                    let result = self
622                        .sequence_grant_privileges(ctx.session_mut(), plan)
623                        .await;
624                    ctx.retire(result);
625                }
626                Plan::RevokePrivileges(plan) => {
627                    let result = self
628                        .sequence_revoke_privileges(ctx.session_mut(), plan)
629                        .await;
630                    ctx.retire(result);
631                }
632                Plan::AlterDefaultPrivileges(plan) => {
633                    let result = self
634                        .sequence_alter_default_privileges(ctx.session_mut(), plan)
635                        .await;
636                    ctx.retire(result);
637                }
638                Plan::GrantRole(plan) => {
639                    let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
640                    ctx.retire(result);
641                }
642                Plan::RevokeRole(plan) => {
643                    let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
644                    ctx.retire(result);
645                }
646                Plan::AlterOwner(plan) => {
647                    let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
648                    ctx.retire(result);
649                }
650                Plan::ReassignOwned(plan) => {
651                    let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
652                    ctx.retire(result);
653                }
654                Plan::ValidateConnection(plan) => {
655                    let connection = plan
656                        .connection
657                        .into_inline_connection(self.catalog().state());
658                    let current_storage_configuration = self.controller.storage.config().clone();
659                    mz_ore::task::spawn(|| "coord::validate_connection", async move {
660                        let res = match connection
661                            .validate(plan.id, &current_storage_configuration)
662                            .await
663                        {
664                            Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
665                            Err(err) => Err(err.into()),
666                        };
667                        ctx.retire(res);
668                    });
669                }
670            }
671        }
672        .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
673        .boxed_local()
674    }
675
676    #[mz_ore::instrument(level = "debug")]
677    pub(crate) async fn sequence_execute_single_statement_transaction(
678        &mut self,
679        ctx: ExecuteContext,
680        stmt: Arc<Statement<Raw>>,
681        params: Params,
682    ) {
683        // Put the session into single statement implicit so anything can execute.
684        let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
685        assert!(matches!(session.transaction(), TransactionStatus::Default));
686        session.start_transaction_single_stmt(self.now_datetime());
687        let conn_id = session.conn_id().unhandled();
688
689        // Execute the saved statement in a temp transmitter so we can run COMMIT.
690        let (sub_tx, sub_rx) = oneshot::channel();
691        let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
692        let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
693        self.handle_execute_inner(stmt, params, sub_ctx).await;
694
695        // The response can need off-thread processing. Wait for it elsewhere so the coordinator can
696        // continue processing.
697        let internal_cmd_tx = self.internal_cmd_tx.clone();
698        mz_ore::task::spawn(
699            || format!("execute_single_statement:{conn_id}"),
700            async move {
701                let Ok(Response {
702                    result,
703                    session,
704                    otel_ctx,
705                }) = sub_rx.await
706                else {
707                    // Coordinator went away.
708                    return;
709                };
710                otel_ctx.attach_as_parent();
711                let (sub_tx, sub_rx) = oneshot::channel();
712                let _ = internal_cmd_tx.send(Message::Command(
713                    otel_ctx,
714                    Command::Commit {
715                        action: EndTransactionAction::Commit,
716                        session,
717                        tx: sub_tx,
718                    },
719                ));
720                let Ok(commit_response) = sub_rx.await else {
721                    // Coordinator went away.
722                    return;
723                };
724                assert!(matches!(
725                    commit_response.session.transaction(),
726                    TransactionStatus::Default
727                ));
728                // The fake, generated response was already sent to the user and we don't need to
729                // ever send an `Ok(result)` to the user, because they are expecting a response from
730                // a `COMMIT`. So, always send the `COMMIT`'s result if the original statement
731                // succeeded. If it failed, we can send an error and don't need to wrap it or send a
732                // later COMMIT or ROLLBACK.
733                let result = match (result, commit_response.result) {
734                    (Ok(_), commit) => commit,
735                    (Err(result), _) => Err(result),
736                };
737                // We ignore the resp.result because it's not clear what to do if it failed since we
738                // can only send a single ExecuteResponse to tx.
739                tx.send(result, commit_response.session);
740            }
741            .instrument(Span::current()),
742        );
743    }
744
745    /// Creates a role during connection startup.
746    ///
747    /// This should not be called from anywhere except connection startup.
748    #[mz_ore::instrument(level = "debug")]
749    pub(crate) async fn sequence_create_role_for_startup(
750        &mut self,
751        plan: CreateRolePlan,
752    ) -> Result<ExecuteResponse, AdapterError> {
753        // This does not set conn_id because it's not yet in active_conns. That is because we can't
754        // make a ConnMeta until we have a role id which we don't have until after the catalog txn
755        // is committed. Passing None here means the audit log won't have a user set in the event's
756        // user field. This seems fine because it is indeed the system that is creating this role,
757        // not a user request, and the user name is still recorded in the plan, so we aren't losing
758        // information.
759        self.sequence_create_role(None, plan).await
760    }
761
762    pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
763        self.transient_id_gen.allocate_id()
764    }
765
766    fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
767        if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
768            Some(AdapterNotice::RbacUserDisabled)
769        } else {
770            None
771        }
772    }
773
774    /// Inserts the rows from `constants` into the table identified by `id`.
775    ///
776    /// # Panics
777    ///
778    /// Panics if `constants` is not an `MirRelationExpr::Constant`.
779    pub(crate) fn insert_constant(
780        catalog: &Catalog,
781        session: &mut Session,
782        id: CatalogItemId,
783        constants: MirRelationExpr,
784    ) -> Result<ExecuteResponse, AdapterError> {
785        // Insert can be queued, so we need to re-verify the id exists.
786        let desc = match catalog.try_get_entry(&id) {
787            Some(table) => {
788                let full_name = catalog.resolve_full_name(table.name(), Some(session.conn_id()));
789                // Inserts always happen at the latest version of a table.
790                table.desc_latest(&full_name)?
791            }
792            None => {
793                return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
794                    kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
795                        id.to_string(),
796                    )),
797                }));
798            }
799        };
800
801        match constants.as_const() {
802            Some((rows, ..)) => {
803                let rows = rows.clone()?;
804                for (row, _) in &rows {
805                    for (i, datum) in row.iter().enumerate() {
806                        desc.constraints_met(i, &datum)?;
807                    }
808                }
809                let diffs_plan = plan::SendDiffsPlan {
810                    id,
811                    updates: rows,
812                    kind: MutationKind::Insert,
813                    returning: Vec::new(),
814                    max_result_size: catalog.system_config().max_result_size(),
815                };
816                Self::send_diffs(session, diffs_plan)
817            }
818            None => panic!(
819                "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
820                constants.pretty(),
821            ),
822        }
823    }
824
825    #[mz_ore::instrument(level = "debug")]
826    pub(crate) fn send_diffs(
827        session: &mut Session,
828        mut plan: plan::SendDiffsPlan,
829    ) -> Result<ExecuteResponse, AdapterError> {
830        let affected_rows = {
831            let mut affected_rows = Diff::from(0);
832            let mut all_positive_diffs = true;
833            // If all diffs are positive, the number of affected rows is just the
834            // sum of all unconsolidated diffs.
835            for (_, diff) in plan.updates.iter() {
836                if diff.is_negative() {
837                    all_positive_diffs = false;
838                    break;
839                }
840
841                affected_rows += diff;
842            }
843
844            if !all_positive_diffs {
845                // Consolidate rows. This is useful e.g. for an UPDATE where the row
846                // doesn't change, and we need to reflect that in the number of
847                // affected rows.
848                differential_dataflow::consolidation::consolidate(&mut plan.updates);
849
850                affected_rows = Diff::ZERO;
851                // With retractions, the number of affected rows is not the number
852                // of rows we see, but the sum of the absolute value of their diffs,
853                // e.g. if one row is retracted and another is added, the total
854                // number of rows affected is 2.
855                for (_, diff) in plan.updates.iter() {
856                    affected_rows += diff.abs();
857                }
858            }
859
860            usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
861        };
862        event!(
863            Level::TRACE,
864            affected_rows,
865            id = format!("{:?}", plan.id),
866            kind = format!("{:?}", plan.kind),
867            updates = plan.updates.len(),
868            returning = plan.returning.len(),
869        );
870
871        session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
872            id: plan.id,
873            rows: TableData::Rows(plan.updates),
874        }]))?;
875        if !plan.returning.is_empty() {
876            let finishing = RowSetFinishing {
877                order_by: Vec::new(),
878                limit: None,
879                offset: 0,
880                project: (0..plan.returning[0].0.iter().count()).collect(),
881            };
882            let max_returned_query_size = session.vars().max_query_result_size();
883            let duration_histogram = session.metrics().row_set_finishing_seconds();
884
885            return match finishing.finish(
886                RowCollection::new(plan.returning, &finishing.order_by),
887                plan.max_result_size,
888                Some(max_returned_query_size),
889                duration_histogram,
890            ) {
891                Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
892                Err(e) => Err(AdapterError::ResultSize(e)),
893            };
894        }
895        Ok(match plan.kind {
896            MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
897            MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
898            MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
899        })
900    }
901}