1use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39 self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40 MAX_CONTINUAL_TASKS, MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS,
41 MAX_MATERIALIZED_VIEWS, MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA,
42 MAX_POSTGRES_CONNECTIONS, MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE,
43 MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63 #[instrument(name = "coord::catalog_transact")]
65 pub(crate) async fn catalog_transact(
66 &mut self,
67 session: Option<&Session>,
68 ops: Vec<catalog::Op>,
69 ) -> Result<(), AdapterError> {
70 let start = Instant::now();
71 let result = self
72 .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73 .await;
74 self.metrics
75 .catalog_transact_seconds
76 .with_label_values(&["catalog_transact"])
77 .observe(start.elapsed().as_secs_f64());
78 result
79 }
80
81 #[instrument(name = "coord::catalog_transact_with_side_effects")]
90 pub(crate) async fn catalog_transact_with_side_effects<F>(
91 &mut self,
92 mut ctx: Option<&mut ExecuteContext>,
93 ops: Vec<catalog::Op>,
94 side_effect: F,
95 ) -> Result<(), AdapterError>
96 where
97 F: for<'a> FnOnce(
98 &'a mut Coordinator,
99 Option<&'a mut ExecuteContext>,
100 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101 + 'static,
102 {
103 let start = Instant::now();
104
105 let (table_updates, catalog_updates) = self
106 .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107 .await?;
108
109 let apply_implications_res = self
112 .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113 .await;
114
115 apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120 mz_ore::soft_assert_eq_no_log!(
123 self.check_consistency(),
124 Ok(()),
125 "coordinator inconsistency detected"
126 );
127
128 let side_effects_fut = side_effect(self, ctx);
129
130 let ((), ()) = futures::future::join(
132 side_effects_fut.instrument(info_span!(
133 "coord::catalog_transact_with_side_effects::side_effects_fut"
134 )),
135 table_updates.instrument(info_span!(
136 "coord::catalog_transact_with_side_effects::table_updates"
137 )),
138 )
139 .await;
140
141 self.metrics
142 .catalog_transact_seconds
143 .with_label_values(&["catalog_transact_with_side_effects"])
144 .observe(start.elapsed().as_secs_f64());
145
146 Ok(())
147 }
148
149 #[instrument(name = "coord::catalog_transact_with_context")]
157 pub(crate) async fn catalog_transact_with_context(
158 &mut self,
159 conn_id: Option<&ConnectionId>,
160 ctx: Option<&mut ExecuteContext>,
161 ops: Vec<catalog::Op>,
162 ) -> Result<(), AdapterError> {
163 let start = Instant::now();
164
165 let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167 let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169 let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171 let (combined_apply_res, ()) = futures::future::join(
173 apply_catalog_implications_fut.instrument(info_span!(
174 "coord::catalog_transact_with_context::side_effects_fut"
175 )),
176 table_updates.instrument(info_span!(
177 "coord::catalog_transact_with_context::table_updates"
178 )),
179 )
180 .await;
181
182 combined_apply_res.expect("cannot fail to apply catalog implications");
186
187 mz_ore::soft_assert_eq_no_log!(
190 self.check_consistency(),
191 Ok(()),
192 "coordinator inconsistency detected"
193 );
194
195 self.metrics
196 .catalog_transact_seconds
197 .with_label_values(&["catalog_transact_with_context"])
198 .observe(start.elapsed().as_secs_f64());
199
200 Ok(())
201 }
202
203 #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206 pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207 &mut self,
208 ctx: &mut ExecuteContext,
209 ops: Vec<catalog::Op>,
210 side_effect: F,
211 ) -> Result<(), AdapterError>
212 where
213 F: for<'a> FnOnce(
214 &'a mut Coordinator,
215 Option<&'a mut ExecuteContext>,
216 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217 + Send
218 + Sync
219 + 'static,
220 {
221 let start = Instant::now();
222
223 let Some(Transaction {
224 ops:
225 TransactionOps::DDL {
226 ops: txn_ops,
227 revision: txn_revision,
228 state: txn_state,
229 snapshot: txn_snapshot,
230 side_effects: _,
231 },
232 ..
233 }) = ctx.session().transaction().inner()
234 else {
235 let result = self
236 .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
237 .await;
238 self.metrics
239 .catalog_transact_seconds
240 .with_label_values(&["catalog_transact_with_ddl_transaction"])
241 .observe(start.elapsed().as_secs_f64());
242 return result;
243 };
244
245 if self.catalog().transient_revision() != *txn_revision {
247 self.metrics
248 .catalog_transact_seconds
249 .with_label_values(&["catalog_transact_with_ddl_transaction"])
250 .observe(start.elapsed().as_secs_f64());
251 return Err(AdapterError::DDLTransactionRace);
252 }
253
254 let txn_ops_clone = txn_ops.clone();
256 let txn_state_clone = txn_state.clone();
257 let prev_snapshot = txn_snapshot.clone();
258
259 let mut combined_ops = txn_ops_clone;
261 combined_ops.extend(ops.iter().cloned());
262 let conn_id = ctx.session().conn_id().clone();
263 self.validate_resource_limits(&combined_ops, &conn_id)?;
264
265 let oracle_write_ts = self.get_local_write_ts().await.timestamp;
267
268 let conn = self.active_conns.get(ctx.session().conn_id());
270
271 let (new_state, new_snapshot) = self
277 .catalog()
278 .transact_incremental_dry_run(
279 &txn_state_clone,
280 ops.clone(),
281 conn,
282 prev_snapshot,
283 oracle_write_ts,
284 )
285 .await?;
286
287 let result = ctx
289 .session_mut()
290 .transaction_mut()
291 .add_ops(TransactionOps::DDL {
292 ops: combined_ops,
293 state: new_state,
294 side_effects: vec![Box::new(side_effect)],
295 revision: self.catalog().transient_revision(),
296 snapshot: Some(new_snapshot),
297 });
298
299 self.metrics
300 .catalog_transact_seconds
301 .with_label_values(&["catalog_transact_with_ddl_transaction"])
302 .observe(start.elapsed().as_secs_f64());
303
304 result
305 }
306
307 #[instrument(name = "coord::catalog_transact_inner")]
311 pub(crate) async fn catalog_transact_inner(
312 &mut self,
313 conn_id: Option<&ConnectionId>,
314 ops: Vec<catalog::Op>,
315 ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
316 if self.controller.read_only() {
317 return Err(AdapterError::ReadOnly);
318 }
319
320 event!(Level::TRACE, ops = format!("{:?}", ops));
321
322 let mut webhook_sources_to_restart = BTreeSet::new();
323 let mut clusters_to_drop = vec![];
324 let mut cluster_replicas_to_drop = vec![];
325 let mut clusters_to_create = vec![];
326 let mut cluster_replicas_to_create = vec![];
327 let mut update_metrics_config = false;
328 let mut update_tracing_config = false;
329 let mut update_controller_config = false;
330 let mut update_compute_config = false;
331 let mut update_storage_config = false;
332 let mut update_timestamp_oracle_config = false;
333 let mut update_metrics_retention = false;
334 let mut update_secrets_caching_config = false;
335 let mut update_cluster_scheduling_config = false;
336 let mut update_http_config = false;
337 let mut update_advance_timelines_interval = false;
338
339 for op in &ops {
340 match op {
341 catalog::Op::DropObjects(drop_object_infos) => {
342 for drop_object_info in drop_object_infos {
343 match &drop_object_info {
344 catalog::DropObjectInfo::Item(_) => {
345 }
348 catalog::DropObjectInfo::Cluster(id) => {
349 clusters_to_drop.push(*id);
350 }
351 catalog::DropObjectInfo::ClusterReplica((
352 cluster_id,
353 replica_id,
354 _reason,
355 )) => {
356 cluster_replicas_to_drop.push((*cluster_id, *replica_id));
358 }
359 _ => (),
360 }
361 }
362 }
363 catalog::Op::ResetSystemConfiguration { name }
364 | catalog::Op::UpdateSystemConfiguration { name, .. } => {
365 update_metrics_config |= self
366 .catalog
367 .state()
368 .system_config()
369 .is_metrics_config_var(name);
370 update_tracing_config |= vars::is_tracing_var(name);
371 update_controller_config |= self
372 .catalog
373 .state()
374 .system_config()
375 .is_controller_config_var(name);
376 update_compute_config |= self
377 .catalog
378 .state()
379 .system_config()
380 .is_compute_config_var(name);
381 update_storage_config |= self
382 .catalog
383 .state()
384 .system_config()
385 .is_storage_config_var(name);
386 update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
387 update_metrics_retention |= name == vars::METRICS_RETENTION.name();
388 update_secrets_caching_config |= vars::is_secrets_caching_var(name);
389 update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
390 update_http_config |= vars::is_http_config_var(name);
391 update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
392 }
393 catalog::Op::ResetAllSystemConfiguration => {
394 update_tracing_config = true;
398 update_controller_config = true;
399 update_compute_config = true;
400 update_storage_config = true;
401 update_timestamp_oracle_config = true;
402 update_metrics_retention = true;
403 update_secrets_caching_config = true;
404 update_cluster_scheduling_config = true;
405 update_metrics_config = true;
406 update_http_config = true;
407 update_advance_timelines_interval = true;
408 }
409 catalog::Op::RenameItem { id, .. } => {
410 let item = self.catalog().get_entry(id);
411 let is_webhook_source = item
412 .source()
413 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
414 .unwrap_or(false);
415 if is_webhook_source {
416 webhook_sources_to_restart.insert(*id);
417 }
418 }
419 catalog::Op::RenameSchema {
420 database_spec,
421 schema_spec,
422 ..
423 } => {
424 let schema = self.catalog().get_schema(
425 database_spec,
426 schema_spec,
427 conn_id.unwrap_or(&SYSTEM_CONN_ID),
428 );
429 let webhook_sources = schema.item_ids().filter(|id| {
430 let item = self.catalog().get_entry(id);
431 item.source()
432 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
433 .unwrap_or(false)
434 });
435 webhook_sources_to_restart.extend(webhook_sources);
436 }
437 catalog::Op::CreateCluster { id, .. } => {
438 clusters_to_create.push(*id);
439 }
440 catalog::Op::CreateClusterReplica {
441 cluster_id,
442 name,
443 config,
444 ..
445 } => {
446 cluster_replicas_to_create.push((
447 *cluster_id,
448 name.clone(),
449 config.location.num_processes(),
450 ));
451 }
452 _ => (),
453 }
454 }
455
456 self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
457
458 let oracle_write_ts = self.get_local_write_ts().await.timestamp;
467
468 let Coordinator {
469 catalog,
470 active_conns,
471 controller,
472 cluster_replica_statuses,
473 ..
474 } = self;
475 let catalog = Arc::make_mut(catalog);
476 let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
477
478 let TransactionResult {
479 builtin_table_updates,
480 catalog_updates,
481 audit_events,
482 } = catalog
483 .transact(
484 Some(&mut controller.storage_collections),
485 oracle_write_ts,
486 conn,
487 ops,
488 )
489 .await?;
490
491 for (cluster_id, replica_id) in &cluster_replicas_to_drop {
492 cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
493 }
494 for cluster_id in &clusters_to_drop {
495 cluster_replica_statuses.remove_cluster_statuses(cluster_id);
496 }
497 for cluster_id in clusters_to_create {
498 cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
499 }
500 let now = to_datetime((catalog.config().now)());
501 for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
502 let replica_id = catalog
503 .resolve_replica_in_cluster(&cluster_id, &replica_name)
504 .expect("just created")
505 .replica_id();
506 cluster_replica_statuses.initialize_cluster_replica_statuses(
507 cluster_id,
508 replica_id,
509 num_processes,
510 now,
511 );
512 }
513
514 let (builtin_update_notify, _) = self
517 .builtin_table_update()
518 .execute(builtin_table_updates)
519 .await;
520
521 let _: () = async {
524 if !webhook_sources_to_restart.is_empty() {
525 self.restart_webhook_sources(webhook_sources_to_restart);
526 }
527
528 if update_metrics_config {
529 mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
530 }
531 if update_controller_config {
532 self.update_controller_config();
533 }
534 if update_compute_config {
535 self.update_compute_config();
536 }
537 if update_storage_config {
538 self.update_storage_config();
539 }
540 if update_timestamp_oracle_config {
541 self.update_timestamp_oracle_config();
542 }
543 if update_metrics_retention {
544 self.update_metrics_retention();
545 }
546 if update_tracing_config {
547 self.update_tracing_config();
548 }
549 if update_secrets_caching_config {
550 self.update_secrets_caching_config();
551 }
552 if update_cluster_scheduling_config {
553 self.update_cluster_scheduling_config();
554 }
555 if update_http_config {
556 self.update_http_config();
557 }
558 if update_advance_timelines_interval {
559 let new_interval = self.catalog().system_config().default_timestamp_interval();
560 if new_interval != self.advance_timelines_interval.period() {
561 self.advance_timelines_interval = tokio::time::interval(new_interval);
562 }
563 }
564 }
565 .instrument(info_span!("coord::catalog_transact_with::finalize"))
566 .await;
567
568 let conn = conn_id.and_then(|id| self.active_conns.get(id));
569 if let Some(segment_client) = &self.segment_client {
570 for VersionedEvent::V1(event) in audit_events {
571 let event_type = format!(
572 "{} {}",
573 event.object_type.as_title_case(),
574 event.event_type.as_title_case()
575 );
576 segment_client.environment_track(
577 &self.catalog().config().environment_id,
578 event_type,
579 json!({ "details": event.details.as_json() }),
580 EventDetails {
581 user_id: conn
582 .and_then(|c| c.user().external_metadata.as_ref())
583 .map(|m| m.user_id),
584 application_name: conn.map(|c| c.application_name()),
585 ..Default::default()
586 },
587 );
588 }
589 }
590
591 Ok((builtin_update_notify, catalog_updates))
592 }
593
594 pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
595 self.drop_introspection_subscribes(replica_id);
596
597 self.controller
598 .drop_replica(cluster_id, replica_id)
599 .expect("dropping replica must not fail");
600 }
601
602 pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
604 for (item_id, _gid) in &sources {
605 self.active_webhooks.remove(item_id);
606 }
607 let storage_metadata = self.catalog.state().storage_metadata();
608 let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
609 self.controller
610 .storage
611 .drop_sources(storage_metadata, source_gids)
612 .unwrap_or_terminate("cannot fail to drop sources");
613 }
614
615 pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
617 for (item_id, _gid) in &tables {
618 self.active_webhooks.remove(item_id);
619 }
620
621 let storage_metadata = self.catalog.state().storage_metadata();
622 let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
623 self.controller
624 .storage
625 .drop_tables(storage_metadata, table_gids, ts)
626 .unwrap_or_terminate("cannot fail to drop tables");
627 }
628
629 fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
630 for id in sources {
631 self.active_webhooks.remove(&id);
632 }
633 }
634
635 #[must_use]
641 pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
642 self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
643 }
644
645 #[must_use]
654 pub async fn drop_compute_sinks(
655 &mut self,
656 sink_ids: impl IntoIterator<Item = GlobalId>,
657 ) -> BTreeMap<GlobalId, ActiveComputeSink> {
658 let mut by_id = BTreeMap::new();
659 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
660 for sink_id in sink_ids {
661 let sink = match self.remove_active_compute_sink(sink_id).await {
662 None => {
663 tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
664 continue;
665 }
666 Some(sink) => sink,
667 };
668
669 by_cluster
670 .entry(sink.cluster_id())
671 .or_default()
672 .push(sink_id);
673 by_id.insert(sink_id, sink);
674 }
675 for (cluster_id, ids) in by_cluster {
676 let compute = &mut self.controller.compute;
677 if compute.instance_exists(cluster_id) {
679 compute
680 .drop_collections(cluster_id, ids)
681 .unwrap_or_terminate("cannot fail to drop collections");
682 }
683 }
684 by_id
685 }
686
687 pub async fn retire_compute_sinks(
692 &mut self,
693 mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
694 ) {
695 let sink_ids = reasons.keys().cloned();
696 for (id, sink) in self.drop_compute_sinks(sink_ids).await {
697 let reason = reasons
698 .remove(&id)
699 .expect("all returned IDs are in `reasons`");
700 sink.retire(reason);
701 }
702 }
703
704 pub async fn drop_reconfiguration_replicas(
707 &mut self,
708 cluster_ids: BTreeSet<ClusterId>,
709 ) -> Result<(), AdapterError> {
710 let pending_cluster_ops: Vec<Op> = cluster_ids
711 .iter()
712 .map(|c| {
713 self.catalog()
714 .get_cluster(c.clone())
715 .replicas()
716 .filter_map(|r| match r.config.location {
717 ReplicaLocation::Managed(ref l) if l.pending => {
718 Some(DropObjectInfo::ClusterReplica((
719 c.clone(),
720 r.replica_id,
721 ReplicaCreateDropReason::Manual,
722 )))
723 }
724 _ => None,
725 })
726 .collect::<Vec<DropObjectInfo>>()
727 })
728 .filter_map(|pending_replica_drop_ops_by_cluster| {
729 match pending_replica_drop_ops_by_cluster.len() {
730 0 => None,
731 _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
732 }
733 })
734 .collect();
735 if !pending_cluster_ops.is_empty() {
736 self.catalog_transact(None, pending_cluster_ops).await?;
737 }
738 Ok(())
739 }
740
741 #[mz_ore::instrument(level = "debug")]
743 pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
744 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
745 .await
746 }
747
748 #[mz_ore::instrument(level = "debug")]
750 pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
751 &mut self,
752 conn_id: &ConnectionId,
753 ) {
754 self.retire_cluster_reconfigurations_for_conn(conn_id).await
755 }
756
757 #[mz_ore::instrument(level = "debug")]
760 pub(crate) async fn retire_compute_sinks_for_conn(
761 &mut self,
762 conn_id: &ConnectionId,
763 reason: ActiveComputeSinkRetireReason,
764 ) {
765 let drop_sinks = self
766 .active_conns
767 .get_mut(conn_id)
768 .expect("must exist for active session")
769 .drop_sinks
770 .iter()
771 .map(|sink_id| (*sink_id, reason.clone()))
772 .collect();
773 self.retire_compute_sinks(drop_sinks).await;
774 }
775
776 #[mz_ore::instrument(level = "debug")]
778 pub(crate) async fn retire_cluster_reconfigurations_for_conn(
779 &mut self,
780 conn_id: &ConnectionId,
781 ) {
782 let reconfiguring_clusters = self
783 .active_conns
784 .get(conn_id)
785 .expect("must exist for active session")
786 .pending_cluster_alters
787 .clone();
788 self.drop_reconfiguration_replicas(reconfiguring_clusters)
790 .await
791 .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
792
793 self.active_conns
794 .get_mut(conn_id)
795 .expect("must exist for active session")
796 .pending_cluster_alters
797 .clear();
798 }
799
800 pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
801 let storage_metadata = self.catalog.state().storage_metadata();
802 self.controller
803 .storage
804 .drop_sinks(storage_metadata, sink_gids)
805 .unwrap_or_terminate("cannot fail to drop sinks");
806 }
807
808 pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
809 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
810 for (cluster_id, gid) in collections {
811 by_cluster.entry(cluster_id).or_default().push(gid);
812 }
813 for (cluster_id, gids) in by_cluster {
814 let compute = &mut self.controller.compute;
815 if compute.instance_exists(cluster_id) {
817 compute
818 .drop_collections(cluster_id, gids)
819 .unwrap_or_terminate("cannot fail to drop collections");
820 }
821 }
822 }
823
824 pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
825 let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
826 .as_ref()
827 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
828 .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
829 task::spawn(
837 || "drop_vpc_endpoints",
838 async move {
839 for vpc_endpoint in vpc_endpoints {
840 let _ = Retry::default()
841 .max_duration(Duration::from_secs(60))
842 .retry_async(|_state| async {
843 fail_point!("drop_vpc_endpoint", |r| {
844 Err(anyhow::anyhow!("Fail point error {:?}", r))
845 });
846 match cloud_resource_controller
847 .delete_vpc_endpoint(vpc_endpoint)
848 .await
849 {
850 Ok(_) => Ok(()),
851 Err(e) => {
852 warn!("Dropping VPC Endpoints has encountered an error: {}", e);
853 Err(e)
854 }
855 }
856 })
857 .await;
858 }
859 }
860 .instrument(info_span!(
861 "coord::catalog_transact_inner::drop_vpc_endpoints"
862 )),
863 );
864 }
865
866 pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
869 let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
870 let all_items = self.catalog().object_dependents(&temp_items, conn_id);
871
872 if all_items.is_empty() {
873 return;
874 }
875 let op = Op::DropObjects(
876 all_items
877 .into_iter()
878 .map(DropObjectInfo::manual_drop_from_object_id)
879 .collect(),
880 );
881
882 self.catalog_transact_with_context(Some(conn_id), None, vec![op])
883 .await
884 .expect("unable to drop temporary items for conn_id");
885 }
886
887 fn update_cluster_scheduling_config(&self) {
888 let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
889 self.controller
890 .update_orchestrator_scheduling_config(config);
891 }
892
893 fn update_secrets_caching_config(&self) {
894 let config = flags::caching_config(self.catalog.system_config());
895 self.caching_secrets_reader.set_policy(config);
896 }
897
898 fn update_tracing_config(&self) {
899 let tracing = flags::tracing_config(self.catalog().system_config());
900 tracing.apply(&self.tracing_handle);
901 }
902
903 fn update_compute_config(&mut self) {
904 let config_params = flags::compute_config(self.catalog().system_config());
905 self.controller.compute.update_configuration(config_params);
906 }
907
908 fn update_storage_config(&mut self) {
909 let config_params = flags::storage_config(self.catalog().system_config());
910 self.controller.storage.update_parameters(config_params);
911 }
912
913 fn update_timestamp_oracle_config(&self) {
914 let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
915 if let Some(config) = self.timestamp_oracle_config.as_ref() {
916 config.apply_parameters(config_params)
917 }
918 }
919
920 fn update_metrics_retention(&self) {
921 let duration = self.catalog().system_config().metrics_retention();
922 let policy = ReadPolicy::lag_writes_by(
923 Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
924 tracing::error!("Absurd metrics retention duration: {duration:?}.");
925 u64::MAX
926 })),
927 SINCE_GRANULARITY,
928 );
929 let storage_policies = self
930 .catalog()
931 .entries()
932 .filter(|entry| {
933 entry.item().is_retained_metrics_object()
934 && entry.item().is_compute_object_on_cluster().is_none()
935 })
936 .map(|entry| (entry.id(), policy.clone()))
937 .collect::<Vec<_>>();
938 let compute_policies = self
939 .catalog()
940 .entries()
941 .filter_map(|entry| {
942 if let (true, Some(cluster_id)) = (
943 entry.item().is_retained_metrics_object(),
944 entry.item().is_compute_object_on_cluster(),
945 ) {
946 Some((cluster_id, entry.id(), policy.clone()))
947 } else {
948 None
949 }
950 })
951 .collect::<Vec<_>>();
952 self.update_storage_read_policies(storage_policies);
953 self.update_compute_read_policies(compute_policies);
954 }
955
956 fn update_controller_config(&mut self) {
957 let sys_config = self.catalog().system_config();
958 self.controller
959 .update_configuration(sys_config.dyncfg_updates());
960 }
961
962 fn update_http_config(&mut self) {
963 let webhook_request_limit = self
964 .catalog()
965 .system_config()
966 .webhook_concurrent_request_limit();
967 self.webhook_concurrency_limit
968 .set_limit(webhook_request_limit);
969 }
970
971 pub(crate) async fn create_storage_export(
972 &mut self,
973 id: GlobalId,
974 sink: &Sink,
975 ) -> Result<(), AdapterError> {
976 self.controller.storage.check_exists(sink.from)?;
978
979 let id_bundle = crate::CollectionIdBundle {
986 storage_ids: btreeset! {sink.from},
987 compute_ids: btreemap! {},
988 };
989
990 let read_holds = self.acquire_read_holds(&id_bundle);
998 let as_of = read_holds.least_valid_read();
999
1000 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1001 let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1002 from: sink.from,
1003 from_desc: storage_sink_from_entry
1004 .relation_desc()
1005 .expect("sinks can only be built on items with descs")
1006 .into_owned(),
1007 connection: sink
1008 .connection
1009 .clone()
1010 .into_inline_connection(self.catalog().state()),
1011 envelope: sink.envelope,
1012 as_of,
1013 with_snapshot: sink.with_snapshot,
1014 version: sink.version,
1015 from_storage_metadata: (),
1016 to_storage_metadata: (),
1017 commit_interval: sink.commit_interval,
1018 };
1019
1020 let collection_desc = CollectionDescription {
1021 desc: KAFKA_PROGRESS_DESC.clone(),
1023 data_source: DataSource::Sink {
1024 desc: ExportDescription {
1025 sink: storage_sink_desc,
1026 instance_id: sink.cluster_id,
1027 },
1028 },
1029 since: None,
1030 timeline: None,
1031 primary: None,
1032 };
1033 let collections = vec![(id, collection_desc)];
1034
1035 let storage_metadata = self.catalog.state().storage_metadata();
1037 let res = self
1038 .controller
1039 .storage
1040 .create_collections(storage_metadata, None, collections)
1041 .await;
1042
1043 drop(read_holds);
1046
1047 Ok(res?)
1048 }
1049
1050 fn validate_resource_limits(
1053 &self,
1054 ops: &Vec<catalog::Op>,
1055 conn_id: &ConnectionId,
1056 ) -> Result<(), AdapterError> {
1057 let mut new_kafka_connections = 0;
1058 let mut new_postgres_connections = 0;
1059 let mut new_mysql_connections = 0;
1060 let mut new_sql_server_connections = 0;
1061 let mut new_aws_privatelink_connections = 0;
1062 let mut new_tables = 0;
1063 let mut new_sources = 0;
1064 let mut new_sinks = 0;
1065 let mut new_materialized_views = 0;
1066 let mut new_clusters = 0;
1067 let mut new_replicas_per_cluster = BTreeMap::new();
1068 let mut new_credit_consumption_rate = Numeric::zero();
1069 let mut new_databases = 0;
1070 let mut new_schemas_per_database = BTreeMap::new();
1071 let mut new_objects_per_schema = BTreeMap::new();
1072 let mut new_secrets = 0;
1073 let mut new_roles = 0;
1074 let mut new_continual_tasks = 0;
1075 let mut new_network_policies = 0;
1076 for op in ops {
1077 match op {
1078 Op::CreateDatabase { .. } => {
1079 new_databases += 1;
1080 }
1081 Op::CreateSchema { database_id, .. } => {
1082 if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1083 *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1084 }
1085 }
1086 Op::CreateRole { .. } => {
1087 new_roles += 1;
1088 }
1089 Op::CreateNetworkPolicy { .. } => {
1090 new_network_policies += 1;
1091 }
1092 Op::CreateCluster { .. } => {
1093 new_clusters += 1;
1097 }
1098 Op::CreateClusterReplica {
1099 cluster_id, config, ..
1100 } => {
1101 if cluster_id.is_user() {
1102 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1103 if let ReplicaLocation::Managed(location) = &config.location {
1104 let replica_allocation = self
1105 .catalog()
1106 .cluster_replica_sizes()
1107 .0
1108 .get(location.size_for_billing())
1109 .expect(
1110 "location size is validated against the cluster replica sizes",
1111 );
1112 new_credit_consumption_rate += replica_allocation.credits_per_hour
1113 }
1114 }
1115 }
1116 Op::CreateItem { name, item, .. } => {
1117 *new_objects_per_schema
1118 .entry((
1119 name.qualifiers.database_spec.clone(),
1120 name.qualifiers.schema_spec.clone(),
1121 ))
1122 .or_insert(0) += 1;
1123 match item {
1124 CatalogItem::Connection(connection) => match connection.details {
1125 ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1126 ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1127 ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1128 ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1129 ConnectionDetails::AwsPrivatelink(_) => {
1130 new_aws_privatelink_connections += 1
1131 }
1132 ConnectionDetails::Csr(_)
1133 | ConnectionDetails::Ssh { .. }
1134 | ConnectionDetails::Aws(_)
1135 | ConnectionDetails::IcebergCatalog(_) => {}
1136 },
1137 CatalogItem::Table(_) => {
1138 new_tables += 1;
1139 }
1140 CatalogItem::Source(source) => {
1141 new_sources += source.user_controllable_persist_shard_count()
1142 }
1143 CatalogItem::Sink(_) => new_sinks += 1,
1144 CatalogItem::MaterializedView(_) => {
1145 new_materialized_views += 1;
1146 }
1147 CatalogItem::Secret(_) => {
1148 new_secrets += 1;
1149 }
1150 CatalogItem::ContinualTask(_) => {
1151 new_continual_tasks += 1;
1152 }
1153 CatalogItem::Log(_)
1154 | CatalogItem::View(_)
1155 | CatalogItem::Index(_)
1156 | CatalogItem::Type(_)
1157 | CatalogItem::Func(_) => {}
1158 }
1159 }
1160 Op::DropObjects(drop_object_infos) => {
1161 for drop_object_info in drop_object_infos {
1162 match drop_object_info {
1163 DropObjectInfo::Cluster(_) => {
1164 new_clusters -= 1;
1165 }
1166 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1167 if cluster_id.is_user() {
1168 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1169 let cluster = self
1170 .catalog()
1171 .get_cluster_replica(*cluster_id, *replica_id);
1172 if let ReplicaLocation::Managed(location) =
1173 &cluster.config.location
1174 {
1175 let replica_allocation = self
1176 .catalog()
1177 .cluster_replica_sizes()
1178 .0
1179 .get(location.size_for_billing())
1180 .expect(
1181 "location size is validated against the cluster replica sizes",
1182 );
1183 new_credit_consumption_rate -=
1184 replica_allocation.credits_per_hour
1185 }
1186 }
1187 }
1188 DropObjectInfo::Database(_) => {
1189 new_databases -= 1;
1190 }
1191 DropObjectInfo::Schema((database_spec, _)) => {
1192 if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1193 *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1194 }
1195 }
1196 DropObjectInfo::Role(_) => {
1197 new_roles -= 1;
1198 }
1199 DropObjectInfo::NetworkPolicy(_) => {
1200 new_network_policies -= 1;
1201 }
1202 DropObjectInfo::Item(id) => {
1203 let entry = self.catalog().get_entry(id);
1204 *new_objects_per_schema
1205 .entry((
1206 entry.name().qualifiers.database_spec.clone(),
1207 entry.name().qualifiers.schema_spec.clone(),
1208 ))
1209 .or_insert(0) -= 1;
1210 match entry.item() {
1211 CatalogItem::Connection(connection) => match connection.details
1212 {
1213 ConnectionDetails::AwsPrivatelink(_) => {
1214 new_aws_privatelink_connections -= 1;
1215 }
1216 _ => (),
1217 },
1218 CatalogItem::Table(_) => {
1219 new_tables -= 1;
1220 }
1221 CatalogItem::Source(source) => {
1222 new_sources -=
1223 source.user_controllable_persist_shard_count()
1224 }
1225 CatalogItem::Sink(_) => new_sinks -= 1,
1226 CatalogItem::MaterializedView(_) => {
1227 new_materialized_views -= 1;
1228 }
1229 CatalogItem::Secret(_) => {
1230 new_secrets -= 1;
1231 }
1232 CatalogItem::ContinualTask(_) => {
1233 new_continual_tasks -= 1;
1234 }
1235 CatalogItem::Log(_)
1236 | CatalogItem::View(_)
1237 | CatalogItem::Index(_)
1238 | CatalogItem::Type(_)
1239 | CatalogItem::Func(_) => {}
1240 }
1241 }
1242 }
1243 }
1244 }
1245 Op::UpdateItem {
1246 name: _,
1247 id,
1248 to_item,
1249 } => match to_item {
1250 CatalogItem::Source(source) => {
1251 let current_source = self
1252 .catalog()
1253 .get_entry(id)
1254 .source()
1255 .expect("source update is for source item");
1256
1257 new_sources += source.user_controllable_persist_shard_count()
1258 - current_source.user_controllable_persist_shard_count();
1259 }
1260 CatalogItem::Connection(_)
1261 | CatalogItem::Table(_)
1262 | CatalogItem::Sink(_)
1263 | CatalogItem::MaterializedView(_)
1264 | CatalogItem::Secret(_)
1265 | CatalogItem::Log(_)
1266 | CatalogItem::View(_)
1267 | CatalogItem::Index(_)
1268 | CatalogItem::Type(_)
1269 | CatalogItem::Func(_)
1270 | CatalogItem::ContinualTask(_) => {}
1271 },
1272 Op::AlterRole { .. }
1273 | Op::AlterRetainHistory { .. }
1274 | Op::AlterSourceTimestampInterval { .. }
1275 | Op::AlterNetworkPolicy { .. }
1276 | Op::AlterAddColumn { .. }
1277 | Op::AlterMaterializedViewApplyReplacement { .. }
1278 | Op::UpdatePrivilege { .. }
1279 | Op::UpdateDefaultPrivilege { .. }
1280 | Op::GrantRole { .. }
1281 | Op::RenameCluster { .. }
1282 | Op::RenameClusterReplica { .. }
1283 | Op::RenameItem { .. }
1284 | Op::RenameSchema { .. }
1285 | Op::UpdateOwner { .. }
1286 | Op::RevokeRole { .. }
1287 | Op::UpdateClusterConfig { .. }
1288 | Op::UpdateClusterReplicaConfig { .. }
1289 | Op::UpdateSourceReferences { .. }
1290 | Op::UpdateSystemConfiguration { .. }
1291 | Op::ResetSystemConfiguration { .. }
1292 | Op::ResetAllSystemConfiguration { .. }
1293 | Op::Comment { .. }
1294 | Op::WeirdStorageUsageUpdates { .. }
1295 | Op::InjectAuditEvents { .. } => {}
1296 }
1297 }
1298
1299 let mut current_aws_privatelink_connections = 0;
1300 let mut current_postgres_connections = 0;
1301 let mut current_mysql_connections = 0;
1302 let mut current_sql_server_connections = 0;
1303 let mut current_kafka_connections = 0;
1304 for c in self.catalog().user_connections() {
1305 let connection = c
1306 .connection()
1307 .expect("`user_connections()` only returns connection objects");
1308
1309 match connection.details {
1310 ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1311 ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1312 ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1313 ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1314 ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1315 ConnectionDetails::Csr(_)
1316 | ConnectionDetails::Ssh { .. }
1317 | ConnectionDetails::Aws(_)
1318 | ConnectionDetails::IcebergCatalog(_) => {}
1319 }
1320 }
1321 self.validate_resource_limit(
1322 current_kafka_connections,
1323 new_kafka_connections,
1324 SystemVars::max_kafka_connections,
1325 "Kafka Connection",
1326 MAX_KAFKA_CONNECTIONS.name(),
1327 )?;
1328 self.validate_resource_limit(
1329 current_postgres_connections,
1330 new_postgres_connections,
1331 SystemVars::max_postgres_connections,
1332 "PostgreSQL Connection",
1333 MAX_POSTGRES_CONNECTIONS.name(),
1334 )?;
1335 self.validate_resource_limit(
1336 current_mysql_connections,
1337 new_mysql_connections,
1338 SystemVars::max_mysql_connections,
1339 "MySQL Connection",
1340 MAX_MYSQL_CONNECTIONS.name(),
1341 )?;
1342 self.validate_resource_limit(
1343 current_sql_server_connections,
1344 new_sql_server_connections,
1345 SystemVars::max_sql_server_connections,
1346 "SQL Server Connection",
1347 MAX_SQL_SERVER_CONNECTIONS.name(),
1348 )?;
1349 self.validate_resource_limit(
1350 current_aws_privatelink_connections,
1351 new_aws_privatelink_connections,
1352 SystemVars::max_aws_privatelink_connections,
1353 "AWS PrivateLink Connection",
1354 MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1355 )?;
1356 self.validate_resource_limit(
1357 self.catalog().user_tables().count(),
1358 new_tables,
1359 SystemVars::max_tables,
1360 "table",
1361 MAX_TABLES.name(),
1362 )?;
1363
1364 let current_sources: usize = self
1365 .catalog()
1366 .user_sources()
1367 .filter_map(|source| source.source())
1368 .map(|source| source.user_controllable_persist_shard_count())
1369 .sum::<i64>()
1370 .try_into()
1371 .expect("non-negative sum of sources");
1372
1373 self.validate_resource_limit(
1374 current_sources,
1375 new_sources,
1376 SystemVars::max_sources,
1377 "source",
1378 MAX_SOURCES.name(),
1379 )?;
1380 self.validate_resource_limit(
1381 self.catalog().user_sinks().count(),
1382 new_sinks,
1383 SystemVars::max_sinks,
1384 "sink",
1385 MAX_SINKS.name(),
1386 )?;
1387 self.validate_resource_limit(
1388 self.catalog().user_materialized_views().count(),
1389 new_materialized_views,
1390 SystemVars::max_materialized_views,
1391 "materialized view",
1392 MAX_MATERIALIZED_VIEWS.name(),
1393 )?;
1394 self.validate_resource_limit(
1395 self.catalog().user_clusters().count(),
1401 new_clusters,
1402 SystemVars::max_clusters,
1403 "cluster",
1404 MAX_CLUSTERS.name(),
1405 )?;
1406 for (cluster_id, new_replicas) in new_replicas_per_cluster {
1407 let current_amount = self
1409 .catalog()
1410 .try_get_cluster(cluster_id)
1411 .map(|instance| instance.user_replicas().count())
1412 .unwrap_or(0);
1413 self.validate_resource_limit(
1414 current_amount,
1415 new_replicas,
1416 SystemVars::max_replicas_per_cluster,
1417 "cluster replica",
1418 MAX_REPLICAS_PER_CLUSTER.name(),
1419 )?;
1420 }
1421 self.validate_resource_limit_numeric(
1422 self.current_credit_consumption_rate(),
1423 new_credit_consumption_rate,
1424 |system_vars| {
1425 self.license_key
1426 .max_credit_consumption_rate()
1427 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1428 },
1429 "cluster replica",
1430 MAX_CREDIT_CONSUMPTION_RATE.name(),
1431 )?;
1432 self.validate_resource_limit(
1433 self.catalog().databases().count(),
1434 new_databases,
1435 SystemVars::max_databases,
1436 "database",
1437 MAX_DATABASES.name(),
1438 )?;
1439 for (database_id, new_schemas) in new_schemas_per_database {
1440 self.validate_resource_limit(
1441 self.catalog().get_database(database_id).schemas_by_id.len(),
1442 new_schemas,
1443 SystemVars::max_schemas_per_database,
1444 "schema",
1445 MAX_SCHEMAS_PER_DATABASE.name(),
1446 )?;
1447 }
1448 for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1449 let current_items = self
1452 .catalog()
1453 .try_get_schema(&database_spec, &schema_spec, conn_id)
1454 .map(|schema| schema.items.len())
1455 .unwrap_or(0);
1456 self.validate_resource_limit(
1457 current_items,
1458 new_objects,
1459 SystemVars::max_objects_per_schema,
1460 "object",
1461 MAX_OBJECTS_PER_SCHEMA.name(),
1462 )?;
1463 }
1464 self.validate_resource_limit(
1465 self.catalog().user_secrets().count(),
1466 new_secrets,
1467 SystemVars::max_secrets,
1468 "secret",
1469 MAX_SECRETS.name(),
1470 )?;
1471 self.validate_resource_limit(
1472 self.catalog().user_roles().count(),
1473 new_roles,
1474 SystemVars::max_roles,
1475 "role",
1476 MAX_ROLES.name(),
1477 )?;
1478 self.validate_resource_limit(
1479 self.catalog().user_continual_tasks().count(),
1480 new_continual_tasks,
1481 SystemVars::max_continual_tasks,
1482 "continual_task",
1483 MAX_CONTINUAL_TASKS.name(),
1484 )?;
1485 self.validate_resource_limit(
1486 self.catalog().user_network_policies().count(),
1487 new_network_policies,
1488 SystemVars::max_network_policies,
1489 "network_policy",
1490 MAX_NETWORK_POLICIES.name(),
1491 )?;
1492 Ok(())
1493 }
1494
1495 pub(crate) fn validate_resource_limit<F>(
1497 &self,
1498 current_amount: usize,
1499 new_instances: i64,
1500 resource_limit: F,
1501 resource_type: &str,
1502 limit_name: &str,
1503 ) -> Result<(), AdapterError>
1504 where
1505 F: Fn(&SystemVars) -> u32,
1506 {
1507 if new_instances <= 0 {
1508 return Ok(());
1509 }
1510
1511 let limit: i64 = resource_limit(self.catalog().system_config()).into();
1512 let current_amount: Option<i64> = current_amount.try_into().ok();
1513 let desired =
1514 current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1515
1516 let exceeds_limit = if let Some(desired) = desired {
1517 desired > limit
1518 } else {
1519 true
1520 };
1521
1522 let desired = desired
1523 .map(|desired| desired.to_string())
1524 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1525 let current = current_amount
1526 .map(|current| current.to_string())
1527 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1528 if exceeds_limit {
1529 Err(AdapterError::ResourceExhaustion {
1530 resource_type: resource_type.to_string(),
1531 limit_name: limit_name.to_string(),
1532 desired,
1533 limit: limit.to_string(),
1534 current,
1535 })
1536 } else {
1537 Ok(())
1538 }
1539 }
1540
1541 pub(crate) fn validate_resource_limit_numeric<F>(
1545 &self,
1546 current_amount: Numeric,
1547 new_amount: Numeric,
1548 resource_limit: F,
1549 resource_type: &str,
1550 limit_name: &str,
1551 ) -> Result<(), AdapterError>
1552 where
1553 F: Fn(&SystemVars) -> Numeric,
1554 {
1555 if new_amount <= Numeric::zero() {
1556 return Ok(());
1557 }
1558
1559 let limit = resource_limit(self.catalog().system_config());
1560 let desired = current_amount + new_amount;
1564 if desired > limit {
1565 Err(AdapterError::ResourceExhaustion {
1566 resource_type: resource_type.to_string(),
1567 limit_name: limit_name.to_string(),
1568 desired: desired.to_string(),
1569 limit: limit.to_string(),
1570 current: current_amount.to_string(),
1571 })
1572 } else {
1573 Ok(())
1574 }
1575 }
1576}