1#![allow(clippy::pub_use)]
12
13use futures::FutureExt;
16use futures::future::LocalBoxFuture;
17use inner::return_if_err;
18use mz_expr::row::RowCollection;
19use mz_expr::{MirRelationExpr, RowSetFinishing};
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_repr::{CatalogItemId, Diff, GlobalId};
22use mz_sql::catalog::CatalogError;
23use mz_sql::names::ResolvedIds;
24use mz_sql::plan::{
25 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
26 CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
27};
28use mz_sql::rbac;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast::{Raw, Statement};
31use mz_storage_client::client::TableData;
32use mz_storage_types::connections::inline::IntoInlineConnection;
33use std::collections::BTreeSet;
34use std::sync::Arc;
35use tokio::sync::oneshot;
36use tracing::{Instrument, Level, Span, event};
37
38use crate::ExecuteContext;
39use crate::catalog::Catalog;
40use crate::command::{Command, ExecuteResponse, Response};
41use crate::coord::appends::{DeferredOp, DeferredPlan};
42use crate::coord::validity::PlanValidity;
43use crate::coord::{
44 Coordinator, DeferredPlanStatement, Message, PlanStatement, TargetCluster, catalog_serving,
45};
46use crate::error::AdapterError;
47use crate::notice::AdapterNotice;
48use crate::session::{EndTransactionAction, Session, TransactionOps, TransactionStatus, WriteOp};
49use crate::util::ClientTransmitter;
50
51mod inner;
68
69impl Coordinator {
70 pub(crate) fn sequence_plan(
74 &mut self,
75 mut ctx: ExecuteContext,
76 plan: Plan,
77 resolved_ids: ResolvedIds,
78 ) -> LocalBoxFuture<'_, ()> {
79 async move {
80 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
81 ctx.tx_mut().set_allowed(responses);
82
83 if self.controller.read_only() && !plan.allowed_in_read_only() {
84 ctx.retire(Err(AdapterError::ReadOnly));
85 return;
86 }
87
88 if let Some((dependencies, wait_future)) =
91 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
92 {
93 let conn_id = ctx.session().conn_id();
94 tracing::debug!(%conn_id, "deferring plan for startup appends");
95
96 let role_metadata = ctx.session().role_metadata().clone();
97 let validity = PlanValidity::new(
98 self.catalog.transient_revision(),
99 dependencies,
100 None,
101 None,
102 role_metadata,
103 );
104 let deferred_plan = DeferredPlan {
105 ctx,
106 plan,
107 validity,
108 requires_locks: BTreeSet::default(),
109 };
110 let acquire_future = wait_future.map(|()| None);
113
114 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
115
116 return;
119 };
120
121 let target_cluster = match ctx.session().transaction().cluster() {
123 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
125 None => {
127 let session_catalog = self.catalog.for_session(ctx.session());
128 catalog_serving::auto_run_on_catalog_server(
129 &session_catalog,
130 ctx.session(),
131 &plan,
132 )
133 }
134 };
135 let (target_cluster_id, target_cluster_name) = match self
136 .catalog()
137 .resolve_target_cluster(target_cluster, ctx.session())
138 {
139 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
140 Err(_) => (None, None),
141 };
142
143 if let (Some(cluster_id), Some(statement_id)) =
144 (target_cluster_id, ctx.extra().contents())
145 {
146 self.set_statement_execution_cluster(statement_id, cluster_id);
147 }
148
149 let session_catalog = self.catalog.for_session(ctx.session());
150
151 if let Some(cluster_name) = &target_cluster_name {
152 if let Err(e) = catalog_serving::check_cluster_restrictions(
153 cluster_name,
154 &session_catalog,
155 &plan,
156 ) {
157 return ctx.retire(Err(e));
158 }
159 }
160
161 if let Err(e) = rbac::check_plan(
162 &session_catalog,
163 |id| {
164 self.active_conns()
167 .into_iter()
168 .find(|(conn_id, _)| conn_id.unhandled() == id)
169 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
170 },
171 ctx.session(),
172 &plan,
173 target_cluster_id,
174 &resolved_ids,
175 ) {
176 return ctx.retire(Err(e.into()));
177 }
178
179 match plan {
180 Plan::CreateSource(plan) => {
181 let id_ts = self.get_catalog_write_ts().await;
182 let (item_id, global_id) =
183 return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
184 let result = self
185 .sequence_create_source(
186 ctx.session_mut(),
187 vec![CreateSourcePlanBundle {
188 item_id,
189 global_id,
190 plan,
191 resolved_ids,
192 available_source_references: None,
193 }],
194 )
195 .await;
196 ctx.retire(result);
197 }
198 Plan::CreateSources(plans) => {
199 assert!(
200 resolved_ids.is_empty(),
201 "each plan has separate resolved_ids"
202 );
203 let result = self.sequence_create_source(ctx.session_mut(), plans).await;
204 ctx.retire(result);
205 }
206 Plan::CreateConnection(plan) => {
207 self.sequence_create_connection(ctx, plan, resolved_ids)
208 .await;
209 }
210 Plan::CreateDatabase(plan) => {
211 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
212 ctx.retire(result);
213 }
214 Plan::CreateSchema(plan) => {
215 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
216 ctx.retire(result);
217 }
218 Plan::CreateRole(plan) => {
219 let result = self
220 .sequence_create_role(Some(ctx.session().conn_id()), plan)
221 .await;
222 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
223 ctx.session().add_notice(notice);
224 }
225 ctx.retire(result);
226 }
227 Plan::CreateCluster(plan) => {
228 let result = self.sequence_create_cluster(ctx.session(), plan).await;
229 ctx.retire(result);
230 }
231 Plan::CreateClusterReplica(plan) => {
232 let result = self
233 .sequence_create_cluster_replica(ctx.session(), plan)
234 .await;
235 ctx.retire(result);
236 }
237 Plan::CreateTable(plan) => {
238 let result = self
239 .sequence_create_table(&mut ctx, plan, resolved_ids)
240 .await;
241 ctx.retire(result);
242 }
243 Plan::CreateSecret(plan) => {
244 self.sequence_create_secret(ctx, plan).await;
245 }
246 Plan::CreateSink(plan) => {
247 self.sequence_create_sink(ctx, plan, resolved_ids).await;
248 }
249 Plan::CreateView(plan) => {
250 self.sequence_create_view(ctx, plan, resolved_ids).await;
251 }
252 Plan::CreateMaterializedView(plan) => {
253 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
254 .await;
255 }
256 Plan::CreateContinualTask(plan) => {
257 let res = self
258 .sequence_create_continual_task(ctx.session(), plan, resolved_ids)
259 .await;
260 ctx.retire(res);
261 }
262 Plan::CreateIndex(plan) => {
263 self.sequence_create_index(ctx, plan, resolved_ids).await;
264 }
265 Plan::CreateType(plan) => {
266 let result = self
267 .sequence_create_type(ctx.session(), plan, resolved_ids)
268 .await;
269 ctx.retire(result);
270 }
271 Plan::CreateNetworkPolicy(plan) => {
272 let res = self
273 .sequence_create_network_policy(ctx.session(), plan)
274 .await;
275 ctx.retire(res);
276 }
277 Plan::Comment(plan) => {
278 let result = self.sequence_comment_on(ctx.session(), plan).await;
279 ctx.retire(result);
280 }
281 Plan::CopyTo(plan) => {
282 self.sequence_copy_to(ctx, plan, target_cluster).await;
283 }
284 Plan::DropObjects(plan) => {
285 let result = self.sequence_drop_objects(ctx.session_mut(), plan).await;
286 ctx.retire(result);
287 }
288 Plan::DropOwned(plan) => {
289 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
290 ctx.retire(result);
291 }
292 Plan::EmptyQuery => {
293 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
294 }
295 Plan::ShowAllVariables => {
296 let result = self.sequence_show_all_variables(ctx.session());
297 ctx.retire(result);
298 }
299 Plan::ShowVariable(plan) => {
300 let result = self.sequence_show_variable(ctx.session(), plan);
301 ctx.retire(result);
302 }
303 Plan::InspectShard(plan) => {
304 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
306 ctx.retire(result);
307 }
308 Plan::SetVariable(plan) => {
309 let result = self.sequence_set_variable(ctx.session_mut(), plan);
310 ctx.retire(result);
311 }
312 Plan::ResetVariable(plan) => {
313 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
314 ctx.retire(result);
315 }
316 Plan::SetTransaction(plan) => {
317 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
318 ctx.retire(result);
319 }
320 Plan::StartTransaction(plan) => {
321 if matches!(
322 ctx.session().transaction(),
323 TransactionStatus::InTransaction(_)
324 ) {
325 ctx.session()
326 .add_notice(AdapterNotice::ExistingTransactionInProgress);
327 }
328 let result = ctx.session_mut().start_transaction(
329 self.now_datetime(),
330 plan.access,
331 plan.isolation_level,
332 );
333 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
334 }
335 Plan::CommitTransaction(CommitTransactionPlan {
336 ref transaction_type,
337 })
338 | Plan::AbortTransaction(AbortTransactionPlan {
339 ref transaction_type,
340 }) => {
341 if ctx.session().transaction().is_ddl() {
344 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
345 let prev = self
346 .active_conns
347 .get_mut(ctx.session().conn_id())
348 .expect("connection must exist")
349 .deferred_lock
350 .replace(guard);
351 assert!(
352 prev.is_none(),
353 "connections should have at most one lock guard"
354 );
355 } else {
356 self.serialized_ddl.push_back(DeferredPlanStatement {
357 ctx,
358 ps: PlanStatement::Plan { plan, resolved_ids },
359 });
360 return;
361 }
362 }
363
364 let action = match &plan {
365 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
366 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
367 _ => unreachable!(),
368 };
369 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
370 {
371 ctx.session().add_notice(
376 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
377 );
378 }
379 self.sequence_end_transaction(ctx, action).await;
380 }
381 Plan::Select(plan) => {
382 let max = Some(ctx.session().vars().max_query_result_size());
383 self.sequence_peek(ctx, plan, target_cluster, max).await;
384 }
385 Plan::Subscribe(plan) => {
386 self.sequence_subscribe(ctx, plan, target_cluster).await;
387 }
388 Plan::SideEffectingFunc(plan) => {
389 self.sequence_side_effecting_func(ctx, plan).await;
390 }
391 Plan::ShowCreate(plan) => {
392 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
393 }
394 Plan::ShowColumns(show_columns_plan) => {
395 let max = Some(ctx.session().vars().max_query_result_size());
396 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
397 .await;
398 }
399 Plan::CopyFrom(plan) => match plan.source {
400 CopyFromSource::Stdin => {
401 let (tx, _, session, ctx_extra) = ctx.into_parts();
402 tx.send(
403 Ok(ExecuteResponse::CopyFrom {
404 id: plan.id,
405 columns: plan.columns,
406 params: plan.params,
407 ctx_extra,
408 }),
409 session,
410 );
411 }
412 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
413 self.sequence_copy_from(ctx, plan, target_cluster).await;
414 }
415 },
416 Plan::ExplainPlan(plan) => {
417 self.sequence_explain_plan(ctx, plan, target_cluster).await;
418 }
419 Plan::ExplainPushdown(plan) => {
420 self.sequence_explain_pushdown(ctx, plan, target_cluster)
421 .await;
422 }
423 Plan::ExplainSinkSchema(plan) => {
424 let result = self.sequence_explain_schema(plan);
425 ctx.retire(result);
426 }
427 Plan::ExplainTimestamp(plan) => {
428 self.sequence_explain_timestamp(ctx, plan, target_cluster)
429 .await;
430 }
431 Plan::Insert(plan) => {
432 self.sequence_insert(ctx, plan).await;
433 }
434 Plan::ReadThenWrite(plan) => {
435 self.sequence_read_then_write(ctx, plan).await;
436 }
437 Plan::AlterNoop(plan) => {
438 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
439 }
440 Plan::AlterCluster(plan) => {
441 self.sequence_alter_cluster_staged(ctx, plan).await;
442 }
443 Plan::AlterClusterRename(plan) => {
444 let result = self
445 .sequence_alter_cluster_rename(ctx.session_mut(), plan)
446 .await;
447 ctx.retire(result);
448 }
449 Plan::AlterClusterSwap(plan) => {
450 let result = self
451 .sequence_alter_cluster_swap(ctx.session_mut(), plan)
452 .await;
453 ctx.retire(result);
454 }
455 Plan::AlterClusterReplicaRename(plan) => {
456 let result = self
457 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
458 .await;
459 ctx.retire(result);
460 }
461 Plan::AlterConnection(plan) => {
462 self.sequence_alter_connection(ctx, plan).await;
463 }
464 Plan::AlterSetCluster(plan) => {
465 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
466 ctx.retire(result);
467 }
468 Plan::AlterRetainHistory(plan) => {
469 let result = self
470 .sequence_alter_retain_history(ctx.session_mut(), plan)
471 .await;
472 ctx.retire(result);
473 }
474 Plan::AlterItemRename(plan) => {
475 let result = self
476 .sequence_alter_item_rename(ctx.session_mut(), plan)
477 .await;
478 ctx.retire(result);
479 }
480 Plan::AlterSchemaRename(plan) => {
481 let result = self
482 .sequence_alter_schema_rename(ctx.session_mut(), plan)
483 .await;
484 ctx.retire(result);
485 }
486 Plan::AlterSchemaSwap(plan) => {
487 let result = self
488 .sequence_alter_schema_swap(ctx.session_mut(), plan)
489 .await;
490 ctx.retire(result);
491 }
492 Plan::AlterRole(plan) => {
493 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
494 ctx.retire(result);
495 }
496 Plan::AlterSecret(plan) => {
497 self.sequence_alter_secret(ctx, plan).await;
498 }
499 Plan::AlterSink(plan) => {
500 self.sequence_alter_sink_prepare(ctx, plan).await;
501 }
502 Plan::AlterSource(plan) => {
503 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
504 ctx.retire(result);
505 }
506 Plan::AlterSystemSet(plan) => {
507 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
508 ctx.retire(result);
509 }
510 Plan::AlterSystemReset(plan) => {
511 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
512 ctx.retire(result);
513 }
514 Plan::AlterSystemResetAll(plan) => {
515 let result = self
516 .sequence_alter_system_reset_all(ctx.session(), plan)
517 .await;
518 ctx.retire(result);
519 }
520 Plan::AlterTableAddColumn(plan) => {
521 let result = self.sequence_alter_table(ctx.session(), plan).await;
522 ctx.retire(result);
523 }
524 Plan::AlterNetworkPolicy(plan) => {
525 let res = self
526 .sequence_alter_network_policy(ctx.session(), plan)
527 .await;
528 ctx.retire(res);
529 }
530 Plan::DiscardTemp => {
531 self.drop_temp_items(ctx.session().conn_id()).await;
532 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
533 }
534 Plan::DiscardAll => {
535 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
536 self.clear_transaction(ctx.session_mut()).await;
537 self.drop_temp_items(ctx.session().conn_id()).await;
538 ctx.session_mut().reset();
539 Ok(ExecuteResponse::DiscardedAll)
540 } else {
541 Err(AdapterError::OperationProhibitsTransaction(
542 "DISCARD ALL".into(),
543 ))
544 };
545 ctx.retire(ret);
546 }
547 Plan::Declare(plan) => {
548 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
549 }
550 Plan::Fetch(FetchPlan {
551 name,
552 count,
553 timeout,
554 }) => {
555 let ctx_extra = std::mem::take(ctx.extra_mut());
556 ctx.retire(Ok(ExecuteResponse::Fetch {
557 name,
558 count,
559 timeout,
560 ctx_extra,
561 }));
562 }
563 Plan::Close(plan) => {
564 if ctx.session_mut().remove_portal(&plan.name) {
565 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
566 } else {
567 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
568 }
569 }
570 Plan::Prepare(plan) => {
571 if ctx
572 .session()
573 .get_prepared_statement_unverified(&plan.name)
574 .is_some()
575 {
576 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
577 } else {
578 ctx.session_mut().set_prepared_statement(
579 plan.name,
580 Some(plan.stmt),
581 plan.sql,
582 plan.desc,
583 self.catalog().transient_revision(),
584 self.now(),
585 );
586 ctx.retire(Ok(ExecuteResponse::Prepare));
587 }
588 }
589 Plan::Execute(plan) => {
590 match self.sequence_execute(ctx.session_mut(), plan) {
591 Ok(portal_name) => {
592 let (tx, _, session, extra) = ctx.into_parts();
593 self.internal_cmd_tx
594 .send(Message::Command(
595 OpenTelemetryContext::obtain(),
596 Command::Execute {
597 portal_name,
598 session,
599 tx: tx.take(),
600 outer_ctx_extra: Some(extra),
601 },
602 ))
603 .expect("sending to self.internal_cmd_tx cannot fail");
604 }
605 Err(err) => ctx.retire(Err(err)),
606 };
607 }
608 Plan::Deallocate(plan) => match plan.name {
609 Some(name) => {
610 if ctx.session_mut().remove_prepared_statement(&name) {
611 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
612 } else {
613 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
614 }
615 }
616 None => {
617 ctx.session_mut().remove_all_prepared_statements();
618 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
619 }
620 },
621 Plan::Raise(RaisePlan { severity }) => {
622 ctx.session()
623 .add_notice(AdapterNotice::UserRequested { severity });
624 ctx.retire(Ok(ExecuteResponse::Raised));
625 }
626 Plan::GrantPrivileges(plan) => {
627 let result = self
628 .sequence_grant_privileges(ctx.session_mut(), plan)
629 .await;
630 ctx.retire(result);
631 }
632 Plan::RevokePrivileges(plan) => {
633 let result = self
634 .sequence_revoke_privileges(ctx.session_mut(), plan)
635 .await;
636 ctx.retire(result);
637 }
638 Plan::AlterDefaultPrivileges(plan) => {
639 let result = self
640 .sequence_alter_default_privileges(ctx.session_mut(), plan)
641 .await;
642 ctx.retire(result);
643 }
644 Plan::GrantRole(plan) => {
645 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
646 ctx.retire(result);
647 }
648 Plan::RevokeRole(plan) => {
649 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
650 ctx.retire(result);
651 }
652 Plan::AlterOwner(plan) => {
653 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
654 ctx.retire(result);
655 }
656 Plan::ReassignOwned(plan) => {
657 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
658 ctx.retire(result);
659 }
660 Plan::ValidateConnection(plan) => {
661 let connection = plan
662 .connection
663 .into_inline_connection(self.catalog().state());
664 let current_storage_configuration = self.controller.storage.config().clone();
665 mz_ore::task::spawn(|| "coord::validate_connection", async move {
666 let res = match connection
667 .validate(plan.id, ¤t_storage_configuration)
668 .await
669 {
670 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
671 Err(err) => Err(err.into()),
672 };
673 ctx.retire(res);
674 });
675 }
676 }
677 }
678 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
679 .boxed_local()
680 }
681
682 #[mz_ore::instrument(level = "debug")]
683 pub(crate) async fn sequence_execute_single_statement_transaction(
684 &mut self,
685 ctx: ExecuteContext,
686 stmt: Arc<Statement<Raw>>,
687 params: Params,
688 ) {
689 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
691 assert!(matches!(session.transaction(), TransactionStatus::Default));
692 session.start_transaction_single_stmt(self.now_datetime());
693 let conn_id = session.conn_id().unhandled();
694
695 let (sub_tx, sub_rx) = oneshot::channel();
697 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
698 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
699 self.handle_execute_inner(stmt, params, sub_ctx).await;
700
701 let internal_cmd_tx = self.internal_cmd_tx.clone();
704 mz_ore::task::spawn(
705 || format!("execute_single_statement:{conn_id}"),
706 async move {
707 let Ok(Response {
708 result,
709 session,
710 otel_ctx,
711 }) = sub_rx.await
712 else {
713 return;
715 };
716 otel_ctx.attach_as_parent();
717 let (sub_tx, sub_rx) = oneshot::channel();
718 let _ = internal_cmd_tx.send(Message::Command(
719 otel_ctx,
720 Command::Commit {
721 action: EndTransactionAction::Commit,
722 session,
723 tx: sub_tx,
724 },
725 ));
726 let Ok(commit_response) = sub_rx.await else {
727 return;
729 };
730 assert!(matches!(
731 commit_response.session.transaction(),
732 TransactionStatus::Default
733 ));
734 let result = match (result, commit_response.result) {
740 (Ok(_), commit) => commit,
741 (Err(result), _) => Err(result),
742 };
743 tx.send(result, commit_response.session);
746 }
747 .instrument(Span::current()),
748 );
749 }
750
751 #[mz_ore::instrument(level = "debug")]
755 pub(crate) async fn sequence_create_role_for_startup(
756 &mut self,
757 plan: CreateRolePlan,
758 ) -> Result<ExecuteResponse, AdapterError> {
759 self.sequence_create_role(None, plan).await
766 }
767
768 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
769 self.transient_id_gen.allocate_id()
770 }
771
772 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
773 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
774 Some(AdapterNotice::RbacUserDisabled)
775 } else {
776 None
777 }
778 }
779
780 pub(crate) fn insert_constant(
781 catalog: &Catalog,
782 session: &mut Session,
783 id: CatalogItemId,
784 constants: MirRelationExpr,
785 ) -> Result<ExecuteResponse, AdapterError> {
786 let desc = match catalog.try_get_entry(&id) {
788 Some(table) => {
789 let full_name = catalog.resolve_full_name(table.name(), Some(session.conn_id()));
790 table.desc_latest(&full_name)?
792 }
793 None => {
794 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
795 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
796 id.to_string(),
797 )),
798 }));
799 }
800 };
801
802 match constants.as_const() {
803 Some((rows, ..)) => {
804 let rows = rows.clone()?;
805 for (row, _) in &rows {
806 for (i, datum) in row.iter().enumerate() {
807 desc.constraints_met(i, &datum)?;
808 }
809 }
810 let diffs_plan = plan::SendDiffsPlan {
811 id,
812 updates: rows,
813 kind: MutationKind::Insert,
814 returning: Vec::new(),
815 max_result_size: catalog.system_config().max_result_size(),
816 };
817 Self::send_diffs(session, diffs_plan)
818 }
819 None => panic!(
820 "tried using sequence_insert_constant on non-constant MirRelationExpr {:?}",
821 constants
822 ),
823 }
824 }
825
826 #[mz_ore::instrument(level = "debug")]
827 pub(crate) fn send_diffs(
828 session: &mut Session,
829 mut plan: plan::SendDiffsPlan,
830 ) -> Result<ExecuteResponse, AdapterError> {
831 let affected_rows = {
832 let mut affected_rows = Diff::from(0);
833 let mut all_positive_diffs = true;
834 for (_, diff) in plan.updates.iter() {
837 if diff.is_negative() {
838 all_positive_diffs = false;
839 break;
840 }
841
842 affected_rows += diff;
843 }
844
845 if !all_positive_diffs {
846 differential_dataflow::consolidation::consolidate(&mut plan.updates);
850
851 affected_rows = Diff::ZERO;
852 for (_, diff) in plan.updates.iter() {
857 affected_rows += diff.abs();
858 }
859 }
860
861 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
862 };
863 event!(
864 Level::TRACE,
865 affected_rows,
866 id = format!("{:?}", plan.id),
867 kind = format!("{:?}", plan.kind),
868 updates = plan.updates.len(),
869 returning = plan.returning.len(),
870 );
871
872 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
873 id: plan.id,
874 rows: TableData::Rows(plan.updates),
875 }]))?;
876 if !plan.returning.is_empty() {
877 let finishing = RowSetFinishing {
878 order_by: Vec::new(),
879 limit: None,
880 offset: 0,
881 project: (0..plan.returning[0].0.iter().count()).collect(),
882 };
883 let max_returned_query_size = session.vars().max_query_result_size();
884 let duration_histogram = session.metrics().row_set_finishing_seconds();
885
886 return match finishing.finish(
887 RowCollection::new(plan.returning, &finishing.order_by),
888 plan.max_result_size,
889 Some(max_returned_query_size),
890 duration_histogram,
891 ) {
892 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
893 Err(e) => Err(AdapterError::ResultSize(e)),
894 };
895 }
896 Ok(match plan.kind {
897 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
898 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
899 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
900 })
901 }
902}