1use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39 self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
40 MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
41 MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
42 MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
43 MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63 #[instrument(name = "coord::catalog_transact")]
65 pub(crate) async fn catalog_transact(
66 &mut self,
67 session: Option<&Session>,
68 ops: Vec<catalog::Op>,
69 ) -> Result<(), AdapterError> {
70 let start = Instant::now();
71 let result = self
72 .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73 .await;
74 self.metrics
75 .catalog_transact_seconds
76 .with_label_values(&["catalog_transact"])
77 .observe(start.elapsed().as_secs_f64());
78 result
79 }
80
81 #[instrument(name = "coord::catalog_transact_with_side_effects")]
90 pub(crate) async fn catalog_transact_with_side_effects<F>(
91 &mut self,
92 mut ctx: Option<&mut ExecuteContext>,
93 ops: Vec<catalog::Op>,
94 side_effect: F,
95 ) -> Result<(), AdapterError>
96 where
97 F: for<'a> FnOnce(
98 &'a mut Coordinator,
99 Option<&'a mut ExecuteContext>,
100 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101 + 'static,
102 {
103 let start = Instant::now();
104
105 let (table_updates, catalog_updates) = self
106 .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107 .await?;
108
109 let apply_implications_res = self
112 .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113 .await;
114
115 apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120 mz_ore::soft_assert_eq_no_log!(
123 self.check_consistency(),
124 Ok(()),
125 "coordinator inconsistency detected"
126 );
127
128 let side_effects_fut = side_effect(self, ctx);
129
130 let ((), ()) = futures::future::join(
132 side_effects_fut.instrument(info_span!(
133 "coord::catalog_transact_with_side_effects::side_effects_fut"
134 )),
135 table_updates.instrument(info_span!(
136 "coord::catalog_transact_with_side_effects::table_updates"
137 )),
138 )
139 .await;
140
141 self.metrics
142 .catalog_transact_seconds
143 .with_label_values(&["catalog_transact_with_side_effects"])
144 .observe(start.elapsed().as_secs_f64());
145
146 Ok(())
147 }
148
149 #[instrument(name = "coord::catalog_transact_with_context")]
157 pub(crate) async fn catalog_transact_with_context(
158 &mut self,
159 conn_id: Option<&ConnectionId>,
160 ctx: Option<&mut ExecuteContext>,
161 ops: Vec<catalog::Op>,
162 ) -> Result<(), AdapterError> {
163 let start = Instant::now();
164
165 let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167 let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169 let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171 let (combined_apply_res, ()) = futures::future::join(
173 apply_catalog_implications_fut.instrument(info_span!(
174 "coord::catalog_transact_with_context::side_effects_fut"
175 )),
176 table_updates.instrument(info_span!(
177 "coord::catalog_transact_with_context::table_updates"
178 )),
179 )
180 .await;
181
182 combined_apply_res.expect("cannot fail to apply catalog implications");
186
187 mz_ore::soft_assert_eq_no_log!(
190 self.check_consistency(),
191 Ok(()),
192 "coordinator inconsistency detected"
193 );
194
195 self.metrics
196 .catalog_transact_seconds
197 .with_label_values(&["catalog_transact_with_context"])
198 .observe(start.elapsed().as_secs_f64());
199
200 Ok(())
201 }
202
203 #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206 pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207 &mut self,
208 ctx: &mut ExecuteContext,
209 ops: Vec<catalog::Op>,
210 side_effect: F,
211 ) -> Result<(), AdapterError>
212 where
213 F: for<'a> FnOnce(
214 &'a mut Coordinator,
215 Option<&'a mut ExecuteContext>,
216 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217 + Send
218 + Sync
219 + 'static,
220 {
221 let start = Instant::now();
222
223 let Some(Transaction {
224 ops:
225 TransactionOps::DDL {
226 ops: txn_ops,
227 revision: txn_revision,
228 side_effects: _,
229 state: _,
230 },
231 ..
232 }) = ctx.session().transaction().inner()
233 else {
234 let result = self
235 .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
236 .await;
237 self.metrics
238 .catalog_transact_seconds
239 .with_label_values(&["catalog_transact_with_ddl_transaction"])
240 .observe(start.elapsed().as_secs_f64());
241 return result;
242 };
243
244 if self.catalog().transient_revision() != *txn_revision {
246 self.metrics
247 .catalog_transact_seconds
248 .with_label_values(&["catalog_transact_with_ddl_transaction"])
249 .observe(start.elapsed().as_secs_f64());
250 return Err(AdapterError::DDLTransactionRace);
251 }
252
253 let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
255 all_ops.extend(txn_ops.iter().cloned());
256 all_ops.extend(ops.clone());
257 all_ops.push(Op::TransactionDryRun);
258
259 let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
261
262 let result = match result {
263 Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
265 ctx.session_mut()
268 .transaction_mut()
269 .add_ops(TransactionOps::DDL {
270 ops: new_ops,
271 state: new_state,
272 side_effects: vec![Box::new(side_effect)],
273 revision: self.catalog().transient_revision(),
274 })?;
275 Ok(())
276 }
277 Ok(_) => unreachable!("unexpected success!"),
278 Err(e) => Err(e),
279 };
280
281 self.metrics
282 .catalog_transact_seconds
283 .with_label_values(&["catalog_transact_with_ddl_transaction"])
284 .observe(start.elapsed().as_secs_f64());
285
286 result
287 }
288
289 #[instrument(name = "coord::catalog_transact_inner")]
293 pub(crate) async fn catalog_transact_inner<'a>(
294 &mut self,
295 conn_id: Option<&ConnectionId>,
296 ops: Vec<catalog::Op>,
297 ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
298 if self.controller.read_only() {
299 return Err(AdapterError::ReadOnly);
300 }
301
302 event!(Level::TRACE, ops = format!("{:?}", ops));
303
304 let mut webhook_sources_to_restart = BTreeSet::new();
305 let mut clusters_to_drop = vec![];
306 let mut cluster_replicas_to_drop = vec![];
307 let mut clusters_to_create = vec![];
308 let mut cluster_replicas_to_create = vec![];
309 let mut update_metrics_config = false;
310 let mut update_tracing_config = false;
311 let mut update_controller_config = false;
312 let mut update_compute_config = false;
313 let mut update_storage_config = false;
314 let mut update_pg_timestamp_oracle_config = false;
315 let mut update_metrics_retention = false;
316 let mut update_secrets_caching_config = false;
317 let mut update_cluster_scheduling_config = false;
318 let mut update_http_config = false;
319
320 for op in &ops {
321 match op {
322 catalog::Op::DropObjects(drop_object_infos) => {
323 for drop_object_info in drop_object_infos {
324 match &drop_object_info {
325 catalog::DropObjectInfo::Item(_) => {
326 }
329 catalog::DropObjectInfo::Cluster(id) => {
330 clusters_to_drop.push(*id);
331 }
332 catalog::DropObjectInfo::ClusterReplica((
333 cluster_id,
334 replica_id,
335 _reason,
336 )) => {
337 cluster_replicas_to_drop.push((*cluster_id, *replica_id));
339 }
340 _ => (),
341 }
342 }
343 }
344 catalog::Op::ResetSystemConfiguration { name }
345 | catalog::Op::UpdateSystemConfiguration { name, .. } => {
346 update_metrics_config |= self
347 .catalog
348 .state()
349 .system_config()
350 .is_metrics_config_var(name);
351 update_tracing_config |= vars::is_tracing_var(name);
352 update_controller_config |= self
353 .catalog
354 .state()
355 .system_config()
356 .is_controller_config_var(name);
357 update_compute_config |= self
358 .catalog
359 .state()
360 .system_config()
361 .is_compute_config_var(name);
362 update_storage_config |= self
363 .catalog
364 .state()
365 .system_config()
366 .is_storage_config_var(name);
367 update_pg_timestamp_oracle_config |=
368 vars::is_pg_timestamp_oracle_config_var(name);
369 update_metrics_retention |= name == vars::METRICS_RETENTION.name();
370 update_secrets_caching_config |= vars::is_secrets_caching_var(name);
371 update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
372 update_http_config |= vars::is_http_config_var(name);
373 }
374 catalog::Op::ResetAllSystemConfiguration => {
375 update_tracing_config = true;
379 update_controller_config = true;
380 update_compute_config = true;
381 update_storage_config = true;
382 update_pg_timestamp_oracle_config = true;
383 update_metrics_retention = true;
384 update_secrets_caching_config = true;
385 update_cluster_scheduling_config = true;
386 update_http_config = true;
387 }
388 catalog::Op::RenameItem { id, .. } => {
389 let item = self.catalog().get_entry(id);
390 let is_webhook_source = item
391 .source()
392 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
393 .unwrap_or(false);
394 if is_webhook_source {
395 webhook_sources_to_restart.insert(*id);
396 }
397 }
398 catalog::Op::RenameSchema {
399 database_spec,
400 schema_spec,
401 ..
402 } => {
403 let schema = self.catalog().get_schema(
404 database_spec,
405 schema_spec,
406 conn_id.unwrap_or(&SYSTEM_CONN_ID),
407 );
408 let webhook_sources = schema.item_ids().filter(|id| {
409 let item = self.catalog().get_entry(id);
410 item.source()
411 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
412 .unwrap_or(false)
413 });
414 webhook_sources_to_restart.extend(webhook_sources);
415 }
416 catalog::Op::CreateCluster { id, .. } => {
417 clusters_to_create.push(*id);
418 }
419 catalog::Op::CreateClusterReplica {
420 cluster_id,
421 name,
422 config,
423 ..
424 } => {
425 cluster_replicas_to_create.push((
426 *cluster_id,
427 name.clone(),
428 config.location.num_processes(),
429 ));
430 }
431 _ => (),
432 }
433 }
434
435 self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
436
437 let oracle_write_ts = self.get_local_write_ts().await.timestamp;
446
447 let Coordinator {
448 catalog,
449 active_conns,
450 controller,
451 cluster_replica_statuses,
452 ..
453 } = self;
454 let catalog = Arc::make_mut(catalog);
455 let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
456
457 let TransactionResult {
458 builtin_table_updates,
459 catalog_updates,
460 audit_events,
461 } = catalog
462 .transact(
463 Some(&mut controller.storage_collections),
464 oracle_write_ts,
465 conn,
466 ops,
467 )
468 .await?;
469
470 for (cluster_id, replica_id) in &cluster_replicas_to_drop {
471 cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
472 }
473 for cluster_id in &clusters_to_drop {
474 cluster_replica_statuses.remove_cluster_statuses(cluster_id);
475 }
476 for cluster_id in clusters_to_create {
477 cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
478 }
479 let now = to_datetime((catalog.config().now)());
480 for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
481 let replica_id = catalog
482 .resolve_replica_in_cluster(&cluster_id, &replica_name)
483 .expect("just created")
484 .replica_id();
485 cluster_replica_statuses.initialize_cluster_replica_statuses(
486 cluster_id,
487 replica_id,
488 num_processes,
489 now,
490 );
491 }
492
493 let (builtin_update_notify, _) = self
496 .builtin_table_update()
497 .execute(builtin_table_updates)
498 .await;
499
500 let _: () = async {
503 if !webhook_sources_to_restart.is_empty() {
504 self.restart_webhook_sources(webhook_sources_to_restart);
505 }
506
507 if update_metrics_config {
508 mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
509 }
510 if update_controller_config {
511 self.update_controller_config();
512 }
513 if update_compute_config {
514 self.update_compute_config();
515 }
516 if update_storage_config {
517 self.update_storage_config();
518 }
519 if update_pg_timestamp_oracle_config {
520 self.update_pg_timestamp_oracle_config();
521 }
522 if update_metrics_retention {
523 self.update_metrics_retention();
524 }
525 if update_tracing_config {
526 self.update_tracing_config();
527 }
528 if update_secrets_caching_config {
529 self.update_secrets_caching_config();
530 }
531 if update_cluster_scheduling_config {
532 self.update_cluster_scheduling_config();
533 }
534 if update_http_config {
535 self.update_http_config();
536 }
537 }
538 .instrument(info_span!("coord::catalog_transact_with::finalize"))
539 .await;
540
541 let conn = conn_id.and_then(|id| self.active_conns.get(id));
542 if let Some(segment_client) = &self.segment_client {
543 for VersionedEvent::V1(event) in audit_events {
544 let event_type = format!(
545 "{} {}",
546 event.object_type.as_title_case(),
547 event.event_type.as_title_case()
548 );
549 segment_client.environment_track(
550 &self.catalog().config().environment_id,
551 event_type,
552 json!({ "details": event.details.as_json() }),
553 EventDetails {
554 user_id: conn
555 .and_then(|c| c.user().external_metadata.as_ref())
556 .map(|m| m.user_id),
557 application_name: conn.map(|c| c.application_name()),
558 ..Default::default()
559 },
560 );
561 }
562 }
563
564 Ok((builtin_update_notify, catalog_updates))
565 }
566
567 pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
568 self.drop_introspection_subscribes(replica_id);
569
570 self.controller
571 .drop_replica(cluster_id, replica_id)
572 .expect("dropping replica must not fail");
573 }
574
575 pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
577 for (item_id, _gid) in &sources {
578 self.active_webhooks.remove(item_id);
579 }
580 let storage_metadata = self.catalog.state().storage_metadata();
581 let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
582 self.controller
583 .storage
584 .drop_sources(storage_metadata, source_gids)
585 .unwrap_or_terminate("cannot fail to drop sources");
586 }
587
588 pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
590 for (item_id, _gid) in &tables {
591 self.active_webhooks.remove(item_id);
592 }
593
594 let storage_metadata = self.catalog.state().storage_metadata();
595 let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
596 self.controller
597 .storage
598 .drop_tables(storage_metadata, table_gids, ts)
599 .unwrap_or_terminate("cannot fail to drop tables");
600 }
601
602 fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
603 for id in sources {
604 self.active_webhooks.remove(&id);
605 }
606 }
607
608 #[must_use]
614 pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
615 self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
616 }
617
618 #[must_use]
627 pub async fn drop_compute_sinks(
628 &mut self,
629 sink_ids: impl IntoIterator<Item = GlobalId>,
630 ) -> BTreeMap<GlobalId, ActiveComputeSink> {
631 let mut by_id = BTreeMap::new();
632 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
633 for sink_id in sink_ids {
634 let sink = match self.remove_active_compute_sink(sink_id).await {
635 None => {
636 tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
637 continue;
638 }
639 Some(sink) => sink,
640 };
641
642 by_cluster
643 .entry(sink.cluster_id())
644 .or_default()
645 .push(sink_id);
646 by_id.insert(sink_id, sink);
647 }
648 for (cluster_id, ids) in by_cluster {
649 let compute = &mut self.controller.compute;
650 if compute.instance_exists(cluster_id) {
652 compute
653 .drop_collections(cluster_id, ids)
654 .unwrap_or_terminate("cannot fail to drop collections");
655 }
656 }
657 by_id
658 }
659
660 pub async fn retire_compute_sinks(
665 &mut self,
666 mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
667 ) {
668 let sink_ids = reasons.keys().cloned();
669 for (id, sink) in self.drop_compute_sinks(sink_ids).await {
670 let reason = reasons
671 .remove(&id)
672 .expect("all returned IDs are in `reasons`");
673 sink.retire(reason);
674 }
675 }
676
677 pub async fn drop_reconfiguration_replicas(
680 &mut self,
681 cluster_ids: BTreeSet<ClusterId>,
682 ) -> Result<(), AdapterError> {
683 let pending_cluster_ops: Vec<Op> = cluster_ids
684 .iter()
685 .map(|c| {
686 self.catalog()
687 .get_cluster(c.clone())
688 .replicas()
689 .filter_map(|r| match r.config.location {
690 ReplicaLocation::Managed(ref l) if l.pending => {
691 Some(DropObjectInfo::ClusterReplica((
692 c.clone(),
693 r.replica_id,
694 ReplicaCreateDropReason::Manual,
695 )))
696 }
697 _ => None,
698 })
699 .collect::<Vec<DropObjectInfo>>()
700 })
701 .filter_map(|pending_replica_drop_ops_by_cluster| {
702 match pending_replica_drop_ops_by_cluster.len() {
703 0 => None,
704 _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
705 }
706 })
707 .collect();
708 if !pending_cluster_ops.is_empty() {
709 self.catalog_transact(None, pending_cluster_ops).await?;
710 }
711 Ok(())
712 }
713
714 #[mz_ore::instrument(level = "debug")]
716 pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
717 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
718 .await
719 }
720
721 #[mz_ore::instrument(level = "debug")]
723 pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
724 &mut self,
725 conn_id: &ConnectionId,
726 ) {
727 self.retire_cluster_reconfigurations_for_conn(conn_id).await
728 }
729
730 #[mz_ore::instrument(level = "debug")]
733 pub(crate) async fn retire_compute_sinks_for_conn(
734 &mut self,
735 conn_id: &ConnectionId,
736 reason: ActiveComputeSinkRetireReason,
737 ) {
738 let drop_sinks = self
739 .active_conns
740 .get_mut(conn_id)
741 .expect("must exist for active session")
742 .drop_sinks
743 .iter()
744 .map(|sink_id| (*sink_id, reason.clone()))
745 .collect();
746 self.retire_compute_sinks(drop_sinks).await;
747 }
748
749 #[mz_ore::instrument(level = "debug")]
751 pub(crate) async fn retire_cluster_reconfigurations_for_conn(
752 &mut self,
753 conn_id: &ConnectionId,
754 ) {
755 let reconfiguring_clusters = self
756 .active_conns
757 .get(conn_id)
758 .expect("must exist for active session")
759 .pending_cluster_alters
760 .clone();
761 self.drop_reconfiguration_replicas(reconfiguring_clusters)
763 .await
764 .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
765
766 self.active_conns
767 .get_mut(conn_id)
768 .expect("must exist for active session")
769 .pending_cluster_alters
770 .clear();
771 }
772
773 pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
774 let storage_metadata = self.catalog.state().storage_metadata();
775 self.controller
776 .storage
777 .drop_sinks(storage_metadata, sink_gids)
778 .unwrap_or_terminate("cannot fail to drop sinks");
779 }
780
781 pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
782 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
783 for (cluster_id, gid) in collections {
784 by_cluster.entry(cluster_id).or_default().push(gid);
785 }
786 for (cluster_id, gids) in by_cluster {
787 let compute = &mut self.controller.compute;
788 if compute.instance_exists(cluster_id) {
790 compute
791 .drop_collections(cluster_id, gids)
792 .unwrap_or_terminate("cannot fail to drop collections");
793 }
794 }
795 }
796
797 pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
798 let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
799 .as_ref()
800 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
801 .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
802 task::spawn(
810 || "drop_vpc_endpoints",
811 async move {
812 for vpc_endpoint in vpc_endpoints {
813 let _ = Retry::default()
814 .max_duration(Duration::from_secs(60))
815 .retry_async(|_state| async {
816 fail_point!("drop_vpc_endpoint", |r| {
817 Err(anyhow::anyhow!("Fail point error {:?}", r))
818 });
819 match cloud_resource_controller
820 .delete_vpc_endpoint(vpc_endpoint)
821 .await
822 {
823 Ok(_) => Ok(()),
824 Err(e) => {
825 warn!("Dropping VPC Endpoints has encountered an error: {}", e);
826 Err(e)
827 }
828 }
829 })
830 .await;
831 }
832 }
833 .instrument(info_span!(
834 "coord::catalog_transact_inner::drop_vpc_endpoints"
835 )),
836 );
837 }
838
839 pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
842 let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
843 let all_items = self.catalog().object_dependents(&temp_items, conn_id);
844
845 if all_items.is_empty() {
846 return;
847 }
848 let op = Op::DropObjects(
849 all_items
850 .into_iter()
851 .map(DropObjectInfo::manual_drop_from_object_id)
852 .collect(),
853 );
854
855 self.catalog_transact_with_context(Some(conn_id), None, vec![op])
856 .await
857 .expect("unable to drop temporary items for conn_id");
858 }
859
860 fn update_cluster_scheduling_config(&self) {
861 let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
862 self.controller
863 .update_orchestrator_scheduling_config(config);
864 }
865
866 fn update_secrets_caching_config(&self) {
867 let config = flags::caching_config(self.catalog.system_config());
868 self.caching_secrets_reader.set_policy(config);
869 }
870
871 fn update_tracing_config(&self) {
872 let tracing = flags::tracing_config(self.catalog().system_config());
873 tracing.apply(&self.tracing_handle);
874 }
875
876 fn update_compute_config(&mut self) {
877 let config_params = flags::compute_config(self.catalog().system_config());
878 self.controller.compute.update_configuration(config_params);
879 }
880
881 fn update_storage_config(&mut self) {
882 let config_params = flags::storage_config(self.catalog().system_config());
883 self.controller.storage.update_parameters(config_params);
884 }
885
886 fn update_pg_timestamp_oracle_config(&self) {
887 let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
888 if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
889 config_params.apply(config)
890 }
891 }
892
893 fn update_metrics_retention(&self) {
894 let duration = self.catalog().system_config().metrics_retention();
895 let policy = ReadPolicy::lag_writes_by(
896 Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
897 tracing::error!("Absurd metrics retention duration: {duration:?}.");
898 u64::MAX
899 })),
900 SINCE_GRANULARITY,
901 );
902 let storage_policies = self
903 .catalog()
904 .entries()
905 .filter(|entry| {
906 entry.item().is_retained_metrics_object()
907 && entry.item().is_compute_object_on_cluster().is_none()
908 })
909 .map(|entry| (entry.id(), policy.clone()))
910 .collect::<Vec<_>>();
911 let compute_policies = self
912 .catalog()
913 .entries()
914 .filter_map(|entry| {
915 if let (true, Some(cluster_id)) = (
916 entry.item().is_retained_metrics_object(),
917 entry.item().is_compute_object_on_cluster(),
918 ) {
919 Some((cluster_id, entry.id(), policy.clone()))
920 } else {
921 None
922 }
923 })
924 .collect::<Vec<_>>();
925 self.update_storage_read_policies(storage_policies);
926 self.update_compute_read_policies(compute_policies);
927 }
928
929 fn update_controller_config(&mut self) {
930 let sys_config = self.catalog().system_config();
931 self.controller
932 .update_configuration(sys_config.dyncfg_updates());
933 }
934
935 fn update_http_config(&mut self) {
936 let webhook_request_limit = self
937 .catalog()
938 .system_config()
939 .webhook_concurrent_request_limit();
940 self.webhook_concurrency_limit
941 .set_limit(webhook_request_limit);
942 }
943
944 pub(crate) async fn create_storage_export(
945 &mut self,
946 id: GlobalId,
947 sink: &Sink,
948 ) -> Result<(), AdapterError> {
949 self.controller.storage.check_exists(sink.from)?;
951
952 let id_bundle = crate::CollectionIdBundle {
959 storage_ids: btreeset! {sink.from},
960 compute_ids: btreemap! {},
961 };
962
963 let read_holds = self.acquire_read_holds(&id_bundle);
971 let as_of = read_holds.least_valid_read();
972
973 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
974 let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
975 from: sink.from,
976 from_desc: storage_sink_from_entry
977 .desc(&self.catalog().resolve_full_name(
978 storage_sink_from_entry.name(),
979 storage_sink_from_entry.conn_id(),
980 ))
981 .expect("sinks can only be built on items with descs")
982 .into_owned(),
983 connection: sink
984 .connection
985 .clone()
986 .into_inline_connection(self.catalog().state()),
987 envelope: sink.envelope,
988 as_of,
989 with_snapshot: sink.with_snapshot,
990 version: sink.version,
991 from_storage_metadata: (),
992 to_storage_metadata: (),
993 };
994
995 let collection_desc = CollectionDescription {
996 desc: KAFKA_PROGRESS_DESC.clone(),
998 data_source: DataSource::Sink {
999 desc: ExportDescription {
1000 sink: storage_sink_desc,
1001 instance_id: sink.cluster_id,
1002 },
1003 },
1004 since: None,
1005 status_collection_id: None,
1006 timeline: None,
1007 primary: None,
1008 };
1009 let collections = vec![(id, collection_desc)];
1010
1011 let storage_metadata = self.catalog.state().storage_metadata();
1013 let res = self
1014 .controller
1015 .storage
1016 .create_collections(storage_metadata, None, collections)
1017 .await;
1018
1019 drop(read_holds);
1022
1023 Ok(res?)
1024 }
1025
1026 fn validate_resource_limits(
1029 &self,
1030 ops: &Vec<catalog::Op>,
1031 conn_id: &ConnectionId,
1032 ) -> Result<(), AdapterError> {
1033 let mut new_kafka_connections = 0;
1034 let mut new_postgres_connections = 0;
1035 let mut new_mysql_connections = 0;
1036 let mut new_sql_server_connections = 0;
1037 let mut new_aws_privatelink_connections = 0;
1038 let mut new_tables = 0;
1039 let mut new_sources = 0;
1040 let mut new_sinks = 0;
1041 let mut new_materialized_views = 0;
1042 let mut new_clusters = 0;
1043 let mut new_replicas_per_cluster = BTreeMap::new();
1044 let mut new_credit_consumption_rate = Numeric::zero();
1045 let mut new_databases = 0;
1046 let mut new_schemas_per_database = BTreeMap::new();
1047 let mut new_objects_per_schema = BTreeMap::new();
1048 let mut new_secrets = 0;
1049 let mut new_roles = 0;
1050 let mut new_continual_tasks = 0;
1051 let mut new_network_policies = 0;
1052 for op in ops {
1053 match op {
1054 Op::CreateDatabase { .. } => {
1055 new_databases += 1;
1056 }
1057 Op::CreateSchema { database_id, .. } => {
1058 if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1059 *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1060 }
1061 }
1062 Op::CreateRole { .. } => {
1063 new_roles += 1;
1064 }
1065 Op::CreateNetworkPolicy { .. } => {
1066 new_network_policies += 1;
1067 }
1068 Op::CreateCluster { .. } => {
1069 new_clusters += 1;
1073 }
1074 Op::CreateClusterReplica {
1075 cluster_id, config, ..
1076 } => {
1077 if cluster_id.is_user() {
1078 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1079 if let ReplicaLocation::Managed(location) = &config.location {
1080 let replica_allocation = self
1081 .catalog()
1082 .cluster_replica_sizes()
1083 .0
1084 .get(location.size_for_billing())
1085 .expect(
1086 "location size is validated against the cluster replica sizes",
1087 );
1088 new_credit_consumption_rate += replica_allocation.credits_per_hour
1089 }
1090 }
1091 }
1092 Op::CreateItem { name, item, .. } => {
1093 *new_objects_per_schema
1094 .entry((
1095 name.qualifiers.database_spec.clone(),
1096 name.qualifiers.schema_spec.clone(),
1097 ))
1098 .or_insert(0) += 1;
1099 match item {
1100 CatalogItem::Connection(connection) => match connection.details {
1101 ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1102 ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1103 ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1104 ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1105 ConnectionDetails::AwsPrivatelink(_) => {
1106 new_aws_privatelink_connections += 1
1107 }
1108 ConnectionDetails::Csr(_)
1109 | ConnectionDetails::Ssh { .. }
1110 | ConnectionDetails::Aws(_)
1111 | ConnectionDetails::IcebergCatalog(_) => {}
1112 },
1113 CatalogItem::Table(_) => {
1114 new_tables += 1;
1115 }
1116 CatalogItem::Source(source) => {
1117 new_sources += source.user_controllable_persist_shard_count()
1118 }
1119 CatalogItem::Sink(_) => new_sinks += 1,
1120 CatalogItem::MaterializedView(_) => {
1121 new_materialized_views += 1;
1122 }
1123 CatalogItem::Secret(_) => {
1124 new_secrets += 1;
1125 }
1126 CatalogItem::ContinualTask(_) => {
1127 new_continual_tasks += 1;
1128 }
1129 CatalogItem::Log(_)
1130 | CatalogItem::View(_)
1131 | CatalogItem::Index(_)
1132 | CatalogItem::Type(_)
1133 | CatalogItem::Func(_) => {}
1134 }
1135 }
1136 Op::DropObjects(drop_object_infos) => {
1137 for drop_object_info in drop_object_infos {
1138 match drop_object_info {
1139 DropObjectInfo::Cluster(_) => {
1140 new_clusters -= 1;
1141 }
1142 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1143 if cluster_id.is_user() {
1144 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1145 let cluster = self
1146 .catalog()
1147 .get_cluster_replica(*cluster_id, *replica_id);
1148 if let ReplicaLocation::Managed(location) =
1149 &cluster.config.location
1150 {
1151 let replica_allocation = self
1152 .catalog()
1153 .cluster_replica_sizes()
1154 .0
1155 .get(location.size_for_billing())
1156 .expect(
1157 "location size is validated against the cluster replica sizes",
1158 );
1159 new_credit_consumption_rate -=
1160 replica_allocation.credits_per_hour
1161 }
1162 }
1163 }
1164 DropObjectInfo::Database(_) => {
1165 new_databases -= 1;
1166 }
1167 DropObjectInfo::Schema((database_spec, _)) => {
1168 if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1169 *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1170 }
1171 }
1172 DropObjectInfo::Role(_) => {
1173 new_roles -= 1;
1174 }
1175 DropObjectInfo::NetworkPolicy(_) => {
1176 new_network_policies -= 1;
1177 }
1178 DropObjectInfo::Item(id) => {
1179 let entry = self.catalog().get_entry(id);
1180 *new_objects_per_schema
1181 .entry((
1182 entry.name().qualifiers.database_spec.clone(),
1183 entry.name().qualifiers.schema_spec.clone(),
1184 ))
1185 .or_insert(0) -= 1;
1186 match entry.item() {
1187 CatalogItem::Connection(connection) => match connection.details
1188 {
1189 ConnectionDetails::AwsPrivatelink(_) => {
1190 new_aws_privatelink_connections -= 1;
1191 }
1192 _ => (),
1193 },
1194 CatalogItem::Table(_) => {
1195 new_tables -= 1;
1196 }
1197 CatalogItem::Source(source) => {
1198 new_sources -=
1199 source.user_controllable_persist_shard_count()
1200 }
1201 CatalogItem::Sink(_) => new_sinks -= 1,
1202 CatalogItem::MaterializedView(_) => {
1203 new_materialized_views -= 1;
1204 }
1205 CatalogItem::Secret(_) => {
1206 new_secrets -= 1;
1207 }
1208 CatalogItem::ContinualTask(_) => {
1209 new_continual_tasks -= 1;
1210 }
1211 CatalogItem::Log(_)
1212 | CatalogItem::View(_)
1213 | CatalogItem::Index(_)
1214 | CatalogItem::Type(_)
1215 | CatalogItem::Func(_) => {}
1216 }
1217 }
1218 }
1219 }
1220 }
1221 Op::UpdateItem {
1222 name: _,
1223 id,
1224 to_item,
1225 } => match to_item {
1226 CatalogItem::Source(source) => {
1227 let current_source = self
1228 .catalog()
1229 .get_entry(id)
1230 .source()
1231 .expect("source update is for source item");
1232
1233 new_sources += source.user_controllable_persist_shard_count()
1234 - current_source.user_controllable_persist_shard_count();
1235 }
1236 CatalogItem::Connection(_)
1237 | CatalogItem::Table(_)
1238 | CatalogItem::Sink(_)
1239 | CatalogItem::MaterializedView(_)
1240 | CatalogItem::Secret(_)
1241 | CatalogItem::Log(_)
1242 | CatalogItem::View(_)
1243 | CatalogItem::Index(_)
1244 | CatalogItem::Type(_)
1245 | CatalogItem::Func(_)
1246 | CatalogItem::ContinualTask(_) => {}
1247 },
1248 Op::AlterRole { .. }
1249 | Op::AlterRetainHistory { .. }
1250 | Op::AlterNetworkPolicy { .. }
1251 | Op::AlterAddColumn { .. }
1252 | Op::AlterMaterializedViewApplyReplacement { .. }
1253 | Op::UpdatePrivilege { .. }
1254 | Op::UpdateDefaultPrivilege { .. }
1255 | Op::GrantRole { .. }
1256 | Op::RenameCluster { .. }
1257 | Op::RenameClusterReplica { .. }
1258 | Op::RenameItem { .. }
1259 | Op::RenameSchema { .. }
1260 | Op::UpdateOwner { .. }
1261 | Op::RevokeRole { .. }
1262 | Op::UpdateClusterConfig { .. }
1263 | Op::UpdateClusterReplicaConfig { .. }
1264 | Op::UpdateSourceReferences { .. }
1265 | Op::UpdateSystemConfiguration { .. }
1266 | Op::ResetSystemConfiguration { .. }
1267 | Op::ResetAllSystemConfiguration { .. }
1268 | Op::Comment { .. }
1269 | Op::WeirdStorageUsageUpdates { .. }
1270 | Op::TransactionDryRun => {}
1271 }
1272 }
1273
1274 let mut current_aws_privatelink_connections = 0;
1275 let mut current_postgres_connections = 0;
1276 let mut current_mysql_connections = 0;
1277 let mut current_sql_server_connections = 0;
1278 let mut current_kafka_connections = 0;
1279 for c in self.catalog().user_connections() {
1280 let connection = c
1281 .connection()
1282 .expect("`user_connections()` only returns connection objects");
1283
1284 match connection.details {
1285 ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1286 ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1287 ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1288 ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1289 ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1290 ConnectionDetails::Csr(_)
1291 | ConnectionDetails::Ssh { .. }
1292 | ConnectionDetails::Aws(_)
1293 | ConnectionDetails::IcebergCatalog(_) => {}
1294 }
1295 }
1296 self.validate_resource_limit(
1297 current_kafka_connections,
1298 new_kafka_connections,
1299 SystemVars::max_kafka_connections,
1300 "Kafka Connection",
1301 MAX_KAFKA_CONNECTIONS.name(),
1302 )?;
1303 self.validate_resource_limit(
1304 current_postgres_connections,
1305 new_postgres_connections,
1306 SystemVars::max_postgres_connections,
1307 "PostgreSQL Connection",
1308 MAX_POSTGRES_CONNECTIONS.name(),
1309 )?;
1310 self.validate_resource_limit(
1311 current_mysql_connections,
1312 new_mysql_connections,
1313 SystemVars::max_mysql_connections,
1314 "MySQL Connection",
1315 MAX_MYSQL_CONNECTIONS.name(),
1316 )?;
1317 self.validate_resource_limit(
1318 current_sql_server_connections,
1319 new_sql_server_connections,
1320 SystemVars::max_sql_server_connections,
1321 "SQL Server Connection",
1322 MAX_SQL_SERVER_CONNECTIONS.name(),
1323 )?;
1324 self.validate_resource_limit(
1325 current_aws_privatelink_connections,
1326 new_aws_privatelink_connections,
1327 SystemVars::max_aws_privatelink_connections,
1328 "AWS PrivateLink Connection",
1329 MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1330 )?;
1331 self.validate_resource_limit(
1332 self.catalog().user_tables().count(),
1333 new_tables,
1334 SystemVars::max_tables,
1335 "table",
1336 MAX_TABLES.name(),
1337 )?;
1338
1339 let current_sources: usize = self
1340 .catalog()
1341 .user_sources()
1342 .filter_map(|source| source.source())
1343 .map(|source| source.user_controllable_persist_shard_count())
1344 .sum::<i64>()
1345 .try_into()
1346 .expect("non-negative sum of sources");
1347
1348 self.validate_resource_limit(
1349 current_sources,
1350 new_sources,
1351 SystemVars::max_sources,
1352 "source",
1353 MAX_SOURCES.name(),
1354 )?;
1355 self.validate_resource_limit(
1356 self.catalog().user_sinks().count(),
1357 new_sinks,
1358 SystemVars::max_sinks,
1359 "sink",
1360 MAX_SINKS.name(),
1361 )?;
1362 self.validate_resource_limit(
1363 self.catalog().user_materialized_views().count(),
1364 new_materialized_views,
1365 SystemVars::max_materialized_views,
1366 "materialized view",
1367 MAX_MATERIALIZED_VIEWS.name(),
1368 )?;
1369 self.validate_resource_limit(
1370 self.catalog().user_clusters().count(),
1376 new_clusters,
1377 SystemVars::max_clusters,
1378 "cluster",
1379 MAX_CLUSTERS.name(),
1380 )?;
1381 for (cluster_id, new_replicas) in new_replicas_per_cluster {
1382 let current_amount = self
1384 .catalog()
1385 .try_get_cluster(cluster_id)
1386 .map(|instance| instance.user_replicas().count())
1387 .unwrap_or(0);
1388 self.validate_resource_limit(
1389 current_amount,
1390 new_replicas,
1391 SystemVars::max_replicas_per_cluster,
1392 "cluster replica",
1393 MAX_REPLICAS_PER_CLUSTER.name(),
1394 )?;
1395 }
1396 self.validate_resource_limit_numeric(
1397 self.current_credit_consumption_rate(),
1398 new_credit_consumption_rate,
1399 |system_vars| {
1400 self.license_key
1401 .max_credit_consumption_rate()
1402 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1403 },
1404 "cluster replica",
1405 MAX_CREDIT_CONSUMPTION_RATE.name(),
1406 )?;
1407 self.validate_resource_limit(
1408 self.catalog().databases().count(),
1409 new_databases,
1410 SystemVars::max_databases,
1411 "database",
1412 MAX_DATABASES.name(),
1413 )?;
1414 for (database_id, new_schemas) in new_schemas_per_database {
1415 self.validate_resource_limit(
1416 self.catalog().get_database(database_id).schemas_by_id.len(),
1417 new_schemas,
1418 SystemVars::max_schemas_per_database,
1419 "schema",
1420 MAX_SCHEMAS_PER_DATABASE.name(),
1421 )?;
1422 }
1423 for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1424 self.validate_resource_limit(
1425 self.catalog()
1426 .get_schema(&database_spec, &schema_spec, conn_id)
1427 .items
1428 .len(),
1429 new_objects,
1430 SystemVars::max_objects_per_schema,
1431 "object",
1432 MAX_OBJECTS_PER_SCHEMA.name(),
1433 )?;
1434 }
1435 self.validate_resource_limit(
1436 self.catalog().user_secrets().count(),
1437 new_secrets,
1438 SystemVars::max_secrets,
1439 "secret",
1440 MAX_SECRETS.name(),
1441 )?;
1442 self.validate_resource_limit(
1443 self.catalog().user_roles().count(),
1444 new_roles,
1445 SystemVars::max_roles,
1446 "role",
1447 MAX_ROLES.name(),
1448 )?;
1449 self.validate_resource_limit(
1450 self.catalog().user_continual_tasks().count(),
1451 new_continual_tasks,
1452 SystemVars::max_continual_tasks,
1453 "continual_task",
1454 MAX_CONTINUAL_TASKS.name(),
1455 )?;
1456 self.validate_resource_limit(
1457 self.catalog().user_continual_tasks().count(),
1458 new_network_policies,
1459 SystemVars::max_network_policies,
1460 "network_policy",
1461 MAX_NETWORK_POLICIES.name(),
1462 )?;
1463 Ok(())
1464 }
1465
1466 pub(crate) fn validate_resource_limit<F>(
1468 &self,
1469 current_amount: usize,
1470 new_instances: i64,
1471 resource_limit: F,
1472 resource_type: &str,
1473 limit_name: &str,
1474 ) -> Result<(), AdapterError>
1475 where
1476 F: Fn(&SystemVars) -> u32,
1477 {
1478 if new_instances <= 0 {
1479 return Ok(());
1480 }
1481
1482 let limit: i64 = resource_limit(self.catalog().system_config()).into();
1483 let current_amount: Option<i64> = current_amount.try_into().ok();
1484 let desired =
1485 current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1486
1487 let exceeds_limit = if let Some(desired) = desired {
1488 desired > limit
1489 } else {
1490 true
1491 };
1492
1493 let desired = desired
1494 .map(|desired| desired.to_string())
1495 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1496 let current = current_amount
1497 .map(|current| current.to_string())
1498 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1499 if exceeds_limit {
1500 Err(AdapterError::ResourceExhaustion {
1501 resource_type: resource_type.to_string(),
1502 limit_name: limit_name.to_string(),
1503 desired,
1504 limit: limit.to_string(),
1505 current,
1506 })
1507 } else {
1508 Ok(())
1509 }
1510 }
1511
1512 pub(crate) fn validate_resource_limit_numeric<F>(
1516 &self,
1517 current_amount: Numeric,
1518 new_amount: Numeric,
1519 resource_limit: F,
1520 resource_type: &str,
1521 limit_name: &str,
1522 ) -> Result<(), AdapterError>
1523 where
1524 F: Fn(&SystemVars) -> Numeric,
1525 {
1526 if new_amount <= Numeric::zero() {
1527 return Ok(());
1528 }
1529
1530 let limit = resource_limit(self.catalog().system_config());
1531 let desired = current_amount + new_amount;
1535 if desired > limit {
1536 Err(AdapterError::ResourceExhaustion {
1537 resource_type: resource_type.to_string(),
1538 limit_name: limit_name.to_string(),
1539 desired: desired.to_string(),
1540 limit: limit.to_string(),
1541 current: current_amount.to_string(),
1542 })
1543 } else {
1544 Ok(())
1545 }
1546 }
1547}