mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use fail::fail_point;
18use futures::Future;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, Connection, DataSourceDesc, Sink};
25use mz_compute_client::protocol::response::PeekResponse;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::instrument;
31use mz_ore::now::to_datetime;
32use mz_ore::retry::Retry;
33use mz_ore::str::StrExt;
34use mz_ore::task;
35use mz_postgres_util::tunnel::PostgresFlavor;
36use mz_repr::adt::numeric::Numeric;
37use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
38use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, CatalogSchema};
39use mz_sql::names::ResolvedDatabaseSpecifier;
40use mz_sql::plan::ConnectionDetails;
41use mz_sql::session::metadata::SessionMetadata;
42use mz_sql::session::vars::{
43    self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
44    MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
45    MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
46    MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
47    MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
48};
49use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
50use mz_storage_types::connections::PostgresConnection;
51use mz_storage_types::connections::inline::IntoInlineConnection;
52use mz_storage_types::read_policy::ReadPolicy;
53use mz_storage_types::sources::GenericSourceConnection;
54use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
55use serde_json::json;
56use tracing::{Instrument, Level, event, info_span, warn};
57
58use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
59use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
60use crate::coord::appends::BuiltinTableAppendNotify;
61use crate::coord::timeline::{TimelineContext, TimelineState};
62use crate::coord::{Coordinator, ReplicaMetadata};
63use crate::session::{Session, Transaction, TransactionOps};
64use crate::statement_logging::StatementEndedExecutionReason;
65use crate::telemetry::{EventDetails, SegmentClientExt};
66use crate::util::ResultExt;
67use crate::{AdapterError, TimestampProvider, catalog, flags};
68
69impl Coordinator {
70    /// Same as [`Self::catalog_transact_conn`] but takes a [`Session`].
71    #[instrument(name = "coord::catalog_transact")]
72    pub(crate) async fn catalog_transact(
73        &mut self,
74        session: Option<&Session>,
75        ops: Vec<catalog::Op>,
76    ) -> Result<(), AdapterError> {
77        self.catalog_transact_conn(session.map(|session| session.conn_id()), ops)
78            .await
79    }
80
81    /// Same as [`Self::catalog_transact_conn`] but takes a [`Session`] and runs
82    /// builtin table updates concurrently with any side effects (e.g. creating
83    /// collections).
84    #[instrument(name = "coord::catalog_transact_with_side_effects")]
85    pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>(
86        &'c mut self,
87        session: Option<&Session>,
88        ops: Vec<catalog::Op>,
89        side_effect: F,
90    ) -> Result<(), AdapterError>
91    where
92        F: FnOnce(&'c mut Coordinator) -> Fut,
93        Fut: Future<Output = ()>,
94    {
95        let table_updates = self
96            .catalog_transact_inner(session.map(|session| session.conn_id()), ops)
97            .await?;
98        let side_effects_fut = side_effect(self);
99
100        // Run our side effects concurrently with the table updates.
101        let ((), ()) = futures::future::join(
102            side_effects_fut.instrument(info_span!(
103                "coord::catalog_transact_with_side_effects::side_effects_fut"
104            )),
105            table_updates.instrument(info_span!(
106                "coord::catalog_transact_with_side_effects::table_updates"
107            )),
108        )
109        .await;
110
111        Ok(())
112    }
113
114    /// Same as [`Self::catalog_transact_inner`] but awaits the table updates.
115    #[instrument(name = "coord::catalog_transact_conn")]
116    pub(crate) async fn catalog_transact_conn(
117        &mut self,
118        conn_id: Option<&ConnectionId>,
119        ops: Vec<catalog::Op>,
120    ) -> Result<(), AdapterError> {
121        let table_updates = self.catalog_transact_inner(conn_id, ops).await?;
122        table_updates
123            .instrument(info_span!("coord::catalog_transact_conn::table_updates"))
124            .await;
125        Ok(())
126    }
127
128    /// Executes a Catalog transaction with handling if the provided [`Session`]
129    /// is in a SQL transaction that is executing DDL.
130    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
131    pub(crate) async fn catalog_transact_with_ddl_transaction(
132        &mut self,
133        session: &mut Session,
134        ops: Vec<catalog::Op>,
135    ) -> Result<(), AdapterError> {
136        let Some(Transaction {
137            ops:
138                TransactionOps::DDL {
139                    ops: txn_ops,
140                    revision: txn_revision,
141                    state: _,
142                },
143            ..
144        }) = session.transaction().inner()
145        else {
146            return self.catalog_transact(Some(session), ops).await;
147        };
148
149        // Make sure our Catalog hasn't changed since openning the transaction.
150        if self.catalog().transient_revision() != *txn_revision {
151            return Err(AdapterError::DDLTransactionRace);
152        }
153
154        // Combine the existing ops with the new ops so we can replay them.
155        let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
156        all_ops.extend(txn_ops.iter().cloned());
157        all_ops.extend(ops.clone());
158        all_ops.push(Op::TransactionDryRun);
159
160        // Run our Catalog transaction, but abort before committing.
161        let result = self.catalog_transact(Some(session), all_ops).await;
162
163        match result {
164            // We purposefully fail with this error to prevent committing the transaction.
165            Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
166                // Sets these ops to our transaction, bailing if the Catalog has changed since we
167                // ran the transaction.
168                session.transaction_mut().add_ops(TransactionOps::DDL {
169                    ops: new_ops,
170                    state: new_state,
171                    revision: self.catalog().transient_revision(),
172                })?;
173                Ok(())
174            }
175            Ok(_) => unreachable!("unexpected success!"),
176            Err(e) => Err(e),
177        }
178    }
179
180    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
181    /// called after this function successfully returns on any built
182    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
183    #[instrument(name = "coord::catalog_transact_inner")]
184    pub(crate) async fn catalog_transact_inner<'a>(
185        &mut self,
186        conn_id: Option<&ConnectionId>,
187        ops: Vec<catalog::Op>,
188    ) -> Result<BuiltinTableAppendNotify, AdapterError> {
189        if self.controller.read_only() {
190            return Err(AdapterError::ReadOnly);
191        }
192
193        event!(Level::TRACE, ops = format!("{:?}", ops));
194
195        let mut sources_to_drop = vec![];
196        let mut webhook_sources_to_restart = BTreeSet::new();
197        let mut table_gids_to_drop = vec![];
198        let mut storage_sink_gids_to_drop = vec![];
199        let mut indexes_to_drop = vec![];
200        let mut materialized_views_to_drop = vec![];
201        let mut continual_tasks_to_drop = vec![];
202        let mut views_to_drop = vec![];
203        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
204        let mut secrets_to_drop = vec![];
205        let mut vpc_endpoints_to_drop = vec![];
206        let mut clusters_to_drop = vec![];
207        let mut cluster_replicas_to_drop = vec![];
208        let mut compute_sinks_to_drop = BTreeMap::new();
209        let mut peeks_to_drop = vec![];
210        let mut copies_to_drop = vec![];
211        let mut clusters_to_create = vec![];
212        let mut cluster_replicas_to_create = vec![];
213        let mut update_metrics_config = false;
214        let mut update_tracing_config = false;
215        let mut update_compute_config = false;
216        let mut update_storage_config = false;
217        let mut update_pg_timestamp_oracle_config = false;
218        let mut update_metrics_retention = false;
219        let mut update_secrets_caching_config = false;
220        let mut update_cluster_scheduling_config = false;
221        let mut update_arrangement_exert_proportionality = false;
222        let mut update_http_config = false;
223
224        for op in &ops {
225            match op {
226                catalog::Op::DropObjects(drop_object_infos) => {
227                    for drop_object_info in drop_object_infos {
228                        match &drop_object_info {
229                            catalog::DropObjectInfo::Item(id) => {
230                                match self.catalog().get_entry(id).item() {
231                                    CatalogItem::Table(table) => {
232                                        table_gids_to_drop
233                                            .extend(table.global_ids().map(|gid| (*id, gid)));
234                                    }
235                                    CatalogItem::Source(source) => {
236                                        sources_to_drop.push((*id, source.global_id()));
237                                        if let DataSourceDesc::Ingestion {
238                                            ingestion_desc, ..
239                                        } = &source.data_source
240                                        {
241                                            match &ingestion_desc.desc.connection {
242                                                GenericSourceConnection::Postgres(conn) => {
243                                                    let conn = conn.clone().into_inline_connection(
244                                                        self.catalog().state(),
245                                                    );
246                                                    let pending_drop = (
247                                                        conn.connection.clone(),
248                                                        conn.publication_details.slot.clone(),
249                                                    );
250                                                    replication_slots_to_drop.push(pending_drop);
251                                                }
252                                                _ => {}
253                                            }
254                                        }
255                                    }
256                                    CatalogItem::Sink(sink) => {
257                                        storage_sink_gids_to_drop.push(sink.global_id());
258                                    }
259                                    CatalogItem::Index(index) => {
260                                        indexes_to_drop.push((index.cluster_id, index.global_id()));
261                                    }
262                                    CatalogItem::MaterializedView(mv) => {
263                                        materialized_views_to_drop
264                                            .push((mv.cluster_id, mv.global_id()));
265                                    }
266                                    CatalogItem::View(view) => {
267                                        views_to_drop.push((*id, view.clone()))
268                                    }
269                                    CatalogItem::ContinualTask(ct) => {
270                                        continual_tasks_to_drop.push((
271                                            *id,
272                                            ct.cluster_id,
273                                            ct.global_id(),
274                                        ));
275                                    }
276                                    CatalogItem::Secret(_) => {
277                                        secrets_to_drop.push(*id);
278                                    }
279                                    CatalogItem::Connection(Connection { details, .. }) => {
280                                        match details {
281                                            // SSH connections have an associated secret that should be dropped
282                                            ConnectionDetails::Ssh { .. } => {
283                                                secrets_to_drop.push(*id);
284                                            }
285                                            // AWS PrivateLink connections have an associated
286                                            // VpcEndpoint K8S resource that should be dropped
287                                            ConnectionDetails::AwsPrivatelink(_) => {
288                                                vpc_endpoints_to_drop.push(*id);
289                                            }
290                                            _ => (),
291                                        }
292                                    }
293                                    _ => (),
294                                }
295                            }
296                            catalog::DropObjectInfo::Cluster(id) => {
297                                clusters_to_drop.push(*id);
298                            }
299                            catalog::DropObjectInfo::ClusterReplica((
300                                cluster_id,
301                                replica_id,
302                                _reason,
303                            )) => {
304                                // Drop the cluster replica itself.
305                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
306                            }
307                            _ => (),
308                        }
309                    }
310                }
311                catalog::Op::ResetSystemConfiguration { name }
312                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
313                    update_metrics_config |= self
314                        .catalog
315                        .state()
316                        .system_config()
317                        .is_metrics_config_var(name);
318                    update_tracing_config |= vars::is_tracing_var(name);
319                    update_compute_config |= self
320                        .catalog
321                        .state()
322                        .system_config()
323                        .is_compute_config_var(name);
324                    update_storage_config |= self
325                        .catalog
326                        .state()
327                        .system_config()
328                        .is_storage_config_var(name);
329                    update_pg_timestamp_oracle_config |=
330                        vars::is_pg_timestamp_oracle_config_var(name);
331                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
332                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
333                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
334                    update_arrangement_exert_proportionality |=
335                        name == vars::ARRANGEMENT_EXERT_PROPORTIONALITY.name();
336                    update_http_config |= vars::is_http_config_var(name);
337                }
338                catalog::Op::ResetAllSystemConfiguration => {
339                    // Assume they all need to be updated.
340                    // We could see if the config's have actually changed, but
341                    // this is simpler.
342                    update_tracing_config = true;
343                    update_compute_config = true;
344                    update_storage_config = true;
345                    update_pg_timestamp_oracle_config = true;
346                    update_metrics_retention = true;
347                    update_secrets_caching_config = true;
348                    update_cluster_scheduling_config = true;
349                    update_arrangement_exert_proportionality = true;
350                    update_http_config = true;
351                }
352                catalog::Op::RenameItem { id, .. } => {
353                    let item = self.catalog().get_entry(id);
354                    let is_webhook_source = item
355                        .source()
356                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
357                        .unwrap_or(false);
358                    if is_webhook_source {
359                        webhook_sources_to_restart.insert(*id);
360                    }
361                }
362                catalog::Op::RenameSchema {
363                    database_spec,
364                    schema_spec,
365                    ..
366                } => {
367                    let schema = self.catalog().get_schema(
368                        database_spec,
369                        schema_spec,
370                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
371                    );
372                    let webhook_sources = schema.item_ids().filter(|id| {
373                        let item = self.catalog().get_entry(id);
374                        item.source()
375                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
376                            .unwrap_or(false)
377                    });
378                    webhook_sources_to_restart.extend(webhook_sources);
379                }
380                catalog::Op::CreateCluster { id, .. } => {
381                    clusters_to_create.push(*id);
382                }
383                catalog::Op::CreateClusterReplica {
384                    cluster_id,
385                    name,
386                    config,
387                    ..
388                } => {
389                    cluster_replicas_to_create.push((
390                        *cluster_id,
391                        name.clone(),
392                        config.location.num_processes(),
393                    ));
394                }
395                _ => (),
396            }
397        }
398
399        let collections_to_drop: BTreeSet<GlobalId> = sources_to_drop
400            .iter()
401            .map(|(_, gid)| *gid)
402            .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
403            .chain(storage_sink_gids_to_drop.iter().copied())
404            .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
405            .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
406            .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
407            .chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
408            .collect();
409
410        // Clean up any active compute sinks like subscribes or copy to-s that rely on dropped relations or clusters.
411        for (sink_id, sink) in &self.active_compute_sinks {
412            let cluster_id = sink.cluster_id();
413            let conn_id = &sink.connection_id();
414            if let Some(id) = sink
415                .depends_on()
416                .iter()
417                .find(|id| collections_to_drop.contains(id))
418            {
419                let entry = self.catalog().get_entry_by_global_id(id);
420                let name = self
421                    .catalog()
422                    .resolve_full_name(entry.name(), Some(conn_id))
423                    .to_string();
424                compute_sinks_to_drop.insert(
425                    *sink_id,
426                    ActiveComputeSinkRetireReason::DependencyDropped(format!(
427                        "relation {}",
428                        name.quoted()
429                    )),
430                );
431            } else if clusters_to_drop.contains(&cluster_id) {
432                let name = self.catalog().get_cluster(cluster_id).name();
433                compute_sinks_to_drop.insert(
434                    *sink_id,
435                    ActiveComputeSinkRetireReason::DependencyDropped(format!(
436                        "cluster {}",
437                        name.quoted()
438                    )),
439                );
440            }
441        }
442
443        // Clean up any pending peeks that rely on dropped relations or clusters.
444        for (uuid, pending_peek) in &self.pending_peeks {
445            if let Some(id) = pending_peek
446                .depends_on
447                .iter()
448                .find(|id| collections_to_drop.contains(id))
449            {
450                let entry = self.catalog().get_entry_by_global_id(id);
451                let name = self
452                    .catalog()
453                    .resolve_full_name(entry.name(), Some(&pending_peek.conn_id));
454                peeks_to_drop.push((
455                    format!("relation {}", name.to_string().quoted()),
456                    uuid.clone(),
457                ));
458            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
459                let name = self.catalog().get_cluster(pending_peek.cluster_id).name();
460                peeks_to_drop.push((format!("cluster {}", name.quoted()), uuid.clone()));
461            }
462        }
463
464        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
465        for (conn_id, pending_copy) in &self.active_copies {
466            let dropping_table = table_gids_to_drop
467                .iter()
468                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
469            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
470
471            if dropping_table || dropping_cluster {
472                copies_to_drop.push(conn_id.clone());
473            }
474        }
475
476        let storage_ids_to_drop = sources_to_drop
477            .iter()
478            .map(|(_, gid)| *gid)
479            .chain(storage_sink_gids_to_drop.iter().copied())
480            .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
481            .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
482            .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
483        let compute_ids_to_drop = indexes_to_drop
484            .iter()
485            .copied()
486            .chain(materialized_views_to_drop.iter().copied())
487            .chain(
488                continual_tasks_to_drop
489                    .iter()
490                    .map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
491            );
492
493        // Check if any Timelines would become empty, if we dropped the specified storage or
494        // compute resources.
495        //
496        // Note: only after a Transaction succeeds do we actually drop the timeline
497        let collection_id_bundle = self.build_collection_id_bundle(
498            storage_ids_to_drop,
499            compute_ids_to_drop,
500            clusters_to_drop.clone(),
501        );
502        let timeline_associations: BTreeMap<_, _> = self
503            .partition_ids_by_timeline_context(&collection_id_bundle)
504            .filter_map(|(context, bundle)| {
505                let TimelineContext::TimelineDependent(timeline) = context else {
506                    return None;
507                };
508                let TimelineState { read_holds, .. } = self
509                    .global_timelines
510                    .get(&timeline)
511                    .expect("all timeslines have a timestamp oracle");
512
513                let empty = read_holds.id_bundle().difference(&bundle).is_empty();
514
515                Some((timeline, (empty, bundle)))
516            })
517            .collect();
518
519        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
520
521        // This will produce timestamps that are guaranteed to increase on each
522        // call, and also never be behind the system clock. If the system clock
523        // hasn't advanced (or has gone backward), it will increment by 1. For
524        // the audit log, we need to balance "close (within 10s or so) to the
525        // system clock" and "always goes up". We've chosen here to prioritize
526        // always going up, and believe we will always be close to the system
527        // clock because it is well configured (chrony) and so may only rarely
528        // regress or pause for 10s.
529        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
530
531        let Coordinator {
532            catalog,
533            active_conns,
534            controller,
535            cluster_replica_statuses,
536            ..
537        } = self;
538        let catalog = Arc::make_mut(catalog);
539        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
540
541        let TransactionResult {
542            mut builtin_table_updates,
543            audit_events,
544        } = catalog
545            .transact(
546                Some(&mut controller.storage_collections),
547                oracle_write_ts,
548                conn,
549                ops,
550            )
551            .await?;
552
553        // Update in-memory cluster replica statuses.
554        // TODO(jkosh44) All these builtin table updates should be handled as a builtin source
555        // updates elsewhere.
556        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
557            let replica_statuses =
558                cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
559            for (process_id, status) in replica_statuses {
560                let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
561                    *replica_id,
562                    process_id,
563                    &status,
564                    Diff::MINUS_ONE,
565                );
566                let builtin_table_update = catalog
567                    .state()
568                    .resolve_builtin_table_update(builtin_table_update);
569                builtin_table_updates.push(builtin_table_update);
570            }
571        }
572        for cluster_id in &clusters_to_drop {
573            let cluster_statuses = cluster_replica_statuses.remove_cluster_statuses(cluster_id);
574            for (replica_id, replica_statuses) in cluster_statuses {
575                for (process_id, status) in replica_statuses {
576                    let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
577                        replica_id,
578                        process_id,
579                        &status,
580                        Diff::MINUS_ONE,
581                    );
582                    let builtin_table_update = catalog
583                        .state()
584                        .resolve_builtin_table_update(builtin_table_update);
585                    builtin_table_updates.push(builtin_table_update);
586                }
587            }
588        }
589        for cluster_id in clusters_to_create {
590            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
591            for (replica_id, replica_statuses) in
592                cluster_replica_statuses.get_cluster_statuses(cluster_id)
593            {
594                for (process_id, status) in replica_statuses {
595                    let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
596                        *replica_id,
597                        *process_id,
598                        status,
599                        Diff::ONE,
600                    );
601                    let builtin_table_update = catalog
602                        .state()
603                        .resolve_builtin_table_update(builtin_table_update);
604                    builtin_table_updates.push(builtin_table_update);
605                }
606            }
607        }
608        let now = to_datetime((catalog.config().now)());
609        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
610            let replica_id = catalog
611                .resolve_replica_in_cluster(&cluster_id, &replica_name)
612                .expect("just created")
613                .replica_id();
614            cluster_replica_statuses.initialize_cluster_replica_statuses(
615                cluster_id,
616                replica_id,
617                num_processes,
618                now,
619            );
620            for (process_id, status) in
621                cluster_replica_statuses.get_cluster_replica_statuses(cluster_id, replica_id)
622            {
623                let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
624                    replica_id,
625                    *process_id,
626                    status,
627                    Diff::ONE,
628                );
629                let builtin_table_update = catalog
630                    .state()
631                    .resolve_builtin_table_update(builtin_table_update);
632                builtin_table_updates.push(builtin_table_update);
633            }
634        }
635
636        // Append our builtin table updates, then return the notify so we can run other tasks in
637        // parallel.
638        let (builtin_update_notify, _) = self
639            .builtin_table_update()
640            .execute(builtin_table_updates)
641            .await;
642
643        // No error returns are allowed after this point. Enforce this at compile time
644        // by using this odd structure so we don't accidentally add a stray `?`.
645        let _: () = async {
646            if !timeline_associations.is_empty() {
647                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
648                    let became_empty =
649                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
650                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
651                }
652            }
653            if !table_gids_to_drop.is_empty() {
654                let ts = self.get_local_write_ts().await;
655                self.drop_tables(table_gids_to_drop, ts.timestamp);
656            }
657            // Note that we drop tables before sources since there can be a weak dependency
658            // on sources from tables in the storage controller that will result in error
659            // logging that we'd prefer to avoid. This isn't an actual dependency issue but
660            // we'd like to keep that error logging around to indicate when an actual
661            // dependency error might occur.
662            if !sources_to_drop.is_empty() {
663                self.drop_sources(sources_to_drop);
664            }
665            if !webhook_sources_to_restart.is_empty() {
666                self.restart_webhook_sources(webhook_sources_to_restart);
667            }
668            if !storage_sink_gids_to_drop.is_empty() {
669                self.drop_storage_sinks(storage_sink_gids_to_drop);
670            }
671            if !compute_sinks_to_drop.is_empty() {
672                self.retire_compute_sinks(compute_sinks_to_drop).await;
673            }
674            if !peeks_to_drop.is_empty() {
675                for (dropped_name, uuid) in peeks_to_drop {
676                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
677                        let cancel_reason = PeekResponse::Error(format!(
678                            "query could not complete because {dropped_name} was dropped"
679                        ));
680                        self.controller
681                            .compute
682                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
683                            .unwrap_or_terminate("unable to cancel peek");
684                        self.retire_execution(
685                            StatementEndedExecutionReason::Canceled,
686                            pending_peek.ctx_extra,
687                        );
688                    }
689                }
690            }
691            if !copies_to_drop.is_empty() {
692                for conn_id in copies_to_drop {
693                    self.cancel_pending_copy(&conn_id);
694                }
695            }
696            if !indexes_to_drop.is_empty() {
697                self.drop_indexes(indexes_to_drop);
698            }
699            if !materialized_views_to_drop.is_empty() {
700                self.drop_materialized_views(materialized_views_to_drop);
701            }
702            if !continual_tasks_to_drop.is_empty() {
703                self.drop_continual_tasks(continual_tasks_to_drop);
704            }
705            if !vpc_endpoints_to_drop.is_empty() {
706                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
707            }
708            if !cluster_replicas_to_drop.is_empty() {
709                fail::fail_point!("after_catalog_drop_replica");
710                for (cluster_id, replica_id) in cluster_replicas_to_drop {
711                    self.drop_replica(cluster_id, replica_id);
712                }
713            }
714            if !clusters_to_drop.is_empty() {
715                for cluster_id in clusters_to_drop {
716                    self.controller.drop_cluster(cluster_id);
717                }
718            }
719
720            // We don't want to block the main coordinator thread on cleaning
721            // up external resources (PostgreSQL replication slots and secrets),
722            // so we perform that cleanup in a background task.
723            //
724            // TODO(database-issues#4154): This is inherently best effort. An ill-timed crash
725            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
726            // See <https://github.com/MaterializeInc/database-issues/issues/4154>
727            task::spawn(|| "drop_replication_slots_and_secrets", {
728                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
729                let secrets_controller = Arc::clone(&self.secrets_controller);
730                let secrets_reader = Arc::clone(self.secrets_reader());
731                let storage_config = self.controller.storage.config().clone();
732
733                async move {
734                    for (connection, replication_slot_name) in replication_slots_to_drop {
735                        tracing::info!(?replication_slot_name, "dropping replication slot");
736
737                        // Try to drop the replication slots, but give up after
738                        // a while. The PostgreSQL server may no longer be
739                        // healthy. Users often drop PostgreSQL sources
740                        // *because* the PostgreSQL server has been
741                        // decomissioned.
742                        let result: Result<(), anyhow::Error> = Retry::default()
743                            .max_duration(Duration::from_secs(60))
744                            .retry_async(|_state| async {
745                                let config = connection
746                                    .config(&secrets_reader, &storage_config, InTask::No)
747                                    .await
748                                    .map_err(|e| {
749                                        anyhow::anyhow!(
750                                            "error creating Postgres client for \
751                                            dropping acquired slots: {}",
752                                            e.display_with_causes()
753                                        )
754                                    })?;
755
756                                // Yugabyte does not support waiting for the replication to become
757                                // inactive before dropping it. That is fine though because in that
758                                // case dropping will fail and we'll go around the retry loop which
759                                // is effectively the same as waiting 60 seconds.
760                                let should_wait = match connection.flavor {
761                                    PostgresFlavor::Vanilla => true,
762                                    PostgresFlavor::Yugabyte => false,
763                                };
764                                mz_postgres_util::drop_replication_slots(
765                                    &ssh_tunnel_manager,
766                                    config.clone(),
767                                    &[(&replication_slot_name, should_wait)],
768                                )
769                                .await?;
770
771                                Ok(())
772                            })
773                            .await;
774
775                        if let Err(err) = result {
776                            tracing::warn!(
777                                ?replication_slot_name,
778                                ?err,
779                                "failed to drop replication slot"
780                            );
781                        }
782                    }
783
784                    // Drop secrets *after* dropping the replication slots,
785                    // because those replication slots may.
786                    //
787                    // It's okay if we crash before processing the secret drops,
788                    // as we look for and remove any orphaned secrets during
789                    // startup.
790                    fail_point!("drop_secrets");
791                    for secret in secrets_to_drop {
792                        if let Err(e) = secrets_controller.delete(secret).await {
793                            warn!("Dropping secrets has encountered an error: {}", e);
794                        }
795                    }
796                }
797            });
798
799            if update_metrics_config {
800                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
801            }
802            if update_compute_config {
803                self.update_compute_config();
804            }
805            if update_storage_config {
806                self.update_storage_config();
807            }
808            if update_pg_timestamp_oracle_config {
809                self.update_pg_timestamp_oracle_config();
810            }
811            if update_metrics_retention {
812                self.update_metrics_retention();
813            }
814            if update_tracing_config {
815                self.update_tracing_config();
816            }
817            if update_secrets_caching_config {
818                self.update_secrets_caching_config();
819            }
820            if update_cluster_scheduling_config {
821                self.update_cluster_scheduling_config();
822            }
823            if update_arrangement_exert_proportionality {
824                self.update_arrangement_exert_proportionality();
825            }
826            if update_http_config {
827                self.update_http_config();
828            }
829        }
830        .instrument(info_span!("coord::catalog_transact_with::finalize"))
831        .await;
832
833        let conn = conn_id.and_then(|id| self.active_conns.get(id));
834        if let Some(segment_client) = &self.segment_client {
835            for VersionedEvent::V1(event) in audit_events {
836                let event_type = format!(
837                    "{} {}",
838                    event.object_type.as_title_case(),
839                    event.event_type.as_title_case()
840                );
841                segment_client.environment_track(
842                    &self.catalog().config().environment_id,
843                    event_type,
844                    json!({ "details": event.details.as_json() }),
845                    EventDetails {
846                        user_id: conn
847                            .and_then(|c| c.user().external_metadata.as_ref())
848                            .map(|m| m.user_id),
849                        application_name: conn.map(|c| c.application_name()),
850                        ..Default::default()
851                    },
852                );
853            }
854        }
855
856        // Note: It's important that we keep the function call inside macro, this way we only run
857        // the consistency checks if sort assertions are enabled.
858        mz_ore::soft_assert_eq_no_log!(
859            self.check_consistency(),
860            Ok(()),
861            "coordinator inconsistency detected"
862        );
863
864        Ok(builtin_update_notify)
865    }
866
867    fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
868        if let Some(Some(ReplicaMetadata { metrics })) =
869            self.transient_replica_metadata.insert(replica_id, None)
870        {
871            let mut updates = vec![];
872            if let Some(metrics) = metrics {
873                let retractions = self.catalog().state().pack_replica_metric_updates(
874                    replica_id,
875                    &metrics,
876                    Diff::MINUS_ONE,
877                );
878                let retractions = self
879                    .catalog()
880                    .state()
881                    .resolve_builtin_table_updates(retractions);
882                updates.extend(retractions);
883            }
884            // We don't care about when the write finishes.
885            let _notify = self.builtin_table_update().background(updates);
886        }
887
888        self.drop_introspection_subscribes(replica_id);
889
890        self.controller
891            .drop_replica(cluster_id, replica_id)
892            .expect("dropping replica must not fail");
893    }
894
895    /// A convenience method for dropping sources.
896    fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
897        for (item_id, _gid) in &sources {
898            self.active_webhooks.remove(item_id);
899        }
900        let storage_metadata = self.catalog.state().storage_metadata();
901        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
902        self.controller
903            .storage
904            .drop_sources(storage_metadata, source_gids)
905            .unwrap_or_terminate("cannot fail to drop sources");
906    }
907
908    fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
909        for (item_id, _gid) in &tables {
910            self.active_webhooks.remove(item_id);
911        }
912        let storage_metadata = self.catalog.state().storage_metadata();
913        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
914        self.controller
915            .storage
916            .drop_tables(storage_metadata, table_gids, ts)
917            .unwrap_or_terminate("cannot fail to drop tables");
918    }
919
920    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
921        for id in sources {
922            self.active_webhooks.remove(&id);
923        }
924    }
925
926    /// Like `drop_compute_sinks`, but for a single compute sink.
927    ///
928    /// Returns the controller's state for the compute sink if the identified
929    /// sink was known to the controller. It is the caller's responsibility to
930    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
931    #[must_use]
932    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
933        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
934    }
935
936    /// Drops a batch of compute sinks.
937    ///
938    /// For each sink that exists, the coordinator and controller's state
939    /// associated with the sink is removed.
940    ///
941    /// Returns a map containing the controller's state for each sink that was
942    /// removed. It is the caller's responsibility to retire the returned sinks.
943    /// Consider using `retire_compute_sinks` instead.
944    #[must_use]
945    pub async fn drop_compute_sinks(
946        &mut self,
947        sink_ids: impl IntoIterator<Item = GlobalId>,
948    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
949        let mut by_id = BTreeMap::new();
950        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
951        for sink_id in sink_ids {
952            let sink = match self.remove_active_compute_sink(sink_id).await {
953                None => {
954                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
955                    continue;
956                }
957                Some(sink) => sink,
958            };
959
960            by_cluster
961                .entry(sink.cluster_id())
962                .or_default()
963                .push(sink_id);
964            by_id.insert(sink_id, sink);
965        }
966        for (cluster_id, ids) in by_cluster {
967            let compute = &mut self.controller.compute;
968            // A cluster could have been dropped, so verify it exists.
969            if compute.instance_exists(cluster_id) {
970                compute
971                    .drop_collections(cluster_id, ids)
972                    .unwrap_or_terminate("cannot fail to drop collections");
973            }
974        }
975        by_id
976    }
977
978    /// Retires a batch of sinks with disparate reasons for retirement.
979    ///
980    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
981    /// then retired with its corresponding reason.
982    pub async fn retire_compute_sinks(
983        &mut self,
984        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
985    ) {
986        let sink_ids = reasons.keys().cloned();
987        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
988            let reason = reasons
989                .remove(&id)
990                .expect("all returned IDs are in `reasons`");
991            sink.retire(reason);
992        }
993    }
994
995    /// Drops all pending replicas for a set of clusters
996    /// that are undergoing reconfiguration.
997    pub async fn drop_reconfiguration_replicas(
998        &mut self,
999        cluster_ids: BTreeSet<ClusterId>,
1000    ) -> Result<(), AdapterError> {
1001        let pending_cluster_ops: Vec<Op> = cluster_ids
1002            .iter()
1003            .map(|c| {
1004                self.catalog()
1005                    .get_cluster(c.clone())
1006                    .replicas()
1007                    .filter_map(|r| match r.config.location {
1008                        ReplicaLocation::Managed(ref l) if l.pending => {
1009                            Some(DropObjectInfo::ClusterReplica((
1010                                c.clone(),
1011                                r.replica_id,
1012                                ReplicaCreateDropReason::Manual,
1013                            )))
1014                        }
1015                        _ => None,
1016                    })
1017                    .collect::<Vec<DropObjectInfo>>()
1018            })
1019            .filter_map(|pending_replica_drop_ops_by_cluster| {
1020                match pending_replica_drop_ops_by_cluster.len() {
1021                    0 => None,
1022                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
1023                }
1024            })
1025            .collect();
1026        if !pending_cluster_ops.is_empty() {
1027            self.catalog_transact(None, pending_cluster_ops).await?;
1028        }
1029        Ok(())
1030    }
1031
1032    /// Cancels all active compute sinks for the identified connection.
1033    #[mz_ore::instrument(level = "debug")]
1034    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
1035        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
1036            .await
1037    }
1038
1039    /// Cancels all active cluster reconfigurations sinks for the identified connection.
1040    #[mz_ore::instrument(level = "debug")]
1041    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
1042        &mut self,
1043        conn_id: &ConnectionId,
1044    ) {
1045        self.retire_cluster_reconfigurations_for_conn(conn_id).await
1046    }
1047
1048    /// Retires all active compute sinks for the identified connection with the
1049    /// specified reason.
1050    #[mz_ore::instrument(level = "debug")]
1051    pub(crate) async fn retire_compute_sinks_for_conn(
1052        &mut self,
1053        conn_id: &ConnectionId,
1054        reason: ActiveComputeSinkRetireReason,
1055    ) {
1056        let drop_sinks = self
1057            .active_conns
1058            .get_mut(conn_id)
1059            .expect("must exist for active session")
1060            .drop_sinks
1061            .iter()
1062            .map(|sink_id| (*sink_id, reason.clone()))
1063            .collect();
1064        self.retire_compute_sinks(drop_sinks).await;
1065    }
1066
1067    /// Cleans pending cluster reconfiguraiotns for the identified connection
1068    #[mz_ore::instrument(level = "debug")]
1069    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
1070        &mut self,
1071        conn_id: &ConnectionId,
1072    ) {
1073        let reconfiguring_clusters = self
1074            .active_conns
1075            .get(conn_id)
1076            .expect("must exist for active session")
1077            .pending_cluster_alters
1078            .clone();
1079        // try to drop reconfig replicas
1080        self.drop_reconfiguration_replicas(reconfiguring_clusters)
1081            .await
1082            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
1083
1084        self.active_conns
1085            .get_mut(conn_id)
1086            .expect("must exist for active session")
1087            .pending_cluster_alters
1088            .clear();
1089    }
1090
1091    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
1092        let storage_metadata = self.catalog.state().storage_metadata();
1093        self.controller
1094            .storage
1095            .drop_sinks(storage_metadata, sink_gids)
1096            .unwrap_or_terminate("cannot fail to drop sinks");
1097    }
1098
1099    pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
1100        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1101        for (cluster_id, gid) in indexes {
1102            by_cluster.entry(cluster_id).or_default().push(gid);
1103        }
1104        for (cluster_id, gids) in by_cluster {
1105            let compute = &mut self.controller.compute;
1106            // A cluster could have been dropped, so verify it exists.
1107            if compute.instance_exists(cluster_id) {
1108                compute
1109                    .drop_collections(cluster_id, gids)
1110                    .unwrap_or_terminate("cannot fail to drop collections");
1111            }
1112        }
1113    }
1114
1115    /// A convenience method for dropping materialized views.
1116    fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
1117        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1118        let mut mv_gids = Vec::new();
1119        for (cluster_id, gid) in mviews {
1120            by_cluster.entry(cluster_id).or_default().push(gid);
1121            mv_gids.push(gid);
1122        }
1123
1124        // Drop compute sinks.
1125        for (cluster_id, ids) in by_cluster {
1126            let compute = &mut self.controller.compute;
1127            // A cluster could have been dropped, so verify it exists.
1128            if compute.instance_exists(cluster_id) {
1129                compute
1130                    .drop_collections(cluster_id, ids)
1131                    .unwrap_or_terminate("cannot fail to drop collections");
1132            }
1133        }
1134
1135        // Drop storage resources.
1136        let storage_metadata = self.catalog.state().storage_metadata();
1137        self.controller
1138            .storage
1139            .drop_sources(storage_metadata, mv_gids)
1140            .unwrap_or_terminate("cannot fail to drop sources");
1141    }
1142
1143    /// A convenience method for dropping continual tasks.
1144    fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
1145        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1146        let mut source_ids = Vec::new();
1147        for (item_id, cluster_id, gid) in cts {
1148            by_cluster.entry(cluster_id).or_default().push(gid);
1149            source_ids.push((item_id, gid));
1150        }
1151
1152        // Drop compute sinks.
1153        for (cluster_id, ids) in by_cluster {
1154            let compute = &mut self.controller.compute;
1155            // A cluster could have been dropped, so verify it exists.
1156            if compute.instance_exists(cluster_id) {
1157                compute
1158                    .drop_collections(cluster_id, ids)
1159                    .unwrap_or_terminate("cannot fail to drop collections");
1160            }
1161        }
1162
1163        // Drop storage sources.
1164        self.drop_sources(source_ids)
1165    }
1166
1167    fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
1168        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
1169            .as_ref()
1170            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
1171            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
1172        // We don't want to block the coordinator on an external delete api
1173        // calls, so move the drop vpc_endpoint to a separate task. This does
1174        // mean that a failed drop won't bubble up to the user as an error
1175        // message. However, even if it did (and how the code previously
1176        // worked), mz has already dropped it from our catalog, and so we
1177        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
1178        // eventually be cleaned during restart via coord bootstrap.
1179        task::spawn(
1180            || "drop_vpc_endpoints",
1181            async move {
1182                for vpc_endpoint in vpc_endpoints {
1183                    let _ = Retry::default()
1184                        .max_duration(Duration::from_secs(60))
1185                        .retry_async(|_state| async {
1186                            fail_point!("drop_vpc_endpoint", |r| {
1187                                Err(anyhow::anyhow!("Fail point error {:?}", r))
1188                            });
1189                            match cloud_resource_controller
1190                                .delete_vpc_endpoint(vpc_endpoint)
1191                                .await
1192                            {
1193                                Ok(_) => Ok(()),
1194                                Err(e) => {
1195                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
1196                                    Err(e)
1197                                }
1198                            }
1199                        })
1200                        .await;
1201                }
1202            }
1203            .instrument(info_span!(
1204                "coord::catalog_transact_inner::drop_vpc_endpoints"
1205            )),
1206        );
1207    }
1208
1209    /// Removes all temporary items created by the specified connection, though
1210    /// not the temporary schema itself.
1211    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
1212        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
1213        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
1214
1215        if all_items.is_empty() {
1216            return;
1217        }
1218        let op = Op::DropObjects(
1219            all_items
1220                .into_iter()
1221                .map(DropObjectInfo::manual_drop_from_object_id)
1222                .collect(),
1223        );
1224
1225        self.catalog_transact_conn(Some(conn_id), vec![op])
1226            .await
1227            .expect("unable to drop temporary items for conn_id");
1228    }
1229
1230    fn update_cluster_scheduling_config(&self) {
1231        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
1232        self.controller
1233            .update_orchestrator_scheduling_config(config);
1234    }
1235
1236    fn update_secrets_caching_config(&self) {
1237        let config = flags::caching_config(self.catalog.system_config());
1238        self.caching_secrets_reader.set_policy(config);
1239    }
1240
1241    fn update_tracing_config(&self) {
1242        let tracing = flags::tracing_config(self.catalog().system_config());
1243        tracing.apply(&self.tracing_handle);
1244    }
1245
1246    fn update_compute_config(&mut self) {
1247        let config_params = flags::compute_config(self.catalog().system_config());
1248        self.controller.compute.update_configuration(config_params);
1249    }
1250
1251    fn update_storage_config(&mut self) {
1252        let config_params = flags::storage_config(self.catalog().system_config());
1253        self.controller.storage.update_parameters(config_params);
1254    }
1255
1256    fn update_pg_timestamp_oracle_config(&self) {
1257        let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
1258        if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
1259            config_params.apply(config)
1260        }
1261    }
1262
1263    fn update_metrics_retention(&self) {
1264        let duration = self.catalog().system_config().metrics_retention();
1265        let policy = ReadPolicy::lag_writes_by(
1266            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
1267                tracing::error!("Absurd metrics retention duration: {duration:?}.");
1268                u64::MAX
1269            })),
1270            SINCE_GRANULARITY,
1271        );
1272        let storage_policies = self
1273            .catalog()
1274            .entries()
1275            .filter(|entry| {
1276                entry.item().is_retained_metrics_object()
1277                    && entry.item().is_compute_object_on_cluster().is_none()
1278            })
1279            .map(|entry| (entry.id(), policy.clone()))
1280            .collect::<Vec<_>>();
1281        let compute_policies = self
1282            .catalog()
1283            .entries()
1284            .filter_map(|entry| {
1285                if let (true, Some(cluster_id)) = (
1286                    entry.item().is_retained_metrics_object(),
1287                    entry.item().is_compute_object_on_cluster(),
1288                ) {
1289                    Some((cluster_id, entry.id(), policy.clone()))
1290                } else {
1291                    None
1292                }
1293            })
1294            .collect::<Vec<_>>();
1295        self.update_storage_read_policies(storage_policies);
1296        self.update_compute_read_policies(compute_policies);
1297    }
1298
1299    fn update_arrangement_exert_proportionality(&mut self) {
1300        let prop = self
1301            .catalog()
1302            .system_config()
1303            .arrangement_exert_proportionality();
1304        self.controller
1305            .compute
1306            .set_arrangement_exert_proportionality(prop);
1307    }
1308
1309    fn update_http_config(&mut self) {
1310        let webhook_request_limit = self
1311            .catalog()
1312            .system_config()
1313            .webhook_concurrent_request_limit();
1314        self.webhook_concurrency_limit
1315            .set_limit(webhook_request_limit);
1316    }
1317
1318    pub(crate) async fn create_storage_export(
1319        &mut self,
1320        id: GlobalId,
1321        sink: &Sink,
1322    ) -> Result<(), AdapterError> {
1323        // Validate `sink.from` is in fact a storage collection
1324        self.controller.storage.check_exists(sink.from)?;
1325
1326        // The AsOf is used to determine at what time to snapshot reading from
1327        // the persist collection.  This is primarily relevant when we do _not_
1328        // want to include the snapshot in the sink.
1329        //
1330        // We choose the smallest as_of that is legal, according to the sinked
1331        // collection's since.
1332        let id_bundle = crate::CollectionIdBundle {
1333            storage_ids: btreeset! {sink.from},
1334            compute_ids: btreemap! {},
1335        };
1336
1337        // We're putting in place read holds, such that create_exports, below,
1338        // which calls update_read_capabilities, can successfully do so.
1339        // Otherwise, the since of dependencies might move along concurrently,
1340        // pulling the rug from under us!
1341        //
1342        // TODO: Maybe in the future, pass those holds on to storage, to hold on
1343        // to them and downgrade when possible?
1344        let read_holds = self.acquire_read_holds(&id_bundle);
1345        let as_of = self.least_valid_read(&read_holds);
1346
1347        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1348        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1349            from: sink.from,
1350            from_desc: storage_sink_from_entry
1351                .desc(&self.catalog().resolve_full_name(
1352                    storage_sink_from_entry.name(),
1353                    storage_sink_from_entry.conn_id(),
1354                ))
1355                .expect("sinks can only be built on items with descs")
1356                .into_owned(),
1357            connection: sink
1358                .connection
1359                .clone()
1360                .into_inline_connection(self.catalog().state()),
1361            envelope: sink.envelope,
1362            as_of,
1363            with_snapshot: sink.with_snapshot,
1364            version: sink.version,
1365            from_storage_metadata: (),
1366            to_storage_metadata: (),
1367        };
1368
1369        let collection_desc = CollectionDescription {
1370            // TODO(sinks): make generic once we have more than one sink type.
1371            desc: KAFKA_PROGRESS_DESC.clone(),
1372            data_source: DataSource::Sink {
1373                desc: ExportDescription {
1374                    sink: storage_sink_desc,
1375                    instance_id: sink.cluster_id,
1376                },
1377            },
1378            since: None,
1379            status_collection_id: None,
1380            timeline: None,
1381        };
1382        let collections = vec![(id, collection_desc)];
1383
1384        // Create the collections.
1385        let storage_metadata = self.catalog.state().storage_metadata();
1386        let res = self
1387            .controller
1388            .storage
1389            .create_collections(storage_metadata, None, collections)
1390            .await;
1391
1392        // Drop read holds after the export has been created, at which point
1393        // storage will have put in its own read holds.
1394        drop(read_holds);
1395
1396        Ok(res?)
1397    }
1398
1399    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1400    /// exceeded.
1401    fn validate_resource_limits(
1402        &self,
1403        ops: &Vec<catalog::Op>,
1404        conn_id: &ConnectionId,
1405    ) -> Result<(), AdapterError> {
1406        let mut new_kafka_connections = 0;
1407        let mut new_postgres_connections = 0;
1408        let mut new_mysql_connections = 0;
1409        let mut new_sql_server_connections = 0;
1410        let mut new_aws_privatelink_connections = 0;
1411        let mut new_tables = 0;
1412        let mut new_sources = 0;
1413        let mut new_sinks = 0;
1414        let mut new_materialized_views = 0;
1415        let mut new_clusters = 0;
1416        let mut new_replicas_per_cluster = BTreeMap::new();
1417        let mut new_credit_consumption_rate = Numeric::zero();
1418        let mut new_databases = 0;
1419        let mut new_schemas_per_database = BTreeMap::new();
1420        let mut new_objects_per_schema = BTreeMap::new();
1421        let mut new_secrets = 0;
1422        let mut new_roles = 0;
1423        let mut new_continual_tasks = 0;
1424        let mut new_network_policies = 0;
1425        for op in ops {
1426            match op {
1427                Op::CreateDatabase { .. } => {
1428                    new_databases += 1;
1429                }
1430                Op::CreateSchema { database_id, .. } => {
1431                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1432                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1433                    }
1434                }
1435                Op::CreateRole { .. } => {
1436                    new_roles += 1;
1437                }
1438                Op::CreateNetworkPolicy { .. } => {
1439                    new_network_policies += 1;
1440                }
1441                Op::CreateCluster { .. } => {
1442                    // TODO(benesch): having deprecated linked clusters, remove
1443                    // the `max_sources` and `max_sinks` limit, and set a higher
1444                    // max cluster limit?
1445                    new_clusters += 1;
1446                }
1447                Op::CreateClusterReplica {
1448                    cluster_id, config, ..
1449                } => {
1450                    if cluster_id.is_user() {
1451                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1452                        if let ReplicaLocation::Managed(location) = &config.location {
1453                            let replica_allocation = self
1454                                .catalog()
1455                                .cluster_replica_sizes()
1456                                .0
1457                                .get(location.size_for_billing())
1458                                .expect(
1459                                    "location size is validated against the cluster replica sizes",
1460                                );
1461                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1462                        }
1463                    }
1464                }
1465                Op::CreateItem { name, item, .. } => {
1466                    *new_objects_per_schema
1467                        .entry((
1468                            name.qualifiers.database_spec.clone(),
1469                            name.qualifiers.schema_spec.clone(),
1470                        ))
1471                        .or_insert(0) += 1;
1472                    match item {
1473                        CatalogItem::Connection(connection) => match connection.details {
1474                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1475                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1476                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1477                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1478                            ConnectionDetails::AwsPrivatelink(_) => {
1479                                new_aws_privatelink_connections += 1
1480                            }
1481                            ConnectionDetails::Csr(_)
1482                            | ConnectionDetails::Ssh { .. }
1483                            | ConnectionDetails::Aws(_) => {}
1484                        },
1485                        CatalogItem::Table(_) => {
1486                            new_tables += 1;
1487                        }
1488                        CatalogItem::Source(source) => {
1489                            new_sources += source.user_controllable_persist_shard_count()
1490                        }
1491                        CatalogItem::Sink(_) => new_sinks += 1,
1492                        CatalogItem::MaterializedView(_) => {
1493                            new_materialized_views += 1;
1494                        }
1495                        CatalogItem::Secret(_) => {
1496                            new_secrets += 1;
1497                        }
1498                        CatalogItem::ContinualTask(_) => {
1499                            new_continual_tasks += 1;
1500                        }
1501                        CatalogItem::Log(_)
1502                        | CatalogItem::View(_)
1503                        | CatalogItem::Index(_)
1504                        | CatalogItem::Type(_)
1505                        | CatalogItem::Func(_) => {}
1506                    }
1507                }
1508                Op::DropObjects(drop_object_infos) => {
1509                    for drop_object_info in drop_object_infos {
1510                        match drop_object_info {
1511                            DropObjectInfo::Cluster(_) => {
1512                                new_clusters -= 1;
1513                            }
1514                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1515                                if cluster_id.is_user() {
1516                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1517                                    let cluster = self
1518                                        .catalog()
1519                                        .get_cluster_replica(*cluster_id, *replica_id);
1520                                    if let ReplicaLocation::Managed(location) =
1521                                        &cluster.config.location
1522                                    {
1523                                        let replica_allocation = self
1524                                            .catalog()
1525                                            .cluster_replica_sizes()
1526                                            .0
1527                                            .get(location.size_for_billing())
1528                                            .expect(
1529                                                "location size is validated against the cluster replica sizes",
1530                                            );
1531                                        new_credit_consumption_rate -=
1532                                            replica_allocation.credits_per_hour
1533                                    }
1534                                }
1535                            }
1536                            DropObjectInfo::Database(_) => {
1537                                new_databases -= 1;
1538                            }
1539                            DropObjectInfo::Schema((database_spec, _)) => {
1540                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1541                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1542                                }
1543                            }
1544                            DropObjectInfo::Role(_) => {
1545                                new_roles -= 1;
1546                            }
1547                            DropObjectInfo::NetworkPolicy(_) => {
1548                                new_network_policies -= 1;
1549                            }
1550                            DropObjectInfo::Item(id) => {
1551                                let entry = self.catalog().get_entry(id);
1552                                *new_objects_per_schema
1553                                    .entry((
1554                                        entry.name().qualifiers.database_spec.clone(),
1555                                        entry.name().qualifiers.schema_spec.clone(),
1556                                    ))
1557                                    .or_insert(0) -= 1;
1558                                match entry.item() {
1559                                    CatalogItem::Connection(connection) => match connection.details
1560                                    {
1561                                        ConnectionDetails::AwsPrivatelink(_) => {
1562                                            new_aws_privatelink_connections -= 1;
1563                                        }
1564                                        _ => (),
1565                                    },
1566                                    CatalogItem::Table(_) => {
1567                                        new_tables -= 1;
1568                                    }
1569                                    CatalogItem::Source(source) => {
1570                                        new_sources -=
1571                                            source.user_controllable_persist_shard_count()
1572                                    }
1573                                    CatalogItem::Sink(_) => new_sinks -= 1,
1574                                    CatalogItem::MaterializedView(_) => {
1575                                        new_materialized_views -= 1;
1576                                    }
1577                                    CatalogItem::Secret(_) => {
1578                                        new_secrets -= 1;
1579                                    }
1580                                    CatalogItem::ContinualTask(_) => {
1581                                        new_continual_tasks -= 1;
1582                                    }
1583                                    CatalogItem::Log(_)
1584                                    | CatalogItem::View(_)
1585                                    | CatalogItem::Index(_)
1586                                    | CatalogItem::Type(_)
1587                                    | CatalogItem::Func(_) => {}
1588                                }
1589                            }
1590                        }
1591                    }
1592                }
1593                Op::UpdateItem {
1594                    name: _,
1595                    id,
1596                    to_item,
1597                } => match to_item {
1598                    CatalogItem::Source(source) => {
1599                        let current_source = self
1600                            .catalog()
1601                            .get_entry(id)
1602                            .source()
1603                            .expect("source update is for source item");
1604
1605                        new_sources += source.user_controllable_persist_shard_count()
1606                            - current_source.user_controllable_persist_shard_count();
1607                    }
1608                    CatalogItem::Connection(_)
1609                    | CatalogItem::Table(_)
1610                    | CatalogItem::Sink(_)
1611                    | CatalogItem::MaterializedView(_)
1612                    | CatalogItem::Secret(_)
1613                    | CatalogItem::Log(_)
1614                    | CatalogItem::View(_)
1615                    | CatalogItem::Index(_)
1616                    | CatalogItem::Type(_)
1617                    | CatalogItem::Func(_)
1618                    | CatalogItem::ContinualTask(_) => {}
1619                },
1620                Op::AlterRole { .. }
1621                | Op::AlterRetainHistory { .. }
1622                | Op::AlterNetworkPolicy { .. }
1623                | Op::AlterAddColumn { .. }
1624                | Op::UpdatePrivilege { .. }
1625                | Op::UpdateDefaultPrivilege { .. }
1626                | Op::GrantRole { .. }
1627                | Op::RenameCluster { .. }
1628                | Op::RenameClusterReplica { .. }
1629                | Op::RenameItem { .. }
1630                | Op::RenameSchema { .. }
1631                | Op::UpdateOwner { .. }
1632                | Op::RevokeRole { .. }
1633                | Op::UpdateClusterConfig { .. }
1634                | Op::UpdateClusterReplicaConfig { .. }
1635                | Op::UpdateSourceReferences { .. }
1636                | Op::UpdateSystemConfiguration { .. }
1637                | Op::ResetSystemConfiguration { .. }
1638                | Op::ResetAllSystemConfiguration { .. }
1639                | Op::Comment { .. }
1640                | Op::WeirdStorageUsageUpdates { .. }
1641                | Op::TransactionDryRun => {}
1642            }
1643        }
1644
1645        let mut current_aws_privatelink_connections = 0;
1646        let mut current_postgres_connections = 0;
1647        let mut current_mysql_connections = 0;
1648        let mut current_sql_server_connections = 0;
1649        let mut current_kafka_connections = 0;
1650        for c in self.catalog().user_connections() {
1651            let connection = c
1652                .connection()
1653                .expect("`user_connections()` only returns connection objects");
1654
1655            match connection.details {
1656                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1657                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1658                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1659                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1660                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1661                ConnectionDetails::Csr(_)
1662                | ConnectionDetails::Ssh { .. }
1663                | ConnectionDetails::Aws(_) => {}
1664            }
1665        }
1666        self.validate_resource_limit(
1667            current_kafka_connections,
1668            new_kafka_connections,
1669            SystemVars::max_kafka_connections,
1670            "Kafka Connection",
1671            MAX_KAFKA_CONNECTIONS.name(),
1672        )?;
1673        self.validate_resource_limit(
1674            current_postgres_connections,
1675            new_postgres_connections,
1676            SystemVars::max_postgres_connections,
1677            "PostgreSQL Connection",
1678            MAX_POSTGRES_CONNECTIONS.name(),
1679        )?;
1680        self.validate_resource_limit(
1681            current_mysql_connections,
1682            new_mysql_connections,
1683            SystemVars::max_mysql_connections,
1684            "MySQL Connection",
1685            MAX_MYSQL_CONNECTIONS.name(),
1686        )?;
1687        self.validate_resource_limit(
1688            current_sql_server_connections,
1689            new_sql_server_connections,
1690            SystemVars::max_sql_server_connections,
1691            "SQL Server Connection",
1692            MAX_SQL_SERVER_CONNECTIONS.name(),
1693        )?;
1694        self.validate_resource_limit(
1695            current_aws_privatelink_connections,
1696            new_aws_privatelink_connections,
1697            SystemVars::max_aws_privatelink_connections,
1698            "AWS PrivateLink Connection",
1699            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1700        )?;
1701        self.validate_resource_limit(
1702            self.catalog().user_tables().count(),
1703            new_tables,
1704            SystemVars::max_tables,
1705            "table",
1706            MAX_TABLES.name(),
1707        )?;
1708
1709        let current_sources: usize = self
1710            .catalog()
1711            .user_sources()
1712            .filter_map(|source| source.source())
1713            .map(|source| source.user_controllable_persist_shard_count())
1714            .sum::<i64>()
1715            .try_into()
1716            .expect("non-negative sum of sources");
1717
1718        self.validate_resource_limit(
1719            current_sources,
1720            new_sources,
1721            SystemVars::max_sources,
1722            "source",
1723            MAX_SOURCES.name(),
1724        )?;
1725        self.validate_resource_limit(
1726            self.catalog().user_sinks().count(),
1727            new_sinks,
1728            SystemVars::max_sinks,
1729            "sink",
1730            MAX_SINKS.name(),
1731        )?;
1732        self.validate_resource_limit(
1733            self.catalog().user_materialized_views().count(),
1734            new_materialized_views,
1735            SystemVars::max_materialized_views,
1736            "materialized view",
1737            MAX_MATERIALIZED_VIEWS.name(),
1738        )?;
1739        self.validate_resource_limit(
1740            // Linked compute clusters don't count against the limit, since
1741            // we have a separate sources and sinks limit.
1742            //
1743            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1744            // and set a higher max cluster limit?
1745            self.catalog().user_clusters().count(),
1746            new_clusters,
1747            SystemVars::max_clusters,
1748            "cluster",
1749            MAX_CLUSTERS.name(),
1750        )?;
1751        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1752            // It's possible that the cluster hasn't been created yet.
1753            let current_amount = self
1754                .catalog()
1755                .try_get_cluster(cluster_id)
1756                .map(|instance| instance.user_replicas().count())
1757                .unwrap_or(0);
1758            self.validate_resource_limit(
1759                current_amount,
1760                new_replicas,
1761                SystemVars::max_replicas_per_cluster,
1762                "cluster replica",
1763                MAX_REPLICAS_PER_CLUSTER.name(),
1764            )?;
1765        }
1766        let current_credit_consumption_rate = self
1767            .catalog()
1768            .user_cluster_replicas()
1769            .filter_map(|replica| match &replica.config.location {
1770                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
1771                ReplicaLocation::Unmanaged(_) => None,
1772            })
1773            .map(|size| {
1774                self.catalog()
1775                    .cluster_replica_sizes()
1776                    .0
1777                    .get(size)
1778                    .expect("location size is validated against the cluster replica sizes")
1779                    .credits_per_hour
1780            })
1781            .sum();
1782        self.validate_resource_limit_numeric(
1783            current_credit_consumption_rate,
1784            new_credit_consumption_rate,
1785            |system_vars| {
1786                self.license_key
1787                    .max_credit_consumption_rate()
1788                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1789            },
1790            "cluster replica",
1791            MAX_CREDIT_CONSUMPTION_RATE.name(),
1792        )?;
1793        self.validate_resource_limit(
1794            self.catalog().databases().count(),
1795            new_databases,
1796            SystemVars::max_databases,
1797            "database",
1798            MAX_DATABASES.name(),
1799        )?;
1800        for (database_id, new_schemas) in new_schemas_per_database {
1801            self.validate_resource_limit(
1802                self.catalog().get_database(database_id).schemas_by_id.len(),
1803                new_schemas,
1804                SystemVars::max_schemas_per_database,
1805                "schema",
1806                MAX_SCHEMAS_PER_DATABASE.name(),
1807            )?;
1808        }
1809        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1810            self.validate_resource_limit(
1811                self.catalog()
1812                    .get_schema(&database_spec, &schema_spec, conn_id)
1813                    .items
1814                    .len(),
1815                new_objects,
1816                SystemVars::max_objects_per_schema,
1817                "object",
1818                MAX_OBJECTS_PER_SCHEMA.name(),
1819            )?;
1820        }
1821        self.validate_resource_limit(
1822            self.catalog().user_secrets().count(),
1823            new_secrets,
1824            SystemVars::max_secrets,
1825            "secret",
1826            MAX_SECRETS.name(),
1827        )?;
1828        self.validate_resource_limit(
1829            self.catalog().user_roles().count(),
1830            new_roles,
1831            SystemVars::max_roles,
1832            "role",
1833            MAX_ROLES.name(),
1834        )?;
1835        self.validate_resource_limit(
1836            self.catalog().user_continual_tasks().count(),
1837            new_continual_tasks,
1838            SystemVars::max_continual_tasks,
1839            "continual_task",
1840            MAX_CONTINUAL_TASKS.name(),
1841        )?;
1842        self.validate_resource_limit(
1843            self.catalog().user_continual_tasks().count(),
1844            new_network_policies,
1845            SystemVars::max_network_policies,
1846            "network_policy",
1847            MAX_NETWORK_POLICIES.name(),
1848        )?;
1849        Ok(())
1850    }
1851
1852    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1853    pub(crate) fn validate_resource_limit<F>(
1854        &self,
1855        current_amount: usize,
1856        new_instances: i64,
1857        resource_limit: F,
1858        resource_type: &str,
1859        limit_name: &str,
1860    ) -> Result<(), AdapterError>
1861    where
1862        F: Fn(&SystemVars) -> u32,
1863    {
1864        if new_instances <= 0 {
1865            return Ok(());
1866        }
1867
1868        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1869        let current_amount: Option<i64> = current_amount.try_into().ok();
1870        let desired =
1871            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1872
1873        let exceeds_limit = if let Some(desired) = desired {
1874            desired > limit
1875        } else {
1876            true
1877        };
1878
1879        let desired = desired
1880            .map(|desired| desired.to_string())
1881            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1882        let current = current_amount
1883            .map(|current| current.to_string())
1884            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1885        if exceeds_limit {
1886            Err(AdapterError::ResourceExhaustion {
1887                resource_type: resource_type.to_string(),
1888                limit_name: limit_name.to_string(),
1889                desired,
1890                limit: limit.to_string(),
1891                current,
1892            })
1893        } else {
1894            Ok(())
1895        }
1896    }
1897
1898    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1899    ///
1900    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1901    fn validate_resource_limit_numeric<F>(
1902        &self,
1903        current_amount: Numeric,
1904        new_amount: Numeric,
1905        resource_limit: F,
1906        resource_type: &str,
1907        limit_name: &str,
1908    ) -> Result<(), AdapterError>
1909    where
1910        F: Fn(&SystemVars) -> Numeric,
1911    {
1912        if new_amount <= Numeric::zero() {
1913            return Ok(());
1914        }
1915
1916        let limit = resource_limit(self.catalog().system_config());
1917        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1918        // semantics.
1919        // NaN should be impossible here since both values are positive.
1920        let desired = current_amount + new_amount;
1921        if desired > limit {
1922            Err(AdapterError::ResourceExhaustion {
1923                resource_type: resource_type.to_string(),
1924                limit_name: limit_name.to_string(),
1925                desired: desired.to_string(),
1926                limit: limit.to_string(),
1927                current: current_amount.to_string(),
1928            })
1929        } else {
1930            Ok(())
1931        }
1932    }
1933}