Skip to main content

mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40    MAX_CONTINUAL_TASKS, MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS,
41    MAX_MATERIALIZED_VIEWS, MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA,
42    MAX_POSTGRES_CONNECTIONS, MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE,
43    MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    state: txn_state,
229                    snapshot: txn_snapshot,
230                    side_effects: _,
231                },
232            ..
233        }) = ctx.session().transaction().inner()
234        else {
235            let result = self
236                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
237                .await;
238            self.metrics
239                .catalog_transact_seconds
240                .with_label_values(&["catalog_transact_with_ddl_transaction"])
241                .observe(start.elapsed().as_secs_f64());
242            return result;
243        };
244
245        // Make sure our Catalog hasn't changed since openning the transaction.
246        if self.catalog().transient_revision() != *txn_revision {
247            self.metrics
248                .catalog_transact_seconds
249                .with_label_values(&["catalog_transact_with_ddl_transaction"])
250                .observe(start.elapsed().as_secs_f64());
251            return Err(AdapterError::DDLTransactionRace);
252        }
253
254        // Clone what we need from the session before taking &mut below.
255        let txn_ops_clone = txn_ops.clone();
256        let txn_state_clone = txn_state.clone();
257        let prev_snapshot = txn_snapshot.clone();
258
259        // Validate resource limits with all accumulated + new ops (cheap O(N) counting).
260        let mut combined_ops = txn_ops_clone;
261        combined_ops.extend(ops.iter().cloned());
262        let conn_id = ctx.session().conn_id().clone();
263        self.validate_resource_limits(&combined_ops, &conn_id)?;
264
265        // Get oracle timestamp for audit log entries.
266        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
267
268        // Get ConnMeta for the session.
269        let conn = self.active_conns.get(ctx.session().conn_id());
270
271        // Incremental dry run: process only NEW ops against accumulated state.
272        // If we have a saved snapshot from a previous dry run, use it to
273        // initialize the transaction so it starts in sync with the accumulated
274        // state. Otherwise (first statement), the fresh durable transaction is
275        // already in sync with the real catalog state.
276        let (new_state, new_snapshot) = self
277            .catalog()
278            .transact_incremental_dry_run(
279                &txn_state_clone,
280                ops.clone(),
281                conn,
282                prev_snapshot,
283                oracle_write_ts,
284            )
285            .await?;
286
287        // Accumulate ops for eventual COMMIT.
288        let result = ctx
289            .session_mut()
290            .transaction_mut()
291            .add_ops(TransactionOps::DDL {
292                ops: combined_ops,
293                state: new_state,
294                side_effects: vec![Box::new(side_effect)],
295                revision: self.catalog().transient_revision(),
296                snapshot: Some(new_snapshot),
297            });
298
299        self.metrics
300            .catalog_transact_seconds
301            .with_label_values(&["catalog_transact_with_ddl_transaction"])
302            .observe(start.elapsed().as_secs_f64());
303
304        result
305    }
306
307    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
308    /// called after this function successfully returns on any built
309    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
310    #[instrument(name = "coord::catalog_transact_inner")]
311    pub(crate) async fn catalog_transact_inner(
312        &mut self,
313        conn_id: Option<&ConnectionId>,
314        ops: Vec<catalog::Op>,
315    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
316        if self.controller.read_only() {
317            return Err(AdapterError::ReadOnly);
318        }
319
320        event!(Level::TRACE, ops = format!("{:?}", ops));
321
322        let mut webhook_sources_to_restart = BTreeSet::new();
323        let mut clusters_to_drop = vec![];
324        let mut cluster_replicas_to_drop = vec![];
325        let mut clusters_to_create = vec![];
326        let mut cluster_replicas_to_create = vec![];
327        let mut update_metrics_config = false;
328        let mut update_tracing_config = false;
329        let mut update_controller_config = false;
330        let mut update_compute_config = false;
331        let mut update_storage_config = false;
332        let mut update_timestamp_oracle_config = false;
333        let mut update_metrics_retention = false;
334        let mut update_secrets_caching_config = false;
335        let mut update_cluster_scheduling_config = false;
336        let mut update_http_config = false;
337        let mut update_advance_timelines_interval = false;
338
339        for op in &ops {
340            match op {
341                catalog::Op::DropObjects(drop_object_infos) => {
342                    for drop_object_info in drop_object_infos {
343                        match &drop_object_info {
344                            catalog::DropObjectInfo::Item(_) => {
345                                // Nothing to do, these will be handled by
346                                // applying the side effects that we return.
347                            }
348                            catalog::DropObjectInfo::Cluster(id) => {
349                                clusters_to_drop.push(*id);
350                            }
351                            catalog::DropObjectInfo::ClusterReplica((
352                                cluster_id,
353                                replica_id,
354                                _reason,
355                            )) => {
356                                // Drop the cluster replica itself.
357                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
358                            }
359                            _ => (),
360                        }
361                    }
362                }
363                catalog::Op::ResetSystemConfiguration { name }
364                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
365                    update_metrics_config |= self
366                        .catalog
367                        .state()
368                        .system_config()
369                        .is_metrics_config_var(name);
370                    update_tracing_config |= vars::is_tracing_var(name);
371                    update_controller_config |= self
372                        .catalog
373                        .state()
374                        .system_config()
375                        .is_controller_config_var(name);
376                    update_compute_config |= self
377                        .catalog
378                        .state()
379                        .system_config()
380                        .is_compute_config_var(name);
381                    update_storage_config |= self
382                        .catalog
383                        .state()
384                        .system_config()
385                        .is_storage_config_var(name);
386                    update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
387                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
388                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
389                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
390                    update_http_config |= vars::is_http_config_var(name);
391                    update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
392                }
393                catalog::Op::ResetAllSystemConfiguration => {
394                    // Assume they all need to be updated.
395                    // We could see if the config's have actually changed, but
396                    // this is simpler.
397                    update_tracing_config = true;
398                    update_controller_config = true;
399                    update_compute_config = true;
400                    update_storage_config = true;
401                    update_timestamp_oracle_config = true;
402                    update_metrics_retention = true;
403                    update_secrets_caching_config = true;
404                    update_cluster_scheduling_config = true;
405                    update_metrics_config = true;
406                    update_http_config = true;
407                    update_advance_timelines_interval = true;
408                }
409                catalog::Op::RenameItem { id, .. } => {
410                    let item = self.catalog().get_entry(id);
411                    let is_webhook_source = item
412                        .source()
413                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
414                        .unwrap_or(false);
415                    if is_webhook_source {
416                        webhook_sources_to_restart.insert(*id);
417                    }
418                }
419                catalog::Op::RenameSchema {
420                    database_spec,
421                    schema_spec,
422                    ..
423                } => {
424                    let schema = self.catalog().get_schema(
425                        database_spec,
426                        schema_spec,
427                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
428                    );
429                    let webhook_sources = schema.item_ids().filter(|id| {
430                        let item = self.catalog().get_entry(id);
431                        item.source()
432                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
433                            .unwrap_or(false)
434                    });
435                    webhook_sources_to_restart.extend(webhook_sources);
436                }
437                catalog::Op::CreateCluster { id, .. } => {
438                    clusters_to_create.push(*id);
439                }
440                catalog::Op::CreateClusterReplica {
441                    cluster_id,
442                    name,
443                    config,
444                    ..
445                } => {
446                    cluster_replicas_to_create.push((
447                        *cluster_id,
448                        name.clone(),
449                        config.location.num_processes(),
450                    ));
451                }
452                _ => (),
453            }
454        }
455
456        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
457
458        // This will produce timestamps that are guaranteed to increase on each
459        // call, and also never be behind the system clock. If the system clock
460        // hasn't advanced (or has gone backward), it will increment by 1. For
461        // the audit log, we need to balance "close (within 10s or so) to the
462        // system clock" and "always goes up". We've chosen here to prioritize
463        // always going up, and believe we will always be close to the system
464        // clock because it is well configured (chrony) and so may only rarely
465        // regress or pause for 10s.
466        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
467
468        let Coordinator {
469            catalog,
470            active_conns,
471            controller,
472            cluster_replica_statuses,
473            ..
474        } = self;
475        let catalog = Arc::make_mut(catalog);
476        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
477
478        let TransactionResult {
479            builtin_table_updates,
480            catalog_updates,
481            audit_events,
482        } = catalog
483            .transact(
484                Some(&mut controller.storage_collections),
485                oracle_write_ts,
486                conn,
487                ops,
488            )
489            .await?;
490
491        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
492            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
493        }
494        for cluster_id in &clusters_to_drop {
495            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
496        }
497        for cluster_id in clusters_to_create {
498            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
499        }
500        let now = to_datetime((catalog.config().now)());
501        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
502            let replica_id = catalog
503                .resolve_replica_in_cluster(&cluster_id, &replica_name)
504                .expect("just created")
505                .replica_id();
506            cluster_replica_statuses.initialize_cluster_replica_statuses(
507                cluster_id,
508                replica_id,
509                num_processes,
510                now,
511            );
512        }
513
514        // Append our builtin table updates, then return the notify so we can run other tasks in
515        // parallel.
516        let (builtin_update_notify, _) = self
517            .builtin_table_update()
518            .execute(builtin_table_updates)
519            .await;
520
521        // No error returns are allowed after this point. Enforce this at compile time
522        // by using this odd structure so we don't accidentally add a stray `?`.
523        let _: () = async {
524            if !webhook_sources_to_restart.is_empty() {
525                self.restart_webhook_sources(webhook_sources_to_restart);
526            }
527
528            if update_metrics_config {
529                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
530            }
531            if update_controller_config {
532                self.update_controller_config();
533            }
534            if update_compute_config {
535                self.update_compute_config();
536            }
537            if update_storage_config {
538                self.update_storage_config();
539            }
540            if update_timestamp_oracle_config {
541                self.update_timestamp_oracle_config();
542            }
543            if update_metrics_retention {
544                self.update_metrics_retention();
545            }
546            if update_tracing_config {
547                self.update_tracing_config();
548            }
549            if update_secrets_caching_config {
550                self.update_secrets_caching_config();
551            }
552            if update_cluster_scheduling_config {
553                self.update_cluster_scheduling_config();
554            }
555            if update_http_config {
556                self.update_http_config();
557            }
558            if update_advance_timelines_interval {
559                let new_interval = self.catalog().system_config().default_timestamp_interval();
560                if new_interval != self.advance_timelines_interval.period() {
561                    self.advance_timelines_interval = tokio::time::interval(new_interval);
562                }
563            }
564        }
565        .instrument(info_span!("coord::catalog_transact_with::finalize"))
566        .await;
567
568        let conn = conn_id.and_then(|id| self.active_conns.get(id));
569        if let Some(segment_client) = &self.segment_client {
570            for VersionedEvent::V1(event) in audit_events {
571                let event_type = format!(
572                    "{} {}",
573                    event.object_type.as_title_case(),
574                    event.event_type.as_title_case()
575                );
576                segment_client.environment_track(
577                    &self.catalog().config().environment_id,
578                    event_type,
579                    json!({ "details": event.details.as_json() }),
580                    EventDetails {
581                        user_id: conn
582                            .and_then(|c| c.user().external_metadata.as_ref())
583                            .map(|m| m.user_id),
584                        application_name: conn.map(|c| c.application_name()),
585                        ..Default::default()
586                    },
587                );
588            }
589        }
590
591        Ok((builtin_update_notify, catalog_updates))
592    }
593
594    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
595        self.drop_introspection_subscribes(replica_id);
596
597        self.controller
598            .drop_replica(cluster_id, replica_id)
599            .expect("dropping replica must not fail");
600    }
601
602    /// A convenience method for dropping sources.
603    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
604        for (item_id, _gid) in &sources {
605            self.active_webhooks.remove(item_id);
606        }
607        let storage_metadata = self.catalog.state().storage_metadata();
608        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
609        self.controller
610            .storage
611            .drop_sources(storage_metadata, source_gids)
612            .unwrap_or_terminate("cannot fail to drop sources");
613    }
614
615    /// A convenience method for dropping tables.
616    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
617        for (item_id, _gid) in &tables {
618            self.active_webhooks.remove(item_id);
619        }
620
621        let storage_metadata = self.catalog.state().storage_metadata();
622        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
623        self.controller
624            .storage
625            .drop_tables(storage_metadata, table_gids, ts)
626            .unwrap_or_terminate("cannot fail to drop tables");
627    }
628
629    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
630        for id in sources {
631            self.active_webhooks.remove(&id);
632        }
633    }
634
635    /// Like `drop_compute_sinks`, but for a single compute sink.
636    ///
637    /// Returns the controller's state for the compute sink if the identified
638    /// sink was known to the controller. It is the caller's responsibility to
639    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
640    #[must_use]
641    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
642        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
643    }
644
645    /// Drops a batch of compute sinks.
646    ///
647    /// For each sink that exists, the coordinator and controller's state
648    /// associated with the sink is removed.
649    ///
650    /// Returns a map containing the controller's state for each sink that was
651    /// removed. It is the caller's responsibility to retire the returned sinks.
652    /// Consider using `retire_compute_sinks` instead.
653    #[must_use]
654    pub async fn drop_compute_sinks(
655        &mut self,
656        sink_ids: impl IntoIterator<Item = GlobalId>,
657    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
658        let mut by_id = BTreeMap::new();
659        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
660        for sink_id in sink_ids {
661            let sink = match self.remove_active_compute_sink(sink_id).await {
662                None => {
663                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
664                    continue;
665                }
666                Some(sink) => sink,
667            };
668
669            by_cluster
670                .entry(sink.cluster_id())
671                .or_default()
672                .push(sink_id);
673            by_id.insert(sink_id, sink);
674        }
675        for (cluster_id, ids) in by_cluster {
676            let compute = &mut self.controller.compute;
677            // A cluster could have been dropped, so verify it exists.
678            if compute.instance_exists(cluster_id) {
679                compute
680                    .drop_collections(cluster_id, ids)
681                    .unwrap_or_terminate("cannot fail to drop collections");
682            }
683        }
684        by_id
685    }
686
687    /// Retires a batch of sinks with disparate reasons for retirement.
688    ///
689    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
690    /// then retired with its corresponding reason.
691    pub async fn retire_compute_sinks(
692        &mut self,
693        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
694    ) {
695        let sink_ids = reasons.keys().cloned();
696        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
697            let reason = reasons
698                .remove(&id)
699                .expect("all returned IDs are in `reasons`");
700            sink.retire(reason);
701        }
702    }
703
704    /// Drops all pending replicas for a set of clusters
705    /// that are undergoing reconfiguration.
706    pub async fn drop_reconfiguration_replicas(
707        &mut self,
708        cluster_ids: BTreeSet<ClusterId>,
709    ) -> Result<(), AdapterError> {
710        let pending_cluster_ops: Vec<Op> = cluster_ids
711            .iter()
712            .map(|c| {
713                self.catalog()
714                    .get_cluster(c.clone())
715                    .replicas()
716                    .filter_map(|r| match r.config.location {
717                        ReplicaLocation::Managed(ref l) if l.pending => {
718                            Some(DropObjectInfo::ClusterReplica((
719                                c.clone(),
720                                r.replica_id,
721                                ReplicaCreateDropReason::Manual,
722                            )))
723                        }
724                        _ => None,
725                    })
726                    .collect::<Vec<DropObjectInfo>>()
727            })
728            .filter_map(|pending_replica_drop_ops_by_cluster| {
729                match pending_replica_drop_ops_by_cluster.len() {
730                    0 => None,
731                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
732                }
733            })
734            .collect();
735        if !pending_cluster_ops.is_empty() {
736            self.catalog_transact(None, pending_cluster_ops).await?;
737        }
738        Ok(())
739    }
740
741    /// Cancels all active compute sinks for the identified connection.
742    #[mz_ore::instrument(level = "debug")]
743    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
744        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
745            .await
746    }
747
748    /// Cancels all active cluster reconfigurations sinks for the identified connection.
749    #[mz_ore::instrument(level = "debug")]
750    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
751        &mut self,
752        conn_id: &ConnectionId,
753    ) {
754        self.retire_cluster_reconfigurations_for_conn(conn_id).await
755    }
756
757    /// Retires all active compute sinks for the identified connection with the
758    /// specified reason.
759    #[mz_ore::instrument(level = "debug")]
760    pub(crate) async fn retire_compute_sinks_for_conn(
761        &mut self,
762        conn_id: &ConnectionId,
763        reason: ActiveComputeSinkRetireReason,
764    ) {
765        let drop_sinks = self
766            .active_conns
767            .get_mut(conn_id)
768            .expect("must exist for active session")
769            .drop_sinks
770            .iter()
771            .map(|sink_id| (*sink_id, reason.clone()))
772            .collect();
773        self.retire_compute_sinks(drop_sinks).await;
774    }
775
776    /// Cleans pending cluster reconfiguraiotns for the identified connection
777    #[mz_ore::instrument(level = "debug")]
778    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
779        &mut self,
780        conn_id: &ConnectionId,
781    ) {
782        let reconfiguring_clusters = self
783            .active_conns
784            .get(conn_id)
785            .expect("must exist for active session")
786            .pending_cluster_alters
787            .clone();
788        // try to drop reconfig replicas
789        self.drop_reconfiguration_replicas(reconfiguring_clusters)
790            .await
791            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
792
793        self.active_conns
794            .get_mut(conn_id)
795            .expect("must exist for active session")
796            .pending_cluster_alters
797            .clear();
798    }
799
800    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
801        let storage_metadata = self.catalog.state().storage_metadata();
802        self.controller
803            .storage
804            .drop_sinks(storage_metadata, sink_gids)
805            .unwrap_or_terminate("cannot fail to drop sinks");
806    }
807
808    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
809        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
810        for (cluster_id, gid) in collections {
811            by_cluster.entry(cluster_id).or_default().push(gid);
812        }
813        for (cluster_id, gids) in by_cluster {
814            let compute = &mut self.controller.compute;
815            // A cluster could have been dropped, so verify it exists.
816            if compute.instance_exists(cluster_id) {
817                compute
818                    .drop_collections(cluster_id, gids)
819                    .unwrap_or_terminate("cannot fail to drop collections");
820            }
821        }
822    }
823
824    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
825        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
826            .as_ref()
827            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
828            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
829        // We don't want to block the coordinator on an external delete api
830        // calls, so move the drop vpc_endpoint to a separate task. This does
831        // mean that a failed drop won't bubble up to the user as an error
832        // message. However, even if it did (and how the code previously
833        // worked), mz has already dropped it from our catalog, and so we
834        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
835        // eventually be cleaned during restart via coord bootstrap.
836        task::spawn(
837            || "drop_vpc_endpoints",
838            async move {
839                for vpc_endpoint in vpc_endpoints {
840                    let _ = Retry::default()
841                        .max_duration(Duration::from_secs(60))
842                        .retry_async(|_state| async {
843                            fail_point!("drop_vpc_endpoint", |r| {
844                                Err(anyhow::anyhow!("Fail point error {:?}", r))
845                            });
846                            match cloud_resource_controller
847                                .delete_vpc_endpoint(vpc_endpoint)
848                                .await
849                            {
850                                Ok(_) => Ok(()),
851                                Err(e) => {
852                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
853                                    Err(e)
854                                }
855                            }
856                        })
857                        .await;
858                }
859            }
860            .instrument(info_span!(
861                "coord::catalog_transact_inner::drop_vpc_endpoints"
862            )),
863        );
864    }
865
866    /// Removes all temporary items created by the specified connection, though
867    /// not the temporary schema itself.
868    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
869        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
870        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
871
872        if all_items.is_empty() {
873            return;
874        }
875        let op = Op::DropObjects(
876            all_items
877                .into_iter()
878                .map(DropObjectInfo::manual_drop_from_object_id)
879                .collect(),
880        );
881
882        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
883            .await
884            .expect("unable to drop temporary items for conn_id");
885    }
886
887    fn update_cluster_scheduling_config(&self) {
888        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
889        self.controller
890            .update_orchestrator_scheduling_config(config);
891    }
892
893    fn update_secrets_caching_config(&self) {
894        let config = flags::caching_config(self.catalog.system_config());
895        self.caching_secrets_reader.set_policy(config);
896    }
897
898    fn update_tracing_config(&self) {
899        let tracing = flags::tracing_config(self.catalog().system_config());
900        tracing.apply(&self.tracing_handle);
901    }
902
903    fn update_compute_config(&mut self) {
904        let config_params = flags::compute_config(self.catalog().system_config());
905        self.controller.compute.update_configuration(config_params);
906    }
907
908    fn update_storage_config(&mut self) {
909        let config_params = flags::storage_config(self.catalog().system_config());
910        self.controller.storage.update_parameters(config_params);
911    }
912
913    fn update_timestamp_oracle_config(&self) {
914        let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
915        if let Some(config) = self.timestamp_oracle_config.as_ref() {
916            config.apply_parameters(config_params)
917        }
918    }
919
920    fn update_metrics_retention(&self) {
921        let duration = self.catalog().system_config().metrics_retention();
922        let policy = ReadPolicy::lag_writes_by(
923            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
924                tracing::error!("Absurd metrics retention duration: {duration:?}.");
925                u64::MAX
926            })),
927            SINCE_GRANULARITY,
928        );
929        let storage_policies = self
930            .catalog()
931            .entries()
932            .filter(|entry| {
933                entry.item().is_retained_metrics_object()
934                    && entry.item().is_compute_object_on_cluster().is_none()
935            })
936            .map(|entry| (entry.id(), policy.clone()))
937            .collect::<Vec<_>>();
938        let compute_policies = self
939            .catalog()
940            .entries()
941            .filter_map(|entry| {
942                if let (true, Some(cluster_id)) = (
943                    entry.item().is_retained_metrics_object(),
944                    entry.item().is_compute_object_on_cluster(),
945                ) {
946                    Some((cluster_id, entry.id(), policy.clone()))
947                } else {
948                    None
949                }
950            })
951            .collect::<Vec<_>>();
952        self.update_storage_read_policies(storage_policies);
953        self.update_compute_read_policies(compute_policies);
954    }
955
956    fn update_controller_config(&mut self) {
957        let sys_config = self.catalog().system_config();
958        self.controller
959            .update_configuration(sys_config.dyncfg_updates());
960    }
961
962    fn update_http_config(&mut self) {
963        let webhook_request_limit = self
964            .catalog()
965            .system_config()
966            .webhook_concurrent_request_limit();
967        self.webhook_concurrency_limit
968            .set_limit(webhook_request_limit);
969    }
970
971    pub(crate) async fn create_storage_export(
972        &mut self,
973        id: GlobalId,
974        sink: &Sink,
975    ) -> Result<(), AdapterError> {
976        // Validate `sink.from` is in fact a storage collection
977        self.controller.storage.check_exists(sink.from)?;
978
979        // The AsOf is used to determine at what time to snapshot reading from
980        // the persist collection.  This is primarily relevant when we do _not_
981        // want to include the snapshot in the sink.
982        //
983        // We choose the smallest as_of that is legal, according to the sinked
984        // collection's since.
985        let id_bundle = crate::CollectionIdBundle {
986            storage_ids: btreeset! {sink.from},
987            compute_ids: btreemap! {},
988        };
989
990        // We're putting in place read holds, such that create_exports, below,
991        // which calls update_read_capabilities, can successfully do so.
992        // Otherwise, the since of dependencies might move along concurrently,
993        // pulling the rug from under us!
994        //
995        // TODO: Maybe in the future, pass those holds on to storage, to hold on
996        // to them and downgrade when possible?
997        let read_holds = self.acquire_read_holds(&id_bundle);
998        let as_of = read_holds.least_valid_read();
999
1000        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1001        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1002            from: sink.from,
1003            from_desc: storage_sink_from_entry
1004                .relation_desc()
1005                .expect("sinks can only be built on items with descs")
1006                .into_owned(),
1007            connection: sink
1008                .connection
1009                .clone()
1010                .into_inline_connection(self.catalog().state()),
1011            envelope: sink.envelope,
1012            as_of,
1013            with_snapshot: sink.with_snapshot,
1014            version: sink.version,
1015            from_storage_metadata: (),
1016            to_storage_metadata: (),
1017            commit_interval: sink.commit_interval,
1018        };
1019
1020        let collection_desc = CollectionDescription {
1021            // TODO(sinks): make generic once we have more than one sink type.
1022            desc: KAFKA_PROGRESS_DESC.clone(),
1023            data_source: DataSource::Sink {
1024                desc: ExportDescription {
1025                    sink: storage_sink_desc,
1026                    instance_id: sink.cluster_id,
1027                },
1028            },
1029            since: None,
1030            timeline: None,
1031            primary: None,
1032        };
1033        let collections = vec![(id, collection_desc)];
1034
1035        // Create the collections.
1036        let storage_metadata = self.catalog.state().storage_metadata();
1037        let res = self
1038            .controller
1039            .storage
1040            .create_collections(storage_metadata, None, collections)
1041            .await;
1042
1043        // Drop read holds after the export has been created, at which point
1044        // storage will have put in its own read holds.
1045        drop(read_holds);
1046
1047        Ok(res?)
1048    }
1049
1050    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1051    /// exceeded.
1052    fn validate_resource_limits(
1053        &self,
1054        ops: &Vec<catalog::Op>,
1055        conn_id: &ConnectionId,
1056    ) -> Result<(), AdapterError> {
1057        let mut new_kafka_connections = 0;
1058        let mut new_postgres_connections = 0;
1059        let mut new_mysql_connections = 0;
1060        let mut new_sql_server_connections = 0;
1061        let mut new_aws_privatelink_connections = 0;
1062        let mut new_tables = 0;
1063        let mut new_sources = 0;
1064        let mut new_sinks = 0;
1065        let mut new_materialized_views = 0;
1066        let mut new_clusters = 0;
1067        let mut new_replicas_per_cluster = BTreeMap::new();
1068        let mut new_credit_consumption_rate = Numeric::zero();
1069        let mut new_databases = 0;
1070        let mut new_schemas_per_database = BTreeMap::new();
1071        let mut new_objects_per_schema = BTreeMap::new();
1072        let mut new_secrets = 0;
1073        let mut new_roles = 0;
1074        let mut new_continual_tasks = 0;
1075        let mut new_network_policies = 0;
1076        for op in ops {
1077            match op {
1078                Op::CreateDatabase { .. } => {
1079                    new_databases += 1;
1080                }
1081                Op::CreateSchema { database_id, .. } => {
1082                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1083                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1084                    }
1085                }
1086                Op::CreateRole { .. } => {
1087                    new_roles += 1;
1088                }
1089                Op::CreateNetworkPolicy { .. } => {
1090                    new_network_policies += 1;
1091                }
1092                Op::CreateCluster { .. } => {
1093                    // TODO(benesch): having deprecated linked clusters, remove
1094                    // the `max_sources` and `max_sinks` limit, and set a higher
1095                    // max cluster limit?
1096                    new_clusters += 1;
1097                }
1098                Op::CreateClusterReplica {
1099                    cluster_id, config, ..
1100                } => {
1101                    if cluster_id.is_user() {
1102                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1103                        if let ReplicaLocation::Managed(location) = &config.location {
1104                            let replica_allocation = self
1105                                .catalog()
1106                                .cluster_replica_sizes()
1107                                .0
1108                                .get(location.size_for_billing())
1109                                .expect(
1110                                    "location size is validated against the cluster replica sizes",
1111                                );
1112                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1113                        }
1114                    }
1115                }
1116                Op::CreateItem { name, item, .. } => {
1117                    *new_objects_per_schema
1118                        .entry((
1119                            name.qualifiers.database_spec.clone(),
1120                            name.qualifiers.schema_spec.clone(),
1121                        ))
1122                        .or_insert(0) += 1;
1123                    match item {
1124                        CatalogItem::Connection(connection) => match connection.details {
1125                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1126                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1127                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1128                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1129                            ConnectionDetails::AwsPrivatelink(_) => {
1130                                new_aws_privatelink_connections += 1
1131                            }
1132                            ConnectionDetails::Csr(_)
1133                            | ConnectionDetails::Ssh { .. }
1134                            | ConnectionDetails::Aws(_)
1135                            | ConnectionDetails::IcebergCatalog(_) => {}
1136                        },
1137                        CatalogItem::Table(_) => {
1138                            new_tables += 1;
1139                        }
1140                        CatalogItem::Source(source) => {
1141                            new_sources += source.user_controllable_persist_shard_count()
1142                        }
1143                        CatalogItem::Sink(_) => new_sinks += 1,
1144                        CatalogItem::MaterializedView(_) => {
1145                            new_materialized_views += 1;
1146                        }
1147                        CatalogItem::Secret(_) => {
1148                            new_secrets += 1;
1149                        }
1150                        CatalogItem::ContinualTask(_) => {
1151                            new_continual_tasks += 1;
1152                        }
1153                        CatalogItem::Log(_)
1154                        | CatalogItem::View(_)
1155                        | CatalogItem::Index(_)
1156                        | CatalogItem::Type(_)
1157                        | CatalogItem::Func(_) => {}
1158                    }
1159                }
1160                Op::DropObjects(drop_object_infos) => {
1161                    for drop_object_info in drop_object_infos {
1162                        match drop_object_info {
1163                            DropObjectInfo::Cluster(_) => {
1164                                new_clusters -= 1;
1165                            }
1166                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1167                                if cluster_id.is_user() {
1168                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1169                                    let cluster = self
1170                                        .catalog()
1171                                        .get_cluster_replica(*cluster_id, *replica_id);
1172                                    if let ReplicaLocation::Managed(location) =
1173                                        &cluster.config.location
1174                                    {
1175                                        let replica_allocation = self
1176                                            .catalog()
1177                                            .cluster_replica_sizes()
1178                                            .0
1179                                            .get(location.size_for_billing())
1180                                            .expect(
1181                                                "location size is validated against the cluster replica sizes",
1182                                            );
1183                                        new_credit_consumption_rate -=
1184                                            replica_allocation.credits_per_hour
1185                                    }
1186                                }
1187                            }
1188                            DropObjectInfo::Database(_) => {
1189                                new_databases -= 1;
1190                            }
1191                            DropObjectInfo::Schema((database_spec, _)) => {
1192                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1193                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1194                                }
1195                            }
1196                            DropObjectInfo::Role(_) => {
1197                                new_roles -= 1;
1198                            }
1199                            DropObjectInfo::NetworkPolicy(_) => {
1200                                new_network_policies -= 1;
1201                            }
1202                            DropObjectInfo::Item(id) => {
1203                                let entry = self.catalog().get_entry(id);
1204                                *new_objects_per_schema
1205                                    .entry((
1206                                        entry.name().qualifiers.database_spec.clone(),
1207                                        entry.name().qualifiers.schema_spec.clone(),
1208                                    ))
1209                                    .or_insert(0) -= 1;
1210                                match entry.item() {
1211                                    CatalogItem::Connection(connection) => match connection.details
1212                                    {
1213                                        ConnectionDetails::AwsPrivatelink(_) => {
1214                                            new_aws_privatelink_connections -= 1;
1215                                        }
1216                                        _ => (),
1217                                    },
1218                                    CatalogItem::Table(_) => {
1219                                        new_tables -= 1;
1220                                    }
1221                                    CatalogItem::Source(source) => {
1222                                        new_sources -=
1223                                            source.user_controllable_persist_shard_count()
1224                                    }
1225                                    CatalogItem::Sink(_) => new_sinks -= 1,
1226                                    CatalogItem::MaterializedView(_) => {
1227                                        new_materialized_views -= 1;
1228                                    }
1229                                    CatalogItem::Secret(_) => {
1230                                        new_secrets -= 1;
1231                                    }
1232                                    CatalogItem::ContinualTask(_) => {
1233                                        new_continual_tasks -= 1;
1234                                    }
1235                                    CatalogItem::Log(_)
1236                                    | CatalogItem::View(_)
1237                                    | CatalogItem::Index(_)
1238                                    | CatalogItem::Type(_)
1239                                    | CatalogItem::Func(_) => {}
1240                                }
1241                            }
1242                        }
1243                    }
1244                }
1245                Op::UpdateItem {
1246                    name: _,
1247                    id,
1248                    to_item,
1249                } => match to_item {
1250                    CatalogItem::Source(source) => {
1251                        let current_source = self
1252                            .catalog()
1253                            .get_entry(id)
1254                            .source()
1255                            .expect("source update is for source item");
1256
1257                        new_sources += source.user_controllable_persist_shard_count()
1258                            - current_source.user_controllable_persist_shard_count();
1259                    }
1260                    CatalogItem::Connection(_)
1261                    | CatalogItem::Table(_)
1262                    | CatalogItem::Sink(_)
1263                    | CatalogItem::MaterializedView(_)
1264                    | CatalogItem::Secret(_)
1265                    | CatalogItem::Log(_)
1266                    | CatalogItem::View(_)
1267                    | CatalogItem::Index(_)
1268                    | CatalogItem::Type(_)
1269                    | CatalogItem::Func(_)
1270                    | CatalogItem::ContinualTask(_) => {}
1271                },
1272                Op::AlterRole { .. }
1273                | Op::AlterRetainHistory { .. }
1274                | Op::AlterSourceTimestampInterval { .. }
1275                | Op::AlterNetworkPolicy { .. }
1276                | Op::AlterAddColumn { .. }
1277                | Op::AlterMaterializedViewApplyReplacement { .. }
1278                | Op::UpdatePrivilege { .. }
1279                | Op::UpdateDefaultPrivilege { .. }
1280                | Op::GrantRole { .. }
1281                | Op::RenameCluster { .. }
1282                | Op::RenameClusterReplica { .. }
1283                | Op::RenameItem { .. }
1284                | Op::RenameSchema { .. }
1285                | Op::UpdateOwner { .. }
1286                | Op::RevokeRole { .. }
1287                | Op::UpdateClusterConfig { .. }
1288                | Op::UpdateClusterReplicaConfig { .. }
1289                | Op::UpdateSourceReferences { .. }
1290                | Op::UpdateSystemConfiguration { .. }
1291                | Op::ResetSystemConfiguration { .. }
1292                | Op::ResetAllSystemConfiguration { .. }
1293                | Op::Comment { .. }
1294                | Op::WeirdStorageUsageUpdates { .. }
1295                | Op::InjectAuditEvents { .. } => {}
1296            }
1297        }
1298
1299        let mut current_aws_privatelink_connections = 0;
1300        let mut current_postgres_connections = 0;
1301        let mut current_mysql_connections = 0;
1302        let mut current_sql_server_connections = 0;
1303        let mut current_kafka_connections = 0;
1304        for c in self.catalog().user_connections() {
1305            let connection = c
1306                .connection()
1307                .expect("`user_connections()` only returns connection objects");
1308
1309            match connection.details {
1310                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1311                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1312                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1313                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1314                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1315                ConnectionDetails::Csr(_)
1316                | ConnectionDetails::Ssh { .. }
1317                | ConnectionDetails::Aws(_)
1318                | ConnectionDetails::IcebergCatalog(_) => {}
1319            }
1320        }
1321        self.validate_resource_limit(
1322            current_kafka_connections,
1323            new_kafka_connections,
1324            SystemVars::max_kafka_connections,
1325            "Kafka Connection",
1326            MAX_KAFKA_CONNECTIONS.name(),
1327        )?;
1328        self.validate_resource_limit(
1329            current_postgres_connections,
1330            new_postgres_connections,
1331            SystemVars::max_postgres_connections,
1332            "PostgreSQL Connection",
1333            MAX_POSTGRES_CONNECTIONS.name(),
1334        )?;
1335        self.validate_resource_limit(
1336            current_mysql_connections,
1337            new_mysql_connections,
1338            SystemVars::max_mysql_connections,
1339            "MySQL Connection",
1340            MAX_MYSQL_CONNECTIONS.name(),
1341        )?;
1342        self.validate_resource_limit(
1343            current_sql_server_connections,
1344            new_sql_server_connections,
1345            SystemVars::max_sql_server_connections,
1346            "SQL Server Connection",
1347            MAX_SQL_SERVER_CONNECTIONS.name(),
1348        )?;
1349        self.validate_resource_limit(
1350            current_aws_privatelink_connections,
1351            new_aws_privatelink_connections,
1352            SystemVars::max_aws_privatelink_connections,
1353            "AWS PrivateLink Connection",
1354            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1355        )?;
1356        self.validate_resource_limit(
1357            self.catalog().user_tables().count(),
1358            new_tables,
1359            SystemVars::max_tables,
1360            "table",
1361            MAX_TABLES.name(),
1362        )?;
1363
1364        let current_sources: usize = self
1365            .catalog()
1366            .user_sources()
1367            .filter_map(|source| source.source())
1368            .map(|source| source.user_controllable_persist_shard_count())
1369            .sum::<i64>()
1370            .try_into()
1371            .expect("non-negative sum of sources");
1372
1373        self.validate_resource_limit(
1374            current_sources,
1375            new_sources,
1376            SystemVars::max_sources,
1377            "source",
1378            MAX_SOURCES.name(),
1379        )?;
1380        self.validate_resource_limit(
1381            self.catalog().user_sinks().count(),
1382            new_sinks,
1383            SystemVars::max_sinks,
1384            "sink",
1385            MAX_SINKS.name(),
1386        )?;
1387        self.validate_resource_limit(
1388            self.catalog().user_materialized_views().count(),
1389            new_materialized_views,
1390            SystemVars::max_materialized_views,
1391            "materialized view",
1392            MAX_MATERIALIZED_VIEWS.name(),
1393        )?;
1394        self.validate_resource_limit(
1395            // Linked compute clusters don't count against the limit, since
1396            // we have a separate sources and sinks limit.
1397            //
1398            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1399            // and set a higher max cluster limit?
1400            self.catalog().user_clusters().count(),
1401            new_clusters,
1402            SystemVars::max_clusters,
1403            "cluster",
1404            MAX_CLUSTERS.name(),
1405        )?;
1406        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1407            // It's possible that the cluster hasn't been created yet.
1408            let current_amount = self
1409                .catalog()
1410                .try_get_cluster(cluster_id)
1411                .map(|instance| instance.user_replicas().count())
1412                .unwrap_or(0);
1413            self.validate_resource_limit(
1414                current_amount,
1415                new_replicas,
1416                SystemVars::max_replicas_per_cluster,
1417                "cluster replica",
1418                MAX_REPLICAS_PER_CLUSTER.name(),
1419            )?;
1420        }
1421        self.validate_resource_limit_numeric(
1422            self.current_credit_consumption_rate(),
1423            new_credit_consumption_rate,
1424            |system_vars| {
1425                self.license_key
1426                    .max_credit_consumption_rate()
1427                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1428            },
1429            "cluster replica",
1430            MAX_CREDIT_CONSUMPTION_RATE.name(),
1431        )?;
1432        self.validate_resource_limit(
1433            self.catalog().databases().count(),
1434            new_databases,
1435            SystemVars::max_databases,
1436            "database",
1437            MAX_DATABASES.name(),
1438        )?;
1439        for (database_id, new_schemas) in new_schemas_per_database {
1440            self.validate_resource_limit(
1441                self.catalog().get_database(database_id).schemas_by_id.len(),
1442                new_schemas,
1443                SystemVars::max_schemas_per_database,
1444                "schema",
1445                MAX_SCHEMAS_PER_DATABASE.name(),
1446            )?;
1447        }
1448        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1449            // For temporary schemas that don't exist yet (lazy creation),
1450            // treat them as having 0 items.
1451            let current_items = self
1452                .catalog()
1453                .try_get_schema(&database_spec, &schema_spec, conn_id)
1454                .map(|schema| schema.items.len())
1455                .unwrap_or(0);
1456            self.validate_resource_limit(
1457                current_items,
1458                new_objects,
1459                SystemVars::max_objects_per_schema,
1460                "object",
1461                MAX_OBJECTS_PER_SCHEMA.name(),
1462            )?;
1463        }
1464        self.validate_resource_limit(
1465            self.catalog().user_secrets().count(),
1466            new_secrets,
1467            SystemVars::max_secrets,
1468            "secret",
1469            MAX_SECRETS.name(),
1470        )?;
1471        self.validate_resource_limit(
1472            self.catalog().user_roles().count(),
1473            new_roles,
1474            SystemVars::max_roles,
1475            "role",
1476            MAX_ROLES.name(),
1477        )?;
1478        self.validate_resource_limit(
1479            self.catalog().user_continual_tasks().count(),
1480            new_continual_tasks,
1481            SystemVars::max_continual_tasks,
1482            "continual_task",
1483            MAX_CONTINUAL_TASKS.name(),
1484        )?;
1485        self.validate_resource_limit(
1486            self.catalog().user_network_policies().count(),
1487            new_network_policies,
1488            SystemVars::max_network_policies,
1489            "network_policy",
1490            MAX_NETWORK_POLICIES.name(),
1491        )?;
1492        Ok(())
1493    }
1494
1495    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1496    pub(crate) fn validate_resource_limit<F>(
1497        &self,
1498        current_amount: usize,
1499        new_instances: i64,
1500        resource_limit: F,
1501        resource_type: &str,
1502        limit_name: &str,
1503    ) -> Result<(), AdapterError>
1504    where
1505        F: Fn(&SystemVars) -> u32,
1506    {
1507        if new_instances <= 0 {
1508            return Ok(());
1509        }
1510
1511        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1512        let current_amount: Option<i64> = current_amount.try_into().ok();
1513        let desired =
1514            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1515
1516        let exceeds_limit = if let Some(desired) = desired {
1517            desired > limit
1518        } else {
1519            true
1520        };
1521
1522        let desired = desired
1523            .map(|desired| desired.to_string())
1524            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1525        let current = current_amount
1526            .map(|current| current.to_string())
1527            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1528        if exceeds_limit {
1529            Err(AdapterError::ResourceExhaustion {
1530                resource_type: resource_type.to_string(),
1531                limit_name: limit_name.to_string(),
1532                desired,
1533                limit: limit.to_string(),
1534                current,
1535            })
1536        } else {
1537            Ok(())
1538        }
1539    }
1540
1541    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1542    ///
1543    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1544    pub(crate) fn validate_resource_limit_numeric<F>(
1545        &self,
1546        current_amount: Numeric,
1547        new_amount: Numeric,
1548        resource_limit: F,
1549        resource_type: &str,
1550        limit_name: &str,
1551    ) -> Result<(), AdapterError>
1552    where
1553        F: Fn(&SystemVars) -> Numeric,
1554    {
1555        if new_amount <= Numeric::zero() {
1556            return Ok(());
1557        }
1558
1559        let limit = resource_limit(self.catalog().system_config());
1560        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1561        // semantics.
1562        // NaN should be impossible here since both values are positive.
1563        let desired = current_amount + new_amount;
1564        if desired > limit {
1565            Err(AdapterError::ResourceExhaustion {
1566                resource_type: resource_type.to_string(),
1567                limit_name: limit_name.to_string(),
1568                desired: desired.to_string(),
1569                limit: limit.to_string(),
1570                current: current_amount.to_string(),
1571            })
1572        } else {
1573            Ok(())
1574        }
1575    }
1576}