1#![allow(clippy::pub_use)]
12
13use futures::FutureExt;
16use futures::future::LocalBoxFuture;
17use inner::return_if_err;
18use mz_expr::row::RowCollection;
19use mz_expr::{MirRelationExpr, RowSetFinishing};
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_repr::{CatalogItemId, Diff, GlobalId};
22use mz_sql::catalog::CatalogError;
23use mz_sql::names::ResolvedIds;
24use mz_sql::plan::{
25 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
26 CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
27};
28use mz_sql::rbac;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast::{Raw, Statement};
31use mz_storage_client::client::TableData;
32use mz_storage_types::connections::inline::IntoInlineConnection;
33use std::collections::BTreeSet;
34use std::sync::Arc;
35use tokio::sync::oneshot;
36use tracing::{Instrument, Level, Span, event};
37
38use crate::ExecuteContext;
39use crate::catalog::Catalog;
40use crate::command::{Command, ExecuteResponse, Response};
41use crate::coord::appends::{DeferredOp, DeferredPlan};
42use crate::coord::validity::PlanValidity;
43use crate::coord::{
44 Coordinator, DeferredPlanStatement, Message, PlanStatement, TargetCluster, catalog_serving,
45};
46use crate::error::AdapterError;
47use crate::notice::AdapterNotice;
48use crate::session::{
49 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
50};
51use crate::util::ClientTransmitter;
52
53mod inner;
70
71impl Coordinator {
72 pub(crate) fn sequence_plan(
76 &mut self,
77 mut ctx: ExecuteContext,
78 plan: Plan,
79 resolved_ids: ResolvedIds,
80 ) -> LocalBoxFuture<'_, ()> {
81 async move {
82 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
83 ctx.tx_mut().set_allowed(responses);
84
85 if self.controller.read_only() && !plan.allowed_in_read_only() {
86 ctx.retire(Err(AdapterError::ReadOnly));
87 return;
88 }
89
90 if let Some((dependencies, wait_future)) =
93 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
94 {
95 let conn_id = ctx.session().conn_id();
96 tracing::debug!(%conn_id, "deferring plan for startup appends");
97
98 let role_metadata = ctx.session().role_metadata().clone();
99 let validity = PlanValidity::new(
100 self.catalog.transient_revision(),
101 dependencies,
102 None,
103 None,
104 role_metadata,
105 );
106 let deferred_plan = DeferredPlan {
107 ctx,
108 plan,
109 validity,
110 requires_locks: BTreeSet::default(),
111 };
112 let acquire_future = wait_future.map(|()| None);
115
116 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
117
118 return;
121 };
122
123 let target_cluster = match ctx.session().transaction().cluster() {
125 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
127 None => {
129 let session_catalog = self.catalog.for_session(ctx.session());
130 catalog_serving::auto_run_on_catalog_server(
131 &session_catalog,
132 ctx.session(),
133 &plan,
134 )
135 }
136 };
137 let (target_cluster_id, target_cluster_name) = match self
138 .catalog()
139 .resolve_target_cluster(target_cluster, ctx.session())
140 {
141 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
142 Err(_) => (None, None),
143 };
144
145 if let (Some(cluster_id), Some(statement_id)) =
146 (target_cluster_id, ctx.extra().contents())
147 {
148 self.set_statement_execution_cluster(statement_id, cluster_id);
149 }
150
151 let session_catalog = self.catalog.for_session(ctx.session());
152
153 if let Some(cluster_name) = &target_cluster_name {
154 if let Err(e) = catalog_serving::check_cluster_restrictions(
155 cluster_name,
156 &session_catalog,
157 &plan,
158 ) {
159 return ctx.retire(Err(e));
160 }
161 }
162
163 if let Err(e) = rbac::check_plan(
164 &session_catalog,
165 |id| {
166 self.active_conns()
169 .into_iter()
170 .find(|(conn_id, _)| conn_id.unhandled() == id)
171 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
172 },
173 ctx.session(),
174 &plan,
175 target_cluster_id,
176 &resolved_ids,
177 ) {
178 return ctx.retire(Err(e.into()));
179 }
180
181 match plan {
182 Plan::CreateSource(plan) => {
183 let id_ts = self.get_catalog_write_ts().await;
184 let (item_id, global_id) =
185 return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
186 let result = self
187 .sequence_create_source(
188 &mut ctx,
189 vec![CreateSourcePlanBundle {
190 item_id,
191 global_id,
192 plan,
193 resolved_ids,
194 available_source_references: None,
195 }],
196 )
197 .await;
198 ctx.retire(result);
199 }
200 Plan::CreateSources(plans) => {
201 assert!(
202 resolved_ids.is_empty(),
203 "each plan has separate resolved_ids"
204 );
205 let result = self.sequence_create_source(&mut ctx, plans).await;
206 ctx.retire(result);
207 }
208 Plan::CreateConnection(plan) => {
209 self.sequence_create_connection(ctx, plan, resolved_ids)
210 .await;
211 }
212 Plan::CreateDatabase(plan) => {
213 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
214 ctx.retire(result);
215 }
216 Plan::CreateSchema(plan) => {
217 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
218 ctx.retire(result);
219 }
220 Plan::CreateRole(plan) => {
221 let result = self
222 .sequence_create_role(Some(ctx.session().conn_id()), plan)
223 .await;
224 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
225 ctx.session().add_notice(notice);
226 }
227 ctx.retire(result);
228 }
229 Plan::CreateCluster(plan) => {
230 let result = self.sequence_create_cluster(ctx.session(), plan).await;
231 ctx.retire(result);
232 }
233 Plan::CreateClusterReplica(plan) => {
234 let result = self
235 .sequence_create_cluster_replica(ctx.session(), plan)
236 .await;
237 ctx.retire(result);
238 }
239 Plan::CreateTable(plan) => {
240 let result = self
241 .sequence_create_table(&mut ctx, plan, resolved_ids)
242 .await;
243 ctx.retire(result);
244 }
245 Plan::CreateSecret(plan) => {
246 self.sequence_create_secret(ctx, plan).await;
247 }
248 Plan::CreateSink(plan) => {
249 self.sequence_create_sink(ctx, plan, resolved_ids).await;
250 }
251 Plan::CreateView(plan) => {
252 self.sequence_create_view(ctx, plan, resolved_ids).await;
253 }
254 Plan::CreateMaterializedView(plan) => {
255 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
256 .await;
257 }
258 Plan::CreateContinualTask(plan) => {
259 let res = self
260 .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
261 .await;
262 ctx.retire(res);
263 }
264 Plan::CreateIndex(plan) => {
265 self.sequence_create_index(ctx, plan, resolved_ids).await;
266 }
267 Plan::CreateType(plan) => {
268 let result = self
269 .sequence_create_type(ctx.session(), plan, resolved_ids)
270 .await;
271 ctx.retire(result);
272 }
273 Plan::CreateNetworkPolicy(plan) => {
274 let res = self
275 .sequence_create_network_policy(ctx.session(), plan)
276 .await;
277 ctx.retire(res);
278 }
279 Plan::Comment(plan) => {
280 let result = self.sequence_comment_on(ctx.session(), plan).await;
281 ctx.retire(result);
282 }
283 Plan::CopyTo(plan) => {
284 self.sequence_copy_to(ctx, plan, target_cluster).await;
285 }
286 Plan::DropObjects(plan) => {
287 let result = self.sequence_drop_objects(ctx.session_mut(), plan).await;
288 ctx.retire(result);
289 }
290 Plan::DropOwned(plan) => {
291 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
292 ctx.retire(result);
293 }
294 Plan::EmptyQuery => {
295 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
296 }
297 Plan::ShowAllVariables => {
298 let result = self.sequence_show_all_variables(ctx.session());
299 ctx.retire(result);
300 }
301 Plan::ShowVariable(plan) => {
302 let result = self.sequence_show_variable(ctx.session(), plan);
303 ctx.retire(result);
304 }
305 Plan::InspectShard(plan) => {
306 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
308 ctx.retire(result);
309 }
310 Plan::SetVariable(plan) => {
311 let result = self.sequence_set_variable(ctx.session_mut(), plan);
312 ctx.retire(result);
313 }
314 Plan::ResetVariable(plan) => {
315 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
316 ctx.retire(result);
317 }
318 Plan::SetTransaction(plan) => {
319 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
320 ctx.retire(result);
321 }
322 Plan::StartTransaction(plan) => {
323 if matches!(
324 ctx.session().transaction(),
325 TransactionStatus::InTransaction(_)
326 ) {
327 ctx.session()
328 .add_notice(AdapterNotice::ExistingTransactionInProgress);
329 }
330 let result = ctx.session_mut().start_transaction(
331 self.now_datetime(),
332 plan.access,
333 plan.isolation_level,
334 );
335 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
336 }
337 Plan::CommitTransaction(CommitTransactionPlan {
338 ref transaction_type,
339 })
340 | Plan::AbortTransaction(AbortTransactionPlan {
341 ref transaction_type,
342 }) => {
343 if ctx.session().transaction().is_ddl() {
346 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
347 let prev = self
348 .active_conns
349 .get_mut(ctx.session().conn_id())
350 .expect("connection must exist")
351 .deferred_lock
352 .replace(guard);
353 assert!(
354 prev.is_none(),
355 "connections should have at most one lock guard"
356 );
357 } else {
358 self.serialized_ddl.push_back(DeferredPlanStatement {
359 ctx,
360 ps: PlanStatement::Plan { plan, resolved_ids },
361 });
362 return;
363 }
364 }
365
366 let action = match &plan {
367 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
368 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
369 _ => unreachable!(),
370 };
371 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
372 {
373 ctx.session().add_notice(
378 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
379 );
380 }
381 self.sequence_end_transaction(ctx, action).await;
382 }
383 Plan::Select(plan) => {
384 let max = Some(ctx.session().vars().max_query_result_size());
385 self.sequence_peek(ctx, plan, target_cluster, max).await;
386 }
387 Plan::Subscribe(plan) => {
388 self.sequence_subscribe(ctx, plan, target_cluster).await;
389 }
390 Plan::SideEffectingFunc(plan) => {
391 self.sequence_side_effecting_func(ctx, plan).await;
392 }
393 Plan::ShowCreate(plan) => {
394 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
395 }
396 Plan::ShowColumns(show_columns_plan) => {
397 let max = Some(ctx.session().vars().max_query_result_size());
398 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
399 .await;
400 }
401 Plan::CopyFrom(plan) => match plan.source {
402 CopyFromSource::Stdin => {
403 let (tx, _, session, ctx_extra) = ctx.into_parts();
404 tx.send(
405 Ok(ExecuteResponse::CopyFrom {
406 target_id: plan.target_id,
407 target_name: plan.target_name,
408 columns: plan.columns,
409 params: plan.params,
410 ctx_extra,
411 }),
412 session,
413 );
414 }
415 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
416 self.sequence_copy_from(ctx, plan, target_cluster).await;
417 }
418 },
419 Plan::ExplainPlan(plan) => {
420 self.sequence_explain_plan(ctx, plan, target_cluster).await;
421 }
422 Plan::ExplainPushdown(plan) => {
423 self.sequence_explain_pushdown(ctx, plan, target_cluster)
424 .await;
425 }
426 Plan::ExplainSinkSchema(plan) => {
427 let result = self.sequence_explain_schema(plan);
428 ctx.retire(result);
429 }
430 Plan::ExplainTimestamp(plan) => {
431 self.sequence_explain_timestamp(ctx, plan, target_cluster)
432 .await;
433 }
434 Plan::Insert(plan) => {
435 self.sequence_insert(ctx, plan).await;
436 }
437 Plan::ReadThenWrite(plan) => {
438 self.sequence_read_then_write(ctx, plan).await;
439 }
440 Plan::AlterNoop(plan) => {
441 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
442 }
443 Plan::AlterCluster(plan) => {
444 self.sequence_alter_cluster_staged(ctx, plan).await;
445 }
446 Plan::AlterClusterRename(plan) => {
447 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
448 ctx.retire(result);
449 }
450 Plan::AlterClusterSwap(plan) => {
451 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
452 ctx.retire(result);
453 }
454 Plan::AlterClusterReplicaRename(plan) => {
455 let result = self
456 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
457 .await;
458 ctx.retire(result);
459 }
460 Plan::AlterConnection(plan) => {
461 self.sequence_alter_connection(ctx, plan).await;
462 }
463 Plan::AlterSetCluster(plan) => {
464 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
465 ctx.retire(result);
466 }
467 Plan::AlterRetainHistory(plan) => {
468 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
469 ctx.retire(result);
470 }
471 Plan::AlterItemRename(plan) => {
472 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
473 ctx.retire(result);
474 }
475 Plan::AlterSchemaRename(plan) => {
476 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
477 ctx.retire(result);
478 }
479 Plan::AlterSchemaSwap(plan) => {
480 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
481 ctx.retire(result);
482 }
483 Plan::AlterRole(plan) => {
484 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
485 ctx.retire(result);
486 }
487 Plan::AlterSecret(plan) => {
488 self.sequence_alter_secret(ctx, plan).await;
489 }
490 Plan::AlterSink(plan) => {
491 self.sequence_alter_sink_prepare(ctx, plan).await;
492 }
493 Plan::AlterSource(plan) => {
494 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
495 ctx.retire(result);
496 }
497 Plan::AlterSystemSet(plan) => {
498 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
499 ctx.retire(result);
500 }
501 Plan::AlterSystemReset(plan) => {
502 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
503 ctx.retire(result);
504 }
505 Plan::AlterSystemResetAll(plan) => {
506 let result = self
507 .sequence_alter_system_reset_all(ctx.session(), plan)
508 .await;
509 ctx.retire(result);
510 }
511 Plan::AlterTableAddColumn(plan) => {
512 let result = self.sequence_alter_table(&mut ctx, plan).await;
513 ctx.retire(result);
514 }
515 Plan::AlterNetworkPolicy(plan) => {
516 let res = self
517 .sequence_alter_network_policy(ctx.session(), plan)
518 .await;
519 ctx.retire(res);
520 }
521 Plan::DiscardTemp => {
522 self.drop_temp_items(ctx.session().conn_id()).await;
523 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
524 }
525 Plan::DiscardAll => {
526 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
527 self.clear_transaction(ctx.session_mut()).await;
528 self.drop_temp_items(ctx.session().conn_id()).await;
529 ctx.session_mut().reset();
530 Ok(ExecuteResponse::DiscardedAll)
531 } else {
532 Err(AdapterError::OperationProhibitsTransaction(
533 "DISCARD ALL".into(),
534 ))
535 };
536 ctx.retire(ret);
537 }
538 Plan::Declare(plan) => {
539 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
540 }
541 Plan::Fetch(FetchPlan {
542 name,
543 count,
544 timeout,
545 }) => {
546 let ctx_extra = std::mem::take(ctx.extra_mut());
547 ctx.retire(Ok(ExecuteResponse::Fetch {
548 name,
549 count,
550 timeout,
551 ctx_extra,
552 }));
553 }
554 Plan::Close(plan) => {
555 if ctx.session_mut().remove_portal(&plan.name) {
556 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
557 } else {
558 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
559 }
560 }
561 Plan::Prepare(plan) => {
562 if ctx
563 .session()
564 .get_prepared_statement_unverified(&plan.name)
565 .is_some()
566 {
567 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
568 } else {
569 let state_revision = StateRevision {
570 catalog_revision: self.catalog().transient_revision(),
571 session_state_revision: ctx.session().state_revision(),
572 };
573 ctx.session_mut().set_prepared_statement(
574 plan.name,
575 Some(plan.stmt),
576 plan.sql,
577 plan.desc,
578 state_revision,
579 self.now(),
580 );
581 ctx.retire(Ok(ExecuteResponse::Prepare));
582 }
583 }
584 Plan::Execute(plan) => {
585 match self.sequence_execute(ctx.session_mut(), plan) {
586 Ok(portal_name) => {
587 let (tx, _, session, extra) = ctx.into_parts();
588 self.internal_cmd_tx
589 .send(Message::Command(
590 OpenTelemetryContext::obtain(),
591 Command::Execute {
592 portal_name,
593 session,
594 tx: tx.take(),
595 outer_ctx_extra: Some(extra),
596 },
597 ))
598 .expect("sending to self.internal_cmd_tx cannot fail");
599 }
600 Err(err) => ctx.retire(Err(err)),
601 };
602 }
603 Plan::Deallocate(plan) => match plan.name {
604 Some(name) => {
605 if ctx.session_mut().remove_prepared_statement(&name) {
606 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
607 } else {
608 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
609 }
610 }
611 None => {
612 ctx.session_mut().remove_all_prepared_statements();
613 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
614 }
615 },
616 Plan::Raise(RaisePlan { severity }) => {
617 ctx.session()
618 .add_notice(AdapterNotice::UserRequested { severity });
619 ctx.retire(Ok(ExecuteResponse::Raised));
620 }
621 Plan::GrantPrivileges(plan) => {
622 let result = self
623 .sequence_grant_privileges(ctx.session_mut(), plan)
624 .await;
625 ctx.retire(result);
626 }
627 Plan::RevokePrivileges(plan) => {
628 let result = self
629 .sequence_revoke_privileges(ctx.session_mut(), plan)
630 .await;
631 ctx.retire(result);
632 }
633 Plan::AlterDefaultPrivileges(plan) => {
634 let result = self
635 .sequence_alter_default_privileges(ctx.session_mut(), plan)
636 .await;
637 ctx.retire(result);
638 }
639 Plan::GrantRole(plan) => {
640 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
641 ctx.retire(result);
642 }
643 Plan::RevokeRole(plan) => {
644 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
645 ctx.retire(result);
646 }
647 Plan::AlterOwner(plan) => {
648 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
649 ctx.retire(result);
650 }
651 Plan::ReassignOwned(plan) => {
652 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
653 ctx.retire(result);
654 }
655 Plan::ValidateConnection(plan) => {
656 let connection = plan
657 .connection
658 .into_inline_connection(self.catalog().state());
659 let current_storage_configuration = self.controller.storage.config().clone();
660 mz_ore::task::spawn(|| "coord::validate_connection", async move {
661 let res = match connection
662 .validate(plan.id, ¤t_storage_configuration)
663 .await
664 {
665 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
666 Err(err) => Err(err.into()),
667 };
668 ctx.retire(res);
669 });
670 }
671 }
672 }
673 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
674 .boxed_local()
675 }
676
677 #[mz_ore::instrument(level = "debug")]
678 pub(crate) async fn sequence_execute_single_statement_transaction(
679 &mut self,
680 ctx: ExecuteContext,
681 stmt: Arc<Statement<Raw>>,
682 params: Params,
683 ) {
684 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
686 assert!(matches!(session.transaction(), TransactionStatus::Default));
687 session.start_transaction_single_stmt(self.now_datetime());
688 let conn_id = session.conn_id().unhandled();
689
690 let (sub_tx, sub_rx) = oneshot::channel();
692 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
693 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
694 self.handle_execute_inner(stmt, params, sub_ctx).await;
695
696 let internal_cmd_tx = self.internal_cmd_tx.clone();
699 mz_ore::task::spawn(
700 || format!("execute_single_statement:{conn_id}"),
701 async move {
702 let Ok(Response {
703 result,
704 session,
705 otel_ctx,
706 }) = sub_rx.await
707 else {
708 return;
710 };
711 otel_ctx.attach_as_parent();
712 let (sub_tx, sub_rx) = oneshot::channel();
713 let _ = internal_cmd_tx.send(Message::Command(
714 otel_ctx,
715 Command::Commit {
716 action: EndTransactionAction::Commit,
717 session,
718 tx: sub_tx,
719 },
720 ));
721 let Ok(commit_response) = sub_rx.await else {
722 return;
724 };
725 assert!(matches!(
726 commit_response.session.transaction(),
727 TransactionStatus::Default
728 ));
729 let result = match (result, commit_response.result) {
735 (Ok(_), commit) => commit,
736 (Err(result), _) => Err(result),
737 };
738 tx.send(result, commit_response.session);
741 }
742 .instrument(Span::current()),
743 );
744 }
745
746 #[mz_ore::instrument(level = "debug")]
750 pub(crate) async fn sequence_create_role_for_startup(
751 &mut self,
752 plan: CreateRolePlan,
753 ) -> Result<ExecuteResponse, AdapterError> {
754 self.sequence_create_role(None, plan).await
761 }
762
763 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
764 self.transient_id_gen.allocate_id()
765 }
766
767 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
768 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
769 Some(AdapterNotice::RbacUserDisabled)
770 } else {
771 None
772 }
773 }
774
775 pub(crate) fn insert_constant(
781 catalog: &Catalog,
782 session: &mut Session,
783 target_id: CatalogItemId,
784 constants: MirRelationExpr,
785 ) -> Result<ExecuteResponse, AdapterError> {
786 let desc = match catalog.try_get_entry(&target_id) {
788 Some(table) => {
789 let full_name = catalog.resolve_full_name(table.name(), Some(session.conn_id()));
790 table.desc_latest(&full_name)?
792 }
793 None => {
794 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
795 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
796 target_id.to_string(),
797 )),
798 }));
799 }
800 };
801
802 match constants.as_const() {
803 Some((rows, ..)) => {
804 let rows = rows.clone()?;
805 for (row, _) in &rows {
806 for (i, datum) in row.iter().enumerate() {
807 desc.constraints_met(i, &datum)?;
808 }
809 }
810 let diffs_plan = plan::SendDiffsPlan {
811 id: target_id,
812 updates: rows,
813 kind: MutationKind::Insert,
814 returning: Vec::new(),
815 max_result_size: catalog.system_config().max_result_size(),
816 };
817 Self::send_diffs(session, diffs_plan)
818 }
819 None => panic!(
820 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
821 constants.pretty(),
822 ),
823 }
824 }
825
826 #[mz_ore::instrument(level = "debug")]
827 pub(crate) fn send_diffs(
828 session: &mut Session,
829 mut plan: plan::SendDiffsPlan,
830 ) -> Result<ExecuteResponse, AdapterError> {
831 let affected_rows = {
832 let mut affected_rows = Diff::from(0);
833 let mut all_positive_diffs = true;
834 for (_, diff) in plan.updates.iter() {
837 if diff.is_negative() {
838 all_positive_diffs = false;
839 break;
840 }
841
842 affected_rows += diff;
843 }
844
845 if !all_positive_diffs {
846 differential_dataflow::consolidation::consolidate(&mut plan.updates);
850
851 affected_rows = Diff::ZERO;
852 for (_, diff) in plan.updates.iter() {
857 affected_rows += diff.abs();
858 }
859 }
860
861 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
862 };
863 event!(
864 Level::TRACE,
865 affected_rows,
866 id = format!("{:?}", plan.id),
867 kind = format!("{:?}", plan.kind),
868 updates = plan.updates.len(),
869 returning = plan.returning.len(),
870 );
871
872 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
873 id: plan.id,
874 rows: TableData::Rows(plan.updates),
875 }]))?;
876 if !plan.returning.is_empty() {
877 let finishing = RowSetFinishing {
878 order_by: Vec::new(),
879 limit: None,
880 offset: 0,
881 project: (0..plan.returning[0].0.iter().count()).collect(),
882 };
883 let max_returned_query_size = session.vars().max_query_result_size();
884 let duration_histogram = session.metrics().row_set_finishing_seconds();
885
886 return match finishing.finish(
887 RowCollection::new(plan.returning, &finishing.order_by),
888 plan.max_result_size,
889 Some(max_returned_query_size),
890 duration_histogram,
891 ) {
892 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
893 Err(e) => Err(AdapterError::ResultSize(e)),
894 };
895 }
896 Ok(match plan.kind {
897 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
898 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
899 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
900 })
901 }
902}