Skip to main content

mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40    MAX_CONTINUAL_TASKS, MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS,
41    MAX_MATERIALIZED_VIEWS, MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA,
42    MAX_POSTGRES_CONNECTIONS, MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE,
43    MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    side_effects: _,
229                    state: _,
230                },
231            ..
232        }) = ctx.session().transaction().inner()
233        else {
234            let result = self
235                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
236                .await;
237            self.metrics
238                .catalog_transact_seconds
239                .with_label_values(&["catalog_transact_with_ddl_transaction"])
240                .observe(start.elapsed().as_secs_f64());
241            return result;
242        };
243
244        // Make sure our Catalog hasn't changed since openning the transaction.
245        if self.catalog().transient_revision() != *txn_revision {
246            self.metrics
247                .catalog_transact_seconds
248                .with_label_values(&["catalog_transact_with_ddl_transaction"])
249                .observe(start.elapsed().as_secs_f64());
250            return Err(AdapterError::DDLTransactionRace);
251        }
252
253        // Combine the existing ops with the new ops so we can replay them.
254        let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
255        all_ops.extend(txn_ops.iter().cloned());
256        all_ops.extend(ops.clone());
257        all_ops.push(Op::TransactionDryRun);
258
259        // Run our Catalog transaction, but abort before committing.
260        let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
261
262        let result = match result {
263            // We purposefully fail with this error to prevent committing the transaction.
264            Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
265                // Sets these ops to our transaction, bailing if the Catalog has changed since we
266                // ran the transaction.
267                ctx.session_mut()
268                    .transaction_mut()
269                    .add_ops(TransactionOps::DDL {
270                        ops: new_ops,
271                        state: new_state,
272                        side_effects: vec![Box::new(side_effect)],
273                        revision: self.catalog().transient_revision(),
274                    })?;
275                Ok(())
276            }
277            Ok(_) => unreachable!("unexpected success!"),
278            Err(e) => Err(e),
279        };
280
281        self.metrics
282            .catalog_transact_seconds
283            .with_label_values(&["catalog_transact_with_ddl_transaction"])
284            .observe(start.elapsed().as_secs_f64());
285
286        result
287    }
288
289    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
290    /// called after this function successfully returns on any built
291    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
292    #[instrument(name = "coord::catalog_transact_inner")]
293    pub(crate) async fn catalog_transact_inner(
294        &mut self,
295        conn_id: Option<&ConnectionId>,
296        ops: Vec<catalog::Op>,
297    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
298        if self.controller.read_only() {
299            return Err(AdapterError::ReadOnly);
300        }
301
302        event!(Level::TRACE, ops = format!("{:?}", ops));
303
304        let mut webhook_sources_to_restart = BTreeSet::new();
305        let mut clusters_to_drop = vec![];
306        let mut cluster_replicas_to_drop = vec![];
307        let mut clusters_to_create = vec![];
308        let mut cluster_replicas_to_create = vec![];
309        let mut update_metrics_config = false;
310        let mut update_tracing_config = false;
311        let mut update_controller_config = false;
312        let mut update_compute_config = false;
313        let mut update_storage_config = false;
314        let mut update_timestamp_oracle_config = false;
315        let mut update_metrics_retention = false;
316        let mut update_secrets_caching_config = false;
317        let mut update_cluster_scheduling_config = false;
318        let mut update_http_config = false;
319        let mut update_advance_timelines_interval = false;
320
321        for op in &ops {
322            match op {
323                catalog::Op::DropObjects(drop_object_infos) => {
324                    for drop_object_info in drop_object_infos {
325                        match &drop_object_info {
326                            catalog::DropObjectInfo::Item(_) => {
327                                // Nothing to do, these will be handled by
328                                // applying the side effects that we return.
329                            }
330                            catalog::DropObjectInfo::Cluster(id) => {
331                                clusters_to_drop.push(*id);
332                            }
333                            catalog::DropObjectInfo::ClusterReplica((
334                                cluster_id,
335                                replica_id,
336                                _reason,
337                            )) => {
338                                // Drop the cluster replica itself.
339                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
340                            }
341                            _ => (),
342                        }
343                    }
344                }
345                catalog::Op::ResetSystemConfiguration { name }
346                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
347                    update_metrics_config |= self
348                        .catalog
349                        .state()
350                        .system_config()
351                        .is_metrics_config_var(name);
352                    update_tracing_config |= vars::is_tracing_var(name);
353                    update_controller_config |= self
354                        .catalog
355                        .state()
356                        .system_config()
357                        .is_controller_config_var(name);
358                    update_compute_config |= self
359                        .catalog
360                        .state()
361                        .system_config()
362                        .is_compute_config_var(name);
363                    update_storage_config |= self
364                        .catalog
365                        .state()
366                        .system_config()
367                        .is_storage_config_var(name);
368                    update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
369                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
370                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
371                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
372                    update_http_config |= vars::is_http_config_var(name);
373                    update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
374                }
375                catalog::Op::ResetAllSystemConfiguration => {
376                    // Assume they all need to be updated.
377                    // We could see if the config's have actually changed, but
378                    // this is simpler.
379                    update_tracing_config = true;
380                    update_controller_config = true;
381                    update_compute_config = true;
382                    update_storage_config = true;
383                    update_timestamp_oracle_config = true;
384                    update_metrics_retention = true;
385                    update_secrets_caching_config = true;
386                    update_cluster_scheduling_config = true;
387                    update_http_config = true;
388                    update_advance_timelines_interval = true;
389                }
390                catalog::Op::RenameItem { id, .. } => {
391                    let item = self.catalog().get_entry(id);
392                    let is_webhook_source = item
393                        .source()
394                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
395                        .unwrap_or(false);
396                    if is_webhook_source {
397                        webhook_sources_to_restart.insert(*id);
398                    }
399                }
400                catalog::Op::RenameSchema {
401                    database_spec,
402                    schema_spec,
403                    ..
404                } => {
405                    let schema = self.catalog().get_schema(
406                        database_spec,
407                        schema_spec,
408                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
409                    );
410                    let webhook_sources = schema.item_ids().filter(|id| {
411                        let item = self.catalog().get_entry(id);
412                        item.source()
413                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
414                            .unwrap_or(false)
415                    });
416                    webhook_sources_to_restart.extend(webhook_sources);
417                }
418                catalog::Op::CreateCluster { id, .. } => {
419                    clusters_to_create.push(*id);
420                }
421                catalog::Op::CreateClusterReplica {
422                    cluster_id,
423                    name,
424                    config,
425                    ..
426                } => {
427                    cluster_replicas_to_create.push((
428                        *cluster_id,
429                        name.clone(),
430                        config.location.num_processes(),
431                    ));
432                }
433                _ => (),
434            }
435        }
436
437        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
438
439        // This will produce timestamps that are guaranteed to increase on each
440        // call, and also never be behind the system clock. If the system clock
441        // hasn't advanced (or has gone backward), it will increment by 1. For
442        // the audit log, we need to balance "close (within 10s or so) to the
443        // system clock" and "always goes up". We've chosen here to prioritize
444        // always going up, and believe we will always be close to the system
445        // clock because it is well configured (chrony) and so may only rarely
446        // regress or pause for 10s.
447        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
448
449        let Coordinator {
450            catalog,
451            active_conns,
452            controller,
453            cluster_replica_statuses,
454            ..
455        } = self;
456        let catalog = Arc::make_mut(catalog);
457        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
458
459        let TransactionResult {
460            builtin_table_updates,
461            catalog_updates,
462            audit_events,
463        } = catalog
464            .transact(
465                Some(&mut controller.storage_collections),
466                oracle_write_ts,
467                conn,
468                ops,
469            )
470            .await?;
471
472        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
473            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
474        }
475        for cluster_id in &clusters_to_drop {
476            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
477        }
478        for cluster_id in clusters_to_create {
479            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
480        }
481        let now = to_datetime((catalog.config().now)());
482        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
483            let replica_id = catalog
484                .resolve_replica_in_cluster(&cluster_id, &replica_name)
485                .expect("just created")
486                .replica_id();
487            cluster_replica_statuses.initialize_cluster_replica_statuses(
488                cluster_id,
489                replica_id,
490                num_processes,
491                now,
492            );
493        }
494
495        // Append our builtin table updates, then return the notify so we can run other tasks in
496        // parallel.
497        let (builtin_update_notify, _) = self
498            .builtin_table_update()
499            .execute(builtin_table_updates)
500            .await;
501
502        // No error returns are allowed after this point. Enforce this at compile time
503        // by using this odd structure so we don't accidentally add a stray `?`.
504        let _: () = async {
505            if !webhook_sources_to_restart.is_empty() {
506                self.restart_webhook_sources(webhook_sources_to_restart);
507            }
508
509            if update_metrics_config {
510                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
511            }
512            if update_controller_config {
513                self.update_controller_config();
514            }
515            if update_compute_config {
516                self.update_compute_config();
517            }
518            if update_storage_config {
519                self.update_storage_config();
520            }
521            if update_timestamp_oracle_config {
522                self.update_timestamp_oracle_config();
523            }
524            if update_metrics_retention {
525                self.update_metrics_retention();
526            }
527            if update_tracing_config {
528                self.update_tracing_config();
529            }
530            if update_secrets_caching_config {
531                self.update_secrets_caching_config();
532            }
533            if update_cluster_scheduling_config {
534                self.update_cluster_scheduling_config();
535            }
536            if update_http_config {
537                self.update_http_config();
538            }
539            if update_advance_timelines_interval {
540                let new_interval = self.catalog().system_config().default_timestamp_interval();
541                if new_interval != self.advance_timelines_interval.period() {
542                    self.advance_timelines_interval = tokio::time::interval(new_interval);
543                }
544            }
545        }
546        .instrument(info_span!("coord::catalog_transact_with::finalize"))
547        .await;
548
549        let conn = conn_id.and_then(|id| self.active_conns.get(id));
550        if let Some(segment_client) = &self.segment_client {
551            for VersionedEvent::V1(event) in audit_events {
552                let event_type = format!(
553                    "{} {}",
554                    event.object_type.as_title_case(),
555                    event.event_type.as_title_case()
556                );
557                segment_client.environment_track(
558                    &self.catalog().config().environment_id,
559                    event_type,
560                    json!({ "details": event.details.as_json() }),
561                    EventDetails {
562                        user_id: conn
563                            .and_then(|c| c.user().external_metadata.as_ref())
564                            .map(|m| m.user_id),
565                        application_name: conn.map(|c| c.application_name()),
566                        ..Default::default()
567                    },
568                );
569            }
570        }
571
572        Ok((builtin_update_notify, catalog_updates))
573    }
574
575    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
576        self.drop_introspection_subscribes(replica_id);
577
578        self.controller
579            .drop_replica(cluster_id, replica_id)
580            .expect("dropping replica must not fail");
581    }
582
583    /// A convenience method for dropping sources.
584    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
585        for (item_id, _gid) in &sources {
586            self.active_webhooks.remove(item_id);
587        }
588        let storage_metadata = self.catalog.state().storage_metadata();
589        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
590        self.controller
591            .storage
592            .drop_sources(storage_metadata, source_gids)
593            .unwrap_or_terminate("cannot fail to drop sources");
594    }
595
596    /// A convenience method for dropping tables.
597    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
598        for (item_id, _gid) in &tables {
599            self.active_webhooks.remove(item_id);
600        }
601
602        let storage_metadata = self.catalog.state().storage_metadata();
603        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
604        self.controller
605            .storage
606            .drop_tables(storage_metadata, table_gids, ts)
607            .unwrap_or_terminate("cannot fail to drop tables");
608    }
609
610    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
611        for id in sources {
612            self.active_webhooks.remove(&id);
613        }
614    }
615
616    /// Like `drop_compute_sinks`, but for a single compute sink.
617    ///
618    /// Returns the controller's state for the compute sink if the identified
619    /// sink was known to the controller. It is the caller's responsibility to
620    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
621    #[must_use]
622    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
623        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
624    }
625
626    /// Drops a batch of compute sinks.
627    ///
628    /// For each sink that exists, the coordinator and controller's state
629    /// associated with the sink is removed.
630    ///
631    /// Returns a map containing the controller's state for each sink that was
632    /// removed. It is the caller's responsibility to retire the returned sinks.
633    /// Consider using `retire_compute_sinks` instead.
634    #[must_use]
635    pub async fn drop_compute_sinks(
636        &mut self,
637        sink_ids: impl IntoIterator<Item = GlobalId>,
638    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
639        let mut by_id = BTreeMap::new();
640        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
641        for sink_id in sink_ids {
642            let sink = match self.remove_active_compute_sink(sink_id).await {
643                None => {
644                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
645                    continue;
646                }
647                Some(sink) => sink,
648            };
649
650            by_cluster
651                .entry(sink.cluster_id())
652                .or_default()
653                .push(sink_id);
654            by_id.insert(sink_id, sink);
655        }
656        for (cluster_id, ids) in by_cluster {
657            let compute = &mut self.controller.compute;
658            // A cluster could have been dropped, so verify it exists.
659            if compute.instance_exists(cluster_id) {
660                compute
661                    .drop_collections(cluster_id, ids)
662                    .unwrap_or_terminate("cannot fail to drop collections");
663            }
664        }
665        by_id
666    }
667
668    /// Retires a batch of sinks with disparate reasons for retirement.
669    ///
670    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
671    /// then retired with its corresponding reason.
672    pub async fn retire_compute_sinks(
673        &mut self,
674        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
675    ) {
676        let sink_ids = reasons.keys().cloned();
677        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
678            let reason = reasons
679                .remove(&id)
680                .expect("all returned IDs are in `reasons`");
681            sink.retire(reason);
682        }
683    }
684
685    /// Drops all pending replicas for a set of clusters
686    /// that are undergoing reconfiguration.
687    pub async fn drop_reconfiguration_replicas(
688        &mut self,
689        cluster_ids: BTreeSet<ClusterId>,
690    ) -> Result<(), AdapterError> {
691        let pending_cluster_ops: Vec<Op> = cluster_ids
692            .iter()
693            .map(|c| {
694                self.catalog()
695                    .get_cluster(c.clone())
696                    .replicas()
697                    .filter_map(|r| match r.config.location {
698                        ReplicaLocation::Managed(ref l) if l.pending => {
699                            Some(DropObjectInfo::ClusterReplica((
700                                c.clone(),
701                                r.replica_id,
702                                ReplicaCreateDropReason::Manual,
703                            )))
704                        }
705                        _ => None,
706                    })
707                    .collect::<Vec<DropObjectInfo>>()
708            })
709            .filter_map(|pending_replica_drop_ops_by_cluster| {
710                match pending_replica_drop_ops_by_cluster.len() {
711                    0 => None,
712                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
713                }
714            })
715            .collect();
716        if !pending_cluster_ops.is_empty() {
717            self.catalog_transact(None, pending_cluster_ops).await?;
718        }
719        Ok(())
720    }
721
722    /// Cancels all active compute sinks for the identified connection.
723    #[mz_ore::instrument(level = "debug")]
724    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
725        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
726            .await
727    }
728
729    /// Cancels all active cluster reconfigurations sinks for the identified connection.
730    #[mz_ore::instrument(level = "debug")]
731    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
732        &mut self,
733        conn_id: &ConnectionId,
734    ) {
735        self.retire_cluster_reconfigurations_for_conn(conn_id).await
736    }
737
738    /// Retires all active compute sinks for the identified connection with the
739    /// specified reason.
740    #[mz_ore::instrument(level = "debug")]
741    pub(crate) async fn retire_compute_sinks_for_conn(
742        &mut self,
743        conn_id: &ConnectionId,
744        reason: ActiveComputeSinkRetireReason,
745    ) {
746        let drop_sinks = self
747            .active_conns
748            .get_mut(conn_id)
749            .expect("must exist for active session")
750            .drop_sinks
751            .iter()
752            .map(|sink_id| (*sink_id, reason.clone()))
753            .collect();
754        self.retire_compute_sinks(drop_sinks).await;
755    }
756
757    /// Cleans pending cluster reconfiguraiotns for the identified connection
758    #[mz_ore::instrument(level = "debug")]
759    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
760        &mut self,
761        conn_id: &ConnectionId,
762    ) {
763        let reconfiguring_clusters = self
764            .active_conns
765            .get(conn_id)
766            .expect("must exist for active session")
767            .pending_cluster_alters
768            .clone();
769        // try to drop reconfig replicas
770        self.drop_reconfiguration_replicas(reconfiguring_clusters)
771            .await
772            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
773
774        self.active_conns
775            .get_mut(conn_id)
776            .expect("must exist for active session")
777            .pending_cluster_alters
778            .clear();
779    }
780
781    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
782        let storage_metadata = self.catalog.state().storage_metadata();
783        self.controller
784            .storage
785            .drop_sinks(storage_metadata, sink_gids)
786            .unwrap_or_terminate("cannot fail to drop sinks");
787    }
788
789    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
790        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
791        for (cluster_id, gid) in collections {
792            by_cluster.entry(cluster_id).or_default().push(gid);
793        }
794        for (cluster_id, gids) in by_cluster {
795            let compute = &mut self.controller.compute;
796            // A cluster could have been dropped, so verify it exists.
797            if compute.instance_exists(cluster_id) {
798                compute
799                    .drop_collections(cluster_id, gids)
800                    .unwrap_or_terminate("cannot fail to drop collections");
801            }
802        }
803    }
804
805    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
806        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
807            .as_ref()
808            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
809            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
810        // We don't want to block the coordinator on an external delete api
811        // calls, so move the drop vpc_endpoint to a separate task. This does
812        // mean that a failed drop won't bubble up to the user as an error
813        // message. However, even if it did (and how the code previously
814        // worked), mz has already dropped it from our catalog, and so we
815        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
816        // eventually be cleaned during restart via coord bootstrap.
817        task::spawn(
818            || "drop_vpc_endpoints",
819            async move {
820                for vpc_endpoint in vpc_endpoints {
821                    let _ = Retry::default()
822                        .max_duration(Duration::from_secs(60))
823                        .retry_async(|_state| async {
824                            fail_point!("drop_vpc_endpoint", |r| {
825                                Err(anyhow::anyhow!("Fail point error {:?}", r))
826                            });
827                            match cloud_resource_controller
828                                .delete_vpc_endpoint(vpc_endpoint)
829                                .await
830                            {
831                                Ok(_) => Ok(()),
832                                Err(e) => {
833                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
834                                    Err(e)
835                                }
836                            }
837                        })
838                        .await;
839                }
840            }
841            .instrument(info_span!(
842                "coord::catalog_transact_inner::drop_vpc_endpoints"
843            )),
844        );
845    }
846
847    /// Removes all temporary items created by the specified connection, though
848    /// not the temporary schema itself.
849    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
850        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
851        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
852
853        if all_items.is_empty() {
854            return;
855        }
856        let op = Op::DropObjects(
857            all_items
858                .into_iter()
859                .map(DropObjectInfo::manual_drop_from_object_id)
860                .collect(),
861        );
862
863        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
864            .await
865            .expect("unable to drop temporary items for conn_id");
866    }
867
868    fn update_cluster_scheduling_config(&self) {
869        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
870        self.controller
871            .update_orchestrator_scheduling_config(config);
872    }
873
874    fn update_secrets_caching_config(&self) {
875        let config = flags::caching_config(self.catalog.system_config());
876        self.caching_secrets_reader.set_policy(config);
877    }
878
879    fn update_tracing_config(&self) {
880        let tracing = flags::tracing_config(self.catalog().system_config());
881        tracing.apply(&self.tracing_handle);
882    }
883
884    fn update_compute_config(&mut self) {
885        let config_params = flags::compute_config(self.catalog().system_config());
886        self.controller.compute.update_configuration(config_params);
887    }
888
889    fn update_storage_config(&mut self) {
890        let config_params = flags::storage_config(self.catalog().system_config());
891        self.controller.storage.update_parameters(config_params);
892    }
893
894    fn update_timestamp_oracle_config(&self) {
895        let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
896        if let Some(config) = self.timestamp_oracle_config.as_ref() {
897            config.apply_parameters(config_params)
898        }
899    }
900
901    fn update_metrics_retention(&self) {
902        let duration = self.catalog().system_config().metrics_retention();
903        let policy = ReadPolicy::lag_writes_by(
904            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
905                tracing::error!("Absurd metrics retention duration: {duration:?}.");
906                u64::MAX
907            })),
908            SINCE_GRANULARITY,
909        );
910        let storage_policies = self
911            .catalog()
912            .entries()
913            .filter(|entry| {
914                entry.item().is_retained_metrics_object()
915                    && entry.item().is_compute_object_on_cluster().is_none()
916            })
917            .map(|entry| (entry.id(), policy.clone()))
918            .collect::<Vec<_>>();
919        let compute_policies = self
920            .catalog()
921            .entries()
922            .filter_map(|entry| {
923                if let (true, Some(cluster_id)) = (
924                    entry.item().is_retained_metrics_object(),
925                    entry.item().is_compute_object_on_cluster(),
926                ) {
927                    Some((cluster_id, entry.id(), policy.clone()))
928                } else {
929                    None
930                }
931            })
932            .collect::<Vec<_>>();
933        self.update_storage_read_policies(storage_policies);
934        self.update_compute_read_policies(compute_policies);
935    }
936
937    fn update_controller_config(&mut self) {
938        let sys_config = self.catalog().system_config();
939        self.controller
940            .update_configuration(sys_config.dyncfg_updates());
941    }
942
943    fn update_http_config(&mut self) {
944        let webhook_request_limit = self
945            .catalog()
946            .system_config()
947            .webhook_concurrent_request_limit();
948        self.webhook_concurrency_limit
949            .set_limit(webhook_request_limit);
950    }
951
952    pub(crate) async fn create_storage_export(
953        &mut self,
954        id: GlobalId,
955        sink: &Sink,
956    ) -> Result<(), AdapterError> {
957        // Validate `sink.from` is in fact a storage collection
958        self.controller.storage.check_exists(sink.from)?;
959
960        // The AsOf is used to determine at what time to snapshot reading from
961        // the persist collection.  This is primarily relevant when we do _not_
962        // want to include the snapshot in the sink.
963        //
964        // We choose the smallest as_of that is legal, according to the sinked
965        // collection's since.
966        let id_bundle = crate::CollectionIdBundle {
967            storage_ids: btreeset! {sink.from},
968            compute_ids: btreemap! {},
969        };
970
971        // We're putting in place read holds, such that create_exports, below,
972        // which calls update_read_capabilities, can successfully do so.
973        // Otherwise, the since of dependencies might move along concurrently,
974        // pulling the rug from under us!
975        //
976        // TODO: Maybe in the future, pass those holds on to storage, to hold on
977        // to them and downgrade when possible?
978        let read_holds = self.acquire_read_holds(&id_bundle);
979        let as_of = read_holds.least_valid_read();
980
981        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
982        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
983            from: sink.from,
984            from_desc: storage_sink_from_entry
985                .relation_desc()
986                .expect("sinks can only be built on items with descs")
987                .into_owned(),
988            connection: sink
989                .connection
990                .clone()
991                .into_inline_connection(self.catalog().state()),
992            envelope: sink.envelope,
993            as_of,
994            with_snapshot: sink.with_snapshot,
995            version: sink.version,
996            from_storage_metadata: (),
997            to_storage_metadata: (),
998            commit_interval: sink.commit_interval,
999        };
1000
1001        let collection_desc = CollectionDescription {
1002            // TODO(sinks): make generic once we have more than one sink type.
1003            desc: KAFKA_PROGRESS_DESC.clone(),
1004            data_source: DataSource::Sink {
1005                desc: ExportDescription {
1006                    sink: storage_sink_desc,
1007                    instance_id: sink.cluster_id,
1008                },
1009            },
1010            since: None,
1011            timeline: None,
1012            primary: None,
1013        };
1014        let collections = vec![(id, collection_desc)];
1015
1016        // Create the collections.
1017        let storage_metadata = self.catalog.state().storage_metadata();
1018        let res = self
1019            .controller
1020            .storage
1021            .create_collections(storage_metadata, None, collections)
1022            .await;
1023
1024        // Drop read holds after the export has been created, at which point
1025        // storage will have put in its own read holds.
1026        drop(read_holds);
1027
1028        Ok(res?)
1029    }
1030
1031    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1032    /// exceeded.
1033    fn validate_resource_limits(
1034        &self,
1035        ops: &Vec<catalog::Op>,
1036        conn_id: &ConnectionId,
1037    ) -> Result<(), AdapterError> {
1038        let mut new_kafka_connections = 0;
1039        let mut new_postgres_connections = 0;
1040        let mut new_mysql_connections = 0;
1041        let mut new_sql_server_connections = 0;
1042        let mut new_aws_privatelink_connections = 0;
1043        let mut new_tables = 0;
1044        let mut new_sources = 0;
1045        let mut new_sinks = 0;
1046        let mut new_materialized_views = 0;
1047        let mut new_clusters = 0;
1048        let mut new_replicas_per_cluster = BTreeMap::new();
1049        let mut new_credit_consumption_rate = Numeric::zero();
1050        let mut new_databases = 0;
1051        let mut new_schemas_per_database = BTreeMap::new();
1052        let mut new_objects_per_schema = BTreeMap::new();
1053        let mut new_secrets = 0;
1054        let mut new_roles = 0;
1055        let mut new_continual_tasks = 0;
1056        let mut new_network_policies = 0;
1057        for op in ops {
1058            match op {
1059                Op::CreateDatabase { .. } => {
1060                    new_databases += 1;
1061                }
1062                Op::CreateSchema { database_id, .. } => {
1063                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1064                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1065                    }
1066                }
1067                Op::CreateRole { .. } => {
1068                    new_roles += 1;
1069                }
1070                Op::CreateNetworkPolicy { .. } => {
1071                    new_network_policies += 1;
1072                }
1073                Op::CreateCluster { .. } => {
1074                    // TODO(benesch): having deprecated linked clusters, remove
1075                    // the `max_sources` and `max_sinks` limit, and set a higher
1076                    // max cluster limit?
1077                    new_clusters += 1;
1078                }
1079                Op::CreateClusterReplica {
1080                    cluster_id, config, ..
1081                } => {
1082                    if cluster_id.is_user() {
1083                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1084                        if let ReplicaLocation::Managed(location) = &config.location {
1085                            let replica_allocation = self
1086                                .catalog()
1087                                .cluster_replica_sizes()
1088                                .0
1089                                .get(location.size_for_billing())
1090                                .expect(
1091                                    "location size is validated against the cluster replica sizes",
1092                                );
1093                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1094                        }
1095                    }
1096                }
1097                Op::CreateItem { name, item, .. } => {
1098                    *new_objects_per_schema
1099                        .entry((
1100                            name.qualifiers.database_spec.clone(),
1101                            name.qualifiers.schema_spec.clone(),
1102                        ))
1103                        .or_insert(0) += 1;
1104                    match item {
1105                        CatalogItem::Connection(connection) => match connection.details {
1106                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1107                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1108                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1109                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1110                            ConnectionDetails::AwsPrivatelink(_) => {
1111                                new_aws_privatelink_connections += 1
1112                            }
1113                            ConnectionDetails::Csr(_)
1114                            | ConnectionDetails::Ssh { .. }
1115                            | ConnectionDetails::Aws(_)
1116                            | ConnectionDetails::IcebergCatalog(_) => {}
1117                        },
1118                        CatalogItem::Table(_) => {
1119                            new_tables += 1;
1120                        }
1121                        CatalogItem::Source(source) => {
1122                            new_sources += source.user_controllable_persist_shard_count()
1123                        }
1124                        CatalogItem::Sink(_) => new_sinks += 1,
1125                        CatalogItem::MaterializedView(_) => {
1126                            new_materialized_views += 1;
1127                        }
1128                        CatalogItem::Secret(_) => {
1129                            new_secrets += 1;
1130                        }
1131                        CatalogItem::ContinualTask(_) => {
1132                            new_continual_tasks += 1;
1133                        }
1134                        CatalogItem::Log(_)
1135                        | CatalogItem::View(_)
1136                        | CatalogItem::Index(_)
1137                        | CatalogItem::Type(_)
1138                        | CatalogItem::Func(_) => {}
1139                    }
1140                }
1141                Op::DropObjects(drop_object_infos) => {
1142                    for drop_object_info in drop_object_infos {
1143                        match drop_object_info {
1144                            DropObjectInfo::Cluster(_) => {
1145                                new_clusters -= 1;
1146                            }
1147                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1148                                if cluster_id.is_user() {
1149                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1150                                    let cluster = self
1151                                        .catalog()
1152                                        .get_cluster_replica(*cluster_id, *replica_id);
1153                                    if let ReplicaLocation::Managed(location) =
1154                                        &cluster.config.location
1155                                    {
1156                                        let replica_allocation = self
1157                                            .catalog()
1158                                            .cluster_replica_sizes()
1159                                            .0
1160                                            .get(location.size_for_billing())
1161                                            .expect(
1162                                                "location size is validated against the cluster replica sizes",
1163                                            );
1164                                        new_credit_consumption_rate -=
1165                                            replica_allocation.credits_per_hour
1166                                    }
1167                                }
1168                            }
1169                            DropObjectInfo::Database(_) => {
1170                                new_databases -= 1;
1171                            }
1172                            DropObjectInfo::Schema((database_spec, _)) => {
1173                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1174                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1175                                }
1176                            }
1177                            DropObjectInfo::Role(_) => {
1178                                new_roles -= 1;
1179                            }
1180                            DropObjectInfo::NetworkPolicy(_) => {
1181                                new_network_policies -= 1;
1182                            }
1183                            DropObjectInfo::Item(id) => {
1184                                let entry = self.catalog().get_entry(id);
1185                                *new_objects_per_schema
1186                                    .entry((
1187                                        entry.name().qualifiers.database_spec.clone(),
1188                                        entry.name().qualifiers.schema_spec.clone(),
1189                                    ))
1190                                    .or_insert(0) -= 1;
1191                                match entry.item() {
1192                                    CatalogItem::Connection(connection) => match connection.details
1193                                    {
1194                                        ConnectionDetails::AwsPrivatelink(_) => {
1195                                            new_aws_privatelink_connections -= 1;
1196                                        }
1197                                        _ => (),
1198                                    },
1199                                    CatalogItem::Table(_) => {
1200                                        new_tables -= 1;
1201                                    }
1202                                    CatalogItem::Source(source) => {
1203                                        new_sources -=
1204                                            source.user_controllable_persist_shard_count()
1205                                    }
1206                                    CatalogItem::Sink(_) => new_sinks -= 1,
1207                                    CatalogItem::MaterializedView(_) => {
1208                                        new_materialized_views -= 1;
1209                                    }
1210                                    CatalogItem::Secret(_) => {
1211                                        new_secrets -= 1;
1212                                    }
1213                                    CatalogItem::ContinualTask(_) => {
1214                                        new_continual_tasks -= 1;
1215                                    }
1216                                    CatalogItem::Log(_)
1217                                    | CatalogItem::View(_)
1218                                    | CatalogItem::Index(_)
1219                                    | CatalogItem::Type(_)
1220                                    | CatalogItem::Func(_) => {}
1221                                }
1222                            }
1223                        }
1224                    }
1225                }
1226                Op::UpdateItem {
1227                    name: _,
1228                    id,
1229                    to_item,
1230                } => match to_item {
1231                    CatalogItem::Source(source) => {
1232                        let current_source = self
1233                            .catalog()
1234                            .get_entry(id)
1235                            .source()
1236                            .expect("source update is for source item");
1237
1238                        new_sources += source.user_controllable_persist_shard_count()
1239                            - current_source.user_controllable_persist_shard_count();
1240                    }
1241                    CatalogItem::Connection(_)
1242                    | CatalogItem::Table(_)
1243                    | CatalogItem::Sink(_)
1244                    | CatalogItem::MaterializedView(_)
1245                    | CatalogItem::Secret(_)
1246                    | CatalogItem::Log(_)
1247                    | CatalogItem::View(_)
1248                    | CatalogItem::Index(_)
1249                    | CatalogItem::Type(_)
1250                    | CatalogItem::Func(_)
1251                    | CatalogItem::ContinualTask(_) => {}
1252                },
1253                Op::AlterRole { .. }
1254                | Op::AlterRetainHistory { .. }
1255                | Op::AlterSourceTimestampInterval { .. }
1256                | Op::AlterNetworkPolicy { .. }
1257                | Op::AlterAddColumn { .. }
1258                | Op::AlterMaterializedViewApplyReplacement { .. }
1259                | Op::UpdatePrivilege { .. }
1260                | Op::UpdateDefaultPrivilege { .. }
1261                | Op::GrantRole { .. }
1262                | Op::RenameCluster { .. }
1263                | Op::RenameClusterReplica { .. }
1264                | Op::RenameItem { .. }
1265                | Op::RenameSchema { .. }
1266                | Op::UpdateOwner { .. }
1267                | Op::RevokeRole { .. }
1268                | Op::UpdateClusterConfig { .. }
1269                | Op::UpdateClusterReplicaConfig { .. }
1270                | Op::UpdateSourceReferences { .. }
1271                | Op::UpdateSystemConfiguration { .. }
1272                | Op::ResetSystemConfiguration { .. }
1273                | Op::ResetAllSystemConfiguration { .. }
1274                | Op::Comment { .. }
1275                | Op::WeirdStorageUsageUpdates { .. }
1276                | Op::TransactionDryRun => {}
1277            }
1278        }
1279
1280        let mut current_aws_privatelink_connections = 0;
1281        let mut current_postgres_connections = 0;
1282        let mut current_mysql_connections = 0;
1283        let mut current_sql_server_connections = 0;
1284        let mut current_kafka_connections = 0;
1285        for c in self.catalog().user_connections() {
1286            let connection = c
1287                .connection()
1288                .expect("`user_connections()` only returns connection objects");
1289
1290            match connection.details {
1291                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1292                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1293                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1294                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1295                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1296                ConnectionDetails::Csr(_)
1297                | ConnectionDetails::Ssh { .. }
1298                | ConnectionDetails::Aws(_)
1299                | ConnectionDetails::IcebergCatalog(_) => {}
1300            }
1301        }
1302        self.validate_resource_limit(
1303            current_kafka_connections,
1304            new_kafka_connections,
1305            SystemVars::max_kafka_connections,
1306            "Kafka Connection",
1307            MAX_KAFKA_CONNECTIONS.name(),
1308        )?;
1309        self.validate_resource_limit(
1310            current_postgres_connections,
1311            new_postgres_connections,
1312            SystemVars::max_postgres_connections,
1313            "PostgreSQL Connection",
1314            MAX_POSTGRES_CONNECTIONS.name(),
1315        )?;
1316        self.validate_resource_limit(
1317            current_mysql_connections,
1318            new_mysql_connections,
1319            SystemVars::max_mysql_connections,
1320            "MySQL Connection",
1321            MAX_MYSQL_CONNECTIONS.name(),
1322        )?;
1323        self.validate_resource_limit(
1324            current_sql_server_connections,
1325            new_sql_server_connections,
1326            SystemVars::max_sql_server_connections,
1327            "SQL Server Connection",
1328            MAX_SQL_SERVER_CONNECTIONS.name(),
1329        )?;
1330        self.validate_resource_limit(
1331            current_aws_privatelink_connections,
1332            new_aws_privatelink_connections,
1333            SystemVars::max_aws_privatelink_connections,
1334            "AWS PrivateLink Connection",
1335            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1336        )?;
1337        self.validate_resource_limit(
1338            self.catalog().user_tables().count(),
1339            new_tables,
1340            SystemVars::max_tables,
1341            "table",
1342            MAX_TABLES.name(),
1343        )?;
1344
1345        let current_sources: usize = self
1346            .catalog()
1347            .user_sources()
1348            .filter_map(|source| source.source())
1349            .map(|source| source.user_controllable_persist_shard_count())
1350            .sum::<i64>()
1351            .try_into()
1352            .expect("non-negative sum of sources");
1353
1354        self.validate_resource_limit(
1355            current_sources,
1356            new_sources,
1357            SystemVars::max_sources,
1358            "source",
1359            MAX_SOURCES.name(),
1360        )?;
1361        self.validate_resource_limit(
1362            self.catalog().user_sinks().count(),
1363            new_sinks,
1364            SystemVars::max_sinks,
1365            "sink",
1366            MAX_SINKS.name(),
1367        )?;
1368        self.validate_resource_limit(
1369            self.catalog().user_materialized_views().count(),
1370            new_materialized_views,
1371            SystemVars::max_materialized_views,
1372            "materialized view",
1373            MAX_MATERIALIZED_VIEWS.name(),
1374        )?;
1375        self.validate_resource_limit(
1376            // Linked compute clusters don't count against the limit, since
1377            // we have a separate sources and sinks limit.
1378            //
1379            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1380            // and set a higher max cluster limit?
1381            self.catalog().user_clusters().count(),
1382            new_clusters,
1383            SystemVars::max_clusters,
1384            "cluster",
1385            MAX_CLUSTERS.name(),
1386        )?;
1387        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1388            // It's possible that the cluster hasn't been created yet.
1389            let current_amount = self
1390                .catalog()
1391                .try_get_cluster(cluster_id)
1392                .map(|instance| instance.user_replicas().count())
1393                .unwrap_or(0);
1394            self.validate_resource_limit(
1395                current_amount,
1396                new_replicas,
1397                SystemVars::max_replicas_per_cluster,
1398                "cluster replica",
1399                MAX_REPLICAS_PER_CLUSTER.name(),
1400            )?;
1401        }
1402        self.validate_resource_limit_numeric(
1403            self.current_credit_consumption_rate(),
1404            new_credit_consumption_rate,
1405            |system_vars| {
1406                self.license_key
1407                    .max_credit_consumption_rate()
1408                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1409            },
1410            "cluster replica",
1411            MAX_CREDIT_CONSUMPTION_RATE.name(),
1412        )?;
1413        self.validate_resource_limit(
1414            self.catalog().databases().count(),
1415            new_databases,
1416            SystemVars::max_databases,
1417            "database",
1418            MAX_DATABASES.name(),
1419        )?;
1420        for (database_id, new_schemas) in new_schemas_per_database {
1421            self.validate_resource_limit(
1422                self.catalog().get_database(database_id).schemas_by_id.len(),
1423                new_schemas,
1424                SystemVars::max_schemas_per_database,
1425                "schema",
1426                MAX_SCHEMAS_PER_DATABASE.name(),
1427            )?;
1428        }
1429        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1430            // For temporary schemas that don't exist yet (lazy creation),
1431            // treat them as having 0 items.
1432            let current_items = self
1433                .catalog()
1434                .try_get_schema(&database_spec, &schema_spec, conn_id)
1435                .map(|schema| schema.items.len())
1436                .unwrap_or(0);
1437            self.validate_resource_limit(
1438                current_items,
1439                new_objects,
1440                SystemVars::max_objects_per_schema,
1441                "object",
1442                MAX_OBJECTS_PER_SCHEMA.name(),
1443            )?;
1444        }
1445        self.validate_resource_limit(
1446            self.catalog().user_secrets().count(),
1447            new_secrets,
1448            SystemVars::max_secrets,
1449            "secret",
1450            MAX_SECRETS.name(),
1451        )?;
1452        self.validate_resource_limit(
1453            self.catalog().user_roles().count(),
1454            new_roles,
1455            SystemVars::max_roles,
1456            "role",
1457            MAX_ROLES.name(),
1458        )?;
1459        self.validate_resource_limit(
1460            self.catalog().user_continual_tasks().count(),
1461            new_continual_tasks,
1462            SystemVars::max_continual_tasks,
1463            "continual_task",
1464            MAX_CONTINUAL_TASKS.name(),
1465        )?;
1466        self.validate_resource_limit(
1467            self.catalog().user_network_policies().count(),
1468            new_network_policies,
1469            SystemVars::max_network_policies,
1470            "network_policy",
1471            MAX_NETWORK_POLICIES.name(),
1472        )?;
1473        Ok(())
1474    }
1475
1476    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1477    pub(crate) fn validate_resource_limit<F>(
1478        &self,
1479        current_amount: usize,
1480        new_instances: i64,
1481        resource_limit: F,
1482        resource_type: &str,
1483        limit_name: &str,
1484    ) -> Result<(), AdapterError>
1485    where
1486        F: Fn(&SystemVars) -> u32,
1487    {
1488        if new_instances <= 0 {
1489            return Ok(());
1490        }
1491
1492        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1493        let current_amount: Option<i64> = current_amount.try_into().ok();
1494        let desired =
1495            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1496
1497        let exceeds_limit = if let Some(desired) = desired {
1498            desired > limit
1499        } else {
1500            true
1501        };
1502
1503        let desired = desired
1504            .map(|desired| desired.to_string())
1505            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1506        let current = current_amount
1507            .map(|current| current.to_string())
1508            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1509        if exceeds_limit {
1510            Err(AdapterError::ResourceExhaustion {
1511                resource_type: resource_type.to_string(),
1512                limit_name: limit_name.to_string(),
1513                desired,
1514                limit: limit.to_string(),
1515                current,
1516            })
1517        } else {
1518            Ok(())
1519        }
1520    }
1521
1522    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1523    ///
1524    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1525    pub(crate) fn validate_resource_limit_numeric<F>(
1526        &self,
1527        current_amount: Numeric,
1528        new_amount: Numeric,
1529        resource_limit: F,
1530        resource_type: &str,
1531        limit_name: &str,
1532    ) -> Result<(), AdapterError>
1533    where
1534        F: Fn(&SystemVars) -> Numeric,
1535    {
1536        if new_amount <= Numeric::zero() {
1537            return Ok(());
1538        }
1539
1540        let limit = resource_limit(self.catalog().system_config());
1541        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1542        // semantics.
1543        // NaN should be impossible here since both values are positive.
1544        let desired = current_amount + new_amount;
1545        if desired > limit {
1546            Err(AdapterError::ResourceExhaustion {
1547                resource_type: resource_type.to_string(),
1548                limit_name: limit_name.to_string(),
1549                desired: desired.to_string(),
1550                limit: limit.to_string(),
1551                current: current_amount.to_string(),
1552            })
1553        } else {
1554            Ok(())
1555        }
1556    }
1557}