1use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39 self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40 MAX_CONTINUAL_TASKS, MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS,
41 MAX_MATERIALIZED_VIEWS, MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA,
42 MAX_POSTGRES_CONNECTIONS, MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE,
43 MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63 #[instrument(name = "coord::catalog_transact")]
65 pub(crate) async fn catalog_transact(
66 &mut self,
67 session: Option<&Session>,
68 ops: Vec<catalog::Op>,
69 ) -> Result<(), AdapterError> {
70 let start = Instant::now();
71 let result = self
72 .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73 .await;
74 self.metrics
75 .catalog_transact_seconds
76 .with_label_values(&["catalog_transact"])
77 .observe(start.elapsed().as_secs_f64());
78 result
79 }
80
81 #[instrument(name = "coord::catalog_transact_with_side_effects")]
90 pub(crate) async fn catalog_transact_with_side_effects<F>(
91 &mut self,
92 mut ctx: Option<&mut ExecuteContext>,
93 ops: Vec<catalog::Op>,
94 side_effect: F,
95 ) -> Result<(), AdapterError>
96 where
97 F: for<'a> FnOnce(
98 &'a mut Coordinator,
99 Option<&'a mut ExecuteContext>,
100 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101 + 'static,
102 {
103 let start = Instant::now();
104
105 let (table_updates, catalog_updates) = self
106 .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107 .await?;
108
109 let apply_implications_res = self
112 .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113 .await;
114
115 apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120 mz_ore::soft_assert_eq_no_log!(
123 self.check_consistency(),
124 Ok(()),
125 "coordinator inconsistency detected"
126 );
127
128 let side_effects_fut = side_effect(self, ctx);
129
130 let ((), ()) = futures::future::join(
132 side_effects_fut.instrument(info_span!(
133 "coord::catalog_transact_with_side_effects::side_effects_fut"
134 )),
135 table_updates.instrument(info_span!(
136 "coord::catalog_transact_with_side_effects::table_updates"
137 )),
138 )
139 .await;
140
141 self.metrics
142 .catalog_transact_seconds
143 .with_label_values(&["catalog_transact_with_side_effects"])
144 .observe(start.elapsed().as_secs_f64());
145
146 Ok(())
147 }
148
149 #[instrument(name = "coord::catalog_transact_with_context")]
157 pub(crate) async fn catalog_transact_with_context(
158 &mut self,
159 conn_id: Option<&ConnectionId>,
160 ctx: Option<&mut ExecuteContext>,
161 ops: Vec<catalog::Op>,
162 ) -> Result<(), AdapterError> {
163 let start = Instant::now();
164
165 let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167 let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169 let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171 let (combined_apply_res, ()) = futures::future::join(
173 apply_catalog_implications_fut.instrument(info_span!(
174 "coord::catalog_transact_with_context::side_effects_fut"
175 )),
176 table_updates.instrument(info_span!(
177 "coord::catalog_transact_with_context::table_updates"
178 )),
179 )
180 .await;
181
182 combined_apply_res.expect("cannot fail to apply catalog implications");
186
187 mz_ore::soft_assert_eq_no_log!(
190 self.check_consistency(),
191 Ok(()),
192 "coordinator inconsistency detected"
193 );
194
195 self.metrics
196 .catalog_transact_seconds
197 .with_label_values(&["catalog_transact_with_context"])
198 .observe(start.elapsed().as_secs_f64());
199
200 Ok(())
201 }
202
203 #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206 pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207 &mut self,
208 ctx: &mut ExecuteContext,
209 ops: Vec<catalog::Op>,
210 side_effect: F,
211 ) -> Result<(), AdapterError>
212 where
213 F: for<'a> FnOnce(
214 &'a mut Coordinator,
215 Option<&'a mut ExecuteContext>,
216 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217 + Send
218 + Sync
219 + 'static,
220 {
221 let start = Instant::now();
222
223 let Some(Transaction {
224 ops:
225 TransactionOps::DDL {
226 ops: txn_ops,
227 revision: txn_revision,
228 side_effects: _,
229 state: _,
230 },
231 ..
232 }) = ctx.session().transaction().inner()
233 else {
234 let result = self
235 .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
236 .await;
237 self.metrics
238 .catalog_transact_seconds
239 .with_label_values(&["catalog_transact_with_ddl_transaction"])
240 .observe(start.elapsed().as_secs_f64());
241 return result;
242 };
243
244 if self.catalog().transient_revision() != *txn_revision {
246 self.metrics
247 .catalog_transact_seconds
248 .with_label_values(&["catalog_transact_with_ddl_transaction"])
249 .observe(start.elapsed().as_secs_f64());
250 return Err(AdapterError::DDLTransactionRace);
251 }
252
253 let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
255 all_ops.extend(txn_ops.iter().cloned());
256 all_ops.extend(ops.clone());
257 all_ops.push(Op::TransactionDryRun);
258
259 let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
261
262 let result = match result {
263 Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
265 ctx.session_mut()
268 .transaction_mut()
269 .add_ops(TransactionOps::DDL {
270 ops: new_ops,
271 state: new_state,
272 side_effects: vec![Box::new(side_effect)],
273 revision: self.catalog().transient_revision(),
274 })?;
275 Ok(())
276 }
277 Ok(_) => unreachable!("unexpected success!"),
278 Err(e) => Err(e),
279 };
280
281 self.metrics
282 .catalog_transact_seconds
283 .with_label_values(&["catalog_transact_with_ddl_transaction"])
284 .observe(start.elapsed().as_secs_f64());
285
286 result
287 }
288
289 #[instrument(name = "coord::catalog_transact_inner")]
293 pub(crate) async fn catalog_transact_inner(
294 &mut self,
295 conn_id: Option<&ConnectionId>,
296 ops: Vec<catalog::Op>,
297 ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
298 if self.controller.read_only() {
299 return Err(AdapterError::ReadOnly);
300 }
301
302 event!(Level::TRACE, ops = format!("{:?}", ops));
303
304 let mut webhook_sources_to_restart = BTreeSet::new();
305 let mut clusters_to_drop = vec![];
306 let mut cluster_replicas_to_drop = vec![];
307 let mut clusters_to_create = vec![];
308 let mut cluster_replicas_to_create = vec![];
309 let mut update_metrics_config = false;
310 let mut update_tracing_config = false;
311 let mut update_controller_config = false;
312 let mut update_compute_config = false;
313 let mut update_storage_config = false;
314 let mut update_timestamp_oracle_config = false;
315 let mut update_metrics_retention = false;
316 let mut update_secrets_caching_config = false;
317 let mut update_cluster_scheduling_config = false;
318 let mut update_http_config = false;
319 let mut update_advance_timelines_interval = false;
320
321 for op in &ops {
322 match op {
323 catalog::Op::DropObjects(drop_object_infos) => {
324 for drop_object_info in drop_object_infos {
325 match &drop_object_info {
326 catalog::DropObjectInfo::Item(_) => {
327 }
330 catalog::DropObjectInfo::Cluster(id) => {
331 clusters_to_drop.push(*id);
332 }
333 catalog::DropObjectInfo::ClusterReplica((
334 cluster_id,
335 replica_id,
336 _reason,
337 )) => {
338 cluster_replicas_to_drop.push((*cluster_id, *replica_id));
340 }
341 _ => (),
342 }
343 }
344 }
345 catalog::Op::ResetSystemConfiguration { name }
346 | catalog::Op::UpdateSystemConfiguration { name, .. } => {
347 update_metrics_config |= self
348 .catalog
349 .state()
350 .system_config()
351 .is_metrics_config_var(name);
352 update_tracing_config |= vars::is_tracing_var(name);
353 update_controller_config |= self
354 .catalog
355 .state()
356 .system_config()
357 .is_controller_config_var(name);
358 update_compute_config |= self
359 .catalog
360 .state()
361 .system_config()
362 .is_compute_config_var(name);
363 update_storage_config |= self
364 .catalog
365 .state()
366 .system_config()
367 .is_storage_config_var(name);
368 update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
369 update_metrics_retention |= name == vars::METRICS_RETENTION.name();
370 update_secrets_caching_config |= vars::is_secrets_caching_var(name);
371 update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
372 update_http_config |= vars::is_http_config_var(name);
373 update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
374 }
375 catalog::Op::ResetAllSystemConfiguration => {
376 update_tracing_config = true;
380 update_controller_config = true;
381 update_compute_config = true;
382 update_storage_config = true;
383 update_timestamp_oracle_config = true;
384 update_metrics_retention = true;
385 update_secrets_caching_config = true;
386 update_cluster_scheduling_config = true;
387 update_http_config = true;
388 update_advance_timelines_interval = true;
389 }
390 catalog::Op::RenameItem { id, .. } => {
391 let item = self.catalog().get_entry(id);
392 let is_webhook_source = item
393 .source()
394 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
395 .unwrap_or(false);
396 if is_webhook_source {
397 webhook_sources_to_restart.insert(*id);
398 }
399 }
400 catalog::Op::RenameSchema {
401 database_spec,
402 schema_spec,
403 ..
404 } => {
405 let schema = self.catalog().get_schema(
406 database_spec,
407 schema_spec,
408 conn_id.unwrap_or(&SYSTEM_CONN_ID),
409 );
410 let webhook_sources = schema.item_ids().filter(|id| {
411 let item = self.catalog().get_entry(id);
412 item.source()
413 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
414 .unwrap_or(false)
415 });
416 webhook_sources_to_restart.extend(webhook_sources);
417 }
418 catalog::Op::CreateCluster { id, .. } => {
419 clusters_to_create.push(*id);
420 }
421 catalog::Op::CreateClusterReplica {
422 cluster_id,
423 name,
424 config,
425 ..
426 } => {
427 cluster_replicas_to_create.push((
428 *cluster_id,
429 name.clone(),
430 config.location.num_processes(),
431 ));
432 }
433 _ => (),
434 }
435 }
436
437 self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
438
439 let oracle_write_ts = self.get_local_write_ts().await.timestamp;
448
449 let Coordinator {
450 catalog,
451 active_conns,
452 controller,
453 cluster_replica_statuses,
454 ..
455 } = self;
456 let catalog = Arc::make_mut(catalog);
457 let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
458
459 let TransactionResult {
460 builtin_table_updates,
461 catalog_updates,
462 audit_events,
463 } = catalog
464 .transact(
465 Some(&mut controller.storage_collections),
466 oracle_write_ts,
467 conn,
468 ops,
469 )
470 .await?;
471
472 for (cluster_id, replica_id) in &cluster_replicas_to_drop {
473 cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
474 }
475 for cluster_id in &clusters_to_drop {
476 cluster_replica_statuses.remove_cluster_statuses(cluster_id);
477 }
478 for cluster_id in clusters_to_create {
479 cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
480 }
481 let now = to_datetime((catalog.config().now)());
482 for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
483 let replica_id = catalog
484 .resolve_replica_in_cluster(&cluster_id, &replica_name)
485 .expect("just created")
486 .replica_id();
487 cluster_replica_statuses.initialize_cluster_replica_statuses(
488 cluster_id,
489 replica_id,
490 num_processes,
491 now,
492 );
493 }
494
495 let (builtin_update_notify, _) = self
498 .builtin_table_update()
499 .execute(builtin_table_updates)
500 .await;
501
502 let _: () = async {
505 if !webhook_sources_to_restart.is_empty() {
506 self.restart_webhook_sources(webhook_sources_to_restart);
507 }
508
509 if update_metrics_config {
510 mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
511 }
512 if update_controller_config {
513 self.update_controller_config();
514 }
515 if update_compute_config {
516 self.update_compute_config();
517 }
518 if update_storage_config {
519 self.update_storage_config();
520 }
521 if update_timestamp_oracle_config {
522 self.update_timestamp_oracle_config();
523 }
524 if update_metrics_retention {
525 self.update_metrics_retention();
526 }
527 if update_tracing_config {
528 self.update_tracing_config();
529 }
530 if update_secrets_caching_config {
531 self.update_secrets_caching_config();
532 }
533 if update_cluster_scheduling_config {
534 self.update_cluster_scheduling_config();
535 }
536 if update_http_config {
537 self.update_http_config();
538 }
539 if update_advance_timelines_interval {
540 let new_interval = self.catalog().system_config().default_timestamp_interval();
541 if new_interval != self.advance_timelines_interval.period() {
542 self.advance_timelines_interval = tokio::time::interval(new_interval);
543 }
544 }
545 }
546 .instrument(info_span!("coord::catalog_transact_with::finalize"))
547 .await;
548
549 let conn = conn_id.and_then(|id| self.active_conns.get(id));
550 if let Some(segment_client) = &self.segment_client {
551 for VersionedEvent::V1(event) in audit_events {
552 let event_type = format!(
553 "{} {}",
554 event.object_type.as_title_case(),
555 event.event_type.as_title_case()
556 );
557 segment_client.environment_track(
558 &self.catalog().config().environment_id,
559 event_type,
560 json!({ "details": event.details.as_json() }),
561 EventDetails {
562 user_id: conn
563 .and_then(|c| c.user().external_metadata.as_ref())
564 .map(|m| m.user_id),
565 application_name: conn.map(|c| c.application_name()),
566 ..Default::default()
567 },
568 );
569 }
570 }
571
572 Ok((builtin_update_notify, catalog_updates))
573 }
574
575 pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
576 self.drop_introspection_subscribes(replica_id);
577
578 self.controller
579 .drop_replica(cluster_id, replica_id)
580 .expect("dropping replica must not fail");
581 }
582
583 pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
585 for (item_id, _gid) in &sources {
586 self.active_webhooks.remove(item_id);
587 }
588 let storage_metadata = self.catalog.state().storage_metadata();
589 let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
590 self.controller
591 .storage
592 .drop_sources(storage_metadata, source_gids)
593 .unwrap_or_terminate("cannot fail to drop sources");
594 }
595
596 pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
598 for (item_id, _gid) in &tables {
599 self.active_webhooks.remove(item_id);
600 }
601
602 let storage_metadata = self.catalog.state().storage_metadata();
603 let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
604 self.controller
605 .storage
606 .drop_tables(storage_metadata, table_gids, ts)
607 .unwrap_or_terminate("cannot fail to drop tables");
608 }
609
610 fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
611 for id in sources {
612 self.active_webhooks.remove(&id);
613 }
614 }
615
616 #[must_use]
622 pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
623 self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
624 }
625
626 #[must_use]
635 pub async fn drop_compute_sinks(
636 &mut self,
637 sink_ids: impl IntoIterator<Item = GlobalId>,
638 ) -> BTreeMap<GlobalId, ActiveComputeSink> {
639 let mut by_id = BTreeMap::new();
640 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
641 for sink_id in sink_ids {
642 let sink = match self.remove_active_compute_sink(sink_id).await {
643 None => {
644 tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
645 continue;
646 }
647 Some(sink) => sink,
648 };
649
650 by_cluster
651 .entry(sink.cluster_id())
652 .or_default()
653 .push(sink_id);
654 by_id.insert(sink_id, sink);
655 }
656 for (cluster_id, ids) in by_cluster {
657 let compute = &mut self.controller.compute;
658 if compute.instance_exists(cluster_id) {
660 compute
661 .drop_collections(cluster_id, ids)
662 .unwrap_or_terminate("cannot fail to drop collections");
663 }
664 }
665 by_id
666 }
667
668 pub async fn retire_compute_sinks(
673 &mut self,
674 mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
675 ) {
676 let sink_ids = reasons.keys().cloned();
677 for (id, sink) in self.drop_compute_sinks(sink_ids).await {
678 let reason = reasons
679 .remove(&id)
680 .expect("all returned IDs are in `reasons`");
681 sink.retire(reason);
682 }
683 }
684
685 pub async fn drop_reconfiguration_replicas(
688 &mut self,
689 cluster_ids: BTreeSet<ClusterId>,
690 ) -> Result<(), AdapterError> {
691 let pending_cluster_ops: Vec<Op> = cluster_ids
692 .iter()
693 .map(|c| {
694 self.catalog()
695 .get_cluster(c.clone())
696 .replicas()
697 .filter_map(|r| match r.config.location {
698 ReplicaLocation::Managed(ref l) if l.pending => {
699 Some(DropObjectInfo::ClusterReplica((
700 c.clone(),
701 r.replica_id,
702 ReplicaCreateDropReason::Manual,
703 )))
704 }
705 _ => None,
706 })
707 .collect::<Vec<DropObjectInfo>>()
708 })
709 .filter_map(|pending_replica_drop_ops_by_cluster| {
710 match pending_replica_drop_ops_by_cluster.len() {
711 0 => None,
712 _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
713 }
714 })
715 .collect();
716 if !pending_cluster_ops.is_empty() {
717 self.catalog_transact(None, pending_cluster_ops).await?;
718 }
719 Ok(())
720 }
721
722 #[mz_ore::instrument(level = "debug")]
724 pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
725 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
726 .await
727 }
728
729 #[mz_ore::instrument(level = "debug")]
731 pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
732 &mut self,
733 conn_id: &ConnectionId,
734 ) {
735 self.retire_cluster_reconfigurations_for_conn(conn_id).await
736 }
737
738 #[mz_ore::instrument(level = "debug")]
741 pub(crate) async fn retire_compute_sinks_for_conn(
742 &mut self,
743 conn_id: &ConnectionId,
744 reason: ActiveComputeSinkRetireReason,
745 ) {
746 let drop_sinks = self
747 .active_conns
748 .get_mut(conn_id)
749 .expect("must exist for active session")
750 .drop_sinks
751 .iter()
752 .map(|sink_id| (*sink_id, reason.clone()))
753 .collect();
754 self.retire_compute_sinks(drop_sinks).await;
755 }
756
757 #[mz_ore::instrument(level = "debug")]
759 pub(crate) async fn retire_cluster_reconfigurations_for_conn(
760 &mut self,
761 conn_id: &ConnectionId,
762 ) {
763 let reconfiguring_clusters = self
764 .active_conns
765 .get(conn_id)
766 .expect("must exist for active session")
767 .pending_cluster_alters
768 .clone();
769 self.drop_reconfiguration_replicas(reconfiguring_clusters)
771 .await
772 .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
773
774 self.active_conns
775 .get_mut(conn_id)
776 .expect("must exist for active session")
777 .pending_cluster_alters
778 .clear();
779 }
780
781 pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
782 let storage_metadata = self.catalog.state().storage_metadata();
783 self.controller
784 .storage
785 .drop_sinks(storage_metadata, sink_gids)
786 .unwrap_or_terminate("cannot fail to drop sinks");
787 }
788
789 pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
790 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
791 for (cluster_id, gid) in collections {
792 by_cluster.entry(cluster_id).or_default().push(gid);
793 }
794 for (cluster_id, gids) in by_cluster {
795 let compute = &mut self.controller.compute;
796 if compute.instance_exists(cluster_id) {
798 compute
799 .drop_collections(cluster_id, gids)
800 .unwrap_or_terminate("cannot fail to drop collections");
801 }
802 }
803 }
804
805 pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
806 let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
807 .as_ref()
808 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
809 .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
810 task::spawn(
818 || "drop_vpc_endpoints",
819 async move {
820 for vpc_endpoint in vpc_endpoints {
821 let _ = Retry::default()
822 .max_duration(Duration::from_secs(60))
823 .retry_async(|_state| async {
824 fail_point!("drop_vpc_endpoint", |r| {
825 Err(anyhow::anyhow!("Fail point error {:?}", r))
826 });
827 match cloud_resource_controller
828 .delete_vpc_endpoint(vpc_endpoint)
829 .await
830 {
831 Ok(_) => Ok(()),
832 Err(e) => {
833 warn!("Dropping VPC Endpoints has encountered an error: {}", e);
834 Err(e)
835 }
836 }
837 })
838 .await;
839 }
840 }
841 .instrument(info_span!(
842 "coord::catalog_transact_inner::drop_vpc_endpoints"
843 )),
844 );
845 }
846
847 pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
850 let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
851 let all_items = self.catalog().object_dependents(&temp_items, conn_id);
852
853 if all_items.is_empty() {
854 return;
855 }
856 let op = Op::DropObjects(
857 all_items
858 .into_iter()
859 .map(DropObjectInfo::manual_drop_from_object_id)
860 .collect(),
861 );
862
863 self.catalog_transact_with_context(Some(conn_id), None, vec![op])
864 .await
865 .expect("unable to drop temporary items for conn_id");
866 }
867
868 fn update_cluster_scheduling_config(&self) {
869 let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
870 self.controller
871 .update_orchestrator_scheduling_config(config);
872 }
873
874 fn update_secrets_caching_config(&self) {
875 let config = flags::caching_config(self.catalog.system_config());
876 self.caching_secrets_reader.set_policy(config);
877 }
878
879 fn update_tracing_config(&self) {
880 let tracing = flags::tracing_config(self.catalog().system_config());
881 tracing.apply(&self.tracing_handle);
882 }
883
884 fn update_compute_config(&mut self) {
885 let config_params = flags::compute_config(self.catalog().system_config());
886 self.controller.compute.update_configuration(config_params);
887 }
888
889 fn update_storage_config(&mut self) {
890 let config_params = flags::storage_config(self.catalog().system_config());
891 self.controller.storage.update_parameters(config_params);
892 }
893
894 fn update_timestamp_oracle_config(&self) {
895 let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
896 if let Some(config) = self.timestamp_oracle_config.as_ref() {
897 config.apply_parameters(config_params)
898 }
899 }
900
901 fn update_metrics_retention(&self) {
902 let duration = self.catalog().system_config().metrics_retention();
903 let policy = ReadPolicy::lag_writes_by(
904 Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
905 tracing::error!("Absurd metrics retention duration: {duration:?}.");
906 u64::MAX
907 })),
908 SINCE_GRANULARITY,
909 );
910 let storage_policies = self
911 .catalog()
912 .entries()
913 .filter(|entry| {
914 entry.item().is_retained_metrics_object()
915 && entry.item().is_compute_object_on_cluster().is_none()
916 })
917 .map(|entry| (entry.id(), policy.clone()))
918 .collect::<Vec<_>>();
919 let compute_policies = self
920 .catalog()
921 .entries()
922 .filter_map(|entry| {
923 if let (true, Some(cluster_id)) = (
924 entry.item().is_retained_metrics_object(),
925 entry.item().is_compute_object_on_cluster(),
926 ) {
927 Some((cluster_id, entry.id(), policy.clone()))
928 } else {
929 None
930 }
931 })
932 .collect::<Vec<_>>();
933 self.update_storage_read_policies(storage_policies);
934 self.update_compute_read_policies(compute_policies);
935 }
936
937 fn update_controller_config(&mut self) {
938 let sys_config = self.catalog().system_config();
939 self.controller
940 .update_configuration(sys_config.dyncfg_updates());
941 }
942
943 fn update_http_config(&mut self) {
944 let webhook_request_limit = self
945 .catalog()
946 .system_config()
947 .webhook_concurrent_request_limit();
948 self.webhook_concurrency_limit
949 .set_limit(webhook_request_limit);
950 }
951
952 pub(crate) async fn create_storage_export(
953 &mut self,
954 id: GlobalId,
955 sink: &Sink,
956 ) -> Result<(), AdapterError> {
957 self.controller.storage.check_exists(sink.from)?;
959
960 let id_bundle = crate::CollectionIdBundle {
967 storage_ids: btreeset! {sink.from},
968 compute_ids: btreemap! {},
969 };
970
971 let read_holds = self.acquire_read_holds(&id_bundle);
979 let as_of = read_holds.least_valid_read();
980
981 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
982 let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
983 from: sink.from,
984 from_desc: storage_sink_from_entry
985 .relation_desc()
986 .expect("sinks can only be built on items with descs")
987 .into_owned(),
988 connection: sink
989 .connection
990 .clone()
991 .into_inline_connection(self.catalog().state()),
992 envelope: sink.envelope,
993 as_of,
994 with_snapshot: sink.with_snapshot,
995 version: sink.version,
996 from_storage_metadata: (),
997 to_storage_metadata: (),
998 commit_interval: sink.commit_interval,
999 };
1000
1001 let collection_desc = CollectionDescription {
1002 desc: KAFKA_PROGRESS_DESC.clone(),
1004 data_source: DataSource::Sink {
1005 desc: ExportDescription {
1006 sink: storage_sink_desc,
1007 instance_id: sink.cluster_id,
1008 },
1009 },
1010 since: None,
1011 timeline: None,
1012 primary: None,
1013 };
1014 let collections = vec![(id, collection_desc)];
1015
1016 let storage_metadata = self.catalog.state().storage_metadata();
1018 let res = self
1019 .controller
1020 .storage
1021 .create_collections(storage_metadata, None, collections)
1022 .await;
1023
1024 drop(read_holds);
1027
1028 Ok(res?)
1029 }
1030
1031 fn validate_resource_limits(
1034 &self,
1035 ops: &Vec<catalog::Op>,
1036 conn_id: &ConnectionId,
1037 ) -> Result<(), AdapterError> {
1038 let mut new_kafka_connections = 0;
1039 let mut new_postgres_connections = 0;
1040 let mut new_mysql_connections = 0;
1041 let mut new_sql_server_connections = 0;
1042 let mut new_aws_privatelink_connections = 0;
1043 let mut new_tables = 0;
1044 let mut new_sources = 0;
1045 let mut new_sinks = 0;
1046 let mut new_materialized_views = 0;
1047 let mut new_clusters = 0;
1048 let mut new_replicas_per_cluster = BTreeMap::new();
1049 let mut new_credit_consumption_rate = Numeric::zero();
1050 let mut new_databases = 0;
1051 let mut new_schemas_per_database = BTreeMap::new();
1052 let mut new_objects_per_schema = BTreeMap::new();
1053 let mut new_secrets = 0;
1054 let mut new_roles = 0;
1055 let mut new_continual_tasks = 0;
1056 let mut new_network_policies = 0;
1057 for op in ops {
1058 match op {
1059 Op::CreateDatabase { .. } => {
1060 new_databases += 1;
1061 }
1062 Op::CreateSchema { database_id, .. } => {
1063 if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1064 *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1065 }
1066 }
1067 Op::CreateRole { .. } => {
1068 new_roles += 1;
1069 }
1070 Op::CreateNetworkPolicy { .. } => {
1071 new_network_policies += 1;
1072 }
1073 Op::CreateCluster { .. } => {
1074 new_clusters += 1;
1078 }
1079 Op::CreateClusterReplica {
1080 cluster_id, config, ..
1081 } => {
1082 if cluster_id.is_user() {
1083 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1084 if let ReplicaLocation::Managed(location) = &config.location {
1085 let replica_allocation = self
1086 .catalog()
1087 .cluster_replica_sizes()
1088 .0
1089 .get(location.size_for_billing())
1090 .expect(
1091 "location size is validated against the cluster replica sizes",
1092 );
1093 new_credit_consumption_rate += replica_allocation.credits_per_hour
1094 }
1095 }
1096 }
1097 Op::CreateItem { name, item, .. } => {
1098 *new_objects_per_schema
1099 .entry((
1100 name.qualifiers.database_spec.clone(),
1101 name.qualifiers.schema_spec.clone(),
1102 ))
1103 .or_insert(0) += 1;
1104 match item {
1105 CatalogItem::Connection(connection) => match connection.details {
1106 ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1107 ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1108 ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1109 ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1110 ConnectionDetails::AwsPrivatelink(_) => {
1111 new_aws_privatelink_connections += 1
1112 }
1113 ConnectionDetails::Csr(_)
1114 | ConnectionDetails::Ssh { .. }
1115 | ConnectionDetails::Aws(_)
1116 | ConnectionDetails::IcebergCatalog(_) => {}
1117 },
1118 CatalogItem::Table(_) => {
1119 new_tables += 1;
1120 }
1121 CatalogItem::Source(source) => {
1122 new_sources += source.user_controllable_persist_shard_count()
1123 }
1124 CatalogItem::Sink(_) => new_sinks += 1,
1125 CatalogItem::MaterializedView(_) => {
1126 new_materialized_views += 1;
1127 }
1128 CatalogItem::Secret(_) => {
1129 new_secrets += 1;
1130 }
1131 CatalogItem::ContinualTask(_) => {
1132 new_continual_tasks += 1;
1133 }
1134 CatalogItem::Log(_)
1135 | CatalogItem::View(_)
1136 | CatalogItem::Index(_)
1137 | CatalogItem::Type(_)
1138 | CatalogItem::Func(_) => {}
1139 }
1140 }
1141 Op::DropObjects(drop_object_infos) => {
1142 for drop_object_info in drop_object_infos {
1143 match drop_object_info {
1144 DropObjectInfo::Cluster(_) => {
1145 new_clusters -= 1;
1146 }
1147 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1148 if cluster_id.is_user() {
1149 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1150 let cluster = self
1151 .catalog()
1152 .get_cluster_replica(*cluster_id, *replica_id);
1153 if let ReplicaLocation::Managed(location) =
1154 &cluster.config.location
1155 {
1156 let replica_allocation = self
1157 .catalog()
1158 .cluster_replica_sizes()
1159 .0
1160 .get(location.size_for_billing())
1161 .expect(
1162 "location size is validated against the cluster replica sizes",
1163 );
1164 new_credit_consumption_rate -=
1165 replica_allocation.credits_per_hour
1166 }
1167 }
1168 }
1169 DropObjectInfo::Database(_) => {
1170 new_databases -= 1;
1171 }
1172 DropObjectInfo::Schema((database_spec, _)) => {
1173 if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1174 *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1175 }
1176 }
1177 DropObjectInfo::Role(_) => {
1178 new_roles -= 1;
1179 }
1180 DropObjectInfo::NetworkPolicy(_) => {
1181 new_network_policies -= 1;
1182 }
1183 DropObjectInfo::Item(id) => {
1184 let entry = self.catalog().get_entry(id);
1185 *new_objects_per_schema
1186 .entry((
1187 entry.name().qualifiers.database_spec.clone(),
1188 entry.name().qualifiers.schema_spec.clone(),
1189 ))
1190 .or_insert(0) -= 1;
1191 match entry.item() {
1192 CatalogItem::Connection(connection) => match connection.details
1193 {
1194 ConnectionDetails::AwsPrivatelink(_) => {
1195 new_aws_privatelink_connections -= 1;
1196 }
1197 _ => (),
1198 },
1199 CatalogItem::Table(_) => {
1200 new_tables -= 1;
1201 }
1202 CatalogItem::Source(source) => {
1203 new_sources -=
1204 source.user_controllable_persist_shard_count()
1205 }
1206 CatalogItem::Sink(_) => new_sinks -= 1,
1207 CatalogItem::MaterializedView(_) => {
1208 new_materialized_views -= 1;
1209 }
1210 CatalogItem::Secret(_) => {
1211 new_secrets -= 1;
1212 }
1213 CatalogItem::ContinualTask(_) => {
1214 new_continual_tasks -= 1;
1215 }
1216 CatalogItem::Log(_)
1217 | CatalogItem::View(_)
1218 | CatalogItem::Index(_)
1219 | CatalogItem::Type(_)
1220 | CatalogItem::Func(_) => {}
1221 }
1222 }
1223 }
1224 }
1225 }
1226 Op::UpdateItem {
1227 name: _,
1228 id,
1229 to_item,
1230 } => match to_item {
1231 CatalogItem::Source(source) => {
1232 let current_source = self
1233 .catalog()
1234 .get_entry(id)
1235 .source()
1236 .expect("source update is for source item");
1237
1238 new_sources += source.user_controllable_persist_shard_count()
1239 - current_source.user_controllable_persist_shard_count();
1240 }
1241 CatalogItem::Connection(_)
1242 | CatalogItem::Table(_)
1243 | CatalogItem::Sink(_)
1244 | CatalogItem::MaterializedView(_)
1245 | CatalogItem::Secret(_)
1246 | CatalogItem::Log(_)
1247 | CatalogItem::View(_)
1248 | CatalogItem::Index(_)
1249 | CatalogItem::Type(_)
1250 | CatalogItem::Func(_)
1251 | CatalogItem::ContinualTask(_) => {}
1252 },
1253 Op::AlterRole { .. }
1254 | Op::AlterRetainHistory { .. }
1255 | Op::AlterSourceTimestampInterval { .. }
1256 | Op::AlterNetworkPolicy { .. }
1257 | Op::AlterAddColumn { .. }
1258 | Op::AlterMaterializedViewApplyReplacement { .. }
1259 | Op::UpdatePrivilege { .. }
1260 | Op::UpdateDefaultPrivilege { .. }
1261 | Op::GrantRole { .. }
1262 | Op::RenameCluster { .. }
1263 | Op::RenameClusterReplica { .. }
1264 | Op::RenameItem { .. }
1265 | Op::RenameSchema { .. }
1266 | Op::UpdateOwner { .. }
1267 | Op::RevokeRole { .. }
1268 | Op::UpdateClusterConfig { .. }
1269 | Op::UpdateClusterReplicaConfig { .. }
1270 | Op::UpdateSourceReferences { .. }
1271 | Op::UpdateSystemConfiguration { .. }
1272 | Op::ResetSystemConfiguration { .. }
1273 | Op::ResetAllSystemConfiguration { .. }
1274 | Op::Comment { .. }
1275 | Op::WeirdStorageUsageUpdates { .. }
1276 | Op::TransactionDryRun => {}
1277 }
1278 }
1279
1280 let mut current_aws_privatelink_connections = 0;
1281 let mut current_postgres_connections = 0;
1282 let mut current_mysql_connections = 0;
1283 let mut current_sql_server_connections = 0;
1284 let mut current_kafka_connections = 0;
1285 for c in self.catalog().user_connections() {
1286 let connection = c
1287 .connection()
1288 .expect("`user_connections()` only returns connection objects");
1289
1290 match connection.details {
1291 ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1292 ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1293 ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1294 ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1295 ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1296 ConnectionDetails::Csr(_)
1297 | ConnectionDetails::Ssh { .. }
1298 | ConnectionDetails::Aws(_)
1299 | ConnectionDetails::IcebergCatalog(_) => {}
1300 }
1301 }
1302 self.validate_resource_limit(
1303 current_kafka_connections,
1304 new_kafka_connections,
1305 SystemVars::max_kafka_connections,
1306 "Kafka Connection",
1307 MAX_KAFKA_CONNECTIONS.name(),
1308 )?;
1309 self.validate_resource_limit(
1310 current_postgres_connections,
1311 new_postgres_connections,
1312 SystemVars::max_postgres_connections,
1313 "PostgreSQL Connection",
1314 MAX_POSTGRES_CONNECTIONS.name(),
1315 )?;
1316 self.validate_resource_limit(
1317 current_mysql_connections,
1318 new_mysql_connections,
1319 SystemVars::max_mysql_connections,
1320 "MySQL Connection",
1321 MAX_MYSQL_CONNECTIONS.name(),
1322 )?;
1323 self.validate_resource_limit(
1324 current_sql_server_connections,
1325 new_sql_server_connections,
1326 SystemVars::max_sql_server_connections,
1327 "SQL Server Connection",
1328 MAX_SQL_SERVER_CONNECTIONS.name(),
1329 )?;
1330 self.validate_resource_limit(
1331 current_aws_privatelink_connections,
1332 new_aws_privatelink_connections,
1333 SystemVars::max_aws_privatelink_connections,
1334 "AWS PrivateLink Connection",
1335 MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1336 )?;
1337 self.validate_resource_limit(
1338 self.catalog().user_tables().count(),
1339 new_tables,
1340 SystemVars::max_tables,
1341 "table",
1342 MAX_TABLES.name(),
1343 )?;
1344
1345 let current_sources: usize = self
1346 .catalog()
1347 .user_sources()
1348 .filter_map(|source| source.source())
1349 .map(|source| source.user_controllable_persist_shard_count())
1350 .sum::<i64>()
1351 .try_into()
1352 .expect("non-negative sum of sources");
1353
1354 self.validate_resource_limit(
1355 current_sources,
1356 new_sources,
1357 SystemVars::max_sources,
1358 "source",
1359 MAX_SOURCES.name(),
1360 )?;
1361 self.validate_resource_limit(
1362 self.catalog().user_sinks().count(),
1363 new_sinks,
1364 SystemVars::max_sinks,
1365 "sink",
1366 MAX_SINKS.name(),
1367 )?;
1368 self.validate_resource_limit(
1369 self.catalog().user_materialized_views().count(),
1370 new_materialized_views,
1371 SystemVars::max_materialized_views,
1372 "materialized view",
1373 MAX_MATERIALIZED_VIEWS.name(),
1374 )?;
1375 self.validate_resource_limit(
1376 self.catalog().user_clusters().count(),
1382 new_clusters,
1383 SystemVars::max_clusters,
1384 "cluster",
1385 MAX_CLUSTERS.name(),
1386 )?;
1387 for (cluster_id, new_replicas) in new_replicas_per_cluster {
1388 let current_amount = self
1390 .catalog()
1391 .try_get_cluster(cluster_id)
1392 .map(|instance| instance.user_replicas().count())
1393 .unwrap_or(0);
1394 self.validate_resource_limit(
1395 current_amount,
1396 new_replicas,
1397 SystemVars::max_replicas_per_cluster,
1398 "cluster replica",
1399 MAX_REPLICAS_PER_CLUSTER.name(),
1400 )?;
1401 }
1402 self.validate_resource_limit_numeric(
1403 self.current_credit_consumption_rate(),
1404 new_credit_consumption_rate,
1405 |system_vars| {
1406 self.license_key
1407 .max_credit_consumption_rate()
1408 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1409 },
1410 "cluster replica",
1411 MAX_CREDIT_CONSUMPTION_RATE.name(),
1412 )?;
1413 self.validate_resource_limit(
1414 self.catalog().databases().count(),
1415 new_databases,
1416 SystemVars::max_databases,
1417 "database",
1418 MAX_DATABASES.name(),
1419 )?;
1420 for (database_id, new_schemas) in new_schemas_per_database {
1421 self.validate_resource_limit(
1422 self.catalog().get_database(database_id).schemas_by_id.len(),
1423 new_schemas,
1424 SystemVars::max_schemas_per_database,
1425 "schema",
1426 MAX_SCHEMAS_PER_DATABASE.name(),
1427 )?;
1428 }
1429 for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1430 let current_items = self
1433 .catalog()
1434 .try_get_schema(&database_spec, &schema_spec, conn_id)
1435 .map(|schema| schema.items.len())
1436 .unwrap_or(0);
1437 self.validate_resource_limit(
1438 current_items,
1439 new_objects,
1440 SystemVars::max_objects_per_schema,
1441 "object",
1442 MAX_OBJECTS_PER_SCHEMA.name(),
1443 )?;
1444 }
1445 self.validate_resource_limit(
1446 self.catalog().user_secrets().count(),
1447 new_secrets,
1448 SystemVars::max_secrets,
1449 "secret",
1450 MAX_SECRETS.name(),
1451 )?;
1452 self.validate_resource_limit(
1453 self.catalog().user_roles().count(),
1454 new_roles,
1455 SystemVars::max_roles,
1456 "role",
1457 MAX_ROLES.name(),
1458 )?;
1459 self.validate_resource_limit(
1460 self.catalog().user_continual_tasks().count(),
1461 new_continual_tasks,
1462 SystemVars::max_continual_tasks,
1463 "continual_task",
1464 MAX_CONTINUAL_TASKS.name(),
1465 )?;
1466 self.validate_resource_limit(
1467 self.catalog().user_network_policies().count(),
1468 new_network_policies,
1469 SystemVars::max_network_policies,
1470 "network_policy",
1471 MAX_NETWORK_POLICIES.name(),
1472 )?;
1473 Ok(())
1474 }
1475
1476 pub(crate) fn validate_resource_limit<F>(
1478 &self,
1479 current_amount: usize,
1480 new_instances: i64,
1481 resource_limit: F,
1482 resource_type: &str,
1483 limit_name: &str,
1484 ) -> Result<(), AdapterError>
1485 where
1486 F: Fn(&SystemVars) -> u32,
1487 {
1488 if new_instances <= 0 {
1489 return Ok(());
1490 }
1491
1492 let limit: i64 = resource_limit(self.catalog().system_config()).into();
1493 let current_amount: Option<i64> = current_amount.try_into().ok();
1494 let desired =
1495 current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1496
1497 let exceeds_limit = if let Some(desired) = desired {
1498 desired > limit
1499 } else {
1500 true
1501 };
1502
1503 let desired = desired
1504 .map(|desired| desired.to_string())
1505 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1506 let current = current_amount
1507 .map(|current| current.to_string())
1508 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1509 if exceeds_limit {
1510 Err(AdapterError::ResourceExhaustion {
1511 resource_type: resource_type.to_string(),
1512 limit_name: limit_name.to_string(),
1513 desired,
1514 limit: limit.to_string(),
1515 current,
1516 })
1517 } else {
1518 Ok(())
1519 }
1520 }
1521
1522 pub(crate) fn validate_resource_limit_numeric<F>(
1526 &self,
1527 current_amount: Numeric,
1528 new_amount: Numeric,
1529 resource_limit: F,
1530 resource_type: &str,
1531 limit_name: &str,
1532 ) -> Result<(), AdapterError>
1533 where
1534 F: Fn(&SystemVars) -> Numeric,
1535 {
1536 if new_amount <= Numeric::zero() {
1537 return Ok(());
1538 }
1539
1540 let limit = resource_limit(self.catalog().system_config());
1541 let desired = current_amount + new_amount;
1545 if desired > limit {
1546 Err(AdapterError::ResourceExhaustion {
1547 resource_type: resource_type.to_string(),
1548 limit_name: limit_name.to_string(),
1549 desired: desired.to_string(),
1550 limit: limit.to_string(),
1551 current: current_amount.to_string(),
1552 })
1553 } else {
1554 Ok(())
1555 }
1556 }
1557}