Skip to main content

mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40    MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
41    MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
42    MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
43    MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    state: txn_state,
229                    snapshot: txn_snapshot,
230                    side_effects: _,
231                },
232            ..
233        }) = ctx.session().transaction().inner()
234        else {
235            let result = self
236                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
237                .await;
238            self.metrics
239                .catalog_transact_seconds
240                .with_label_values(&["catalog_transact_with_ddl_transaction"])
241                .observe(start.elapsed().as_secs_f64());
242            return result;
243        };
244
245        // Make sure our Catalog hasn't changed since openning the transaction.
246        if self.catalog().transient_revision() != *txn_revision {
247            self.metrics
248                .catalog_transact_seconds
249                .with_label_values(&["catalog_transact_with_ddl_transaction"])
250                .observe(start.elapsed().as_secs_f64());
251            return Err(AdapterError::DDLTransactionRace);
252        }
253
254        // Clone what we need from the session before taking &mut below.
255        let txn_ops_clone = txn_ops.clone();
256        let txn_state_clone = txn_state.clone();
257        let prev_snapshot = txn_snapshot.clone();
258
259        // Validate resource limits with all accumulated + new ops (cheap O(N) counting).
260        let mut combined_ops = txn_ops_clone;
261        combined_ops.extend(ops.iter().cloned());
262        let conn_id = ctx.session().conn_id().clone();
263        self.validate_resource_limits(&combined_ops, &conn_id)?;
264
265        // Get oracle timestamp for audit log entries.
266        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
267
268        // Get ConnMeta for the session.
269        let conn = self.active_conns.get(ctx.session().conn_id());
270
271        // Incremental dry run: process only NEW ops against accumulated state.
272        // If we have a saved snapshot from a previous dry run, use it to
273        // initialize the transaction so it starts in sync with the accumulated
274        // state. Otherwise (first statement), the fresh durable transaction is
275        // already in sync with the real catalog state.
276        let (new_state, new_snapshot) = self
277            .catalog()
278            .transact_incremental_dry_run(
279                &txn_state_clone,
280                ops.clone(),
281                conn,
282                prev_snapshot,
283                oracle_write_ts,
284            )
285            .await?;
286
287        // Accumulate ops for eventual COMMIT.
288        let result = ctx
289            .session_mut()
290            .transaction_mut()
291            .add_ops(TransactionOps::DDL {
292                ops: combined_ops,
293                state: new_state,
294                side_effects: vec![Box::new(side_effect)],
295                revision: self.catalog().transient_revision(),
296                snapshot: Some(new_snapshot),
297            });
298
299        self.metrics
300            .catalog_transact_seconds
301            .with_label_values(&["catalog_transact_with_ddl_transaction"])
302            .observe(start.elapsed().as_secs_f64());
303
304        result
305    }
306
307    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
308    /// called after this function successfully returns on any built
309    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
310    #[instrument(name = "coord::catalog_transact_inner")]
311    pub(crate) async fn catalog_transact_inner(
312        &mut self,
313        conn_id: Option<&ConnectionId>,
314        ops: Vec<catalog::Op>,
315    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
316        if self.controller.read_only() {
317            return Err(AdapterError::ReadOnly);
318        }
319
320        event!(Level::TRACE, ops = format!("{:?}", ops));
321
322        let mut webhook_sources_to_restart = BTreeSet::new();
323        let mut clusters_to_drop = vec![];
324        let mut cluster_replicas_to_drop = vec![];
325        let mut clusters_to_create = vec![];
326        let mut cluster_replicas_to_create = vec![];
327        let mut update_metrics_config = false;
328        let mut update_tracing_config = false;
329        let mut update_controller_config = false;
330        let mut update_compute_config = false;
331        let mut update_storage_config = false;
332        let mut update_timestamp_oracle_config = false;
333        let mut update_metrics_retention = false;
334        let mut update_secrets_caching_config = false;
335        let mut update_cluster_scheduling_config = false;
336        let mut update_http_config = false;
337        let mut update_advance_timelines_interval = false;
338
339        for op in &ops {
340            match op {
341                catalog::Op::DropObjects(drop_object_infos) => {
342                    for drop_object_info in drop_object_infos {
343                        match &drop_object_info {
344                            catalog::DropObjectInfo::Item(_) => {
345                                // Nothing to do, these will be handled by
346                                // applying the side effects that we return.
347                            }
348                            catalog::DropObjectInfo::Cluster(id) => {
349                                clusters_to_drop.push(*id);
350                            }
351                            catalog::DropObjectInfo::ClusterReplica((
352                                cluster_id,
353                                replica_id,
354                                _reason,
355                            )) => {
356                                // Drop the cluster replica itself.
357                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
358                            }
359                            _ => (),
360                        }
361                    }
362                }
363                catalog::Op::ResetSystemConfiguration { name }
364                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
365                    update_metrics_config |= self
366                        .catalog
367                        .state()
368                        .system_config()
369                        .is_metrics_config_var(name);
370                    update_tracing_config |= vars::is_tracing_var(name);
371                    update_controller_config |= self
372                        .catalog
373                        .state()
374                        .system_config()
375                        .is_controller_config_var(name);
376                    update_compute_config |= self
377                        .catalog
378                        .state()
379                        .system_config()
380                        .is_compute_config_var(name);
381                    update_storage_config |= self
382                        .catalog
383                        .state()
384                        .system_config()
385                        .is_storage_config_var(name);
386                    update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
387                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
388                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
389                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
390                    update_http_config |= vars::is_http_config_var(name);
391                    update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
392                }
393                catalog::Op::ResetAllSystemConfiguration => {
394                    // Assume they all need to be updated.
395                    // We could see if the config's have actually changed, but
396                    // this is simpler.
397                    update_tracing_config = true;
398                    update_controller_config = true;
399                    update_compute_config = true;
400                    update_storage_config = true;
401                    update_timestamp_oracle_config = true;
402                    update_metrics_retention = true;
403                    update_secrets_caching_config = true;
404                    update_cluster_scheduling_config = true;
405                    update_metrics_config = true;
406                    update_http_config = true;
407                    update_advance_timelines_interval = true;
408                }
409                catalog::Op::RenameItem { id, .. } => {
410                    let item = self.catalog().get_entry(id);
411                    let is_webhook_source = item
412                        .source()
413                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
414                        .unwrap_or(false);
415                    if is_webhook_source {
416                        webhook_sources_to_restart.insert(*id);
417                    }
418                }
419                catalog::Op::RenameSchema {
420                    database_spec,
421                    schema_spec,
422                    ..
423                } => {
424                    let schema = self.catalog().get_schema(
425                        database_spec,
426                        schema_spec,
427                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
428                    );
429                    let webhook_sources = schema.item_ids().filter(|id| {
430                        let item = self.catalog().get_entry(id);
431                        item.source()
432                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
433                            .unwrap_or(false)
434                    });
435                    webhook_sources_to_restart.extend(webhook_sources);
436                }
437                catalog::Op::CreateCluster { id, .. } => {
438                    clusters_to_create.push(*id);
439                }
440                catalog::Op::CreateClusterReplica {
441                    cluster_id,
442                    name,
443                    config,
444                    ..
445                } => {
446                    cluster_replicas_to_create.push((
447                        *cluster_id,
448                        name.clone(),
449                        config.location.num_processes(),
450                    ));
451                }
452                _ => (),
453            }
454        }
455
456        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
457
458        // This will produce timestamps that are guaranteed to increase on each
459        // call, and also never be behind the system clock. If the system clock
460        // hasn't advanced (or has gone backward), it will increment by 1. For
461        // the audit log, we need to balance "close (within 10s or so) to the
462        // system clock" and "always goes up". We've chosen here to prioritize
463        // always going up, and believe we will always be close to the system
464        // clock because it is well configured (chrony) and so may only rarely
465        // regress or pause for 10s.
466        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
467
468        let Coordinator {
469            catalog,
470            active_conns,
471            controller,
472            cluster_replica_statuses,
473            ..
474        } = self;
475        let catalog = Arc::make_mut(catalog);
476        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
477
478        let TransactionResult {
479            builtin_table_updates,
480            catalog_updates,
481            audit_events,
482        } = catalog
483            .transact(
484                Some(&mut controller.storage_collections),
485                oracle_write_ts,
486                conn,
487                ops,
488            )
489            .await?;
490
491        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
492            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
493        }
494        for cluster_id in &clusters_to_drop {
495            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
496        }
497        for cluster_id in clusters_to_create {
498            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
499        }
500        let now = to_datetime((catalog.config().now)());
501        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
502            let replica_id = catalog
503                .resolve_replica_in_cluster(&cluster_id, &replica_name)
504                .expect("just created")
505                .replica_id();
506            cluster_replica_statuses.initialize_cluster_replica_statuses(
507                cluster_id,
508                replica_id,
509                num_processes,
510                now,
511            );
512        }
513
514        // Append our builtin table updates, then return the notify so we can run other tasks in
515        // parallel.
516        let (builtin_update_notify, _) = self
517            .builtin_table_update()
518            .execute(builtin_table_updates)
519            .await;
520
521        // No error returns are allowed after this point. Enforce this at compile time
522        // by using this odd structure so we don't accidentally add a stray `?`.
523        let _: () = async {
524            if !webhook_sources_to_restart.is_empty() {
525                self.restart_webhook_sources(webhook_sources_to_restart);
526            }
527
528            if update_metrics_config {
529                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
530            }
531            if update_controller_config {
532                self.update_controller_config();
533            }
534            if update_compute_config {
535                self.update_compute_config();
536            }
537            if update_storage_config {
538                self.update_storage_config();
539            }
540            if update_timestamp_oracle_config {
541                self.update_timestamp_oracle_config();
542            }
543            if update_metrics_retention {
544                self.update_metrics_retention();
545            }
546            if update_tracing_config {
547                self.update_tracing_config();
548            }
549            if update_secrets_caching_config {
550                self.update_secrets_caching_config();
551            }
552            if update_cluster_scheduling_config {
553                self.update_cluster_scheduling_config();
554            }
555            if update_http_config {
556                self.update_http_config();
557            }
558            if update_advance_timelines_interval {
559                let new_interval = self.catalog().system_config().default_timestamp_interval();
560                if new_interval != self.advance_timelines_interval.period() {
561                    self.advance_timelines_interval = tokio::time::interval(new_interval);
562                }
563            }
564        }
565        .instrument(info_span!("coord::catalog_transact_with::finalize"))
566        .await;
567
568        let conn = conn_id.and_then(|id| self.active_conns.get(id));
569        if let Some(segment_client) = &self.segment_client {
570            for VersionedEvent::V1(event) in audit_events {
571                let event_type = format!(
572                    "{} {}",
573                    event.object_type.as_title_case(),
574                    event.event_type.as_title_case()
575                );
576                segment_client.environment_track(
577                    &self.catalog().config().environment_id,
578                    event_type,
579                    json!({ "details": event.details.as_json() }),
580                    EventDetails {
581                        user_id: conn
582                            .and_then(|c| c.user().external_metadata.as_ref())
583                            .map(|m| m.user_id),
584                        application_name: conn.map(|c| c.application_name()),
585                        ..Default::default()
586                    },
587                );
588            }
589        }
590
591        Ok((builtin_update_notify, catalog_updates))
592    }
593
594    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
595        self.drop_introspection_subscribes(replica_id);
596
597        self.controller
598            .drop_replica(cluster_id, replica_id)
599            .expect("dropping replica must not fail");
600    }
601
602    /// A convenience method for dropping sources.
603    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
604        for (item_id, _gid) in &sources {
605            self.active_webhooks.remove(item_id);
606        }
607        let storage_metadata = self.catalog.state().storage_metadata();
608        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
609        self.controller
610            .storage
611            .drop_sources(storage_metadata, source_gids)
612            .unwrap_or_terminate("cannot fail to drop sources");
613    }
614
615    /// A convenience method for dropping tables.
616    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
617        for (item_id, _gid) in &tables {
618            self.active_webhooks.remove(item_id);
619        }
620
621        let storage_metadata = self.catalog.state().storage_metadata();
622        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
623        self.controller
624            .storage
625            .drop_tables(storage_metadata, table_gids, ts)
626            .unwrap_or_terminate("cannot fail to drop tables");
627    }
628
629    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
630        for id in sources {
631            self.active_webhooks.remove(&id);
632        }
633    }
634
635    /// Like `drop_compute_sinks`, but for a single compute sink.
636    ///
637    /// Returns the controller's state for the compute sink if the identified
638    /// sink was known to the controller. It is the caller's responsibility to
639    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
640    #[must_use]
641    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
642        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
643    }
644
645    /// Drops a batch of compute sinks.
646    ///
647    /// For each sink that exists, the coordinator and controller's state
648    /// associated with the sink is removed.
649    ///
650    /// Returns a map containing the controller's state for each sink that was
651    /// removed. It is the caller's responsibility to retire the returned sinks.
652    /// Consider using `retire_compute_sinks` instead.
653    #[must_use]
654    pub async fn drop_compute_sinks(
655        &mut self,
656        sink_ids: impl IntoIterator<Item = GlobalId>,
657    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
658        let mut by_id = BTreeMap::new();
659        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
660        for sink_id in sink_ids {
661            let sink = match self.remove_active_compute_sink(sink_id).await {
662                None => {
663                    // This can happen due to a race condition: an internal
664                    // subscribe may be cleaned up via its own message while
665                    // session disconnect cleanup is in progress. This is
666                    // benign.
667                    tracing::debug!(%sink_id, "drop_compute_sinks: sink already removed");
668                    continue;
669                }
670                Some(sink) => sink,
671            };
672
673            by_cluster
674                .entry(sink.cluster_id())
675                .or_default()
676                .push(sink_id);
677            by_id.insert(sink_id, sink);
678        }
679        for (cluster_id, ids) in by_cluster {
680            let compute = &mut self.controller.compute;
681            // A cluster could have been dropped, so verify it exists.
682            if compute.instance_exists(cluster_id) {
683                compute
684                    .drop_collections(cluster_id, ids)
685                    .unwrap_or_terminate("cannot fail to drop collections");
686            }
687        }
688        by_id
689    }
690
691    /// Retires a batch of sinks with disparate reasons for retirement.
692    ///
693    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
694    /// then retired with its corresponding reason.
695    pub async fn retire_compute_sinks(
696        &mut self,
697        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
698    ) {
699        let sink_ids = reasons.keys().cloned();
700        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
701            let reason = reasons
702                .remove(&id)
703                .expect("all returned IDs are in `reasons`");
704            sink.retire(reason);
705        }
706    }
707
708    /// Drops all pending replicas for a set of clusters
709    /// that are undergoing reconfiguration.
710    pub async fn drop_reconfiguration_replicas(
711        &mut self,
712        cluster_ids: BTreeSet<ClusterId>,
713    ) -> Result<(), AdapterError> {
714        let pending_cluster_ops: Vec<Op> = cluster_ids
715            .iter()
716            .map(|c| {
717                self.catalog()
718                    .get_cluster(c.clone())
719                    .replicas()
720                    .filter_map(|r| match r.config.location {
721                        ReplicaLocation::Managed(ref l) if l.pending => {
722                            Some(DropObjectInfo::ClusterReplica((
723                                c.clone(),
724                                r.replica_id,
725                                ReplicaCreateDropReason::Manual,
726                            )))
727                        }
728                        _ => None,
729                    })
730                    .collect::<Vec<DropObjectInfo>>()
731            })
732            .filter_map(|pending_replica_drop_ops_by_cluster| {
733                match pending_replica_drop_ops_by_cluster.len() {
734                    0 => None,
735                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
736                }
737            })
738            .collect();
739        if !pending_cluster_ops.is_empty() {
740            self.catalog_transact(None, pending_cluster_ops).await?;
741        }
742        Ok(())
743    }
744
745    /// Cancels all active compute sinks for the identified connection.
746    #[mz_ore::instrument(level = "debug")]
747    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
748        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
749            .await
750    }
751
752    /// Cancels all active cluster reconfigurations sinks for the identified connection.
753    #[mz_ore::instrument(level = "debug")]
754    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
755        &mut self,
756        conn_id: &ConnectionId,
757    ) {
758        self.retire_cluster_reconfigurations_for_conn(conn_id).await
759    }
760
761    /// Retires all active compute sinks for the identified connection with the
762    /// specified reason.
763    #[mz_ore::instrument(level = "debug")]
764    pub(crate) async fn retire_compute_sinks_for_conn(
765        &mut self,
766        conn_id: &ConnectionId,
767        reason: ActiveComputeSinkRetireReason,
768    ) {
769        let drop_sinks = self
770            .active_conns
771            .get_mut(conn_id)
772            .expect("must exist for active session")
773            .drop_sinks
774            .iter()
775            .map(|sink_id| (*sink_id, reason.clone()))
776            .collect();
777        self.retire_compute_sinks(drop_sinks).await;
778    }
779
780    /// Cleans pending cluster reconfiguraiotns for the identified connection
781    #[mz_ore::instrument(level = "debug")]
782    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
783        &mut self,
784        conn_id: &ConnectionId,
785    ) {
786        let reconfiguring_clusters = self
787            .active_conns
788            .get(conn_id)
789            .expect("must exist for active session")
790            .pending_cluster_alters
791            .clone();
792        // try to drop reconfig replicas
793        self.drop_reconfiguration_replicas(reconfiguring_clusters)
794            .await
795            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
796
797        self.active_conns
798            .get_mut(conn_id)
799            .expect("must exist for active session")
800            .pending_cluster_alters
801            .clear();
802    }
803
804    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
805        let storage_metadata = self.catalog.state().storage_metadata();
806        self.controller
807            .storage
808            .drop_sinks(storage_metadata, sink_gids)
809            .unwrap_or_terminate("cannot fail to drop sinks");
810    }
811
812    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
813        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
814        for (cluster_id, gid) in collections {
815            by_cluster.entry(cluster_id).or_default().push(gid);
816        }
817        for (cluster_id, gids) in by_cluster {
818            let compute = &mut self.controller.compute;
819            // A cluster could have been dropped, so verify it exists.
820            if compute.instance_exists(cluster_id) {
821                compute
822                    .drop_collections(cluster_id, gids)
823                    .unwrap_or_terminate("cannot fail to drop collections");
824            }
825        }
826    }
827
828    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
829        // Match the create path (catalog_implications.rs) which gracefully
830        // logs an error when cloud_resource_controller is None, rather than
831        // panicking.
832        let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() else {
833            warn!("dropping VPC endpoints without cloud_resource_controller; skipping cleanup");
834            return;
835        };
836        let cloud_resource_controller = Arc::clone(cloud_resource_controller);
837        // We don't want to block the coordinator on an external delete api
838        // calls, so move the drop vpc_endpoint to a separate task. This does
839        // mean that a failed drop won't bubble up to the user as an error
840        // message. However, even if it did (and how the code previously
841        // worked), mz has already dropped it from our catalog, and so we
842        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
843        // eventually be cleaned during restart via coord bootstrap.
844        task::spawn(
845            || "drop_vpc_endpoints",
846            async move {
847                for vpc_endpoint in vpc_endpoints {
848                    let _ = Retry::default()
849                        .max_duration(Duration::from_secs(60))
850                        .retry_async(|_state| async {
851                            fail_point!("drop_vpc_endpoint", |r| {
852                                Err(anyhow::anyhow!("Fail point error {:?}", r))
853                            });
854                            match cloud_resource_controller
855                                .delete_vpc_endpoint(vpc_endpoint)
856                                .await
857                            {
858                                Ok(_) => Ok(()),
859                                Err(e) => {
860                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
861                                    Err(e)
862                                }
863                            }
864                        })
865                        .await;
866                }
867            }
868            .instrument(info_span!(
869                "coord::catalog_transact_inner::drop_vpc_endpoints"
870            )),
871        );
872    }
873
874    /// Removes all temporary items created by the specified connection, though
875    /// not the temporary schema itself.
876    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
877        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
878        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
879
880        if all_items.is_empty() {
881            return;
882        }
883        let op = Op::DropObjects(
884            all_items
885                .into_iter()
886                .map(DropObjectInfo::manual_drop_from_object_id)
887                .collect(),
888        );
889
890        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
891            .await
892            .expect("unable to drop temporary items for conn_id");
893    }
894
895    fn update_cluster_scheduling_config(&self) {
896        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
897        self.controller
898            .update_orchestrator_scheduling_config(config);
899    }
900
901    fn update_secrets_caching_config(&self) {
902        let config = flags::caching_config(self.catalog.system_config());
903        self.caching_secrets_reader.set_policy(config);
904    }
905
906    fn update_tracing_config(&self) {
907        let tracing = flags::tracing_config(self.catalog().system_config());
908        tracing.apply(&self.tracing_handle);
909    }
910
911    fn update_compute_config(&mut self) {
912        let config_params = flags::compute_config(self.catalog().system_config());
913        self.controller.compute.update_configuration(config_params);
914    }
915
916    fn update_storage_config(&mut self) {
917        let config_params = flags::storage_config(self.catalog().system_config());
918        self.controller.storage.update_parameters(config_params);
919    }
920
921    fn update_timestamp_oracle_config(&self) {
922        let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
923        if let Some(config) = self.timestamp_oracle_config.as_ref() {
924            config.apply_parameters(config_params)
925        }
926    }
927
928    fn update_metrics_retention(&self) {
929        let duration = self.catalog().system_config().metrics_retention();
930        let policy = ReadPolicy::lag_writes_by(
931            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
932                tracing::error!("Absurd metrics retention duration: {duration:?}.");
933                u64::MAX
934            })),
935            SINCE_GRANULARITY,
936        );
937        let storage_policies = self
938            .catalog()
939            .entries()
940            .filter(|entry| {
941                entry.item().is_retained_metrics_object()
942                    && entry.item().is_compute_object_on_cluster().is_none()
943            })
944            .map(|entry| (entry.id(), policy.clone()))
945            .collect::<Vec<_>>();
946        let compute_policies = self
947            .catalog()
948            .entries()
949            .filter_map(|entry| {
950                if let (true, Some(cluster_id)) = (
951                    entry.item().is_retained_metrics_object(),
952                    entry.item().is_compute_object_on_cluster(),
953                ) {
954                    Some((cluster_id, entry.id(), policy.clone()))
955                } else {
956                    None
957                }
958            })
959            .collect::<Vec<_>>();
960        self.update_storage_read_policies(storage_policies);
961        self.update_compute_read_policies(compute_policies);
962    }
963
964    fn update_controller_config(&mut self) {
965        let sys_config = self.catalog().system_config();
966        self.controller
967            .update_configuration(sys_config.dyncfg_updates());
968    }
969
970    fn update_http_config(&mut self) {
971        let webhook_request_limit = self
972            .catalog()
973            .system_config()
974            .webhook_concurrent_request_limit();
975        self.webhook_concurrency_limit
976            .set_limit(webhook_request_limit);
977    }
978
979    pub(crate) async fn create_storage_export(
980        &mut self,
981        id: GlobalId,
982        sink: &Sink,
983    ) -> Result<(), AdapterError> {
984        // Validate `sink.from` is in fact a storage collection
985        self.controller.storage.check_exists(sink.from)?;
986
987        // The AsOf is used to determine at what time to snapshot reading from
988        // the persist collection.  This is primarily relevant when we do _not_
989        // want to include the snapshot in the sink.
990        //
991        // We choose the smallest as_of that is legal, according to the sinked
992        // collection's since.
993        let id_bundle = crate::CollectionIdBundle {
994            storage_ids: btreeset! {sink.from},
995            compute_ids: btreemap! {},
996        };
997
998        // We're putting in place read holds, such that create_exports, below,
999        // which calls update_read_capabilities, can successfully do so.
1000        // Otherwise, the since of dependencies might move along concurrently,
1001        // pulling the rug from under us!
1002        //
1003        // TODO: Maybe in the future, pass those holds on to storage, to hold on
1004        // to them and downgrade when possible?
1005        let read_holds = self.acquire_read_holds(&id_bundle);
1006        let as_of = read_holds.least_valid_read();
1007
1008        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1009        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1010            from: sink.from,
1011            from_desc: storage_sink_from_entry
1012                .relation_desc()
1013                .expect("sinks can only be built on items with descs")
1014                .into_owned(),
1015            connection: sink
1016                .connection
1017                .clone()
1018                .into_inline_connection(self.catalog().state()),
1019            envelope: sink.envelope,
1020            as_of,
1021            with_snapshot: sink.with_snapshot,
1022            version: sink.version,
1023            from_storage_metadata: (),
1024            to_storage_metadata: (),
1025            commit_interval: sink.commit_interval,
1026        };
1027
1028        let collection_desc = CollectionDescription {
1029            // TODO(sinks): make generic once we have more than one sink type.
1030            desc: KAFKA_PROGRESS_DESC.clone(),
1031            data_source: DataSource::Sink {
1032                desc: ExportDescription {
1033                    sink: storage_sink_desc,
1034                    instance_id: sink.cluster_id,
1035                },
1036            },
1037            since: None,
1038            timeline: None,
1039            primary: None,
1040        };
1041        let collections = vec![(id, collection_desc)];
1042
1043        // Create the collections.
1044        let storage_metadata = self.catalog.state().storage_metadata();
1045        let res = self
1046            .controller
1047            .storage
1048            .create_collections(storage_metadata, None, collections)
1049            .await;
1050
1051        // Drop read holds after the export has been created, at which point
1052        // storage will have put in its own read holds.
1053        drop(read_holds);
1054
1055        Ok(res?)
1056    }
1057
1058    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1059    /// exceeded.
1060    fn validate_resource_limits(
1061        &self,
1062        ops: &Vec<catalog::Op>,
1063        conn_id: &ConnectionId,
1064    ) -> Result<(), AdapterError> {
1065        let mut new_kafka_connections = 0;
1066        let mut new_postgres_connections = 0;
1067        let mut new_mysql_connections = 0;
1068        let mut new_sql_server_connections = 0;
1069        let mut new_aws_privatelink_connections = 0;
1070        let mut new_tables = 0;
1071        let mut new_sources = 0;
1072        let mut new_sinks = 0;
1073        let mut new_materialized_views = 0;
1074        let mut new_clusters = 0;
1075        let mut new_replicas_per_cluster = BTreeMap::new();
1076        let mut new_credit_consumption_rate = Numeric::zero();
1077        let mut new_databases = 0;
1078        let mut new_schemas_per_database = BTreeMap::new();
1079        let mut new_objects_per_schema = BTreeMap::new();
1080        let mut new_secrets = 0;
1081        let mut new_roles = 0;
1082        let mut new_network_policies = 0;
1083        for op in ops {
1084            match op {
1085                Op::CreateDatabase { .. } => {
1086                    new_databases += 1;
1087                }
1088                Op::CreateSchema { database_id, .. } => {
1089                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1090                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1091                    }
1092                }
1093                Op::CreateRole { .. } => {
1094                    new_roles += 1;
1095                }
1096                Op::CreateNetworkPolicy { .. } => {
1097                    new_network_policies += 1;
1098                }
1099                Op::CreateCluster { .. } => {
1100                    // TODO(benesch): having deprecated linked clusters, remove
1101                    // the `max_sources` and `max_sinks` limit, and set a higher
1102                    // max cluster limit?
1103                    new_clusters += 1;
1104                }
1105                Op::CreateClusterReplica {
1106                    cluster_id, config, ..
1107                } => {
1108                    if cluster_id.is_user() {
1109                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1110                        if let ReplicaLocation::Managed(location) = &config.location {
1111                            let replica_allocation = self
1112                                .catalog()
1113                                .cluster_replica_sizes()
1114                                .0
1115                                .get(location.size_for_billing())
1116                                .expect(
1117                                    "location size is validated against the cluster replica sizes",
1118                                );
1119                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1120                        }
1121                    }
1122                }
1123                Op::CreateItem { name, item, .. } => {
1124                    *new_objects_per_schema
1125                        .entry((
1126                            name.qualifiers.database_spec.clone(),
1127                            name.qualifiers.schema_spec.clone(),
1128                        ))
1129                        .or_insert(0) += 1;
1130                    match item {
1131                        CatalogItem::Connection(connection) => match connection.details {
1132                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1133                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1134                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1135                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1136                            ConnectionDetails::AwsPrivatelink(_) => {
1137                                new_aws_privatelink_connections += 1
1138                            }
1139                            ConnectionDetails::Csr(_)
1140                            | ConnectionDetails::GlueSchemaRegistry(_)
1141                            | ConnectionDetails::Ssh { .. }
1142                            | ConnectionDetails::Aws(_)
1143                            | ConnectionDetails::IcebergCatalog(_) => {}
1144                        },
1145                        CatalogItem::Table(_) => {
1146                            new_tables += 1;
1147                        }
1148                        CatalogItem::Source(source) => {
1149                            new_sources += source.user_controllable_persist_shard_count()
1150                        }
1151                        CatalogItem::Sink(_) => new_sinks += 1,
1152                        CatalogItem::MaterializedView(_) => {
1153                            new_materialized_views += 1;
1154                        }
1155                        CatalogItem::Secret(_) => {
1156                            new_secrets += 1;
1157                        }
1158                        CatalogItem::Log(_)
1159                        | CatalogItem::View(_)
1160                        | CatalogItem::Index(_)
1161                        | CatalogItem::Type(_)
1162                        | CatalogItem::Func(_) => {}
1163                    }
1164                }
1165                Op::DropObjects(drop_object_infos) => {
1166                    for drop_object_info in drop_object_infos {
1167                        match drop_object_info {
1168                            DropObjectInfo::Cluster(_) => {
1169                                new_clusters -= 1;
1170                            }
1171                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1172                                if cluster_id.is_user() {
1173                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1174                                    let cluster = self
1175                                        .catalog()
1176                                        .get_cluster_replica(*cluster_id, *replica_id);
1177                                    if let ReplicaLocation::Managed(location) =
1178                                        &cluster.config.location
1179                                    {
1180                                        let replica_allocation = self
1181                                            .catalog()
1182                                            .cluster_replica_sizes()
1183                                            .0
1184                                            .get(location.size_for_billing())
1185                                            .expect(
1186                                                "location size is validated against the cluster replica sizes",
1187                                            );
1188                                        new_credit_consumption_rate -=
1189                                            replica_allocation.credits_per_hour
1190                                    }
1191                                }
1192                            }
1193                            DropObjectInfo::Database(_) => {
1194                                new_databases -= 1;
1195                            }
1196                            DropObjectInfo::Schema((database_spec, _)) => {
1197                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1198                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1199                                }
1200                            }
1201                            DropObjectInfo::Role(_) => {
1202                                new_roles -= 1;
1203                            }
1204                            DropObjectInfo::NetworkPolicy(_) => {
1205                                new_network_policies -= 1;
1206                            }
1207                            DropObjectInfo::Item(id) => {
1208                                let entry = self.catalog().get_entry(id);
1209                                *new_objects_per_schema
1210                                    .entry((
1211                                        entry.name().qualifiers.database_spec.clone(),
1212                                        entry.name().qualifiers.schema_spec.clone(),
1213                                    ))
1214                                    .or_insert(0) -= 1;
1215                                match entry.item() {
1216                                    CatalogItem::Connection(connection) => match connection.details
1217                                    {
1218                                        ConnectionDetails::AwsPrivatelink(_) => {
1219                                            new_aws_privatelink_connections -= 1;
1220                                        }
1221                                        _ => (),
1222                                    },
1223                                    CatalogItem::Table(_) => {
1224                                        new_tables -= 1;
1225                                    }
1226                                    CatalogItem::Source(source) => {
1227                                        new_sources -=
1228                                            source.user_controllable_persist_shard_count()
1229                                    }
1230                                    CatalogItem::Sink(_) => new_sinks -= 1,
1231                                    CatalogItem::MaterializedView(_) => {
1232                                        new_materialized_views -= 1;
1233                                    }
1234                                    CatalogItem::Secret(_) => {
1235                                        new_secrets -= 1;
1236                                    }
1237                                    CatalogItem::Log(_)
1238                                    | CatalogItem::View(_)
1239                                    | CatalogItem::Index(_)
1240                                    | CatalogItem::Type(_)
1241                                    | CatalogItem::Func(_) => {}
1242                                }
1243                            }
1244                        }
1245                    }
1246                }
1247                Op::UpdateItem {
1248                    name: _,
1249                    id,
1250                    to_item,
1251                } => match to_item {
1252                    CatalogItem::Source(source) => {
1253                        let current_source = self
1254                            .catalog()
1255                            .get_entry(id)
1256                            .source()
1257                            .expect("source update is for source item");
1258
1259                        new_sources += source.user_controllable_persist_shard_count()
1260                            - current_source.user_controllable_persist_shard_count();
1261                    }
1262                    CatalogItem::Connection(_)
1263                    | CatalogItem::Table(_)
1264                    | CatalogItem::Sink(_)
1265                    | CatalogItem::MaterializedView(_)
1266                    | CatalogItem::Secret(_)
1267                    | CatalogItem::Log(_)
1268                    | CatalogItem::View(_)
1269                    | CatalogItem::Index(_)
1270                    | CatalogItem::Type(_)
1271                    | CatalogItem::Func(_) => {}
1272                },
1273                Op::AlterRole { .. }
1274                | Op::AlterRetainHistory { .. }
1275                | Op::AlterSourceTimestampInterval { .. }
1276                | Op::AlterNetworkPolicy { .. }
1277                | Op::AlterAddColumn { .. }
1278                | Op::AlterMaterializedViewApplyReplacement { .. }
1279                | Op::UpdatePrivilege { .. }
1280                | Op::UpdateDefaultPrivilege { .. }
1281                | Op::GrantRole { .. }
1282                | Op::RenameCluster { .. }
1283                | Op::RenameClusterReplica { .. }
1284                | Op::RenameItem { .. }
1285                | Op::RenameSchema { .. }
1286                | Op::UpdateOwner { .. }
1287                | Op::RevokeRole { .. }
1288                | Op::UpdateClusterConfig { .. }
1289                | Op::UpdateClusterReplicaConfig { .. }
1290                | Op::UpdateSourceReferences { .. }
1291                | Op::UpdateSystemConfiguration { .. }
1292                | Op::ResetSystemConfiguration { .. }
1293                | Op::ResetAllSystemConfiguration { .. }
1294                | Op::Comment { .. }
1295                | Op::InjectAuditEvents { .. } => {}
1296            }
1297        }
1298
1299        let mut current_aws_privatelink_connections = 0;
1300        let mut current_postgres_connections = 0;
1301        let mut current_mysql_connections = 0;
1302        let mut current_sql_server_connections = 0;
1303        let mut current_kafka_connections = 0;
1304        for c in self.catalog().user_connections() {
1305            let connection = c
1306                .connection()
1307                .expect("`user_connections()` only returns connection objects");
1308
1309            match connection.details {
1310                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1311                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1312                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1313                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1314                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1315                ConnectionDetails::Csr(_)
1316                | ConnectionDetails::GlueSchemaRegistry(_)
1317                | ConnectionDetails::Ssh { .. }
1318                | ConnectionDetails::Aws(_)
1319                | ConnectionDetails::IcebergCatalog(_) => {}
1320            }
1321        }
1322        self.validate_resource_limit(
1323            current_kafka_connections,
1324            new_kafka_connections,
1325            SystemVars::max_kafka_connections,
1326            "Kafka Connection",
1327            MAX_KAFKA_CONNECTIONS.name(),
1328        )?;
1329        self.validate_resource_limit(
1330            current_postgres_connections,
1331            new_postgres_connections,
1332            SystemVars::max_postgres_connections,
1333            "PostgreSQL Connection",
1334            MAX_POSTGRES_CONNECTIONS.name(),
1335        )?;
1336        self.validate_resource_limit(
1337            current_mysql_connections,
1338            new_mysql_connections,
1339            SystemVars::max_mysql_connections,
1340            "MySQL Connection",
1341            MAX_MYSQL_CONNECTIONS.name(),
1342        )?;
1343        self.validate_resource_limit(
1344            current_sql_server_connections,
1345            new_sql_server_connections,
1346            SystemVars::max_sql_server_connections,
1347            "SQL Server Connection",
1348            MAX_SQL_SERVER_CONNECTIONS.name(),
1349        )?;
1350        self.validate_resource_limit(
1351            current_aws_privatelink_connections,
1352            new_aws_privatelink_connections,
1353            SystemVars::max_aws_privatelink_connections,
1354            "AWS PrivateLink Connection",
1355            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1356        )?;
1357        self.validate_resource_limit(
1358            self.catalog().user_tables().count(),
1359            new_tables,
1360            SystemVars::max_tables,
1361            "table",
1362            MAX_TABLES.name(),
1363        )?;
1364
1365        let current_sources: usize = self
1366            .catalog()
1367            .user_sources()
1368            .filter_map(|source| source.source())
1369            .map(|source| source.user_controllable_persist_shard_count())
1370            .sum::<i64>()
1371            .try_into()
1372            .expect("non-negative sum of sources");
1373
1374        self.validate_resource_limit(
1375            current_sources,
1376            new_sources,
1377            SystemVars::max_sources,
1378            "source",
1379            MAX_SOURCES.name(),
1380        )?;
1381        self.validate_resource_limit(
1382            self.catalog().user_sinks().count(),
1383            new_sinks,
1384            SystemVars::max_sinks,
1385            "sink",
1386            MAX_SINKS.name(),
1387        )?;
1388        self.validate_resource_limit(
1389            self.catalog().user_materialized_views().count(),
1390            new_materialized_views,
1391            SystemVars::max_materialized_views,
1392            "materialized view",
1393            MAX_MATERIALIZED_VIEWS.name(),
1394        )?;
1395        self.validate_resource_limit(
1396            // Linked compute clusters don't count against the limit, since
1397            // we have a separate sources and sinks limit.
1398            //
1399            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1400            // and set a higher max cluster limit?
1401            self.catalog().user_clusters().count(),
1402            new_clusters,
1403            SystemVars::max_clusters,
1404            "cluster",
1405            MAX_CLUSTERS.name(),
1406        )?;
1407        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1408            // It's possible that the cluster hasn't been created yet.
1409            let current_amount = self
1410                .catalog()
1411                .try_get_cluster(cluster_id)
1412                .map(|instance| instance.user_replicas().count())
1413                .unwrap_or(0);
1414            self.validate_resource_limit(
1415                current_amount,
1416                new_replicas,
1417                SystemVars::max_replicas_per_cluster,
1418                "cluster replica",
1419                MAX_REPLICAS_PER_CLUSTER.name(),
1420            )?;
1421        }
1422        self.validate_resource_limit_numeric(
1423            self.current_credit_consumption_rate(),
1424            new_credit_consumption_rate,
1425            |system_vars| {
1426                self.license_key
1427                    .max_credit_consumption_rate()
1428                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1429            },
1430            "cluster replica",
1431            MAX_CREDIT_CONSUMPTION_RATE.name(),
1432        )?;
1433        self.validate_resource_limit(
1434            self.catalog().databases().count(),
1435            new_databases,
1436            SystemVars::max_databases,
1437            "database",
1438            MAX_DATABASES.name(),
1439        )?;
1440        for (database_id, new_schemas) in new_schemas_per_database {
1441            self.validate_resource_limit(
1442                self.catalog().get_database(database_id).schemas_by_id.len(),
1443                new_schemas,
1444                SystemVars::max_schemas_per_database,
1445                "schema",
1446                MAX_SCHEMAS_PER_DATABASE.name(),
1447            )?;
1448        }
1449        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1450            // For temporary schemas that don't exist yet (lazy creation),
1451            // treat them as having 0 items.
1452            let current_items = self
1453                .catalog()
1454                .try_get_schema(&database_spec, &schema_spec, conn_id)
1455                .map(|schema| schema.items.len())
1456                .unwrap_or(0);
1457            self.validate_resource_limit(
1458                current_items,
1459                new_objects,
1460                SystemVars::max_objects_per_schema,
1461                "object",
1462                MAX_OBJECTS_PER_SCHEMA.name(),
1463            )?;
1464        }
1465        self.validate_resource_limit(
1466            self.catalog().user_secrets().count(),
1467            new_secrets,
1468            SystemVars::max_secrets,
1469            "secret",
1470            MAX_SECRETS.name(),
1471        )?;
1472        self.validate_resource_limit(
1473            self.catalog().user_roles().count(),
1474            new_roles,
1475            SystemVars::max_roles,
1476            "role",
1477            MAX_ROLES.name(),
1478        )?;
1479        self.validate_resource_limit(
1480            self.catalog().user_network_policies().count(),
1481            new_network_policies,
1482            SystemVars::max_network_policies,
1483            "network_policy",
1484            MAX_NETWORK_POLICIES.name(),
1485        )?;
1486        Ok(())
1487    }
1488
1489    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1490    pub(crate) fn validate_resource_limit<F>(
1491        &self,
1492        current_amount: usize,
1493        new_instances: i64,
1494        resource_limit: F,
1495        resource_type: &str,
1496        limit_name: &str,
1497    ) -> Result<(), AdapterError>
1498    where
1499        F: Fn(&SystemVars) -> u32,
1500    {
1501        if new_instances <= 0 {
1502            return Ok(());
1503        }
1504
1505        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1506        let current_amount: Option<i64> = current_amount.try_into().ok();
1507        let desired =
1508            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1509
1510        let exceeds_limit = if let Some(desired) = desired {
1511            desired > limit
1512        } else {
1513            true
1514        };
1515
1516        let desired = desired
1517            .map(|desired| desired.to_string())
1518            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1519        let current = current_amount
1520            .map(|current| current.to_string())
1521            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1522        if exceeds_limit {
1523            Err(AdapterError::ResourceExhaustion {
1524                resource_type: resource_type.to_string(),
1525                limit_name: limit_name.to_string(),
1526                desired,
1527                limit: limit.to_string(),
1528                current,
1529            })
1530        } else {
1531            Ok(())
1532        }
1533    }
1534
1535    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1536    ///
1537    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1538    pub(crate) fn validate_resource_limit_numeric<F>(
1539        &self,
1540        current_amount: Numeric,
1541        new_amount: Numeric,
1542        resource_limit: F,
1543        resource_type: &str,
1544        limit_name: &str,
1545    ) -> Result<(), AdapterError>
1546    where
1547        F: Fn(&SystemVars) -> Numeric,
1548    {
1549        if new_amount <= Numeric::zero() {
1550            return Ok(());
1551        }
1552
1553        let limit = resource_limit(self.catalog().system_config());
1554        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1555        // semantics.
1556        // NaN should be impossible here since both values are positive.
1557        let desired = current_amount + new_amount;
1558        if desired > limit {
1559            Err(AdapterError::ResourceExhaustion {
1560                resource_type: resource_type.to_string(),
1561                limit_name: limit_name.to_string(),
1562                desired: desired.to_string(),
1563                limit: limit.to_string(),
1564                current: current_amount.to_string(),
1565            })
1566        } else {
1567            Ok(())
1568        }
1569    }
1570}