mz_adapter/coord/
sequencer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Prevents anyone from accidentally exporting a method from the `inner` module.
11#![allow(clippy::pub_use)]
12
13//! Logic for executing a planned SQL query.
14
15use futures::FutureExt;
16use futures::future::LocalBoxFuture;
17use inner::return_if_err;
18use mz_expr::row::RowCollection;
19use mz_expr::{MirRelationExpr, RowSetFinishing};
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_repr::{CatalogItemId, Diff, GlobalId};
22use mz_sql::catalog::CatalogError;
23use mz_sql::names::ResolvedIds;
24use mz_sql::plan::{
25    self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
26    CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
27};
28use mz_sql::rbac;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast::{Raw, Statement};
31use mz_storage_client::client::TableData;
32use mz_storage_types::connections::inline::IntoInlineConnection;
33use std::collections::BTreeSet;
34use std::sync::Arc;
35use tokio::sync::oneshot;
36use tracing::{Instrument, Level, Span, event};
37
38use crate::ExecuteContext;
39use crate::catalog::Catalog;
40use crate::command::{Command, ExecuteResponse, Response};
41use crate::coord::appends::{DeferredOp, DeferredPlan};
42use crate::coord::validity::PlanValidity;
43use crate::coord::{
44    Coordinator, DeferredPlanStatement, Message, PlanStatement, TargetCluster, catalog_serving,
45};
46use crate::error::AdapterError;
47use crate::notice::AdapterNotice;
48use crate::session::{EndTransactionAction, Session, TransactionOps, TransactionStatus, WriteOp};
49use crate::util::ClientTransmitter;
50
51// DO NOT make this visible in any way, i.e. do not add any version of
52// `pub` to this mod. The inner `sequence_X` methods are hidden in this
53// private module to prevent anyone from calling them directly. All
54// sequencing should be done through the `sequence_plan` method.
55// This allows us to add catch-all logic that should be applied to all
56// plans in `sequence_plan` and guarantee that no caller can circumvent
57// that logic.
58//
59// The two exceptions are:
60//
61// - Creating a role during connection startup. In this scenario, the session has not been properly
62// initialized and we need to skip directly to creating role. We have a specific method,
63// `sequence_create_role_for_startup` for this purpose.
64// - Methods that continue the execution of some plan that was being run asynchronously, such as
65// `sequence_peek_stage` and `sequence_create_connection_stage_finish`.
66
67mod inner;
68
69impl Coordinator {
70    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would
71    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
72    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
73    pub(crate) fn sequence_plan(
74        &mut self,
75        mut ctx: ExecuteContext,
76        plan: Plan,
77        resolved_ids: ResolvedIds,
78    ) -> LocalBoxFuture<'_, ()> {
79        async move {
80            let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
81            ctx.tx_mut().set_allowed(responses);
82
83            if self.controller.read_only() && !plan.allowed_in_read_only() {
84                ctx.retire(Err(AdapterError::ReadOnly));
85                return;
86            }
87
88            // Check if we're still waiting for any of the builtin table appends from when we
89            // started the Session to complete.
90            if let Some((dependencies, wait_future)) =
91                super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
92            {
93                let conn_id = ctx.session().conn_id();
94                tracing::debug!(%conn_id, "deferring plan for startup appends");
95
96                let role_metadata = ctx.session().role_metadata().clone();
97                let validity = PlanValidity::new(
98                    self.catalog.transient_revision(),
99                    dependencies,
100                    None,
101                    None,
102                    role_metadata,
103                );
104                let deferred_plan = DeferredPlan {
105                    ctx,
106                    plan,
107                    validity,
108                    requires_locks: BTreeSet::default(),
109                };
110                // Defer op accepts an optional write lock, but there aren't any writes occurring
111                // here, since the map to `None`.
112                let acquire_future = wait_future.map(|()| None);
113
114                self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
115
116                // Return early because our op is deferred on waiting for the builtin writes to
117                // complete.
118                return;
119            };
120
121            // Scope the borrow of the Catalog because we need to mutate the Coordinator state below.
122            let target_cluster = match ctx.session().transaction().cluster() {
123                // Use the current transaction's cluster.
124                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
125                // If there isn't a current cluster set for a transaction, then try to auto route.
126                None => {
127                    let session_catalog = self.catalog.for_session(ctx.session());
128                    catalog_serving::auto_run_on_catalog_server(
129                        &session_catalog,
130                        ctx.session(),
131                        &plan,
132                    )
133                }
134            };
135            let (target_cluster_id, target_cluster_name) = match self
136                .catalog()
137                .resolve_target_cluster(target_cluster, ctx.session())
138            {
139                Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
140                Err(_) => (None, None),
141            };
142
143            if let (Some(cluster_id), Some(statement_id)) =
144                (target_cluster_id, ctx.extra().contents())
145            {
146                self.set_statement_execution_cluster(statement_id, cluster_id);
147            }
148
149            let session_catalog = self.catalog.for_session(ctx.session());
150
151            if let Some(cluster_name) = &target_cluster_name {
152                if let Err(e) = catalog_serving::check_cluster_restrictions(
153                    cluster_name,
154                    &session_catalog,
155                    &plan,
156                ) {
157                    return ctx.retire(Err(e));
158                }
159            }
160
161            if let Err(e) = rbac::check_plan(
162                &session_catalog,
163                |id| {
164                    // We use linear search through active connections if needed, which is fine
165                    // because the RBAC check will call the closure at most once.
166                    self.active_conns()
167                        .into_iter()
168                        .find(|(conn_id, _)| conn_id.unhandled() == id)
169                        .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
170                },
171                ctx.session(),
172                &plan,
173                target_cluster_id,
174                &resolved_ids,
175            ) {
176                return ctx.retire(Err(e.into()));
177            }
178
179            match plan {
180                Plan::CreateSource(plan) => {
181                    let id_ts = self.get_catalog_write_ts().await;
182                    let (item_id, global_id) =
183                        return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
184                    let result = self
185                        .sequence_create_source(
186                            ctx.session_mut(),
187                            vec![CreateSourcePlanBundle {
188                                item_id,
189                                global_id,
190                                plan,
191                                resolved_ids,
192                                available_source_references: None,
193                            }],
194                        )
195                        .await;
196                    ctx.retire(result);
197                }
198                Plan::CreateSources(plans) => {
199                    assert!(
200                        resolved_ids.is_empty(),
201                        "each plan has separate resolved_ids"
202                    );
203                    let result = self.sequence_create_source(ctx.session_mut(), plans).await;
204                    ctx.retire(result);
205                }
206                Plan::CreateConnection(plan) => {
207                    self.sequence_create_connection(ctx, plan, resolved_ids)
208                        .await;
209                }
210                Plan::CreateDatabase(plan) => {
211                    let result = self.sequence_create_database(ctx.session_mut(), plan).await;
212                    ctx.retire(result);
213                }
214                Plan::CreateSchema(plan) => {
215                    let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
216                    ctx.retire(result);
217                }
218                Plan::CreateRole(plan) => {
219                    let result = self
220                        .sequence_create_role(Some(ctx.session().conn_id()), plan)
221                        .await;
222                    if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
223                        ctx.session().add_notice(notice);
224                    }
225                    ctx.retire(result);
226                }
227                Plan::CreateCluster(plan) => {
228                    let result = self.sequence_create_cluster(ctx.session(), plan).await;
229                    ctx.retire(result);
230                }
231                Plan::CreateClusterReplica(plan) => {
232                    let result = self
233                        .sequence_create_cluster_replica(ctx.session(), plan)
234                        .await;
235                    ctx.retire(result);
236                }
237                Plan::CreateTable(plan) => {
238                    let result = self
239                        .sequence_create_table(&mut ctx, plan, resolved_ids)
240                        .await;
241                    ctx.retire(result);
242                }
243                Plan::CreateSecret(plan) => {
244                    self.sequence_create_secret(ctx, plan).await;
245                }
246                Plan::CreateSink(plan) => {
247                    self.sequence_create_sink(ctx, plan, resolved_ids).await;
248                }
249                Plan::CreateView(plan) => {
250                    self.sequence_create_view(ctx, plan, resolved_ids).await;
251                }
252                Plan::CreateMaterializedView(plan) => {
253                    self.sequence_create_materialized_view(ctx, plan, resolved_ids)
254                        .await;
255                }
256                Plan::CreateContinualTask(plan) => {
257                    let res = self
258                        .sequence_create_continual_task(ctx.session(), plan, resolved_ids)
259                        .await;
260                    ctx.retire(res);
261                }
262                Plan::CreateIndex(plan) => {
263                    self.sequence_create_index(ctx, plan, resolved_ids).await;
264                }
265                Plan::CreateType(plan) => {
266                    let result = self
267                        .sequence_create_type(ctx.session(), plan, resolved_ids)
268                        .await;
269                    ctx.retire(result);
270                }
271                Plan::CreateNetworkPolicy(plan) => {
272                    let res = self
273                        .sequence_create_network_policy(ctx.session(), plan)
274                        .await;
275                    ctx.retire(res);
276                }
277                Plan::Comment(plan) => {
278                    let result = self.sequence_comment_on(ctx.session(), plan).await;
279                    ctx.retire(result);
280                }
281                Plan::CopyTo(plan) => {
282                    self.sequence_copy_to(ctx, plan, target_cluster).await;
283                }
284                Plan::DropObjects(plan) => {
285                    let result = self.sequence_drop_objects(ctx.session_mut(), plan).await;
286                    ctx.retire(result);
287                }
288                Plan::DropOwned(plan) => {
289                    let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
290                    ctx.retire(result);
291                }
292                Plan::EmptyQuery => {
293                    ctx.retire(Ok(ExecuteResponse::EmptyQuery));
294                }
295                Plan::ShowAllVariables => {
296                    let result = self.sequence_show_all_variables(ctx.session());
297                    ctx.retire(result);
298                }
299                Plan::ShowVariable(plan) => {
300                    let result = self.sequence_show_variable(ctx.session(), plan);
301                    ctx.retire(result);
302                }
303                Plan::InspectShard(plan) => {
304                    // TODO: Ideally, this await would happen off the main thread.
305                    let result = self.sequence_inspect_shard(ctx.session(), plan).await;
306                    ctx.retire(result);
307                }
308                Plan::SetVariable(plan) => {
309                    let result = self.sequence_set_variable(ctx.session_mut(), plan);
310                    ctx.retire(result);
311                }
312                Plan::ResetVariable(plan) => {
313                    let result = self.sequence_reset_variable(ctx.session_mut(), plan);
314                    ctx.retire(result);
315                }
316                Plan::SetTransaction(plan) => {
317                    let result = self.sequence_set_transaction(ctx.session_mut(), plan);
318                    ctx.retire(result);
319                }
320                Plan::StartTransaction(plan) => {
321                    if matches!(
322                        ctx.session().transaction(),
323                        TransactionStatus::InTransaction(_)
324                    ) {
325                        ctx.session()
326                            .add_notice(AdapterNotice::ExistingTransactionInProgress);
327                    }
328                    let result = ctx.session_mut().start_transaction(
329                        self.now_datetime(),
330                        plan.access,
331                        plan.isolation_level,
332                    );
333                    ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
334                }
335                Plan::CommitTransaction(CommitTransactionPlan {
336                    ref transaction_type,
337                })
338                | Plan::AbortTransaction(AbortTransactionPlan {
339                    ref transaction_type,
340                }) => {
341                    // Serialize DDL transactions. Statements that use this mode must return false
342                    // in `must_serialize_ddl()`.
343                    if ctx.session().transaction().is_ddl() {
344                        if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
345                            let prev = self
346                                .active_conns
347                                .get_mut(ctx.session().conn_id())
348                                .expect("connection must exist")
349                                .deferred_lock
350                                .replace(guard);
351                            assert!(
352                                prev.is_none(),
353                                "connections should have at most one lock guard"
354                            );
355                        } else {
356                            self.serialized_ddl.push_back(DeferredPlanStatement {
357                                ctx,
358                                ps: PlanStatement::Plan { plan, resolved_ids },
359                            });
360                            return;
361                        }
362                    }
363
364                    let action = match &plan {
365                        Plan::CommitTransaction(_) => EndTransactionAction::Commit,
366                        Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
367                        _ => unreachable!(),
368                    };
369                    if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
370                    {
371                        // In Postgres, if a user sends a COMMIT or ROLLBACK in an
372                        // implicit transaction, a warning is sent warning them.
373                        // (The transaction is still closed and a new implicit
374                        // transaction started, though.)
375                        ctx.session().add_notice(
376                            AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
377                        );
378                    }
379                    self.sequence_end_transaction(ctx, action).await;
380                }
381                Plan::Select(plan) => {
382                    let max = Some(ctx.session().vars().max_query_result_size());
383                    self.sequence_peek(ctx, plan, target_cluster, max).await;
384                }
385                Plan::Subscribe(plan) => {
386                    self.sequence_subscribe(ctx, plan, target_cluster).await;
387                }
388                Plan::SideEffectingFunc(plan) => {
389                    self.sequence_side_effecting_func(ctx, plan).await;
390                }
391                Plan::ShowCreate(plan) => {
392                    ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
393                }
394                Plan::ShowColumns(show_columns_plan) => {
395                    let max = Some(ctx.session().vars().max_query_result_size());
396                    self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
397                        .await;
398                }
399                Plan::CopyFrom(plan) => match plan.source {
400                    CopyFromSource::Stdin => {
401                        let (tx, _, session, ctx_extra) = ctx.into_parts();
402                        tx.send(
403                            Ok(ExecuteResponse::CopyFrom {
404                                id: plan.id,
405                                columns: plan.columns,
406                                params: plan.params,
407                                ctx_extra,
408                            }),
409                            session,
410                        );
411                    }
412                    CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
413                        self.sequence_copy_from(ctx, plan, target_cluster).await;
414                    }
415                },
416                Plan::ExplainPlan(plan) => {
417                    self.sequence_explain_plan(ctx, plan, target_cluster).await;
418                }
419                Plan::ExplainPushdown(plan) => {
420                    self.sequence_explain_pushdown(ctx, plan, target_cluster)
421                        .await;
422                }
423                Plan::ExplainSinkSchema(plan) => {
424                    let result = self.sequence_explain_schema(plan);
425                    ctx.retire(result);
426                }
427                Plan::ExplainTimestamp(plan) => {
428                    self.sequence_explain_timestamp(ctx, plan, target_cluster)
429                        .await;
430                }
431                Plan::Insert(plan) => {
432                    self.sequence_insert(ctx, plan).await;
433                }
434                Plan::ReadThenWrite(plan) => {
435                    self.sequence_read_then_write(ctx, plan).await;
436                }
437                Plan::AlterNoop(plan) => {
438                    ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
439                }
440                Plan::AlterCluster(plan) => {
441                    self.sequence_alter_cluster_staged(ctx, plan).await;
442                }
443                Plan::AlterClusterRename(plan) => {
444                    let result = self
445                        .sequence_alter_cluster_rename(ctx.session_mut(), plan)
446                        .await;
447                    ctx.retire(result);
448                }
449                Plan::AlterClusterSwap(plan) => {
450                    let result = self
451                        .sequence_alter_cluster_swap(ctx.session_mut(), plan)
452                        .await;
453                    ctx.retire(result);
454                }
455                Plan::AlterClusterReplicaRename(plan) => {
456                    let result = self
457                        .sequence_alter_cluster_replica_rename(ctx.session(), plan)
458                        .await;
459                    ctx.retire(result);
460                }
461                Plan::AlterConnection(plan) => {
462                    self.sequence_alter_connection(ctx, plan).await;
463                }
464                Plan::AlterSetCluster(plan) => {
465                    let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
466                    ctx.retire(result);
467                }
468                Plan::AlterRetainHistory(plan) => {
469                    let result = self
470                        .sequence_alter_retain_history(ctx.session_mut(), plan)
471                        .await;
472                    ctx.retire(result);
473                }
474                Plan::AlterItemRename(plan) => {
475                    let result = self
476                        .sequence_alter_item_rename(ctx.session_mut(), plan)
477                        .await;
478                    ctx.retire(result);
479                }
480                Plan::AlterSchemaRename(plan) => {
481                    let result = self
482                        .sequence_alter_schema_rename(ctx.session_mut(), plan)
483                        .await;
484                    ctx.retire(result);
485                }
486                Plan::AlterSchemaSwap(plan) => {
487                    let result = self
488                        .sequence_alter_schema_swap(ctx.session_mut(), plan)
489                        .await;
490                    ctx.retire(result);
491                }
492                Plan::AlterRole(plan) => {
493                    let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
494                    ctx.retire(result);
495                }
496                Plan::AlterSecret(plan) => {
497                    self.sequence_alter_secret(ctx, plan).await;
498                }
499                Plan::AlterSink(plan) => {
500                    self.sequence_alter_sink_prepare(ctx, plan).await;
501                }
502                Plan::AlterSource(plan) => {
503                    let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
504                    ctx.retire(result);
505                }
506                Plan::AlterSystemSet(plan) => {
507                    let result = self.sequence_alter_system_set(ctx.session(), plan).await;
508                    ctx.retire(result);
509                }
510                Plan::AlterSystemReset(plan) => {
511                    let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
512                    ctx.retire(result);
513                }
514                Plan::AlterSystemResetAll(plan) => {
515                    let result = self
516                        .sequence_alter_system_reset_all(ctx.session(), plan)
517                        .await;
518                    ctx.retire(result);
519                }
520                Plan::AlterTableAddColumn(plan) => {
521                    let result = self.sequence_alter_table(ctx.session(), plan).await;
522                    ctx.retire(result);
523                }
524                Plan::AlterNetworkPolicy(plan) => {
525                    let res = self
526                        .sequence_alter_network_policy(ctx.session(), plan)
527                        .await;
528                    ctx.retire(res);
529                }
530                Plan::DiscardTemp => {
531                    self.drop_temp_items(ctx.session().conn_id()).await;
532                    ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
533                }
534                Plan::DiscardAll => {
535                    let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
536                        self.clear_transaction(ctx.session_mut()).await;
537                        self.drop_temp_items(ctx.session().conn_id()).await;
538                        ctx.session_mut().reset();
539                        Ok(ExecuteResponse::DiscardedAll)
540                    } else {
541                        Err(AdapterError::OperationProhibitsTransaction(
542                            "DISCARD ALL".into(),
543                        ))
544                    };
545                    ctx.retire(ret);
546                }
547                Plan::Declare(plan) => {
548                    self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
549                }
550                Plan::Fetch(FetchPlan {
551                    name,
552                    count,
553                    timeout,
554                }) => {
555                    let ctx_extra = std::mem::take(ctx.extra_mut());
556                    ctx.retire(Ok(ExecuteResponse::Fetch {
557                        name,
558                        count,
559                        timeout,
560                        ctx_extra,
561                    }));
562                }
563                Plan::Close(plan) => {
564                    if ctx.session_mut().remove_portal(&plan.name) {
565                        ctx.retire(Ok(ExecuteResponse::ClosedCursor));
566                    } else {
567                        ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
568                    }
569                }
570                Plan::Prepare(plan) => {
571                    if ctx
572                        .session()
573                        .get_prepared_statement_unverified(&plan.name)
574                        .is_some()
575                    {
576                        ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
577                    } else {
578                        ctx.session_mut().set_prepared_statement(
579                            plan.name,
580                            Some(plan.stmt),
581                            plan.sql,
582                            plan.desc,
583                            self.catalog().transient_revision(),
584                            self.now(),
585                        );
586                        ctx.retire(Ok(ExecuteResponse::Prepare));
587                    }
588                }
589                Plan::Execute(plan) => {
590                    match self.sequence_execute(ctx.session_mut(), plan) {
591                        Ok(portal_name) => {
592                            let (tx, _, session, extra) = ctx.into_parts();
593                            self.internal_cmd_tx
594                                .send(Message::Command(
595                                    OpenTelemetryContext::obtain(),
596                                    Command::Execute {
597                                        portal_name,
598                                        session,
599                                        tx: tx.take(),
600                                        outer_ctx_extra: Some(extra),
601                                    },
602                                ))
603                                .expect("sending to self.internal_cmd_tx cannot fail");
604                        }
605                        Err(err) => ctx.retire(Err(err)),
606                    };
607                }
608                Plan::Deallocate(plan) => match plan.name {
609                    Some(name) => {
610                        if ctx.session_mut().remove_prepared_statement(&name) {
611                            ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
612                        } else {
613                            ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
614                        }
615                    }
616                    None => {
617                        ctx.session_mut().remove_all_prepared_statements();
618                        ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
619                    }
620                },
621                Plan::Raise(RaisePlan { severity }) => {
622                    ctx.session()
623                        .add_notice(AdapterNotice::UserRequested { severity });
624                    ctx.retire(Ok(ExecuteResponse::Raised));
625                }
626                Plan::GrantPrivileges(plan) => {
627                    let result = self
628                        .sequence_grant_privileges(ctx.session_mut(), plan)
629                        .await;
630                    ctx.retire(result);
631                }
632                Plan::RevokePrivileges(plan) => {
633                    let result = self
634                        .sequence_revoke_privileges(ctx.session_mut(), plan)
635                        .await;
636                    ctx.retire(result);
637                }
638                Plan::AlterDefaultPrivileges(plan) => {
639                    let result = self
640                        .sequence_alter_default_privileges(ctx.session_mut(), plan)
641                        .await;
642                    ctx.retire(result);
643                }
644                Plan::GrantRole(plan) => {
645                    let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
646                    ctx.retire(result);
647                }
648                Plan::RevokeRole(plan) => {
649                    let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
650                    ctx.retire(result);
651                }
652                Plan::AlterOwner(plan) => {
653                    let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
654                    ctx.retire(result);
655                }
656                Plan::ReassignOwned(plan) => {
657                    let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
658                    ctx.retire(result);
659                }
660                Plan::ValidateConnection(plan) => {
661                    let connection = plan
662                        .connection
663                        .into_inline_connection(self.catalog().state());
664                    let current_storage_configuration = self.controller.storage.config().clone();
665                    mz_ore::task::spawn(|| "coord::validate_connection", async move {
666                        let res = match connection
667                            .validate(plan.id, &current_storage_configuration)
668                            .await
669                        {
670                            Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
671                            Err(err) => Err(err.into()),
672                        };
673                        ctx.retire(res);
674                    });
675                }
676            }
677        }
678        .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
679        .boxed_local()
680    }
681
682    #[mz_ore::instrument(level = "debug")]
683    pub(crate) async fn sequence_execute_single_statement_transaction(
684        &mut self,
685        ctx: ExecuteContext,
686        stmt: Arc<Statement<Raw>>,
687        params: Params,
688    ) {
689        // Put the session into single statement implicit so anything can execute.
690        let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
691        assert!(matches!(session.transaction(), TransactionStatus::Default));
692        session.start_transaction_single_stmt(self.now_datetime());
693        let conn_id = session.conn_id().unhandled();
694
695        // Execute the saved statement in a temp transmitter so we can run COMMIT.
696        let (sub_tx, sub_rx) = oneshot::channel();
697        let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
698        let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
699        self.handle_execute_inner(stmt, params, sub_ctx).await;
700
701        // The response can need off-thread processing. Wait for it elsewhere so the coordinator can
702        // continue processing.
703        let internal_cmd_tx = self.internal_cmd_tx.clone();
704        mz_ore::task::spawn(
705            || format!("execute_single_statement:{conn_id}"),
706            async move {
707                let Ok(Response {
708                    result,
709                    session,
710                    otel_ctx,
711                }) = sub_rx.await
712                else {
713                    // Coordinator went away.
714                    return;
715                };
716                otel_ctx.attach_as_parent();
717                let (sub_tx, sub_rx) = oneshot::channel();
718                let _ = internal_cmd_tx.send(Message::Command(
719                    otel_ctx,
720                    Command::Commit {
721                        action: EndTransactionAction::Commit,
722                        session,
723                        tx: sub_tx,
724                    },
725                ));
726                let Ok(commit_response) = sub_rx.await else {
727                    // Coordinator went away.
728                    return;
729                };
730                assert!(matches!(
731                    commit_response.session.transaction(),
732                    TransactionStatus::Default
733                ));
734                // The fake, generated response was already sent to the user and we don't need to
735                // ever send an `Ok(result)` to the user, because they are expecting a response from
736                // a `COMMIT`. So, always send the `COMMIT`'s result if the original statement
737                // succeeded. If it failed, we can send an error and don't need to wrap it or send a
738                // later COMMIT or ROLLBACK.
739                let result = match (result, commit_response.result) {
740                    (Ok(_), commit) => commit,
741                    (Err(result), _) => Err(result),
742                };
743                // We ignore the resp.result because it's not clear what to do if it failed since we
744                // can only send a single ExecuteResponse to tx.
745                tx.send(result, commit_response.session);
746            }
747            .instrument(Span::current()),
748        );
749    }
750
751    /// Creates a role during connection startup.
752    ///
753    /// This should not be called from anywhere except connection startup.
754    #[mz_ore::instrument(level = "debug")]
755    pub(crate) async fn sequence_create_role_for_startup(
756        &mut self,
757        plan: CreateRolePlan,
758    ) -> Result<ExecuteResponse, AdapterError> {
759        // This does not set conn_id because it's not yet in active_conns. That is because we can't
760        // make a ConnMeta until we have a role id which we don't have until after the catalog txn
761        // is committed. Passing None here means the audit log won't have a user set in the event's
762        // user field. This seems fine because it is indeed the system that is creating this role,
763        // not a user request, and the user name is still recorded in the plan, so we aren't losing
764        // information.
765        self.sequence_create_role(None, plan).await
766    }
767
768    pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
769        self.transient_id_gen.allocate_id()
770    }
771
772    fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
773        if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
774            Some(AdapterNotice::RbacUserDisabled)
775        } else {
776            None
777        }
778    }
779
780    pub(crate) fn insert_constant(
781        catalog: &Catalog,
782        session: &mut Session,
783        id: CatalogItemId,
784        constants: MirRelationExpr,
785    ) -> Result<ExecuteResponse, AdapterError> {
786        // Insert can be queued, so we need to re-verify the id exists.
787        let desc = match catalog.try_get_entry(&id) {
788            Some(table) => {
789                let full_name = catalog.resolve_full_name(table.name(), Some(session.conn_id()));
790                // Inserts always happen at the latest version of a table.
791                table.desc_latest(&full_name)?
792            }
793            None => {
794                return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
795                    kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
796                        id.to_string(),
797                    )),
798                }));
799            }
800        };
801
802        match constants.as_const() {
803            Some((rows, ..)) => {
804                let rows = rows.clone()?;
805                for (row, _) in &rows {
806                    for (i, datum) in row.iter().enumerate() {
807                        desc.constraints_met(i, &datum)?;
808                    }
809                }
810                let diffs_plan = plan::SendDiffsPlan {
811                    id,
812                    updates: rows,
813                    kind: MutationKind::Insert,
814                    returning: Vec::new(),
815                    max_result_size: catalog.system_config().max_result_size(),
816                };
817                Self::send_diffs(session, diffs_plan)
818            }
819            None => panic!(
820                "tried using sequence_insert_constant on non-constant MirRelationExpr {:?}",
821                constants
822            ),
823        }
824    }
825
826    #[mz_ore::instrument(level = "debug")]
827    pub(crate) fn send_diffs(
828        session: &mut Session,
829        mut plan: plan::SendDiffsPlan,
830    ) -> Result<ExecuteResponse, AdapterError> {
831        let affected_rows = {
832            let mut affected_rows = Diff::from(0);
833            let mut all_positive_diffs = true;
834            // If all diffs are positive, the number of affected rows is just the
835            // sum of all unconsolidated diffs.
836            for (_, diff) in plan.updates.iter() {
837                if diff.is_negative() {
838                    all_positive_diffs = false;
839                    break;
840                }
841
842                affected_rows += diff;
843            }
844
845            if !all_positive_diffs {
846                // Consolidate rows. This is useful e.g. for an UPDATE where the row
847                // doesn't change, and we need to reflect that in the number of
848                // affected rows.
849                differential_dataflow::consolidation::consolidate(&mut plan.updates);
850
851                affected_rows = Diff::ZERO;
852                // With retractions, the number of affected rows is not the number
853                // of rows we see, but the sum of the absolute value of their diffs,
854                // e.g. if one row is retracted and another is added, the total
855                // number of rows affected is 2.
856                for (_, diff) in plan.updates.iter() {
857                    affected_rows += diff.abs();
858                }
859            }
860
861            usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
862        };
863        event!(
864            Level::TRACE,
865            affected_rows,
866            id = format!("{:?}", plan.id),
867            kind = format!("{:?}", plan.kind),
868            updates = plan.updates.len(),
869            returning = plan.returning.len(),
870        );
871
872        session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
873            id: plan.id,
874            rows: TableData::Rows(plan.updates),
875        }]))?;
876        if !plan.returning.is_empty() {
877            let finishing = RowSetFinishing {
878                order_by: Vec::new(),
879                limit: None,
880                offset: 0,
881                project: (0..plan.returning[0].0.iter().count()).collect(),
882            };
883            let max_returned_query_size = session.vars().max_query_result_size();
884            let duration_histogram = session.metrics().row_set_finishing_seconds();
885
886            return match finishing.finish(
887                RowCollection::new(plan.returning, &finishing.order_by),
888                plan.max_result_size,
889                Some(max_returned_query_size),
890                duration_histogram,
891            ) {
892                Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
893                Err(e) => Err(AdapterError::ResultSize(e)),
894            };
895        }
896        Ok(match plan.kind {
897            MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
898            MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
899            MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
900        })
901    }
902}