1#![allow(clippy::pub_use)]
12
13use futures::FutureExt;
16use futures::future::LocalBoxFuture;
17use inner::return_if_err;
18use mz_expr::row::RowCollection;
19use mz_expr::{MirRelationExpr, RowSetFinishing};
20use mz_ore::tracing::OpenTelemetryContext;
21use mz_repr::{CatalogItemId, Diff, GlobalId};
22use mz_sql::catalog::CatalogError;
23use mz_sql::names::ResolvedIds;
24use mz_sql::plan::{
25 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
26 CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
27};
28use mz_sql::rbac;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast::{Raw, Statement};
31use mz_storage_client::client::TableData;
32use mz_storage_types::connections::inline::IntoInlineConnection;
33use std::collections::BTreeSet;
34use std::sync::Arc;
35use tokio::sync::oneshot;
36use tracing::{Instrument, Level, Span, event};
37
38use crate::ExecuteContext;
39use crate::catalog::Catalog;
40use crate::command::{Command, ExecuteResponse, Response};
41use crate::coord::appends::{DeferredOp, DeferredPlan};
42use crate::coord::validity::PlanValidity;
43use crate::coord::{
44 Coordinator, DeferredPlanStatement, Message, PlanStatement, TargetCluster, catalog_serving,
45};
46use crate::error::AdapterError;
47use crate::notice::AdapterNotice;
48use crate::session::{
49 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
50};
51use crate::util::ClientTransmitter;
52
53mod inner;
70
71impl Coordinator {
72 pub(crate) fn sequence_plan(
76 &mut self,
77 mut ctx: ExecuteContext,
78 plan: Plan,
79 resolved_ids: ResolvedIds,
80 ) -> LocalBoxFuture<'_, ()> {
81 async move {
82 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
83 ctx.tx_mut().set_allowed(responses);
84
85 if self.controller.read_only() && !plan.allowed_in_read_only() {
86 ctx.retire(Err(AdapterError::ReadOnly));
87 return;
88 }
89
90 if let Some((dependencies, wait_future)) =
93 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
94 {
95 let conn_id = ctx.session().conn_id();
96 tracing::debug!(%conn_id, "deferring plan for startup appends");
97
98 let role_metadata = ctx.session().role_metadata().clone();
99 let validity = PlanValidity::new(
100 self.catalog.transient_revision(),
101 dependencies,
102 None,
103 None,
104 role_metadata,
105 );
106 let deferred_plan = DeferredPlan {
107 ctx,
108 plan,
109 validity,
110 requires_locks: BTreeSet::default(),
111 };
112 let acquire_future = wait_future.map(|()| None);
115
116 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
117
118 return;
121 };
122
123 let target_cluster = match ctx.session().transaction().cluster() {
125 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
127 None => {
129 let session_catalog = self.catalog.for_session(ctx.session());
130 catalog_serving::auto_run_on_catalog_server(
131 &session_catalog,
132 ctx.session(),
133 &plan,
134 )
135 }
136 };
137 let (target_cluster_id, target_cluster_name) = match self
138 .catalog()
139 .resolve_target_cluster(target_cluster, ctx.session())
140 {
141 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
142 Err(_) => (None, None),
143 };
144
145 if let (Some(cluster_id), Some(statement_id)) =
146 (target_cluster_id, ctx.extra().contents())
147 {
148 self.set_statement_execution_cluster(statement_id, cluster_id);
149 }
150
151 let session_catalog = self.catalog.for_session(ctx.session());
152
153 if let Some(cluster_name) = &target_cluster_name {
154 if let Err(e) = catalog_serving::check_cluster_restrictions(
155 cluster_name,
156 &session_catalog,
157 &plan,
158 ) {
159 return ctx.retire(Err(e));
160 }
161 }
162
163 if let Err(e) = rbac::check_plan(
164 &session_catalog,
165 |id| {
166 self.active_conns()
169 .into_iter()
170 .find(|(conn_id, _)| conn_id.unhandled() == id)
171 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
172 },
173 ctx.session(),
174 &plan,
175 target_cluster_id,
176 &resolved_ids,
177 ) {
178 return ctx.retire(Err(e.into()));
179 }
180
181 match plan {
182 Plan::CreateSource(plan) => {
183 let id_ts = self.get_catalog_write_ts().await;
184 let (item_id, global_id) =
185 return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
186 let result = self
187 .sequence_create_source(
188 &mut ctx,
189 vec![CreateSourcePlanBundle {
190 item_id,
191 global_id,
192 plan,
193 resolved_ids,
194 available_source_references: None,
195 }],
196 )
197 .await;
198 ctx.retire(result);
199 }
200 Plan::CreateSources(plans) => {
201 assert!(
202 resolved_ids.is_empty(),
203 "each plan has separate resolved_ids"
204 );
205 let result = self.sequence_create_source(&mut ctx, plans).await;
206 ctx.retire(result);
207 }
208 Plan::CreateConnection(plan) => {
209 self.sequence_create_connection(ctx, plan, resolved_ids)
210 .await;
211 }
212 Plan::CreateDatabase(plan) => {
213 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
214 ctx.retire(result);
215 }
216 Plan::CreateSchema(plan) => {
217 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
218 ctx.retire(result);
219 }
220 Plan::CreateRole(plan) => {
221 let result = self
222 .sequence_create_role(Some(ctx.session().conn_id()), plan)
223 .await;
224 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
225 ctx.session().add_notice(notice);
226 }
227 ctx.retire(result);
228 }
229 Plan::CreateCluster(plan) => {
230 let result = self.sequence_create_cluster(ctx.session(), plan).await;
231 ctx.retire(result);
232 }
233 Plan::CreateClusterReplica(plan) => {
234 let result = self
235 .sequence_create_cluster_replica(ctx.session(), plan)
236 .await;
237 ctx.retire(result);
238 }
239 Plan::CreateTable(plan) => {
240 let result = self
241 .sequence_create_table(&mut ctx, plan, resolved_ids)
242 .await;
243 ctx.retire(result);
244 }
245 Plan::CreateSecret(plan) => {
246 self.sequence_create_secret(ctx, plan).await;
247 }
248 Plan::CreateSink(plan) => {
249 self.sequence_create_sink(ctx, plan, resolved_ids).await;
250 }
251 Plan::CreateView(plan) => {
252 self.sequence_create_view(ctx, plan, resolved_ids).await;
253 }
254 Plan::CreateMaterializedView(plan) => {
255 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
256 .await;
257 }
258 Plan::CreateContinualTask(plan) => {
259 let res = self
260 .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
261 .await;
262 ctx.retire(res);
263 }
264 Plan::CreateIndex(plan) => {
265 self.sequence_create_index(ctx, plan, resolved_ids).await;
266 }
267 Plan::CreateType(plan) => {
268 let result = self
269 .sequence_create_type(ctx.session(), plan, resolved_ids)
270 .await;
271 ctx.retire(result);
272 }
273 Plan::CreateNetworkPolicy(plan) => {
274 let res = self
275 .sequence_create_network_policy(ctx.session(), plan)
276 .await;
277 ctx.retire(res);
278 }
279 Plan::Comment(plan) => {
280 let result = self.sequence_comment_on(ctx.session(), plan).await;
281 ctx.retire(result);
282 }
283 Plan::CopyTo(plan) => {
284 self.sequence_copy_to(ctx, plan, target_cluster).await;
285 }
286 Plan::DropObjects(plan) => {
287 let result = self.sequence_drop_objects(ctx.session_mut(), plan).await;
288 ctx.retire(result);
289 }
290 Plan::DropOwned(plan) => {
291 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
292 ctx.retire(result);
293 }
294 Plan::EmptyQuery => {
295 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
296 }
297 Plan::ShowAllVariables => {
298 let result = self.sequence_show_all_variables(ctx.session());
299 ctx.retire(result);
300 }
301 Plan::ShowVariable(plan) => {
302 let result = self.sequence_show_variable(ctx.session(), plan);
303 ctx.retire(result);
304 }
305 Plan::InspectShard(plan) => {
306 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
308 ctx.retire(result);
309 }
310 Plan::SetVariable(plan) => {
311 let result = self.sequence_set_variable(ctx.session_mut(), plan);
312 ctx.retire(result);
313 }
314 Plan::ResetVariable(plan) => {
315 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
316 ctx.retire(result);
317 }
318 Plan::SetTransaction(plan) => {
319 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
320 ctx.retire(result);
321 }
322 Plan::StartTransaction(plan) => {
323 if matches!(
324 ctx.session().transaction(),
325 TransactionStatus::InTransaction(_)
326 ) {
327 ctx.session()
328 .add_notice(AdapterNotice::ExistingTransactionInProgress);
329 }
330 let result = ctx.session_mut().start_transaction(
331 self.now_datetime(),
332 plan.access,
333 plan.isolation_level,
334 );
335 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
336 }
337 Plan::CommitTransaction(CommitTransactionPlan {
338 ref transaction_type,
339 })
340 | Plan::AbortTransaction(AbortTransactionPlan {
341 ref transaction_type,
342 }) => {
343 if ctx.session().transaction().is_ddl() {
346 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
347 let prev = self
348 .active_conns
349 .get_mut(ctx.session().conn_id())
350 .expect("connection must exist")
351 .deferred_lock
352 .replace(guard);
353 assert!(
354 prev.is_none(),
355 "connections should have at most one lock guard"
356 );
357 } else {
358 self.serialized_ddl.push_back(DeferredPlanStatement {
359 ctx,
360 ps: PlanStatement::Plan { plan, resolved_ids },
361 });
362 return;
363 }
364 }
365
366 let action = match &plan {
367 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
368 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
369 _ => unreachable!(),
370 };
371 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
372 {
373 ctx.session().add_notice(
378 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
379 );
380 }
381 self.sequence_end_transaction(ctx, action).await;
382 }
383 Plan::Select(plan) => {
384 let max = Some(ctx.session().vars().max_query_result_size());
385 self.sequence_peek(ctx, plan, target_cluster, max).await;
386 }
387 Plan::Subscribe(plan) => {
388 self.sequence_subscribe(ctx, plan, target_cluster).await;
389 }
390 Plan::SideEffectingFunc(plan) => {
391 self.sequence_side_effecting_func(ctx, plan).await;
392 }
393 Plan::ShowCreate(plan) => {
394 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
395 }
396 Plan::ShowColumns(show_columns_plan) => {
397 let max = Some(ctx.session().vars().max_query_result_size());
398 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
399 .await;
400 }
401 Plan::CopyFrom(plan) => match plan.source {
402 CopyFromSource::Stdin => {
403 let (tx, _, session, ctx_extra) = ctx.into_parts();
404 tx.send(
405 Ok(ExecuteResponse::CopyFrom {
406 id: plan.id,
407 columns: plan.columns,
408 params: plan.params,
409 ctx_extra,
410 }),
411 session,
412 );
413 }
414 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
415 self.sequence_copy_from(ctx, plan, target_cluster).await;
416 }
417 },
418 Plan::ExplainPlan(plan) => {
419 self.sequence_explain_plan(ctx, plan, target_cluster).await;
420 }
421 Plan::ExplainPushdown(plan) => {
422 self.sequence_explain_pushdown(ctx, plan, target_cluster)
423 .await;
424 }
425 Plan::ExplainSinkSchema(plan) => {
426 let result = self.sequence_explain_schema(plan);
427 ctx.retire(result);
428 }
429 Plan::ExplainTimestamp(plan) => {
430 self.sequence_explain_timestamp(ctx, plan, target_cluster)
431 .await;
432 }
433 Plan::Insert(plan) => {
434 self.sequence_insert(ctx, plan).await;
435 }
436 Plan::ReadThenWrite(plan) => {
437 self.sequence_read_then_write(ctx, plan).await;
438 }
439 Plan::AlterNoop(plan) => {
440 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
441 }
442 Plan::AlterCluster(plan) => {
443 self.sequence_alter_cluster_staged(ctx, plan).await;
444 }
445 Plan::AlterClusterRename(plan) => {
446 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
447 ctx.retire(result);
448 }
449 Plan::AlterClusterSwap(plan) => {
450 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
451 ctx.retire(result);
452 }
453 Plan::AlterClusterReplicaRename(plan) => {
454 let result = self
455 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
456 .await;
457 ctx.retire(result);
458 }
459 Plan::AlterConnection(plan) => {
460 self.sequence_alter_connection(ctx, plan).await;
461 }
462 Plan::AlterSetCluster(plan) => {
463 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
464 ctx.retire(result);
465 }
466 Plan::AlterRetainHistory(plan) => {
467 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
468 ctx.retire(result);
469 }
470 Plan::AlterItemRename(plan) => {
471 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
472 ctx.retire(result);
473 }
474 Plan::AlterSchemaRename(plan) => {
475 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
476 ctx.retire(result);
477 }
478 Plan::AlterSchemaSwap(plan) => {
479 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
480 ctx.retire(result);
481 }
482 Plan::AlterRole(plan) => {
483 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
484 ctx.retire(result);
485 }
486 Plan::AlterSecret(plan) => {
487 self.sequence_alter_secret(ctx, plan).await;
488 }
489 Plan::AlterSink(plan) => {
490 self.sequence_alter_sink_prepare(ctx, plan).await;
491 }
492 Plan::AlterSource(plan) => {
493 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
494 ctx.retire(result);
495 }
496 Plan::AlterSystemSet(plan) => {
497 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
498 ctx.retire(result);
499 }
500 Plan::AlterSystemReset(plan) => {
501 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
502 ctx.retire(result);
503 }
504 Plan::AlterSystemResetAll(plan) => {
505 let result = self
506 .sequence_alter_system_reset_all(ctx.session(), plan)
507 .await;
508 ctx.retire(result);
509 }
510 Plan::AlterTableAddColumn(plan) => {
511 let result = self.sequence_alter_table(&mut ctx, plan).await;
512 ctx.retire(result);
513 }
514 Plan::AlterNetworkPolicy(plan) => {
515 let res = self
516 .sequence_alter_network_policy(ctx.session(), plan)
517 .await;
518 ctx.retire(res);
519 }
520 Plan::DiscardTemp => {
521 self.drop_temp_items(ctx.session().conn_id()).await;
522 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
523 }
524 Plan::DiscardAll => {
525 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
526 self.clear_transaction(ctx.session_mut()).await;
527 self.drop_temp_items(ctx.session().conn_id()).await;
528 ctx.session_mut().reset();
529 Ok(ExecuteResponse::DiscardedAll)
530 } else {
531 Err(AdapterError::OperationProhibitsTransaction(
532 "DISCARD ALL".into(),
533 ))
534 };
535 ctx.retire(ret);
536 }
537 Plan::Declare(plan) => {
538 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
539 }
540 Plan::Fetch(FetchPlan {
541 name,
542 count,
543 timeout,
544 }) => {
545 let ctx_extra = std::mem::take(ctx.extra_mut());
546 ctx.retire(Ok(ExecuteResponse::Fetch {
547 name,
548 count,
549 timeout,
550 ctx_extra,
551 }));
552 }
553 Plan::Close(plan) => {
554 if ctx.session_mut().remove_portal(&plan.name) {
555 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
556 } else {
557 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
558 }
559 }
560 Plan::Prepare(plan) => {
561 if ctx
562 .session()
563 .get_prepared_statement_unverified(&plan.name)
564 .is_some()
565 {
566 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
567 } else {
568 let state_revision = StateRevision {
569 catalog_revision: self.catalog().transient_revision(),
570 session_state_revision: ctx.session().state_revision(),
571 };
572 ctx.session_mut().set_prepared_statement(
573 plan.name,
574 Some(plan.stmt),
575 plan.sql,
576 plan.desc,
577 state_revision,
578 self.now(),
579 );
580 ctx.retire(Ok(ExecuteResponse::Prepare));
581 }
582 }
583 Plan::Execute(plan) => {
584 match self.sequence_execute(ctx.session_mut(), plan) {
585 Ok(portal_name) => {
586 let (tx, _, session, extra) = ctx.into_parts();
587 self.internal_cmd_tx
588 .send(Message::Command(
589 OpenTelemetryContext::obtain(),
590 Command::Execute {
591 portal_name,
592 session,
593 tx: tx.take(),
594 outer_ctx_extra: Some(extra),
595 },
596 ))
597 .expect("sending to self.internal_cmd_tx cannot fail");
598 }
599 Err(err) => ctx.retire(Err(err)),
600 };
601 }
602 Plan::Deallocate(plan) => match plan.name {
603 Some(name) => {
604 if ctx.session_mut().remove_prepared_statement(&name) {
605 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
606 } else {
607 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
608 }
609 }
610 None => {
611 ctx.session_mut().remove_all_prepared_statements();
612 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
613 }
614 },
615 Plan::Raise(RaisePlan { severity }) => {
616 ctx.session()
617 .add_notice(AdapterNotice::UserRequested { severity });
618 ctx.retire(Ok(ExecuteResponse::Raised));
619 }
620 Plan::GrantPrivileges(plan) => {
621 let result = self
622 .sequence_grant_privileges(ctx.session_mut(), plan)
623 .await;
624 ctx.retire(result);
625 }
626 Plan::RevokePrivileges(plan) => {
627 let result = self
628 .sequence_revoke_privileges(ctx.session_mut(), plan)
629 .await;
630 ctx.retire(result);
631 }
632 Plan::AlterDefaultPrivileges(plan) => {
633 let result = self
634 .sequence_alter_default_privileges(ctx.session_mut(), plan)
635 .await;
636 ctx.retire(result);
637 }
638 Plan::GrantRole(plan) => {
639 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
640 ctx.retire(result);
641 }
642 Plan::RevokeRole(plan) => {
643 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
644 ctx.retire(result);
645 }
646 Plan::AlterOwner(plan) => {
647 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
648 ctx.retire(result);
649 }
650 Plan::ReassignOwned(plan) => {
651 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
652 ctx.retire(result);
653 }
654 Plan::ValidateConnection(plan) => {
655 let connection = plan
656 .connection
657 .into_inline_connection(self.catalog().state());
658 let current_storage_configuration = self.controller.storage.config().clone();
659 mz_ore::task::spawn(|| "coord::validate_connection", async move {
660 let res = match connection
661 .validate(plan.id, ¤t_storage_configuration)
662 .await
663 {
664 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
665 Err(err) => Err(err.into()),
666 };
667 ctx.retire(res);
668 });
669 }
670 }
671 }
672 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
673 .boxed_local()
674 }
675
676 #[mz_ore::instrument(level = "debug")]
677 pub(crate) async fn sequence_execute_single_statement_transaction(
678 &mut self,
679 ctx: ExecuteContext,
680 stmt: Arc<Statement<Raw>>,
681 params: Params,
682 ) {
683 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
685 assert!(matches!(session.transaction(), TransactionStatus::Default));
686 session.start_transaction_single_stmt(self.now_datetime());
687 let conn_id = session.conn_id().unhandled();
688
689 let (sub_tx, sub_rx) = oneshot::channel();
691 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
692 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
693 self.handle_execute_inner(stmt, params, sub_ctx).await;
694
695 let internal_cmd_tx = self.internal_cmd_tx.clone();
698 mz_ore::task::spawn(
699 || format!("execute_single_statement:{conn_id}"),
700 async move {
701 let Ok(Response {
702 result,
703 session,
704 otel_ctx,
705 }) = sub_rx.await
706 else {
707 return;
709 };
710 otel_ctx.attach_as_parent();
711 let (sub_tx, sub_rx) = oneshot::channel();
712 let _ = internal_cmd_tx.send(Message::Command(
713 otel_ctx,
714 Command::Commit {
715 action: EndTransactionAction::Commit,
716 session,
717 tx: sub_tx,
718 },
719 ));
720 let Ok(commit_response) = sub_rx.await else {
721 return;
723 };
724 assert!(matches!(
725 commit_response.session.transaction(),
726 TransactionStatus::Default
727 ));
728 let result = match (result, commit_response.result) {
734 (Ok(_), commit) => commit,
735 (Err(result), _) => Err(result),
736 };
737 tx.send(result, commit_response.session);
740 }
741 .instrument(Span::current()),
742 );
743 }
744
745 #[mz_ore::instrument(level = "debug")]
749 pub(crate) async fn sequence_create_role_for_startup(
750 &mut self,
751 plan: CreateRolePlan,
752 ) -> Result<ExecuteResponse, AdapterError> {
753 self.sequence_create_role(None, plan).await
760 }
761
762 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
763 self.transient_id_gen.allocate_id()
764 }
765
766 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
767 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
768 Some(AdapterNotice::RbacUserDisabled)
769 } else {
770 None
771 }
772 }
773
774 pub(crate) fn insert_constant(
780 catalog: &Catalog,
781 session: &mut Session,
782 id: CatalogItemId,
783 constants: MirRelationExpr,
784 ) -> Result<ExecuteResponse, AdapterError> {
785 let desc = match catalog.try_get_entry(&id) {
787 Some(table) => {
788 let full_name = catalog.resolve_full_name(table.name(), Some(session.conn_id()));
789 table.desc_latest(&full_name)?
791 }
792 None => {
793 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
794 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
795 id.to_string(),
796 )),
797 }));
798 }
799 };
800
801 match constants.as_const() {
802 Some((rows, ..)) => {
803 let rows = rows.clone()?;
804 for (row, _) in &rows {
805 for (i, datum) in row.iter().enumerate() {
806 desc.constraints_met(i, &datum)?;
807 }
808 }
809 let diffs_plan = plan::SendDiffsPlan {
810 id,
811 updates: rows,
812 kind: MutationKind::Insert,
813 returning: Vec::new(),
814 max_result_size: catalog.system_config().max_result_size(),
815 };
816 Self::send_diffs(session, diffs_plan)
817 }
818 None => panic!(
819 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
820 constants.pretty(),
821 ),
822 }
823 }
824
825 #[mz_ore::instrument(level = "debug")]
826 pub(crate) fn send_diffs(
827 session: &mut Session,
828 mut plan: plan::SendDiffsPlan,
829 ) -> Result<ExecuteResponse, AdapterError> {
830 let affected_rows = {
831 let mut affected_rows = Diff::from(0);
832 let mut all_positive_diffs = true;
833 for (_, diff) in plan.updates.iter() {
836 if diff.is_negative() {
837 all_positive_diffs = false;
838 break;
839 }
840
841 affected_rows += diff;
842 }
843
844 if !all_positive_diffs {
845 differential_dataflow::consolidation::consolidate(&mut plan.updates);
849
850 affected_rows = Diff::ZERO;
851 for (_, diff) in plan.updates.iter() {
856 affected_rows += diff.abs();
857 }
858 }
859
860 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
861 };
862 event!(
863 Level::TRACE,
864 affected_rows,
865 id = format!("{:?}", plan.id),
866 kind = format!("{:?}", plan.kind),
867 updates = plan.updates.len(),
868 returning = plan.returning.len(),
869 );
870
871 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
872 id: plan.id,
873 rows: TableData::Rows(plan.updates),
874 }]))?;
875 if !plan.returning.is_empty() {
876 let finishing = RowSetFinishing {
877 order_by: Vec::new(),
878 limit: None,
879 offset: 0,
880 project: (0..plan.returning[0].0.iter().count()).collect(),
881 };
882 let max_returned_query_size = session.vars().max_query_result_size();
883 let duration_histogram = session.metrics().row_set_finishing_seconds();
884
885 return match finishing.finish(
886 RowCollection::new(plan.returning, &finishing.order_by),
887 plan.max_result_size,
888 Some(max_returned_query_size),
889 duration_histogram,
890 ) {
891 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
892 Err(e) => Err(AdapterError::ResultSize(e)),
893 };
894 }
895 Ok(match plan.kind {
896 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
897 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
898 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
899 })
900 }
901}