mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
40    MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
41    MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
42    MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
43    MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    side_effects: _,
229                    state: _,
230                },
231            ..
232        }) = ctx.session().transaction().inner()
233        else {
234            let result = self
235                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
236                .await;
237            self.metrics
238                .catalog_transact_seconds
239                .with_label_values(&["catalog_transact_with_ddl_transaction"])
240                .observe(start.elapsed().as_secs_f64());
241            return result;
242        };
243
244        // Make sure our Catalog hasn't changed since openning the transaction.
245        if self.catalog().transient_revision() != *txn_revision {
246            self.metrics
247                .catalog_transact_seconds
248                .with_label_values(&["catalog_transact_with_ddl_transaction"])
249                .observe(start.elapsed().as_secs_f64());
250            return Err(AdapterError::DDLTransactionRace);
251        }
252
253        // Combine the existing ops with the new ops so we can replay them.
254        let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
255        all_ops.extend(txn_ops.iter().cloned());
256        all_ops.extend(ops.clone());
257        all_ops.push(Op::TransactionDryRun);
258
259        // Run our Catalog transaction, but abort before committing.
260        let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
261
262        let result = match result {
263            // We purposefully fail with this error to prevent committing the transaction.
264            Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
265                // Sets these ops to our transaction, bailing if the Catalog has changed since we
266                // ran the transaction.
267                ctx.session_mut()
268                    .transaction_mut()
269                    .add_ops(TransactionOps::DDL {
270                        ops: new_ops,
271                        state: new_state,
272                        side_effects: vec![Box::new(side_effect)],
273                        revision: self.catalog().transient_revision(),
274                    })?;
275                Ok(())
276            }
277            Ok(_) => unreachable!("unexpected success!"),
278            Err(e) => Err(e),
279        };
280
281        self.metrics
282            .catalog_transact_seconds
283            .with_label_values(&["catalog_transact_with_ddl_transaction"])
284            .observe(start.elapsed().as_secs_f64());
285
286        result
287    }
288
289    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
290    /// called after this function successfully returns on any built
291    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
292    #[instrument(name = "coord::catalog_transact_inner")]
293    pub(crate) async fn catalog_transact_inner<'a>(
294        &mut self,
295        conn_id: Option<&ConnectionId>,
296        ops: Vec<catalog::Op>,
297    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
298        if self.controller.read_only() {
299            return Err(AdapterError::ReadOnly);
300        }
301
302        event!(Level::TRACE, ops = format!("{:?}", ops));
303
304        let mut webhook_sources_to_restart = BTreeSet::new();
305        let mut clusters_to_drop = vec![];
306        let mut cluster_replicas_to_drop = vec![];
307        let mut clusters_to_create = vec![];
308        let mut cluster_replicas_to_create = vec![];
309        let mut update_metrics_config = false;
310        let mut update_tracing_config = false;
311        let mut update_controller_config = false;
312        let mut update_compute_config = false;
313        let mut update_storage_config = false;
314        let mut update_pg_timestamp_oracle_config = false;
315        let mut update_metrics_retention = false;
316        let mut update_secrets_caching_config = false;
317        let mut update_cluster_scheduling_config = false;
318        let mut update_http_config = false;
319
320        for op in &ops {
321            match op {
322                catalog::Op::DropObjects(drop_object_infos) => {
323                    for drop_object_info in drop_object_infos {
324                        match &drop_object_info {
325                            catalog::DropObjectInfo::Item(_) => {
326                                // Nothing to do, these will be handled by
327                                // applying the side effects that we return.
328                            }
329                            catalog::DropObjectInfo::Cluster(id) => {
330                                clusters_to_drop.push(*id);
331                            }
332                            catalog::DropObjectInfo::ClusterReplica((
333                                cluster_id,
334                                replica_id,
335                                _reason,
336                            )) => {
337                                // Drop the cluster replica itself.
338                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
339                            }
340                            _ => (),
341                        }
342                    }
343                }
344                catalog::Op::ResetSystemConfiguration { name }
345                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
346                    update_metrics_config |= self
347                        .catalog
348                        .state()
349                        .system_config()
350                        .is_metrics_config_var(name);
351                    update_tracing_config |= vars::is_tracing_var(name);
352                    update_controller_config |= self
353                        .catalog
354                        .state()
355                        .system_config()
356                        .is_controller_config_var(name);
357                    update_compute_config |= self
358                        .catalog
359                        .state()
360                        .system_config()
361                        .is_compute_config_var(name);
362                    update_storage_config |= self
363                        .catalog
364                        .state()
365                        .system_config()
366                        .is_storage_config_var(name);
367                    update_pg_timestamp_oracle_config |=
368                        vars::is_pg_timestamp_oracle_config_var(name);
369                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
370                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
371                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
372                    update_http_config |= vars::is_http_config_var(name);
373                }
374                catalog::Op::ResetAllSystemConfiguration => {
375                    // Assume they all need to be updated.
376                    // We could see if the config's have actually changed, but
377                    // this is simpler.
378                    update_tracing_config = true;
379                    update_controller_config = true;
380                    update_compute_config = true;
381                    update_storage_config = true;
382                    update_pg_timestamp_oracle_config = true;
383                    update_metrics_retention = true;
384                    update_secrets_caching_config = true;
385                    update_cluster_scheduling_config = true;
386                    update_http_config = true;
387                }
388                catalog::Op::RenameItem { id, .. } => {
389                    let item = self.catalog().get_entry(id);
390                    let is_webhook_source = item
391                        .source()
392                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
393                        .unwrap_or(false);
394                    if is_webhook_source {
395                        webhook_sources_to_restart.insert(*id);
396                    }
397                }
398                catalog::Op::RenameSchema {
399                    database_spec,
400                    schema_spec,
401                    ..
402                } => {
403                    let schema = self.catalog().get_schema(
404                        database_spec,
405                        schema_spec,
406                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
407                    );
408                    let webhook_sources = schema.item_ids().filter(|id| {
409                        let item = self.catalog().get_entry(id);
410                        item.source()
411                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
412                            .unwrap_or(false)
413                    });
414                    webhook_sources_to_restart.extend(webhook_sources);
415                }
416                catalog::Op::CreateCluster { id, .. } => {
417                    clusters_to_create.push(*id);
418                }
419                catalog::Op::CreateClusterReplica {
420                    cluster_id,
421                    name,
422                    config,
423                    ..
424                } => {
425                    cluster_replicas_to_create.push((
426                        *cluster_id,
427                        name.clone(),
428                        config.location.num_processes(),
429                    ));
430                }
431                _ => (),
432            }
433        }
434
435        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
436
437        // This will produce timestamps that are guaranteed to increase on each
438        // call, and also never be behind the system clock. If the system clock
439        // hasn't advanced (or has gone backward), it will increment by 1. For
440        // the audit log, we need to balance "close (within 10s or so) to the
441        // system clock" and "always goes up". We've chosen here to prioritize
442        // always going up, and believe we will always be close to the system
443        // clock because it is well configured (chrony) and so may only rarely
444        // regress or pause for 10s.
445        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
446
447        let Coordinator {
448            catalog,
449            active_conns,
450            controller,
451            cluster_replica_statuses,
452            ..
453        } = self;
454        let catalog = Arc::make_mut(catalog);
455        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
456
457        let TransactionResult {
458            builtin_table_updates,
459            catalog_updates,
460            audit_events,
461        } = catalog
462            .transact(
463                Some(&mut controller.storage_collections),
464                oracle_write_ts,
465                conn,
466                ops,
467            )
468            .await?;
469
470        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
471            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
472        }
473        for cluster_id in &clusters_to_drop {
474            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
475        }
476        for cluster_id in clusters_to_create {
477            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
478        }
479        let now = to_datetime((catalog.config().now)());
480        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
481            let replica_id = catalog
482                .resolve_replica_in_cluster(&cluster_id, &replica_name)
483                .expect("just created")
484                .replica_id();
485            cluster_replica_statuses.initialize_cluster_replica_statuses(
486                cluster_id,
487                replica_id,
488                num_processes,
489                now,
490            );
491        }
492
493        // Append our builtin table updates, then return the notify so we can run other tasks in
494        // parallel.
495        let (builtin_update_notify, _) = self
496            .builtin_table_update()
497            .execute(builtin_table_updates)
498            .await;
499
500        // No error returns are allowed after this point. Enforce this at compile time
501        // by using this odd structure so we don't accidentally add a stray `?`.
502        let _: () = async {
503            if !webhook_sources_to_restart.is_empty() {
504                self.restart_webhook_sources(webhook_sources_to_restart);
505            }
506
507            if update_metrics_config {
508                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
509            }
510            if update_controller_config {
511                self.update_controller_config();
512            }
513            if update_compute_config {
514                self.update_compute_config();
515            }
516            if update_storage_config {
517                self.update_storage_config();
518            }
519            if update_pg_timestamp_oracle_config {
520                self.update_pg_timestamp_oracle_config();
521            }
522            if update_metrics_retention {
523                self.update_metrics_retention();
524            }
525            if update_tracing_config {
526                self.update_tracing_config();
527            }
528            if update_secrets_caching_config {
529                self.update_secrets_caching_config();
530            }
531            if update_cluster_scheduling_config {
532                self.update_cluster_scheduling_config();
533            }
534            if update_http_config {
535                self.update_http_config();
536            }
537        }
538        .instrument(info_span!("coord::catalog_transact_with::finalize"))
539        .await;
540
541        let conn = conn_id.and_then(|id| self.active_conns.get(id));
542        if let Some(segment_client) = &self.segment_client {
543            for VersionedEvent::V1(event) in audit_events {
544                let event_type = format!(
545                    "{} {}",
546                    event.object_type.as_title_case(),
547                    event.event_type.as_title_case()
548                );
549                segment_client.environment_track(
550                    &self.catalog().config().environment_id,
551                    event_type,
552                    json!({ "details": event.details.as_json() }),
553                    EventDetails {
554                        user_id: conn
555                            .and_then(|c| c.user().external_metadata.as_ref())
556                            .map(|m| m.user_id),
557                        application_name: conn.map(|c| c.application_name()),
558                        ..Default::default()
559                    },
560                );
561            }
562        }
563
564        Ok((builtin_update_notify, catalog_updates))
565    }
566
567    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
568        self.drop_introspection_subscribes(replica_id);
569
570        self.controller
571            .drop_replica(cluster_id, replica_id)
572            .expect("dropping replica must not fail");
573    }
574
575    /// A convenience method for dropping sources.
576    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
577        for (item_id, _gid) in &sources {
578            self.active_webhooks.remove(item_id);
579        }
580        let storage_metadata = self.catalog.state().storage_metadata();
581        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
582        self.controller
583            .storage
584            .drop_sources(storage_metadata, source_gids)
585            .unwrap_or_terminate("cannot fail to drop sources");
586    }
587
588    /// A convenience method for dropping tables.
589    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
590        for (item_id, _gid) in &tables {
591            self.active_webhooks.remove(item_id);
592        }
593
594        let storage_metadata = self.catalog.state().storage_metadata();
595        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
596        self.controller
597            .storage
598            .drop_tables(storage_metadata, table_gids, ts)
599            .unwrap_or_terminate("cannot fail to drop tables");
600    }
601
602    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
603        for id in sources {
604            self.active_webhooks.remove(&id);
605        }
606    }
607
608    /// Like `drop_compute_sinks`, but for a single compute sink.
609    ///
610    /// Returns the controller's state for the compute sink if the identified
611    /// sink was known to the controller. It is the caller's responsibility to
612    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
613    #[must_use]
614    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
615        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
616    }
617
618    /// Drops a batch of compute sinks.
619    ///
620    /// For each sink that exists, the coordinator and controller's state
621    /// associated with the sink is removed.
622    ///
623    /// Returns a map containing the controller's state for each sink that was
624    /// removed. It is the caller's responsibility to retire the returned sinks.
625    /// Consider using `retire_compute_sinks` instead.
626    #[must_use]
627    pub async fn drop_compute_sinks(
628        &mut self,
629        sink_ids: impl IntoIterator<Item = GlobalId>,
630    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
631        let mut by_id = BTreeMap::new();
632        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
633        for sink_id in sink_ids {
634            let sink = match self.remove_active_compute_sink(sink_id).await {
635                None => {
636                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
637                    continue;
638                }
639                Some(sink) => sink,
640            };
641
642            by_cluster
643                .entry(sink.cluster_id())
644                .or_default()
645                .push(sink_id);
646            by_id.insert(sink_id, sink);
647        }
648        for (cluster_id, ids) in by_cluster {
649            let compute = &mut self.controller.compute;
650            // A cluster could have been dropped, so verify it exists.
651            if compute.instance_exists(cluster_id) {
652                compute
653                    .drop_collections(cluster_id, ids)
654                    .unwrap_or_terminate("cannot fail to drop collections");
655            }
656        }
657        by_id
658    }
659
660    /// Retires a batch of sinks with disparate reasons for retirement.
661    ///
662    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
663    /// then retired with its corresponding reason.
664    pub async fn retire_compute_sinks(
665        &mut self,
666        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
667    ) {
668        let sink_ids = reasons.keys().cloned();
669        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
670            let reason = reasons
671                .remove(&id)
672                .expect("all returned IDs are in `reasons`");
673            sink.retire(reason);
674        }
675    }
676
677    /// Drops all pending replicas for a set of clusters
678    /// that are undergoing reconfiguration.
679    pub async fn drop_reconfiguration_replicas(
680        &mut self,
681        cluster_ids: BTreeSet<ClusterId>,
682    ) -> Result<(), AdapterError> {
683        let pending_cluster_ops: Vec<Op> = cluster_ids
684            .iter()
685            .map(|c| {
686                self.catalog()
687                    .get_cluster(c.clone())
688                    .replicas()
689                    .filter_map(|r| match r.config.location {
690                        ReplicaLocation::Managed(ref l) if l.pending => {
691                            Some(DropObjectInfo::ClusterReplica((
692                                c.clone(),
693                                r.replica_id,
694                                ReplicaCreateDropReason::Manual,
695                            )))
696                        }
697                        _ => None,
698                    })
699                    .collect::<Vec<DropObjectInfo>>()
700            })
701            .filter_map(|pending_replica_drop_ops_by_cluster| {
702                match pending_replica_drop_ops_by_cluster.len() {
703                    0 => None,
704                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
705                }
706            })
707            .collect();
708        if !pending_cluster_ops.is_empty() {
709            self.catalog_transact(None, pending_cluster_ops).await?;
710        }
711        Ok(())
712    }
713
714    /// Cancels all active compute sinks for the identified connection.
715    #[mz_ore::instrument(level = "debug")]
716    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
717        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
718            .await
719    }
720
721    /// Cancels all active cluster reconfigurations sinks for the identified connection.
722    #[mz_ore::instrument(level = "debug")]
723    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
724        &mut self,
725        conn_id: &ConnectionId,
726    ) {
727        self.retire_cluster_reconfigurations_for_conn(conn_id).await
728    }
729
730    /// Retires all active compute sinks for the identified connection with the
731    /// specified reason.
732    #[mz_ore::instrument(level = "debug")]
733    pub(crate) async fn retire_compute_sinks_for_conn(
734        &mut self,
735        conn_id: &ConnectionId,
736        reason: ActiveComputeSinkRetireReason,
737    ) {
738        let drop_sinks = self
739            .active_conns
740            .get_mut(conn_id)
741            .expect("must exist for active session")
742            .drop_sinks
743            .iter()
744            .map(|sink_id| (*sink_id, reason.clone()))
745            .collect();
746        self.retire_compute_sinks(drop_sinks).await;
747    }
748
749    /// Cleans pending cluster reconfiguraiotns for the identified connection
750    #[mz_ore::instrument(level = "debug")]
751    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
752        &mut self,
753        conn_id: &ConnectionId,
754    ) {
755        let reconfiguring_clusters = self
756            .active_conns
757            .get(conn_id)
758            .expect("must exist for active session")
759            .pending_cluster_alters
760            .clone();
761        // try to drop reconfig replicas
762        self.drop_reconfiguration_replicas(reconfiguring_clusters)
763            .await
764            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
765
766        self.active_conns
767            .get_mut(conn_id)
768            .expect("must exist for active session")
769            .pending_cluster_alters
770            .clear();
771    }
772
773    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
774        let storage_metadata = self.catalog.state().storage_metadata();
775        self.controller
776            .storage
777            .drop_sinks(storage_metadata, sink_gids)
778            .unwrap_or_terminate("cannot fail to drop sinks");
779    }
780
781    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
782        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
783        for (cluster_id, gid) in collections {
784            by_cluster.entry(cluster_id).or_default().push(gid);
785        }
786        for (cluster_id, gids) in by_cluster {
787            let compute = &mut self.controller.compute;
788            // A cluster could have been dropped, so verify it exists.
789            if compute.instance_exists(cluster_id) {
790                compute
791                    .drop_collections(cluster_id, gids)
792                    .unwrap_or_terminate("cannot fail to drop collections");
793            }
794        }
795    }
796
797    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
798        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
799            .as_ref()
800            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
801            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
802        // We don't want to block the coordinator on an external delete api
803        // calls, so move the drop vpc_endpoint to a separate task. This does
804        // mean that a failed drop won't bubble up to the user as an error
805        // message. However, even if it did (and how the code previously
806        // worked), mz has already dropped it from our catalog, and so we
807        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
808        // eventually be cleaned during restart via coord bootstrap.
809        task::spawn(
810            || "drop_vpc_endpoints",
811            async move {
812                for vpc_endpoint in vpc_endpoints {
813                    let _ = Retry::default()
814                        .max_duration(Duration::from_secs(60))
815                        .retry_async(|_state| async {
816                            fail_point!("drop_vpc_endpoint", |r| {
817                                Err(anyhow::anyhow!("Fail point error {:?}", r))
818                            });
819                            match cloud_resource_controller
820                                .delete_vpc_endpoint(vpc_endpoint)
821                                .await
822                            {
823                                Ok(_) => Ok(()),
824                                Err(e) => {
825                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
826                                    Err(e)
827                                }
828                            }
829                        })
830                        .await;
831                }
832            }
833            .instrument(info_span!(
834                "coord::catalog_transact_inner::drop_vpc_endpoints"
835            )),
836        );
837    }
838
839    /// Removes all temporary items created by the specified connection, though
840    /// not the temporary schema itself.
841    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
842        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
843        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
844
845        if all_items.is_empty() {
846            return;
847        }
848        let op = Op::DropObjects(
849            all_items
850                .into_iter()
851                .map(DropObjectInfo::manual_drop_from_object_id)
852                .collect(),
853        );
854
855        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
856            .await
857            .expect("unable to drop temporary items for conn_id");
858    }
859
860    fn update_cluster_scheduling_config(&self) {
861        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
862        self.controller
863            .update_orchestrator_scheduling_config(config);
864    }
865
866    fn update_secrets_caching_config(&self) {
867        let config = flags::caching_config(self.catalog.system_config());
868        self.caching_secrets_reader.set_policy(config);
869    }
870
871    fn update_tracing_config(&self) {
872        let tracing = flags::tracing_config(self.catalog().system_config());
873        tracing.apply(&self.tracing_handle);
874    }
875
876    fn update_compute_config(&mut self) {
877        let config_params = flags::compute_config(self.catalog().system_config());
878        self.controller.compute.update_configuration(config_params);
879    }
880
881    fn update_storage_config(&mut self) {
882        let config_params = flags::storage_config(self.catalog().system_config());
883        self.controller.storage.update_parameters(config_params);
884    }
885
886    fn update_pg_timestamp_oracle_config(&self) {
887        let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
888        if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
889            config_params.apply(config)
890        }
891    }
892
893    fn update_metrics_retention(&self) {
894        let duration = self.catalog().system_config().metrics_retention();
895        let policy = ReadPolicy::lag_writes_by(
896            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
897                tracing::error!("Absurd metrics retention duration: {duration:?}.");
898                u64::MAX
899            })),
900            SINCE_GRANULARITY,
901        );
902        let storage_policies = self
903            .catalog()
904            .entries()
905            .filter(|entry| {
906                entry.item().is_retained_metrics_object()
907                    && entry.item().is_compute_object_on_cluster().is_none()
908            })
909            .map(|entry| (entry.id(), policy.clone()))
910            .collect::<Vec<_>>();
911        let compute_policies = self
912            .catalog()
913            .entries()
914            .filter_map(|entry| {
915                if let (true, Some(cluster_id)) = (
916                    entry.item().is_retained_metrics_object(),
917                    entry.item().is_compute_object_on_cluster(),
918                ) {
919                    Some((cluster_id, entry.id(), policy.clone()))
920                } else {
921                    None
922                }
923            })
924            .collect::<Vec<_>>();
925        self.update_storage_read_policies(storage_policies);
926        self.update_compute_read_policies(compute_policies);
927    }
928
929    fn update_controller_config(&mut self) {
930        let sys_config = self.catalog().system_config();
931        self.controller
932            .update_configuration(sys_config.dyncfg_updates());
933    }
934
935    fn update_http_config(&mut self) {
936        let webhook_request_limit = self
937            .catalog()
938            .system_config()
939            .webhook_concurrent_request_limit();
940        self.webhook_concurrency_limit
941            .set_limit(webhook_request_limit);
942    }
943
944    pub(crate) async fn create_storage_export(
945        &mut self,
946        id: GlobalId,
947        sink: &Sink,
948    ) -> Result<(), AdapterError> {
949        // Validate `sink.from` is in fact a storage collection
950        self.controller.storage.check_exists(sink.from)?;
951
952        // The AsOf is used to determine at what time to snapshot reading from
953        // the persist collection.  This is primarily relevant when we do _not_
954        // want to include the snapshot in the sink.
955        //
956        // We choose the smallest as_of that is legal, according to the sinked
957        // collection's since.
958        let id_bundle = crate::CollectionIdBundle {
959            storage_ids: btreeset! {sink.from},
960            compute_ids: btreemap! {},
961        };
962
963        // We're putting in place read holds, such that create_exports, below,
964        // which calls update_read_capabilities, can successfully do so.
965        // Otherwise, the since of dependencies might move along concurrently,
966        // pulling the rug from under us!
967        //
968        // TODO: Maybe in the future, pass those holds on to storage, to hold on
969        // to them and downgrade when possible?
970        let read_holds = self.acquire_read_holds(&id_bundle);
971        let as_of = read_holds.least_valid_read();
972
973        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
974        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
975            from: sink.from,
976            from_desc: storage_sink_from_entry
977                .desc(&self.catalog().resolve_full_name(
978                    storage_sink_from_entry.name(),
979                    storage_sink_from_entry.conn_id(),
980                ))
981                .expect("sinks can only be built on items with descs")
982                .into_owned(),
983            connection: sink
984                .connection
985                .clone()
986                .into_inline_connection(self.catalog().state()),
987            envelope: sink.envelope,
988            as_of,
989            with_snapshot: sink.with_snapshot,
990            version: sink.version,
991            from_storage_metadata: (),
992            to_storage_metadata: (),
993        };
994
995        let collection_desc = CollectionDescription {
996            // TODO(sinks): make generic once we have more than one sink type.
997            desc: KAFKA_PROGRESS_DESC.clone(),
998            data_source: DataSource::Sink {
999                desc: ExportDescription {
1000                    sink: storage_sink_desc,
1001                    instance_id: sink.cluster_id,
1002                },
1003            },
1004            since: None,
1005            status_collection_id: None,
1006            timeline: None,
1007            primary: None,
1008        };
1009        let collections = vec![(id, collection_desc)];
1010
1011        // Create the collections.
1012        let storage_metadata = self.catalog.state().storage_metadata();
1013        let res = self
1014            .controller
1015            .storage
1016            .create_collections(storage_metadata, None, collections)
1017            .await;
1018
1019        // Drop read holds after the export has been created, at which point
1020        // storage will have put in its own read holds.
1021        drop(read_holds);
1022
1023        Ok(res?)
1024    }
1025
1026    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1027    /// exceeded.
1028    fn validate_resource_limits(
1029        &self,
1030        ops: &Vec<catalog::Op>,
1031        conn_id: &ConnectionId,
1032    ) -> Result<(), AdapterError> {
1033        let mut new_kafka_connections = 0;
1034        let mut new_postgres_connections = 0;
1035        let mut new_mysql_connections = 0;
1036        let mut new_sql_server_connections = 0;
1037        let mut new_aws_privatelink_connections = 0;
1038        let mut new_tables = 0;
1039        let mut new_sources = 0;
1040        let mut new_sinks = 0;
1041        let mut new_materialized_views = 0;
1042        let mut new_clusters = 0;
1043        let mut new_replicas_per_cluster = BTreeMap::new();
1044        let mut new_credit_consumption_rate = Numeric::zero();
1045        let mut new_databases = 0;
1046        let mut new_schemas_per_database = BTreeMap::new();
1047        let mut new_objects_per_schema = BTreeMap::new();
1048        let mut new_secrets = 0;
1049        let mut new_roles = 0;
1050        let mut new_continual_tasks = 0;
1051        let mut new_network_policies = 0;
1052        for op in ops {
1053            match op {
1054                Op::CreateDatabase { .. } => {
1055                    new_databases += 1;
1056                }
1057                Op::CreateSchema { database_id, .. } => {
1058                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1059                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1060                    }
1061                }
1062                Op::CreateRole { .. } => {
1063                    new_roles += 1;
1064                }
1065                Op::CreateNetworkPolicy { .. } => {
1066                    new_network_policies += 1;
1067                }
1068                Op::CreateCluster { .. } => {
1069                    // TODO(benesch): having deprecated linked clusters, remove
1070                    // the `max_sources` and `max_sinks` limit, and set a higher
1071                    // max cluster limit?
1072                    new_clusters += 1;
1073                }
1074                Op::CreateClusterReplica {
1075                    cluster_id, config, ..
1076                } => {
1077                    if cluster_id.is_user() {
1078                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1079                        if let ReplicaLocation::Managed(location) = &config.location {
1080                            let replica_allocation = self
1081                                .catalog()
1082                                .cluster_replica_sizes()
1083                                .0
1084                                .get(location.size_for_billing())
1085                                .expect(
1086                                    "location size is validated against the cluster replica sizes",
1087                                );
1088                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1089                        }
1090                    }
1091                }
1092                Op::CreateItem { name, item, .. } => {
1093                    *new_objects_per_schema
1094                        .entry((
1095                            name.qualifiers.database_spec.clone(),
1096                            name.qualifiers.schema_spec.clone(),
1097                        ))
1098                        .or_insert(0) += 1;
1099                    match item {
1100                        CatalogItem::Connection(connection) => match connection.details {
1101                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1102                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1103                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1104                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1105                            ConnectionDetails::AwsPrivatelink(_) => {
1106                                new_aws_privatelink_connections += 1
1107                            }
1108                            ConnectionDetails::Csr(_)
1109                            | ConnectionDetails::Ssh { .. }
1110                            | ConnectionDetails::Aws(_)
1111                            | ConnectionDetails::IcebergCatalog(_) => {}
1112                        },
1113                        CatalogItem::Table(_) => {
1114                            new_tables += 1;
1115                        }
1116                        CatalogItem::Source(source) => {
1117                            new_sources += source.user_controllable_persist_shard_count()
1118                        }
1119                        CatalogItem::Sink(_) => new_sinks += 1,
1120                        CatalogItem::MaterializedView(_) => {
1121                            new_materialized_views += 1;
1122                        }
1123                        CatalogItem::Secret(_) => {
1124                            new_secrets += 1;
1125                        }
1126                        CatalogItem::ContinualTask(_) => {
1127                            new_continual_tasks += 1;
1128                        }
1129                        CatalogItem::Log(_)
1130                        | CatalogItem::View(_)
1131                        | CatalogItem::Index(_)
1132                        | CatalogItem::Type(_)
1133                        | CatalogItem::Func(_) => {}
1134                    }
1135                }
1136                Op::DropObjects(drop_object_infos) => {
1137                    for drop_object_info in drop_object_infos {
1138                        match drop_object_info {
1139                            DropObjectInfo::Cluster(_) => {
1140                                new_clusters -= 1;
1141                            }
1142                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1143                                if cluster_id.is_user() {
1144                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1145                                    let cluster = self
1146                                        .catalog()
1147                                        .get_cluster_replica(*cluster_id, *replica_id);
1148                                    if let ReplicaLocation::Managed(location) =
1149                                        &cluster.config.location
1150                                    {
1151                                        let replica_allocation = self
1152                                            .catalog()
1153                                            .cluster_replica_sizes()
1154                                            .0
1155                                            .get(location.size_for_billing())
1156                                            .expect(
1157                                                "location size is validated against the cluster replica sizes",
1158                                            );
1159                                        new_credit_consumption_rate -=
1160                                            replica_allocation.credits_per_hour
1161                                    }
1162                                }
1163                            }
1164                            DropObjectInfo::Database(_) => {
1165                                new_databases -= 1;
1166                            }
1167                            DropObjectInfo::Schema((database_spec, _)) => {
1168                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1169                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1170                                }
1171                            }
1172                            DropObjectInfo::Role(_) => {
1173                                new_roles -= 1;
1174                            }
1175                            DropObjectInfo::NetworkPolicy(_) => {
1176                                new_network_policies -= 1;
1177                            }
1178                            DropObjectInfo::Item(id) => {
1179                                let entry = self.catalog().get_entry(id);
1180                                *new_objects_per_schema
1181                                    .entry((
1182                                        entry.name().qualifiers.database_spec.clone(),
1183                                        entry.name().qualifiers.schema_spec.clone(),
1184                                    ))
1185                                    .or_insert(0) -= 1;
1186                                match entry.item() {
1187                                    CatalogItem::Connection(connection) => match connection.details
1188                                    {
1189                                        ConnectionDetails::AwsPrivatelink(_) => {
1190                                            new_aws_privatelink_connections -= 1;
1191                                        }
1192                                        _ => (),
1193                                    },
1194                                    CatalogItem::Table(_) => {
1195                                        new_tables -= 1;
1196                                    }
1197                                    CatalogItem::Source(source) => {
1198                                        new_sources -=
1199                                            source.user_controllable_persist_shard_count()
1200                                    }
1201                                    CatalogItem::Sink(_) => new_sinks -= 1,
1202                                    CatalogItem::MaterializedView(_) => {
1203                                        new_materialized_views -= 1;
1204                                    }
1205                                    CatalogItem::Secret(_) => {
1206                                        new_secrets -= 1;
1207                                    }
1208                                    CatalogItem::ContinualTask(_) => {
1209                                        new_continual_tasks -= 1;
1210                                    }
1211                                    CatalogItem::Log(_)
1212                                    | CatalogItem::View(_)
1213                                    | CatalogItem::Index(_)
1214                                    | CatalogItem::Type(_)
1215                                    | CatalogItem::Func(_) => {}
1216                                }
1217                            }
1218                        }
1219                    }
1220                }
1221                Op::UpdateItem {
1222                    name: _,
1223                    id,
1224                    to_item,
1225                } => match to_item {
1226                    CatalogItem::Source(source) => {
1227                        let current_source = self
1228                            .catalog()
1229                            .get_entry(id)
1230                            .source()
1231                            .expect("source update is for source item");
1232
1233                        new_sources += source.user_controllable_persist_shard_count()
1234                            - current_source.user_controllable_persist_shard_count();
1235                    }
1236                    CatalogItem::Connection(_)
1237                    | CatalogItem::Table(_)
1238                    | CatalogItem::Sink(_)
1239                    | CatalogItem::MaterializedView(_)
1240                    | CatalogItem::Secret(_)
1241                    | CatalogItem::Log(_)
1242                    | CatalogItem::View(_)
1243                    | CatalogItem::Index(_)
1244                    | CatalogItem::Type(_)
1245                    | CatalogItem::Func(_)
1246                    | CatalogItem::ContinualTask(_) => {}
1247                },
1248                Op::AlterRole { .. }
1249                | Op::AlterRetainHistory { .. }
1250                | Op::AlterNetworkPolicy { .. }
1251                | Op::AlterAddColumn { .. }
1252                | Op::AlterMaterializedViewApplyReplacement { .. }
1253                | Op::UpdatePrivilege { .. }
1254                | Op::UpdateDefaultPrivilege { .. }
1255                | Op::GrantRole { .. }
1256                | Op::RenameCluster { .. }
1257                | Op::RenameClusterReplica { .. }
1258                | Op::RenameItem { .. }
1259                | Op::RenameSchema { .. }
1260                | Op::UpdateOwner { .. }
1261                | Op::RevokeRole { .. }
1262                | Op::UpdateClusterConfig { .. }
1263                | Op::UpdateClusterReplicaConfig { .. }
1264                | Op::UpdateSourceReferences { .. }
1265                | Op::UpdateSystemConfiguration { .. }
1266                | Op::ResetSystemConfiguration { .. }
1267                | Op::ResetAllSystemConfiguration { .. }
1268                | Op::Comment { .. }
1269                | Op::WeirdStorageUsageUpdates { .. }
1270                | Op::TransactionDryRun => {}
1271            }
1272        }
1273
1274        let mut current_aws_privatelink_connections = 0;
1275        let mut current_postgres_connections = 0;
1276        let mut current_mysql_connections = 0;
1277        let mut current_sql_server_connections = 0;
1278        let mut current_kafka_connections = 0;
1279        for c in self.catalog().user_connections() {
1280            let connection = c
1281                .connection()
1282                .expect("`user_connections()` only returns connection objects");
1283
1284            match connection.details {
1285                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1286                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1287                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1288                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1289                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1290                ConnectionDetails::Csr(_)
1291                | ConnectionDetails::Ssh { .. }
1292                | ConnectionDetails::Aws(_)
1293                | ConnectionDetails::IcebergCatalog(_) => {}
1294            }
1295        }
1296        self.validate_resource_limit(
1297            current_kafka_connections,
1298            new_kafka_connections,
1299            SystemVars::max_kafka_connections,
1300            "Kafka Connection",
1301            MAX_KAFKA_CONNECTIONS.name(),
1302        )?;
1303        self.validate_resource_limit(
1304            current_postgres_connections,
1305            new_postgres_connections,
1306            SystemVars::max_postgres_connections,
1307            "PostgreSQL Connection",
1308            MAX_POSTGRES_CONNECTIONS.name(),
1309        )?;
1310        self.validate_resource_limit(
1311            current_mysql_connections,
1312            new_mysql_connections,
1313            SystemVars::max_mysql_connections,
1314            "MySQL Connection",
1315            MAX_MYSQL_CONNECTIONS.name(),
1316        )?;
1317        self.validate_resource_limit(
1318            current_sql_server_connections,
1319            new_sql_server_connections,
1320            SystemVars::max_sql_server_connections,
1321            "SQL Server Connection",
1322            MAX_SQL_SERVER_CONNECTIONS.name(),
1323        )?;
1324        self.validate_resource_limit(
1325            current_aws_privatelink_connections,
1326            new_aws_privatelink_connections,
1327            SystemVars::max_aws_privatelink_connections,
1328            "AWS PrivateLink Connection",
1329            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1330        )?;
1331        self.validate_resource_limit(
1332            self.catalog().user_tables().count(),
1333            new_tables,
1334            SystemVars::max_tables,
1335            "table",
1336            MAX_TABLES.name(),
1337        )?;
1338
1339        let current_sources: usize = self
1340            .catalog()
1341            .user_sources()
1342            .filter_map(|source| source.source())
1343            .map(|source| source.user_controllable_persist_shard_count())
1344            .sum::<i64>()
1345            .try_into()
1346            .expect("non-negative sum of sources");
1347
1348        self.validate_resource_limit(
1349            current_sources,
1350            new_sources,
1351            SystemVars::max_sources,
1352            "source",
1353            MAX_SOURCES.name(),
1354        )?;
1355        self.validate_resource_limit(
1356            self.catalog().user_sinks().count(),
1357            new_sinks,
1358            SystemVars::max_sinks,
1359            "sink",
1360            MAX_SINKS.name(),
1361        )?;
1362        self.validate_resource_limit(
1363            self.catalog().user_materialized_views().count(),
1364            new_materialized_views,
1365            SystemVars::max_materialized_views,
1366            "materialized view",
1367            MAX_MATERIALIZED_VIEWS.name(),
1368        )?;
1369        self.validate_resource_limit(
1370            // Linked compute clusters don't count against the limit, since
1371            // we have a separate sources and sinks limit.
1372            //
1373            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1374            // and set a higher max cluster limit?
1375            self.catalog().user_clusters().count(),
1376            new_clusters,
1377            SystemVars::max_clusters,
1378            "cluster",
1379            MAX_CLUSTERS.name(),
1380        )?;
1381        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1382            // It's possible that the cluster hasn't been created yet.
1383            let current_amount = self
1384                .catalog()
1385                .try_get_cluster(cluster_id)
1386                .map(|instance| instance.user_replicas().count())
1387                .unwrap_or(0);
1388            self.validate_resource_limit(
1389                current_amount,
1390                new_replicas,
1391                SystemVars::max_replicas_per_cluster,
1392                "cluster replica",
1393                MAX_REPLICAS_PER_CLUSTER.name(),
1394            )?;
1395        }
1396        self.validate_resource_limit_numeric(
1397            self.current_credit_consumption_rate(),
1398            new_credit_consumption_rate,
1399            |system_vars| {
1400                self.license_key
1401                    .max_credit_consumption_rate()
1402                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1403            },
1404            "cluster replica",
1405            MAX_CREDIT_CONSUMPTION_RATE.name(),
1406        )?;
1407        self.validate_resource_limit(
1408            self.catalog().databases().count(),
1409            new_databases,
1410            SystemVars::max_databases,
1411            "database",
1412            MAX_DATABASES.name(),
1413        )?;
1414        for (database_id, new_schemas) in new_schemas_per_database {
1415            self.validate_resource_limit(
1416                self.catalog().get_database(database_id).schemas_by_id.len(),
1417                new_schemas,
1418                SystemVars::max_schemas_per_database,
1419                "schema",
1420                MAX_SCHEMAS_PER_DATABASE.name(),
1421            )?;
1422        }
1423        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1424            self.validate_resource_limit(
1425                self.catalog()
1426                    .get_schema(&database_spec, &schema_spec, conn_id)
1427                    .items
1428                    .len(),
1429                new_objects,
1430                SystemVars::max_objects_per_schema,
1431                "object",
1432                MAX_OBJECTS_PER_SCHEMA.name(),
1433            )?;
1434        }
1435        self.validate_resource_limit(
1436            self.catalog().user_secrets().count(),
1437            new_secrets,
1438            SystemVars::max_secrets,
1439            "secret",
1440            MAX_SECRETS.name(),
1441        )?;
1442        self.validate_resource_limit(
1443            self.catalog().user_roles().count(),
1444            new_roles,
1445            SystemVars::max_roles,
1446            "role",
1447            MAX_ROLES.name(),
1448        )?;
1449        self.validate_resource_limit(
1450            self.catalog().user_continual_tasks().count(),
1451            new_continual_tasks,
1452            SystemVars::max_continual_tasks,
1453            "continual_task",
1454            MAX_CONTINUAL_TASKS.name(),
1455        )?;
1456        self.validate_resource_limit(
1457            self.catalog().user_continual_tasks().count(),
1458            new_network_policies,
1459            SystemVars::max_network_policies,
1460            "network_policy",
1461            MAX_NETWORK_POLICIES.name(),
1462        )?;
1463        Ok(())
1464    }
1465
1466    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1467    pub(crate) fn validate_resource_limit<F>(
1468        &self,
1469        current_amount: usize,
1470        new_instances: i64,
1471        resource_limit: F,
1472        resource_type: &str,
1473        limit_name: &str,
1474    ) -> Result<(), AdapterError>
1475    where
1476        F: Fn(&SystemVars) -> u32,
1477    {
1478        if new_instances <= 0 {
1479            return Ok(());
1480        }
1481
1482        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1483        let current_amount: Option<i64> = current_amount.try_into().ok();
1484        let desired =
1485            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1486
1487        let exceeds_limit = if let Some(desired) = desired {
1488            desired > limit
1489        } else {
1490            true
1491        };
1492
1493        let desired = desired
1494            .map(|desired| desired.to_string())
1495            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1496        let current = current_amount
1497            .map(|current| current.to_string())
1498            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1499        if exceeds_limit {
1500            Err(AdapterError::ResourceExhaustion {
1501                resource_type: resource_type.to_string(),
1502                limit_name: limit_name.to_string(),
1503                desired,
1504                limit: limit.to_string(),
1505                current,
1506            })
1507        } else {
1508            Ok(())
1509        }
1510    }
1511
1512    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1513    ///
1514    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1515    pub(crate) fn validate_resource_limit_numeric<F>(
1516        &self,
1517        current_amount: Numeric,
1518        new_amount: Numeric,
1519        resource_limit: F,
1520        resource_type: &str,
1521        limit_name: &str,
1522    ) -> Result<(), AdapterError>
1523    where
1524        F: Fn(&SystemVars) -> Numeric,
1525    {
1526        if new_amount <= Numeric::zero() {
1527            return Ok(());
1528        }
1529
1530        let limit = resource_limit(self.catalog().system_config());
1531        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1532        // semantics.
1533        // NaN should be impossible here since both values are positive.
1534        let desired = current_amount + new_amount;
1535        if desired > limit {
1536            Err(AdapterError::ResourceExhaustion {
1537                resource_type: resource_type.to_string(),
1538                limit_name: limit_name.to_string(),
1539                desired: desired.to_string(),
1540                limit: limit.to_string(),
1541                current: current_amount.to_string(),
1542            })
1543        } else {
1544            Ok(())
1545        }
1546    }
1547}