Skip to main content

mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
40    MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
41    MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
42    MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
43    MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    side_effects: _,
229                    state: _,
230                },
231            ..
232        }) = ctx.session().transaction().inner()
233        else {
234            let result = self
235                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
236                .await;
237            self.metrics
238                .catalog_transact_seconds
239                .with_label_values(&["catalog_transact_with_ddl_transaction"])
240                .observe(start.elapsed().as_secs_f64());
241            return result;
242        };
243
244        // Make sure our Catalog hasn't changed since openning the transaction.
245        if self.catalog().transient_revision() != *txn_revision {
246            self.metrics
247                .catalog_transact_seconds
248                .with_label_values(&["catalog_transact_with_ddl_transaction"])
249                .observe(start.elapsed().as_secs_f64());
250            return Err(AdapterError::DDLTransactionRace);
251        }
252
253        // Combine the existing ops with the new ops so we can replay them.
254        let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
255        all_ops.extend(txn_ops.iter().cloned());
256        all_ops.extend(ops.clone());
257        all_ops.push(Op::TransactionDryRun);
258
259        // Run our Catalog transaction, but abort before committing.
260        let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
261
262        let result = match result {
263            // We purposefully fail with this error to prevent committing the transaction.
264            Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
265                // Sets these ops to our transaction, bailing if the Catalog has changed since we
266                // ran the transaction.
267                ctx.session_mut()
268                    .transaction_mut()
269                    .add_ops(TransactionOps::DDL {
270                        ops: new_ops,
271                        state: new_state,
272                        side_effects: vec![Box::new(side_effect)],
273                        revision: self.catalog().transient_revision(),
274                    })?;
275                Ok(())
276            }
277            Ok(_) => unreachable!("unexpected success!"),
278            Err(e) => Err(e),
279        };
280
281        self.metrics
282            .catalog_transact_seconds
283            .with_label_values(&["catalog_transact_with_ddl_transaction"])
284            .observe(start.elapsed().as_secs_f64());
285
286        result
287    }
288
289    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
290    /// called after this function successfully returns on any built
291    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
292    #[instrument(name = "coord::catalog_transact_inner")]
293    pub(crate) async fn catalog_transact_inner(
294        &mut self,
295        conn_id: Option<&ConnectionId>,
296        ops: Vec<catalog::Op>,
297    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
298        if self.controller.read_only() {
299            return Err(AdapterError::ReadOnly);
300        }
301
302        event!(Level::TRACE, ops = format!("{:?}", ops));
303
304        let mut webhook_sources_to_restart = BTreeSet::new();
305        let mut clusters_to_drop = vec![];
306        let mut cluster_replicas_to_drop = vec![];
307        let mut clusters_to_create = vec![];
308        let mut cluster_replicas_to_create = vec![];
309        let mut update_metrics_config = false;
310        let mut update_tracing_config = false;
311        let mut update_controller_config = false;
312        let mut update_compute_config = false;
313        let mut update_storage_config = false;
314        let mut update_timestamp_oracle_config = false;
315        let mut update_metrics_retention = false;
316        let mut update_secrets_caching_config = false;
317        let mut update_cluster_scheduling_config = false;
318        let mut update_http_config = false;
319
320        for op in &ops {
321            match op {
322                catalog::Op::DropObjects(drop_object_infos) => {
323                    for drop_object_info in drop_object_infos {
324                        match &drop_object_info {
325                            catalog::DropObjectInfo::Item(_) => {
326                                // Nothing to do, these will be handled by
327                                // applying the side effects that we return.
328                            }
329                            catalog::DropObjectInfo::Cluster(id) => {
330                                clusters_to_drop.push(*id);
331                            }
332                            catalog::DropObjectInfo::ClusterReplica((
333                                cluster_id,
334                                replica_id,
335                                _reason,
336                            )) => {
337                                // Drop the cluster replica itself.
338                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
339                            }
340                            _ => (),
341                        }
342                    }
343                }
344                catalog::Op::ResetSystemConfiguration { name }
345                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
346                    update_metrics_config |= self
347                        .catalog
348                        .state()
349                        .system_config()
350                        .is_metrics_config_var(name);
351                    update_tracing_config |= vars::is_tracing_var(name);
352                    update_controller_config |= self
353                        .catalog
354                        .state()
355                        .system_config()
356                        .is_controller_config_var(name);
357                    update_compute_config |= self
358                        .catalog
359                        .state()
360                        .system_config()
361                        .is_compute_config_var(name);
362                    update_storage_config |= self
363                        .catalog
364                        .state()
365                        .system_config()
366                        .is_storage_config_var(name);
367                    update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
368                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
369                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
370                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
371                    update_http_config |= vars::is_http_config_var(name);
372                }
373                catalog::Op::ResetAllSystemConfiguration => {
374                    // Assume they all need to be updated.
375                    // We could see if the config's have actually changed, but
376                    // this is simpler.
377                    update_tracing_config = true;
378                    update_controller_config = true;
379                    update_compute_config = true;
380                    update_storage_config = true;
381                    update_timestamp_oracle_config = true;
382                    update_metrics_retention = true;
383                    update_secrets_caching_config = true;
384                    update_cluster_scheduling_config = true;
385                    update_http_config = true;
386                }
387                catalog::Op::RenameItem { id, .. } => {
388                    let item = self.catalog().get_entry(id);
389                    let is_webhook_source = item
390                        .source()
391                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
392                        .unwrap_or(false);
393                    if is_webhook_source {
394                        webhook_sources_to_restart.insert(*id);
395                    }
396                }
397                catalog::Op::RenameSchema {
398                    database_spec,
399                    schema_spec,
400                    ..
401                } => {
402                    let schema = self.catalog().get_schema(
403                        database_spec,
404                        schema_spec,
405                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
406                    );
407                    let webhook_sources = schema.item_ids().filter(|id| {
408                        let item = self.catalog().get_entry(id);
409                        item.source()
410                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
411                            .unwrap_or(false)
412                    });
413                    webhook_sources_to_restart.extend(webhook_sources);
414                }
415                catalog::Op::CreateCluster { id, .. } => {
416                    clusters_to_create.push(*id);
417                }
418                catalog::Op::CreateClusterReplica {
419                    cluster_id,
420                    name,
421                    config,
422                    ..
423                } => {
424                    cluster_replicas_to_create.push((
425                        *cluster_id,
426                        name.clone(),
427                        config.location.num_processes(),
428                    ));
429                }
430                _ => (),
431            }
432        }
433
434        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
435
436        // This will produce timestamps that are guaranteed to increase on each
437        // call, and also never be behind the system clock. If the system clock
438        // hasn't advanced (or has gone backward), it will increment by 1. For
439        // the audit log, we need to balance "close (within 10s or so) to the
440        // system clock" and "always goes up". We've chosen here to prioritize
441        // always going up, and believe we will always be close to the system
442        // clock because it is well configured (chrony) and so may only rarely
443        // regress or pause for 10s.
444        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
445
446        let Coordinator {
447            catalog,
448            active_conns,
449            controller,
450            cluster_replica_statuses,
451            ..
452        } = self;
453        let catalog = Arc::make_mut(catalog);
454        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
455
456        let TransactionResult {
457            builtin_table_updates,
458            catalog_updates,
459            audit_events,
460        } = catalog
461            .transact(
462                Some(&mut controller.storage_collections),
463                oracle_write_ts,
464                conn,
465                ops,
466            )
467            .await?;
468
469        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
470            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
471        }
472        for cluster_id in &clusters_to_drop {
473            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
474        }
475        for cluster_id in clusters_to_create {
476            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
477        }
478        let now = to_datetime((catalog.config().now)());
479        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
480            let replica_id = catalog
481                .resolve_replica_in_cluster(&cluster_id, &replica_name)
482                .expect("just created")
483                .replica_id();
484            cluster_replica_statuses.initialize_cluster_replica_statuses(
485                cluster_id,
486                replica_id,
487                num_processes,
488                now,
489            );
490        }
491
492        // Append our builtin table updates, then return the notify so we can run other tasks in
493        // parallel.
494        let (builtin_update_notify, _) = self
495            .builtin_table_update()
496            .execute(builtin_table_updates)
497            .await;
498
499        // No error returns are allowed after this point. Enforce this at compile time
500        // by using this odd structure so we don't accidentally add a stray `?`.
501        let _: () = async {
502            if !webhook_sources_to_restart.is_empty() {
503                self.restart_webhook_sources(webhook_sources_to_restart);
504            }
505
506            if update_metrics_config {
507                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
508            }
509            if update_controller_config {
510                self.update_controller_config();
511            }
512            if update_compute_config {
513                self.update_compute_config();
514            }
515            if update_storage_config {
516                self.update_storage_config();
517            }
518            if update_timestamp_oracle_config {
519                self.update_timestamp_oracle_config();
520            }
521            if update_metrics_retention {
522                self.update_metrics_retention();
523            }
524            if update_tracing_config {
525                self.update_tracing_config();
526            }
527            if update_secrets_caching_config {
528                self.update_secrets_caching_config();
529            }
530            if update_cluster_scheduling_config {
531                self.update_cluster_scheduling_config();
532            }
533            if update_http_config {
534                self.update_http_config();
535            }
536        }
537        .instrument(info_span!("coord::catalog_transact_with::finalize"))
538        .await;
539
540        let conn = conn_id.and_then(|id| self.active_conns.get(id));
541        if let Some(segment_client) = &self.segment_client {
542            for VersionedEvent::V1(event) in audit_events {
543                let event_type = format!(
544                    "{} {}",
545                    event.object_type.as_title_case(),
546                    event.event_type.as_title_case()
547                );
548                segment_client.environment_track(
549                    &self.catalog().config().environment_id,
550                    event_type,
551                    json!({ "details": event.details.as_json() }),
552                    EventDetails {
553                        user_id: conn
554                            .and_then(|c| c.user().external_metadata.as_ref())
555                            .map(|m| m.user_id),
556                        application_name: conn.map(|c| c.application_name()),
557                        ..Default::default()
558                    },
559                );
560            }
561        }
562
563        Ok((builtin_update_notify, catalog_updates))
564    }
565
566    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
567        self.drop_introspection_subscribes(replica_id);
568
569        self.controller
570            .drop_replica(cluster_id, replica_id)
571            .expect("dropping replica must not fail");
572    }
573
574    /// A convenience method for dropping sources.
575    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
576        for (item_id, _gid) in &sources {
577            self.active_webhooks.remove(item_id);
578        }
579        let storage_metadata = self.catalog.state().storage_metadata();
580        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
581        self.controller
582            .storage
583            .drop_sources(storage_metadata, source_gids)
584            .unwrap_or_terminate("cannot fail to drop sources");
585    }
586
587    /// A convenience method for dropping tables.
588    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
589        for (item_id, _gid) in &tables {
590            self.active_webhooks.remove(item_id);
591        }
592
593        let storage_metadata = self.catalog.state().storage_metadata();
594        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
595        self.controller
596            .storage
597            .drop_tables(storage_metadata, table_gids, ts)
598            .unwrap_or_terminate("cannot fail to drop tables");
599    }
600
601    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
602        for id in sources {
603            self.active_webhooks.remove(&id);
604        }
605    }
606
607    /// Like `drop_compute_sinks`, but for a single compute sink.
608    ///
609    /// Returns the controller's state for the compute sink if the identified
610    /// sink was known to the controller. It is the caller's responsibility to
611    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
612    #[must_use]
613    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
614        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
615    }
616
617    /// Drops a batch of compute sinks.
618    ///
619    /// For each sink that exists, the coordinator and controller's state
620    /// associated with the sink is removed.
621    ///
622    /// Returns a map containing the controller's state for each sink that was
623    /// removed. It is the caller's responsibility to retire the returned sinks.
624    /// Consider using `retire_compute_sinks` instead.
625    #[must_use]
626    pub async fn drop_compute_sinks(
627        &mut self,
628        sink_ids: impl IntoIterator<Item = GlobalId>,
629    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
630        let mut by_id = BTreeMap::new();
631        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
632        for sink_id in sink_ids {
633            let sink = match self.remove_active_compute_sink(sink_id).await {
634                None => {
635                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
636                    continue;
637                }
638                Some(sink) => sink,
639            };
640
641            by_cluster
642                .entry(sink.cluster_id())
643                .or_default()
644                .push(sink_id);
645            by_id.insert(sink_id, sink);
646        }
647        for (cluster_id, ids) in by_cluster {
648            let compute = &mut self.controller.compute;
649            // A cluster could have been dropped, so verify it exists.
650            if compute.instance_exists(cluster_id) {
651                compute
652                    .drop_collections(cluster_id, ids)
653                    .unwrap_or_terminate("cannot fail to drop collections");
654            }
655        }
656        by_id
657    }
658
659    /// Retires a batch of sinks with disparate reasons for retirement.
660    ///
661    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
662    /// then retired with its corresponding reason.
663    pub async fn retire_compute_sinks(
664        &mut self,
665        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
666    ) {
667        let sink_ids = reasons.keys().cloned();
668        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
669            let reason = reasons
670                .remove(&id)
671                .expect("all returned IDs are in `reasons`");
672            sink.retire(reason);
673        }
674    }
675
676    /// Drops all pending replicas for a set of clusters
677    /// that are undergoing reconfiguration.
678    pub async fn drop_reconfiguration_replicas(
679        &mut self,
680        cluster_ids: BTreeSet<ClusterId>,
681    ) -> Result<(), AdapterError> {
682        let pending_cluster_ops: Vec<Op> = cluster_ids
683            .iter()
684            .map(|c| {
685                self.catalog()
686                    .get_cluster(c.clone())
687                    .replicas()
688                    .filter_map(|r| match r.config.location {
689                        ReplicaLocation::Managed(ref l) if l.pending => {
690                            Some(DropObjectInfo::ClusterReplica((
691                                c.clone(),
692                                r.replica_id,
693                                ReplicaCreateDropReason::Manual,
694                            )))
695                        }
696                        _ => None,
697                    })
698                    .collect::<Vec<DropObjectInfo>>()
699            })
700            .filter_map(|pending_replica_drop_ops_by_cluster| {
701                match pending_replica_drop_ops_by_cluster.len() {
702                    0 => None,
703                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
704                }
705            })
706            .collect();
707        if !pending_cluster_ops.is_empty() {
708            self.catalog_transact(None, pending_cluster_ops).await?;
709        }
710        Ok(())
711    }
712
713    /// Cancels all active compute sinks for the identified connection.
714    #[mz_ore::instrument(level = "debug")]
715    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
716        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
717            .await
718    }
719
720    /// Cancels all active cluster reconfigurations sinks for the identified connection.
721    #[mz_ore::instrument(level = "debug")]
722    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
723        &mut self,
724        conn_id: &ConnectionId,
725    ) {
726        self.retire_cluster_reconfigurations_for_conn(conn_id).await
727    }
728
729    /// Retires all active compute sinks for the identified connection with the
730    /// specified reason.
731    #[mz_ore::instrument(level = "debug")]
732    pub(crate) async fn retire_compute_sinks_for_conn(
733        &mut self,
734        conn_id: &ConnectionId,
735        reason: ActiveComputeSinkRetireReason,
736    ) {
737        let drop_sinks = self
738            .active_conns
739            .get_mut(conn_id)
740            .expect("must exist for active session")
741            .drop_sinks
742            .iter()
743            .map(|sink_id| (*sink_id, reason.clone()))
744            .collect();
745        self.retire_compute_sinks(drop_sinks).await;
746    }
747
748    /// Cleans pending cluster reconfiguraiotns for the identified connection
749    #[mz_ore::instrument(level = "debug")]
750    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
751        &mut self,
752        conn_id: &ConnectionId,
753    ) {
754        let reconfiguring_clusters = self
755            .active_conns
756            .get(conn_id)
757            .expect("must exist for active session")
758            .pending_cluster_alters
759            .clone();
760        // try to drop reconfig replicas
761        self.drop_reconfiguration_replicas(reconfiguring_clusters)
762            .await
763            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
764
765        self.active_conns
766            .get_mut(conn_id)
767            .expect("must exist for active session")
768            .pending_cluster_alters
769            .clear();
770    }
771
772    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
773        let storage_metadata = self.catalog.state().storage_metadata();
774        self.controller
775            .storage
776            .drop_sinks(storage_metadata, sink_gids)
777            .unwrap_or_terminate("cannot fail to drop sinks");
778    }
779
780    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
781        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
782        for (cluster_id, gid) in collections {
783            by_cluster.entry(cluster_id).or_default().push(gid);
784        }
785        for (cluster_id, gids) in by_cluster {
786            let compute = &mut self.controller.compute;
787            // A cluster could have been dropped, so verify it exists.
788            if compute.instance_exists(cluster_id) {
789                compute
790                    .drop_collections(cluster_id, gids)
791                    .unwrap_or_terminate("cannot fail to drop collections");
792            }
793        }
794    }
795
796    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
797        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
798            .as_ref()
799            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
800            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
801        // We don't want to block the coordinator on an external delete api
802        // calls, so move the drop vpc_endpoint to a separate task. This does
803        // mean that a failed drop won't bubble up to the user as an error
804        // message. However, even if it did (and how the code previously
805        // worked), mz has already dropped it from our catalog, and so we
806        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
807        // eventually be cleaned during restart via coord bootstrap.
808        task::spawn(
809            || "drop_vpc_endpoints",
810            async move {
811                for vpc_endpoint in vpc_endpoints {
812                    let _ = Retry::default()
813                        .max_duration(Duration::from_secs(60))
814                        .retry_async(|_state| async {
815                            fail_point!("drop_vpc_endpoint", |r| {
816                                Err(anyhow::anyhow!("Fail point error {:?}", r))
817                            });
818                            match cloud_resource_controller
819                                .delete_vpc_endpoint(vpc_endpoint)
820                                .await
821                            {
822                                Ok(_) => Ok(()),
823                                Err(e) => {
824                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
825                                    Err(e)
826                                }
827                            }
828                        })
829                        .await;
830                }
831            }
832            .instrument(info_span!(
833                "coord::catalog_transact_inner::drop_vpc_endpoints"
834            )),
835        );
836    }
837
838    /// Removes all temporary items created by the specified connection, though
839    /// not the temporary schema itself.
840    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
841        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
842        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
843
844        if all_items.is_empty() {
845            return;
846        }
847        let op = Op::DropObjects(
848            all_items
849                .into_iter()
850                .map(DropObjectInfo::manual_drop_from_object_id)
851                .collect(),
852        );
853
854        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
855            .await
856            .expect("unable to drop temporary items for conn_id");
857    }
858
859    fn update_cluster_scheduling_config(&self) {
860        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
861        self.controller
862            .update_orchestrator_scheduling_config(config);
863    }
864
865    fn update_secrets_caching_config(&self) {
866        let config = flags::caching_config(self.catalog.system_config());
867        self.caching_secrets_reader.set_policy(config);
868    }
869
870    fn update_tracing_config(&self) {
871        let tracing = flags::tracing_config(self.catalog().system_config());
872        tracing.apply(&self.tracing_handle);
873    }
874
875    fn update_compute_config(&mut self) {
876        let config_params = flags::compute_config(self.catalog().system_config());
877        self.controller.compute.update_configuration(config_params);
878    }
879
880    fn update_storage_config(&mut self) {
881        let config_params = flags::storage_config(self.catalog().system_config());
882        self.controller.storage.update_parameters(config_params);
883    }
884
885    fn update_timestamp_oracle_config(&self) {
886        let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
887        if let Some(config) = self.timestamp_oracle_config.as_ref() {
888            config.apply_parameters(config_params)
889        }
890    }
891
892    fn update_metrics_retention(&self) {
893        let duration = self.catalog().system_config().metrics_retention();
894        let policy = ReadPolicy::lag_writes_by(
895            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
896                tracing::error!("Absurd metrics retention duration: {duration:?}.");
897                u64::MAX
898            })),
899            SINCE_GRANULARITY,
900        );
901        let storage_policies = self
902            .catalog()
903            .entries()
904            .filter(|entry| {
905                entry.item().is_retained_metrics_object()
906                    && entry.item().is_compute_object_on_cluster().is_none()
907            })
908            .map(|entry| (entry.id(), policy.clone()))
909            .collect::<Vec<_>>();
910        let compute_policies = self
911            .catalog()
912            .entries()
913            .filter_map(|entry| {
914                if let (true, Some(cluster_id)) = (
915                    entry.item().is_retained_metrics_object(),
916                    entry.item().is_compute_object_on_cluster(),
917                ) {
918                    Some((cluster_id, entry.id(), policy.clone()))
919                } else {
920                    None
921                }
922            })
923            .collect::<Vec<_>>();
924        self.update_storage_read_policies(storage_policies);
925        self.update_compute_read_policies(compute_policies);
926    }
927
928    fn update_controller_config(&mut self) {
929        let sys_config = self.catalog().system_config();
930        self.controller
931            .update_configuration(sys_config.dyncfg_updates());
932    }
933
934    fn update_http_config(&mut self) {
935        let webhook_request_limit = self
936            .catalog()
937            .system_config()
938            .webhook_concurrent_request_limit();
939        self.webhook_concurrency_limit
940            .set_limit(webhook_request_limit);
941    }
942
943    pub(crate) async fn create_storage_export(
944        &mut self,
945        id: GlobalId,
946        sink: &Sink,
947    ) -> Result<(), AdapterError> {
948        // Validate `sink.from` is in fact a storage collection
949        self.controller.storage.check_exists(sink.from)?;
950
951        // The AsOf is used to determine at what time to snapshot reading from
952        // the persist collection.  This is primarily relevant when we do _not_
953        // want to include the snapshot in the sink.
954        //
955        // We choose the smallest as_of that is legal, according to the sinked
956        // collection's since.
957        let id_bundle = crate::CollectionIdBundle {
958            storage_ids: btreeset! {sink.from},
959            compute_ids: btreemap! {},
960        };
961
962        // We're putting in place read holds, such that create_exports, below,
963        // which calls update_read_capabilities, can successfully do so.
964        // Otherwise, the since of dependencies might move along concurrently,
965        // pulling the rug from under us!
966        //
967        // TODO: Maybe in the future, pass those holds on to storage, to hold on
968        // to them and downgrade when possible?
969        let read_holds = self.acquire_read_holds(&id_bundle);
970        let as_of = read_holds.least_valid_read();
971
972        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
973        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
974            from: sink.from,
975            from_desc: storage_sink_from_entry
976                .relation_desc()
977                .expect("sinks can only be built on items with descs")
978                .into_owned(),
979            connection: sink
980                .connection
981                .clone()
982                .into_inline_connection(self.catalog().state()),
983            envelope: sink.envelope,
984            as_of,
985            with_snapshot: sink.with_snapshot,
986            version: sink.version,
987            from_storage_metadata: (),
988            to_storage_metadata: (),
989            commit_interval: sink.commit_interval,
990        };
991
992        let collection_desc = CollectionDescription {
993            // TODO(sinks): make generic once we have more than one sink type.
994            desc: KAFKA_PROGRESS_DESC.clone(),
995            data_source: DataSource::Sink {
996                desc: ExportDescription {
997                    sink: storage_sink_desc,
998                    instance_id: sink.cluster_id,
999                },
1000            },
1001            since: None,
1002            timeline: None,
1003            primary: None,
1004        };
1005        let collections = vec![(id, collection_desc)];
1006
1007        // Create the collections.
1008        let storage_metadata = self.catalog.state().storage_metadata();
1009        let res = self
1010            .controller
1011            .storage
1012            .create_collections(storage_metadata, None, collections)
1013            .await;
1014
1015        // Drop read holds after the export has been created, at which point
1016        // storage will have put in its own read holds.
1017        drop(read_holds);
1018
1019        Ok(res?)
1020    }
1021
1022    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1023    /// exceeded.
1024    fn validate_resource_limits(
1025        &self,
1026        ops: &Vec<catalog::Op>,
1027        conn_id: &ConnectionId,
1028    ) -> Result<(), AdapterError> {
1029        let mut new_kafka_connections = 0;
1030        let mut new_postgres_connections = 0;
1031        let mut new_mysql_connections = 0;
1032        let mut new_sql_server_connections = 0;
1033        let mut new_aws_privatelink_connections = 0;
1034        let mut new_tables = 0;
1035        let mut new_sources = 0;
1036        let mut new_sinks = 0;
1037        let mut new_materialized_views = 0;
1038        let mut new_clusters = 0;
1039        let mut new_replicas_per_cluster = BTreeMap::new();
1040        let mut new_credit_consumption_rate = Numeric::zero();
1041        let mut new_databases = 0;
1042        let mut new_schemas_per_database = BTreeMap::new();
1043        let mut new_objects_per_schema = BTreeMap::new();
1044        let mut new_secrets = 0;
1045        let mut new_roles = 0;
1046        let mut new_continual_tasks = 0;
1047        let mut new_network_policies = 0;
1048        for op in ops {
1049            match op {
1050                Op::CreateDatabase { .. } => {
1051                    new_databases += 1;
1052                }
1053                Op::CreateSchema { database_id, .. } => {
1054                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1055                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1056                    }
1057                }
1058                Op::CreateRole { .. } => {
1059                    new_roles += 1;
1060                }
1061                Op::CreateNetworkPolicy { .. } => {
1062                    new_network_policies += 1;
1063                }
1064                Op::CreateCluster { .. } => {
1065                    // TODO(benesch): having deprecated linked clusters, remove
1066                    // the `max_sources` and `max_sinks` limit, and set a higher
1067                    // max cluster limit?
1068                    new_clusters += 1;
1069                }
1070                Op::CreateClusterReplica {
1071                    cluster_id, config, ..
1072                } => {
1073                    if cluster_id.is_user() {
1074                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1075                        if let ReplicaLocation::Managed(location) = &config.location {
1076                            let replica_allocation = self
1077                                .catalog()
1078                                .cluster_replica_sizes()
1079                                .0
1080                                .get(location.size_for_billing())
1081                                .expect(
1082                                    "location size is validated against the cluster replica sizes",
1083                                );
1084                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1085                        }
1086                    }
1087                }
1088                Op::CreateItem { name, item, .. } => {
1089                    *new_objects_per_schema
1090                        .entry((
1091                            name.qualifiers.database_spec.clone(),
1092                            name.qualifiers.schema_spec.clone(),
1093                        ))
1094                        .or_insert(0) += 1;
1095                    match item {
1096                        CatalogItem::Connection(connection) => match connection.details {
1097                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1098                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1099                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1100                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1101                            ConnectionDetails::AwsPrivatelink(_) => {
1102                                new_aws_privatelink_connections += 1
1103                            }
1104                            ConnectionDetails::Csr(_)
1105                            | ConnectionDetails::Ssh { .. }
1106                            | ConnectionDetails::Aws(_)
1107                            | ConnectionDetails::IcebergCatalog(_) => {}
1108                        },
1109                        CatalogItem::Table(_) => {
1110                            new_tables += 1;
1111                        }
1112                        CatalogItem::Source(source) => {
1113                            new_sources += source.user_controllable_persist_shard_count()
1114                        }
1115                        CatalogItem::Sink(_) => new_sinks += 1,
1116                        CatalogItem::MaterializedView(_) => {
1117                            new_materialized_views += 1;
1118                        }
1119                        CatalogItem::Secret(_) => {
1120                            new_secrets += 1;
1121                        }
1122                        CatalogItem::ContinualTask(_) => {
1123                            new_continual_tasks += 1;
1124                        }
1125                        CatalogItem::Log(_)
1126                        | CatalogItem::View(_)
1127                        | CatalogItem::Index(_)
1128                        | CatalogItem::Type(_)
1129                        | CatalogItem::Func(_) => {}
1130                    }
1131                }
1132                Op::DropObjects(drop_object_infos) => {
1133                    for drop_object_info in drop_object_infos {
1134                        match drop_object_info {
1135                            DropObjectInfo::Cluster(_) => {
1136                                new_clusters -= 1;
1137                            }
1138                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1139                                if cluster_id.is_user() {
1140                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1141                                    let cluster = self
1142                                        .catalog()
1143                                        .get_cluster_replica(*cluster_id, *replica_id);
1144                                    if let ReplicaLocation::Managed(location) =
1145                                        &cluster.config.location
1146                                    {
1147                                        let replica_allocation = self
1148                                            .catalog()
1149                                            .cluster_replica_sizes()
1150                                            .0
1151                                            .get(location.size_for_billing())
1152                                            .expect(
1153                                                "location size is validated against the cluster replica sizes",
1154                                            );
1155                                        new_credit_consumption_rate -=
1156                                            replica_allocation.credits_per_hour
1157                                    }
1158                                }
1159                            }
1160                            DropObjectInfo::Database(_) => {
1161                                new_databases -= 1;
1162                            }
1163                            DropObjectInfo::Schema((database_spec, _)) => {
1164                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1165                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1166                                }
1167                            }
1168                            DropObjectInfo::Role(_) => {
1169                                new_roles -= 1;
1170                            }
1171                            DropObjectInfo::NetworkPolicy(_) => {
1172                                new_network_policies -= 1;
1173                            }
1174                            DropObjectInfo::Item(id) => {
1175                                let entry = self.catalog().get_entry(id);
1176                                *new_objects_per_schema
1177                                    .entry((
1178                                        entry.name().qualifiers.database_spec.clone(),
1179                                        entry.name().qualifiers.schema_spec.clone(),
1180                                    ))
1181                                    .or_insert(0) -= 1;
1182                                match entry.item() {
1183                                    CatalogItem::Connection(connection) => match connection.details
1184                                    {
1185                                        ConnectionDetails::AwsPrivatelink(_) => {
1186                                            new_aws_privatelink_connections -= 1;
1187                                        }
1188                                        _ => (),
1189                                    },
1190                                    CatalogItem::Table(_) => {
1191                                        new_tables -= 1;
1192                                    }
1193                                    CatalogItem::Source(source) => {
1194                                        new_sources -=
1195                                            source.user_controllable_persist_shard_count()
1196                                    }
1197                                    CatalogItem::Sink(_) => new_sinks -= 1,
1198                                    CatalogItem::MaterializedView(_) => {
1199                                        new_materialized_views -= 1;
1200                                    }
1201                                    CatalogItem::Secret(_) => {
1202                                        new_secrets -= 1;
1203                                    }
1204                                    CatalogItem::ContinualTask(_) => {
1205                                        new_continual_tasks -= 1;
1206                                    }
1207                                    CatalogItem::Log(_)
1208                                    | CatalogItem::View(_)
1209                                    | CatalogItem::Index(_)
1210                                    | CatalogItem::Type(_)
1211                                    | CatalogItem::Func(_) => {}
1212                                }
1213                            }
1214                        }
1215                    }
1216                }
1217                Op::UpdateItem {
1218                    name: _,
1219                    id,
1220                    to_item,
1221                } => match to_item {
1222                    CatalogItem::Source(source) => {
1223                        let current_source = self
1224                            .catalog()
1225                            .get_entry(id)
1226                            .source()
1227                            .expect("source update is for source item");
1228
1229                        new_sources += source.user_controllable_persist_shard_count()
1230                            - current_source.user_controllable_persist_shard_count();
1231                    }
1232                    CatalogItem::Connection(_)
1233                    | CatalogItem::Table(_)
1234                    | CatalogItem::Sink(_)
1235                    | CatalogItem::MaterializedView(_)
1236                    | CatalogItem::Secret(_)
1237                    | CatalogItem::Log(_)
1238                    | CatalogItem::View(_)
1239                    | CatalogItem::Index(_)
1240                    | CatalogItem::Type(_)
1241                    | CatalogItem::Func(_)
1242                    | CatalogItem::ContinualTask(_) => {}
1243                },
1244                Op::AlterRole { .. }
1245                | Op::AlterRetainHistory { .. }
1246                | Op::AlterNetworkPolicy { .. }
1247                | Op::AlterAddColumn { .. }
1248                | Op::AlterMaterializedViewApplyReplacement { .. }
1249                | Op::UpdatePrivilege { .. }
1250                | Op::UpdateDefaultPrivilege { .. }
1251                | Op::GrantRole { .. }
1252                | Op::RenameCluster { .. }
1253                | Op::RenameClusterReplica { .. }
1254                | Op::RenameItem { .. }
1255                | Op::RenameSchema { .. }
1256                | Op::UpdateOwner { .. }
1257                | Op::RevokeRole { .. }
1258                | Op::UpdateClusterConfig { .. }
1259                | Op::UpdateClusterReplicaConfig { .. }
1260                | Op::UpdateSourceReferences { .. }
1261                | Op::UpdateSystemConfiguration { .. }
1262                | Op::ResetSystemConfiguration { .. }
1263                | Op::ResetAllSystemConfiguration { .. }
1264                | Op::Comment { .. }
1265                | Op::WeirdStorageUsageUpdates { .. }
1266                | Op::TransactionDryRun => {}
1267            }
1268        }
1269
1270        let mut current_aws_privatelink_connections = 0;
1271        let mut current_postgres_connections = 0;
1272        let mut current_mysql_connections = 0;
1273        let mut current_sql_server_connections = 0;
1274        let mut current_kafka_connections = 0;
1275        for c in self.catalog().user_connections() {
1276            let connection = c
1277                .connection()
1278                .expect("`user_connections()` only returns connection objects");
1279
1280            match connection.details {
1281                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1282                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1283                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1284                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1285                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1286                ConnectionDetails::Csr(_)
1287                | ConnectionDetails::Ssh { .. }
1288                | ConnectionDetails::Aws(_)
1289                | ConnectionDetails::IcebergCatalog(_) => {}
1290            }
1291        }
1292        self.validate_resource_limit(
1293            current_kafka_connections,
1294            new_kafka_connections,
1295            SystemVars::max_kafka_connections,
1296            "Kafka Connection",
1297            MAX_KAFKA_CONNECTIONS.name(),
1298        )?;
1299        self.validate_resource_limit(
1300            current_postgres_connections,
1301            new_postgres_connections,
1302            SystemVars::max_postgres_connections,
1303            "PostgreSQL Connection",
1304            MAX_POSTGRES_CONNECTIONS.name(),
1305        )?;
1306        self.validate_resource_limit(
1307            current_mysql_connections,
1308            new_mysql_connections,
1309            SystemVars::max_mysql_connections,
1310            "MySQL Connection",
1311            MAX_MYSQL_CONNECTIONS.name(),
1312        )?;
1313        self.validate_resource_limit(
1314            current_sql_server_connections,
1315            new_sql_server_connections,
1316            SystemVars::max_sql_server_connections,
1317            "SQL Server Connection",
1318            MAX_SQL_SERVER_CONNECTIONS.name(),
1319        )?;
1320        self.validate_resource_limit(
1321            current_aws_privatelink_connections,
1322            new_aws_privatelink_connections,
1323            SystemVars::max_aws_privatelink_connections,
1324            "AWS PrivateLink Connection",
1325            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1326        )?;
1327        self.validate_resource_limit(
1328            self.catalog().user_tables().count(),
1329            new_tables,
1330            SystemVars::max_tables,
1331            "table",
1332            MAX_TABLES.name(),
1333        )?;
1334
1335        let current_sources: usize = self
1336            .catalog()
1337            .user_sources()
1338            .filter_map(|source| source.source())
1339            .map(|source| source.user_controllable_persist_shard_count())
1340            .sum::<i64>()
1341            .try_into()
1342            .expect("non-negative sum of sources");
1343
1344        self.validate_resource_limit(
1345            current_sources,
1346            new_sources,
1347            SystemVars::max_sources,
1348            "source",
1349            MAX_SOURCES.name(),
1350        )?;
1351        self.validate_resource_limit(
1352            self.catalog().user_sinks().count(),
1353            new_sinks,
1354            SystemVars::max_sinks,
1355            "sink",
1356            MAX_SINKS.name(),
1357        )?;
1358        self.validate_resource_limit(
1359            self.catalog().user_materialized_views().count(),
1360            new_materialized_views,
1361            SystemVars::max_materialized_views,
1362            "materialized view",
1363            MAX_MATERIALIZED_VIEWS.name(),
1364        )?;
1365        self.validate_resource_limit(
1366            // Linked compute clusters don't count against the limit, since
1367            // we have a separate sources and sinks limit.
1368            //
1369            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1370            // and set a higher max cluster limit?
1371            self.catalog().user_clusters().count(),
1372            new_clusters,
1373            SystemVars::max_clusters,
1374            "cluster",
1375            MAX_CLUSTERS.name(),
1376        )?;
1377        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1378            // It's possible that the cluster hasn't been created yet.
1379            let current_amount = self
1380                .catalog()
1381                .try_get_cluster(cluster_id)
1382                .map(|instance| instance.user_replicas().count())
1383                .unwrap_or(0);
1384            self.validate_resource_limit(
1385                current_amount,
1386                new_replicas,
1387                SystemVars::max_replicas_per_cluster,
1388                "cluster replica",
1389                MAX_REPLICAS_PER_CLUSTER.name(),
1390            )?;
1391        }
1392        self.validate_resource_limit_numeric(
1393            self.current_credit_consumption_rate(),
1394            new_credit_consumption_rate,
1395            |system_vars| {
1396                self.license_key
1397                    .max_credit_consumption_rate()
1398                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1399            },
1400            "cluster replica",
1401            MAX_CREDIT_CONSUMPTION_RATE.name(),
1402        )?;
1403        self.validate_resource_limit(
1404            self.catalog().databases().count(),
1405            new_databases,
1406            SystemVars::max_databases,
1407            "database",
1408            MAX_DATABASES.name(),
1409        )?;
1410        for (database_id, new_schemas) in new_schemas_per_database {
1411            self.validate_resource_limit(
1412                self.catalog().get_database(database_id).schemas_by_id.len(),
1413                new_schemas,
1414                SystemVars::max_schemas_per_database,
1415                "schema",
1416                MAX_SCHEMAS_PER_DATABASE.name(),
1417            )?;
1418        }
1419        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1420            // For temporary schemas that don't exist yet (lazy creation),
1421            // treat them as having 0 items.
1422            let current_items = self
1423                .catalog()
1424                .try_get_schema(&database_spec, &schema_spec, conn_id)
1425                .map(|schema| schema.items.len())
1426                .unwrap_or(0);
1427            self.validate_resource_limit(
1428                current_items,
1429                new_objects,
1430                SystemVars::max_objects_per_schema,
1431                "object",
1432                MAX_OBJECTS_PER_SCHEMA.name(),
1433            )?;
1434        }
1435        self.validate_resource_limit(
1436            self.catalog().user_secrets().count(),
1437            new_secrets,
1438            SystemVars::max_secrets,
1439            "secret",
1440            MAX_SECRETS.name(),
1441        )?;
1442        self.validate_resource_limit(
1443            self.catalog().user_roles().count(),
1444            new_roles,
1445            SystemVars::max_roles,
1446            "role",
1447            MAX_ROLES.name(),
1448        )?;
1449        self.validate_resource_limit(
1450            self.catalog().user_continual_tasks().count(),
1451            new_continual_tasks,
1452            SystemVars::max_continual_tasks,
1453            "continual_task",
1454            MAX_CONTINUAL_TASKS.name(),
1455        )?;
1456        self.validate_resource_limit(
1457            self.catalog().user_continual_tasks().count(),
1458            new_network_policies,
1459            SystemVars::max_network_policies,
1460            "network_policy",
1461            MAX_NETWORK_POLICIES.name(),
1462        )?;
1463        Ok(())
1464    }
1465
1466    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1467    pub(crate) fn validate_resource_limit<F>(
1468        &self,
1469        current_amount: usize,
1470        new_instances: i64,
1471        resource_limit: F,
1472        resource_type: &str,
1473        limit_name: &str,
1474    ) -> Result<(), AdapterError>
1475    where
1476        F: Fn(&SystemVars) -> u32,
1477    {
1478        if new_instances <= 0 {
1479            return Ok(());
1480        }
1481
1482        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1483        let current_amount: Option<i64> = current_amount.try_into().ok();
1484        let desired =
1485            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1486
1487        let exceeds_limit = if let Some(desired) = desired {
1488            desired > limit
1489        } else {
1490            true
1491        };
1492
1493        let desired = desired
1494            .map(|desired| desired.to_string())
1495            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1496        let current = current_amount
1497            .map(|current| current.to_string())
1498            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1499        if exceeds_limit {
1500            Err(AdapterError::ResourceExhaustion {
1501                resource_type: resource_type.to_string(),
1502                limit_name: limit_name.to_string(),
1503                desired,
1504                limit: limit.to_string(),
1505                current,
1506            })
1507        } else {
1508            Ok(())
1509        }
1510    }
1511
1512    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1513    ///
1514    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1515    pub(crate) fn validate_resource_limit_numeric<F>(
1516        &self,
1517        current_amount: Numeric,
1518        new_amount: Numeric,
1519        resource_limit: F,
1520        resource_type: &str,
1521        limit_name: &str,
1522    ) -> Result<(), AdapterError>
1523    where
1524        F: Fn(&SystemVars) -> Numeric,
1525    {
1526        if new_amount <= Numeric::zero() {
1527            return Ok(());
1528        }
1529
1530        let limit = resource_limit(self.catalog().system_config());
1531        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1532        // semantics.
1533        // NaN should be impossible here since both values are positive.
1534        let desired = current_amount + new_amount;
1535        if desired > limit {
1536            Err(AdapterError::ResourceExhaustion {
1537                resource_type: resource_type.to_string(),
1538                limit_name: limit_name.to_string(),
1539                desired: desired.to_string(),
1540                limit: limit.to_string(),
1541                current: current_amount.to_string(),
1542            })
1543        } else {
1544            Ok(())
1545        }
1546    }
1547}