Skip to main content

mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40    MAX_CONTINUAL_TASKS, MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS,
41    MAX_MATERIALIZED_VIEWS, MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA,
42    MAX_POSTGRES_CONNECTIONS, MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE,
43    MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    state: txn_state,
229                    snapshot: txn_snapshot,
230                    side_effects: _,
231                },
232            ..
233        }) = ctx.session().transaction().inner()
234        else {
235            let result = self
236                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
237                .await;
238            self.metrics
239                .catalog_transact_seconds
240                .with_label_values(&["catalog_transact_with_ddl_transaction"])
241                .observe(start.elapsed().as_secs_f64());
242            return result;
243        };
244
245        // Make sure our Catalog hasn't changed since openning the transaction.
246        if self.catalog().transient_revision() != *txn_revision {
247            self.metrics
248                .catalog_transact_seconds
249                .with_label_values(&["catalog_transact_with_ddl_transaction"])
250                .observe(start.elapsed().as_secs_f64());
251            return Err(AdapterError::DDLTransactionRace);
252        }
253
254        // Clone what we need from the session before taking &mut below.
255        let txn_ops_clone = txn_ops.clone();
256        let txn_state_clone = txn_state.clone();
257        let prev_snapshot = txn_snapshot.clone();
258
259        // Validate resource limits with all accumulated + new ops (cheap O(N) counting).
260        let mut combined_ops = txn_ops_clone;
261        combined_ops.extend(ops.iter().cloned());
262        let conn_id = ctx.session().conn_id().clone();
263        self.validate_resource_limits(&combined_ops, &conn_id)?;
264
265        // Get oracle timestamp for audit log entries.
266        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
267
268        // Get ConnMeta for the session.
269        let conn = self.active_conns.get(ctx.session().conn_id());
270
271        // Incremental dry run: process only NEW ops against accumulated state.
272        // If we have a saved snapshot from a previous dry run, use it to
273        // initialize the transaction so it starts in sync with the accumulated
274        // state. Otherwise (first statement), the fresh durable transaction is
275        // already in sync with the real catalog state.
276        let (new_state, new_snapshot) = self
277            .catalog()
278            .transact_incremental_dry_run(
279                &txn_state_clone,
280                ops.clone(),
281                conn,
282                prev_snapshot,
283                oracle_write_ts,
284            )
285            .await?;
286
287        // Accumulate ops for eventual COMMIT.
288        let result = ctx
289            .session_mut()
290            .transaction_mut()
291            .add_ops(TransactionOps::DDL {
292                ops: combined_ops,
293                state: new_state,
294                side_effects: vec![Box::new(side_effect)],
295                revision: self.catalog().transient_revision(),
296                snapshot: Some(new_snapshot),
297            });
298
299        self.metrics
300            .catalog_transact_seconds
301            .with_label_values(&["catalog_transact_with_ddl_transaction"])
302            .observe(start.elapsed().as_secs_f64());
303
304        result
305    }
306
307    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
308    /// called after this function successfully returns on any built
309    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
310    #[instrument(name = "coord::catalog_transact_inner")]
311    pub(crate) async fn catalog_transact_inner(
312        &mut self,
313        conn_id: Option<&ConnectionId>,
314        ops: Vec<catalog::Op>,
315    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
316        if self.controller.read_only() {
317            return Err(AdapterError::ReadOnly);
318        }
319
320        event!(Level::TRACE, ops = format!("{:?}", ops));
321
322        let mut webhook_sources_to_restart = BTreeSet::new();
323        let mut clusters_to_drop = vec![];
324        let mut cluster_replicas_to_drop = vec![];
325        let mut clusters_to_create = vec![];
326        let mut cluster_replicas_to_create = vec![];
327        let mut update_metrics_config = false;
328        let mut update_tracing_config = false;
329        let mut update_controller_config = false;
330        let mut update_compute_config = false;
331        let mut update_storage_config = false;
332        let mut update_timestamp_oracle_config = false;
333        let mut update_metrics_retention = false;
334        let mut update_secrets_caching_config = false;
335        let mut update_cluster_scheduling_config = false;
336        let mut update_http_config = false;
337        let mut update_advance_timelines_interval = false;
338
339        for op in &ops {
340            match op {
341                catalog::Op::DropObjects(drop_object_infos) => {
342                    for drop_object_info in drop_object_infos {
343                        match &drop_object_info {
344                            catalog::DropObjectInfo::Item(_) => {
345                                // Nothing to do, these will be handled by
346                                // applying the side effects that we return.
347                            }
348                            catalog::DropObjectInfo::Cluster(id) => {
349                                clusters_to_drop.push(*id);
350                            }
351                            catalog::DropObjectInfo::ClusterReplica((
352                                cluster_id,
353                                replica_id,
354                                _reason,
355                            )) => {
356                                // Drop the cluster replica itself.
357                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
358                            }
359                            _ => (),
360                        }
361                    }
362                }
363                catalog::Op::ResetSystemConfiguration { name }
364                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
365                    update_metrics_config |= self
366                        .catalog
367                        .state()
368                        .system_config()
369                        .is_metrics_config_var(name);
370                    update_tracing_config |= vars::is_tracing_var(name);
371                    update_controller_config |= self
372                        .catalog
373                        .state()
374                        .system_config()
375                        .is_controller_config_var(name);
376                    update_compute_config |= self
377                        .catalog
378                        .state()
379                        .system_config()
380                        .is_compute_config_var(name);
381                    update_storage_config |= self
382                        .catalog
383                        .state()
384                        .system_config()
385                        .is_storage_config_var(name);
386                    update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
387                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
388                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
389                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
390                    update_http_config |= vars::is_http_config_var(name);
391                    update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
392                }
393                catalog::Op::ResetAllSystemConfiguration => {
394                    // Assume they all need to be updated.
395                    // We could see if the config's have actually changed, but
396                    // this is simpler.
397                    update_tracing_config = true;
398                    update_controller_config = true;
399                    update_compute_config = true;
400                    update_storage_config = true;
401                    update_timestamp_oracle_config = true;
402                    update_metrics_retention = true;
403                    update_secrets_caching_config = true;
404                    update_cluster_scheduling_config = true;
405                    update_http_config = true;
406                    update_advance_timelines_interval = true;
407                }
408                catalog::Op::RenameItem { id, .. } => {
409                    let item = self.catalog().get_entry(id);
410                    let is_webhook_source = item
411                        .source()
412                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
413                        .unwrap_or(false);
414                    if is_webhook_source {
415                        webhook_sources_to_restart.insert(*id);
416                    }
417                }
418                catalog::Op::RenameSchema {
419                    database_spec,
420                    schema_spec,
421                    ..
422                } => {
423                    let schema = self.catalog().get_schema(
424                        database_spec,
425                        schema_spec,
426                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
427                    );
428                    let webhook_sources = schema.item_ids().filter(|id| {
429                        let item = self.catalog().get_entry(id);
430                        item.source()
431                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
432                            .unwrap_or(false)
433                    });
434                    webhook_sources_to_restart.extend(webhook_sources);
435                }
436                catalog::Op::CreateCluster { id, .. } => {
437                    clusters_to_create.push(*id);
438                }
439                catalog::Op::CreateClusterReplica {
440                    cluster_id,
441                    name,
442                    config,
443                    ..
444                } => {
445                    cluster_replicas_to_create.push((
446                        *cluster_id,
447                        name.clone(),
448                        config.location.num_processes(),
449                    ));
450                }
451                _ => (),
452            }
453        }
454
455        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
456
457        // This will produce timestamps that are guaranteed to increase on each
458        // call, and also never be behind the system clock. If the system clock
459        // hasn't advanced (or has gone backward), it will increment by 1. For
460        // the audit log, we need to balance "close (within 10s or so) to the
461        // system clock" and "always goes up". We've chosen here to prioritize
462        // always going up, and believe we will always be close to the system
463        // clock because it is well configured (chrony) and so may only rarely
464        // regress or pause for 10s.
465        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
466
467        let Coordinator {
468            catalog,
469            active_conns,
470            controller,
471            cluster_replica_statuses,
472            ..
473        } = self;
474        let catalog = Arc::make_mut(catalog);
475        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
476
477        let TransactionResult {
478            builtin_table_updates,
479            catalog_updates,
480            audit_events,
481        } = catalog
482            .transact(
483                Some(&mut controller.storage_collections),
484                oracle_write_ts,
485                conn,
486                ops,
487            )
488            .await?;
489
490        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
491            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
492        }
493        for cluster_id in &clusters_to_drop {
494            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
495        }
496        for cluster_id in clusters_to_create {
497            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
498        }
499        let now = to_datetime((catalog.config().now)());
500        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
501            let replica_id = catalog
502                .resolve_replica_in_cluster(&cluster_id, &replica_name)
503                .expect("just created")
504                .replica_id();
505            cluster_replica_statuses.initialize_cluster_replica_statuses(
506                cluster_id,
507                replica_id,
508                num_processes,
509                now,
510            );
511        }
512
513        // Append our builtin table updates, then return the notify so we can run other tasks in
514        // parallel.
515        let (builtin_update_notify, _) = self
516            .builtin_table_update()
517            .execute(builtin_table_updates)
518            .await;
519
520        // No error returns are allowed after this point. Enforce this at compile time
521        // by using this odd structure so we don't accidentally add a stray `?`.
522        let _: () = async {
523            if !webhook_sources_to_restart.is_empty() {
524                self.restart_webhook_sources(webhook_sources_to_restart);
525            }
526
527            if update_metrics_config {
528                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
529            }
530            if update_controller_config {
531                self.update_controller_config();
532            }
533            if update_compute_config {
534                self.update_compute_config();
535            }
536            if update_storage_config {
537                self.update_storage_config();
538            }
539            if update_timestamp_oracle_config {
540                self.update_timestamp_oracle_config();
541            }
542            if update_metrics_retention {
543                self.update_metrics_retention();
544            }
545            if update_tracing_config {
546                self.update_tracing_config();
547            }
548            if update_secrets_caching_config {
549                self.update_secrets_caching_config();
550            }
551            if update_cluster_scheduling_config {
552                self.update_cluster_scheduling_config();
553            }
554            if update_http_config {
555                self.update_http_config();
556            }
557            if update_advance_timelines_interval {
558                let new_interval = self.catalog().system_config().default_timestamp_interval();
559                if new_interval != self.advance_timelines_interval.period() {
560                    self.advance_timelines_interval = tokio::time::interval(new_interval);
561                }
562            }
563        }
564        .instrument(info_span!("coord::catalog_transact_with::finalize"))
565        .await;
566
567        let conn = conn_id.and_then(|id| self.active_conns.get(id));
568        if let Some(segment_client) = &self.segment_client {
569            for VersionedEvent::V1(event) in audit_events {
570                let event_type = format!(
571                    "{} {}",
572                    event.object_type.as_title_case(),
573                    event.event_type.as_title_case()
574                );
575                segment_client.environment_track(
576                    &self.catalog().config().environment_id,
577                    event_type,
578                    json!({ "details": event.details.as_json() }),
579                    EventDetails {
580                        user_id: conn
581                            .and_then(|c| c.user().external_metadata.as_ref())
582                            .map(|m| m.user_id),
583                        application_name: conn.map(|c| c.application_name()),
584                        ..Default::default()
585                    },
586                );
587            }
588        }
589
590        Ok((builtin_update_notify, catalog_updates))
591    }
592
593    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
594        self.drop_introspection_subscribes(replica_id);
595
596        self.controller
597            .drop_replica(cluster_id, replica_id)
598            .expect("dropping replica must not fail");
599    }
600
601    /// A convenience method for dropping sources.
602    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
603        for (item_id, _gid) in &sources {
604            self.active_webhooks.remove(item_id);
605        }
606        let storage_metadata = self.catalog.state().storage_metadata();
607        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
608        self.controller
609            .storage
610            .drop_sources(storage_metadata, source_gids)
611            .unwrap_or_terminate("cannot fail to drop sources");
612    }
613
614    /// A convenience method for dropping tables.
615    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
616        for (item_id, _gid) in &tables {
617            self.active_webhooks.remove(item_id);
618        }
619
620        let storage_metadata = self.catalog.state().storage_metadata();
621        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
622        self.controller
623            .storage
624            .drop_tables(storage_metadata, table_gids, ts)
625            .unwrap_or_terminate("cannot fail to drop tables");
626    }
627
628    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
629        for id in sources {
630            self.active_webhooks.remove(&id);
631        }
632    }
633
634    /// Like `drop_compute_sinks`, but for a single compute sink.
635    ///
636    /// Returns the controller's state for the compute sink if the identified
637    /// sink was known to the controller. It is the caller's responsibility to
638    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
639    #[must_use]
640    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
641        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
642    }
643
644    /// Drops a batch of compute sinks.
645    ///
646    /// For each sink that exists, the coordinator and controller's state
647    /// associated with the sink is removed.
648    ///
649    /// Returns a map containing the controller's state for each sink that was
650    /// removed. It is the caller's responsibility to retire the returned sinks.
651    /// Consider using `retire_compute_sinks` instead.
652    #[must_use]
653    pub async fn drop_compute_sinks(
654        &mut self,
655        sink_ids: impl IntoIterator<Item = GlobalId>,
656    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
657        let mut by_id = BTreeMap::new();
658        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
659        for sink_id in sink_ids {
660            let sink = match self.remove_active_compute_sink(sink_id).await {
661                None => {
662                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
663                    continue;
664                }
665                Some(sink) => sink,
666            };
667
668            by_cluster
669                .entry(sink.cluster_id())
670                .or_default()
671                .push(sink_id);
672            by_id.insert(sink_id, sink);
673        }
674        for (cluster_id, ids) in by_cluster {
675            let compute = &mut self.controller.compute;
676            // A cluster could have been dropped, so verify it exists.
677            if compute.instance_exists(cluster_id) {
678                compute
679                    .drop_collections(cluster_id, ids)
680                    .unwrap_or_terminate("cannot fail to drop collections");
681            }
682        }
683        by_id
684    }
685
686    /// Retires a batch of sinks with disparate reasons for retirement.
687    ///
688    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
689    /// then retired with its corresponding reason.
690    pub async fn retire_compute_sinks(
691        &mut self,
692        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
693    ) {
694        let sink_ids = reasons.keys().cloned();
695        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
696            let reason = reasons
697                .remove(&id)
698                .expect("all returned IDs are in `reasons`");
699            sink.retire(reason);
700        }
701    }
702
703    /// Drops all pending replicas for a set of clusters
704    /// that are undergoing reconfiguration.
705    pub async fn drop_reconfiguration_replicas(
706        &mut self,
707        cluster_ids: BTreeSet<ClusterId>,
708    ) -> Result<(), AdapterError> {
709        let pending_cluster_ops: Vec<Op> = cluster_ids
710            .iter()
711            .map(|c| {
712                self.catalog()
713                    .get_cluster(c.clone())
714                    .replicas()
715                    .filter_map(|r| match r.config.location {
716                        ReplicaLocation::Managed(ref l) if l.pending => {
717                            Some(DropObjectInfo::ClusterReplica((
718                                c.clone(),
719                                r.replica_id,
720                                ReplicaCreateDropReason::Manual,
721                            )))
722                        }
723                        _ => None,
724                    })
725                    .collect::<Vec<DropObjectInfo>>()
726            })
727            .filter_map(|pending_replica_drop_ops_by_cluster| {
728                match pending_replica_drop_ops_by_cluster.len() {
729                    0 => None,
730                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
731                }
732            })
733            .collect();
734        if !pending_cluster_ops.is_empty() {
735            self.catalog_transact(None, pending_cluster_ops).await?;
736        }
737        Ok(())
738    }
739
740    /// Cancels all active compute sinks for the identified connection.
741    #[mz_ore::instrument(level = "debug")]
742    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
743        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
744            .await
745    }
746
747    /// Cancels all active cluster reconfigurations sinks for the identified connection.
748    #[mz_ore::instrument(level = "debug")]
749    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
750        &mut self,
751        conn_id: &ConnectionId,
752    ) {
753        self.retire_cluster_reconfigurations_for_conn(conn_id).await
754    }
755
756    /// Retires all active compute sinks for the identified connection with the
757    /// specified reason.
758    #[mz_ore::instrument(level = "debug")]
759    pub(crate) async fn retire_compute_sinks_for_conn(
760        &mut self,
761        conn_id: &ConnectionId,
762        reason: ActiveComputeSinkRetireReason,
763    ) {
764        let drop_sinks = self
765            .active_conns
766            .get_mut(conn_id)
767            .expect("must exist for active session")
768            .drop_sinks
769            .iter()
770            .map(|sink_id| (*sink_id, reason.clone()))
771            .collect();
772        self.retire_compute_sinks(drop_sinks).await;
773    }
774
775    /// Cleans pending cluster reconfiguraiotns for the identified connection
776    #[mz_ore::instrument(level = "debug")]
777    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
778        &mut self,
779        conn_id: &ConnectionId,
780    ) {
781        let reconfiguring_clusters = self
782            .active_conns
783            .get(conn_id)
784            .expect("must exist for active session")
785            .pending_cluster_alters
786            .clone();
787        // try to drop reconfig replicas
788        self.drop_reconfiguration_replicas(reconfiguring_clusters)
789            .await
790            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
791
792        self.active_conns
793            .get_mut(conn_id)
794            .expect("must exist for active session")
795            .pending_cluster_alters
796            .clear();
797    }
798
799    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
800        let storage_metadata = self.catalog.state().storage_metadata();
801        self.controller
802            .storage
803            .drop_sinks(storage_metadata, sink_gids)
804            .unwrap_or_terminate("cannot fail to drop sinks");
805    }
806
807    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
808        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
809        for (cluster_id, gid) in collections {
810            by_cluster.entry(cluster_id).or_default().push(gid);
811        }
812        for (cluster_id, gids) in by_cluster {
813            let compute = &mut self.controller.compute;
814            // A cluster could have been dropped, so verify it exists.
815            if compute.instance_exists(cluster_id) {
816                compute
817                    .drop_collections(cluster_id, gids)
818                    .unwrap_or_terminate("cannot fail to drop collections");
819            }
820        }
821    }
822
823    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
824        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
825            .as_ref()
826            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
827            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
828        // We don't want to block the coordinator on an external delete api
829        // calls, so move the drop vpc_endpoint to a separate task. This does
830        // mean that a failed drop won't bubble up to the user as an error
831        // message. However, even if it did (and how the code previously
832        // worked), mz has already dropped it from our catalog, and so we
833        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
834        // eventually be cleaned during restart via coord bootstrap.
835        task::spawn(
836            || "drop_vpc_endpoints",
837            async move {
838                for vpc_endpoint in vpc_endpoints {
839                    let _ = Retry::default()
840                        .max_duration(Duration::from_secs(60))
841                        .retry_async(|_state| async {
842                            fail_point!("drop_vpc_endpoint", |r| {
843                                Err(anyhow::anyhow!("Fail point error {:?}", r))
844                            });
845                            match cloud_resource_controller
846                                .delete_vpc_endpoint(vpc_endpoint)
847                                .await
848                            {
849                                Ok(_) => Ok(()),
850                                Err(e) => {
851                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
852                                    Err(e)
853                                }
854                            }
855                        })
856                        .await;
857                }
858            }
859            .instrument(info_span!(
860                "coord::catalog_transact_inner::drop_vpc_endpoints"
861            )),
862        );
863    }
864
865    /// Removes all temporary items created by the specified connection, though
866    /// not the temporary schema itself.
867    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
868        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
869        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
870
871        if all_items.is_empty() {
872            return;
873        }
874        let op = Op::DropObjects(
875            all_items
876                .into_iter()
877                .map(DropObjectInfo::manual_drop_from_object_id)
878                .collect(),
879        );
880
881        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
882            .await
883            .expect("unable to drop temporary items for conn_id");
884    }
885
886    fn update_cluster_scheduling_config(&self) {
887        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
888        self.controller
889            .update_orchestrator_scheduling_config(config);
890    }
891
892    fn update_secrets_caching_config(&self) {
893        let config = flags::caching_config(self.catalog.system_config());
894        self.caching_secrets_reader.set_policy(config);
895    }
896
897    fn update_tracing_config(&self) {
898        let tracing = flags::tracing_config(self.catalog().system_config());
899        tracing.apply(&self.tracing_handle);
900    }
901
902    fn update_compute_config(&mut self) {
903        let config_params = flags::compute_config(self.catalog().system_config());
904        self.controller.compute.update_configuration(config_params);
905    }
906
907    fn update_storage_config(&mut self) {
908        let config_params = flags::storage_config(self.catalog().system_config());
909        self.controller.storage.update_parameters(config_params);
910    }
911
912    fn update_timestamp_oracle_config(&self) {
913        let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
914        if let Some(config) = self.timestamp_oracle_config.as_ref() {
915            config.apply_parameters(config_params)
916        }
917    }
918
919    fn update_metrics_retention(&self) {
920        let duration = self.catalog().system_config().metrics_retention();
921        let policy = ReadPolicy::lag_writes_by(
922            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
923                tracing::error!("Absurd metrics retention duration: {duration:?}.");
924                u64::MAX
925            })),
926            SINCE_GRANULARITY,
927        );
928        let storage_policies = self
929            .catalog()
930            .entries()
931            .filter(|entry| {
932                entry.item().is_retained_metrics_object()
933                    && entry.item().is_compute_object_on_cluster().is_none()
934            })
935            .map(|entry| (entry.id(), policy.clone()))
936            .collect::<Vec<_>>();
937        let compute_policies = self
938            .catalog()
939            .entries()
940            .filter_map(|entry| {
941                if let (true, Some(cluster_id)) = (
942                    entry.item().is_retained_metrics_object(),
943                    entry.item().is_compute_object_on_cluster(),
944                ) {
945                    Some((cluster_id, entry.id(), policy.clone()))
946                } else {
947                    None
948                }
949            })
950            .collect::<Vec<_>>();
951        self.update_storage_read_policies(storage_policies);
952        self.update_compute_read_policies(compute_policies);
953    }
954
955    fn update_controller_config(&mut self) {
956        let sys_config = self.catalog().system_config();
957        self.controller
958            .update_configuration(sys_config.dyncfg_updates());
959    }
960
961    fn update_http_config(&mut self) {
962        let webhook_request_limit = self
963            .catalog()
964            .system_config()
965            .webhook_concurrent_request_limit();
966        self.webhook_concurrency_limit
967            .set_limit(webhook_request_limit);
968    }
969
970    pub(crate) async fn create_storage_export(
971        &mut self,
972        id: GlobalId,
973        sink: &Sink,
974    ) -> Result<(), AdapterError> {
975        // Validate `sink.from` is in fact a storage collection
976        self.controller.storage.check_exists(sink.from)?;
977
978        // The AsOf is used to determine at what time to snapshot reading from
979        // the persist collection.  This is primarily relevant when we do _not_
980        // want to include the snapshot in the sink.
981        //
982        // We choose the smallest as_of that is legal, according to the sinked
983        // collection's since.
984        let id_bundle = crate::CollectionIdBundle {
985            storage_ids: btreeset! {sink.from},
986            compute_ids: btreemap! {},
987        };
988
989        // We're putting in place read holds, such that create_exports, below,
990        // which calls update_read_capabilities, can successfully do so.
991        // Otherwise, the since of dependencies might move along concurrently,
992        // pulling the rug from under us!
993        //
994        // TODO: Maybe in the future, pass those holds on to storage, to hold on
995        // to them and downgrade when possible?
996        let read_holds = self.acquire_read_holds(&id_bundle);
997        let as_of = read_holds.least_valid_read();
998
999        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1000        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1001            from: sink.from,
1002            from_desc: storage_sink_from_entry
1003                .relation_desc()
1004                .expect("sinks can only be built on items with descs")
1005                .into_owned(),
1006            connection: sink
1007                .connection
1008                .clone()
1009                .into_inline_connection(self.catalog().state()),
1010            envelope: sink.envelope,
1011            as_of,
1012            with_snapshot: sink.with_snapshot,
1013            version: sink.version,
1014            from_storage_metadata: (),
1015            to_storage_metadata: (),
1016            commit_interval: sink.commit_interval,
1017        };
1018
1019        let collection_desc = CollectionDescription {
1020            // TODO(sinks): make generic once we have more than one sink type.
1021            desc: KAFKA_PROGRESS_DESC.clone(),
1022            data_source: DataSource::Sink {
1023                desc: ExportDescription {
1024                    sink: storage_sink_desc,
1025                    instance_id: sink.cluster_id,
1026                },
1027            },
1028            since: None,
1029            timeline: None,
1030            primary: None,
1031        };
1032        let collections = vec![(id, collection_desc)];
1033
1034        // Create the collections.
1035        let storage_metadata = self.catalog.state().storage_metadata();
1036        let res = self
1037            .controller
1038            .storage
1039            .create_collections(storage_metadata, None, collections)
1040            .await;
1041
1042        // Drop read holds after the export has been created, at which point
1043        // storage will have put in its own read holds.
1044        drop(read_holds);
1045
1046        Ok(res?)
1047    }
1048
1049    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1050    /// exceeded.
1051    fn validate_resource_limits(
1052        &self,
1053        ops: &Vec<catalog::Op>,
1054        conn_id: &ConnectionId,
1055    ) -> Result<(), AdapterError> {
1056        let mut new_kafka_connections = 0;
1057        let mut new_postgres_connections = 0;
1058        let mut new_mysql_connections = 0;
1059        let mut new_sql_server_connections = 0;
1060        let mut new_aws_privatelink_connections = 0;
1061        let mut new_tables = 0;
1062        let mut new_sources = 0;
1063        let mut new_sinks = 0;
1064        let mut new_materialized_views = 0;
1065        let mut new_clusters = 0;
1066        let mut new_replicas_per_cluster = BTreeMap::new();
1067        let mut new_credit_consumption_rate = Numeric::zero();
1068        let mut new_databases = 0;
1069        let mut new_schemas_per_database = BTreeMap::new();
1070        let mut new_objects_per_schema = BTreeMap::new();
1071        let mut new_secrets = 0;
1072        let mut new_roles = 0;
1073        let mut new_continual_tasks = 0;
1074        let mut new_network_policies = 0;
1075        for op in ops {
1076            match op {
1077                Op::CreateDatabase { .. } => {
1078                    new_databases += 1;
1079                }
1080                Op::CreateSchema { database_id, .. } => {
1081                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1082                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1083                    }
1084                }
1085                Op::CreateRole { .. } => {
1086                    new_roles += 1;
1087                }
1088                Op::CreateNetworkPolicy { .. } => {
1089                    new_network_policies += 1;
1090                }
1091                Op::CreateCluster { .. } => {
1092                    // TODO(benesch): having deprecated linked clusters, remove
1093                    // the `max_sources` and `max_sinks` limit, and set a higher
1094                    // max cluster limit?
1095                    new_clusters += 1;
1096                }
1097                Op::CreateClusterReplica {
1098                    cluster_id, config, ..
1099                } => {
1100                    if cluster_id.is_user() {
1101                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1102                        if let ReplicaLocation::Managed(location) = &config.location {
1103                            let replica_allocation = self
1104                                .catalog()
1105                                .cluster_replica_sizes()
1106                                .0
1107                                .get(location.size_for_billing())
1108                                .expect(
1109                                    "location size is validated against the cluster replica sizes",
1110                                );
1111                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1112                        }
1113                    }
1114                }
1115                Op::CreateItem { name, item, .. } => {
1116                    *new_objects_per_schema
1117                        .entry((
1118                            name.qualifiers.database_spec.clone(),
1119                            name.qualifiers.schema_spec.clone(),
1120                        ))
1121                        .or_insert(0) += 1;
1122                    match item {
1123                        CatalogItem::Connection(connection) => match connection.details {
1124                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1125                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1126                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1127                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1128                            ConnectionDetails::AwsPrivatelink(_) => {
1129                                new_aws_privatelink_connections += 1
1130                            }
1131                            ConnectionDetails::Csr(_)
1132                            | ConnectionDetails::Ssh { .. }
1133                            | ConnectionDetails::Aws(_)
1134                            | ConnectionDetails::IcebergCatalog(_) => {}
1135                        },
1136                        CatalogItem::Table(_) => {
1137                            new_tables += 1;
1138                        }
1139                        CatalogItem::Source(source) => {
1140                            new_sources += source.user_controllable_persist_shard_count()
1141                        }
1142                        CatalogItem::Sink(_) => new_sinks += 1,
1143                        CatalogItem::MaterializedView(_) => {
1144                            new_materialized_views += 1;
1145                        }
1146                        CatalogItem::Secret(_) => {
1147                            new_secrets += 1;
1148                        }
1149                        CatalogItem::ContinualTask(_) => {
1150                            new_continual_tasks += 1;
1151                        }
1152                        CatalogItem::Log(_)
1153                        | CatalogItem::View(_)
1154                        | CatalogItem::Index(_)
1155                        | CatalogItem::Type(_)
1156                        | CatalogItem::Func(_) => {}
1157                    }
1158                }
1159                Op::DropObjects(drop_object_infos) => {
1160                    for drop_object_info in drop_object_infos {
1161                        match drop_object_info {
1162                            DropObjectInfo::Cluster(_) => {
1163                                new_clusters -= 1;
1164                            }
1165                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1166                                if cluster_id.is_user() {
1167                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1168                                    let cluster = self
1169                                        .catalog()
1170                                        .get_cluster_replica(*cluster_id, *replica_id);
1171                                    if let ReplicaLocation::Managed(location) =
1172                                        &cluster.config.location
1173                                    {
1174                                        let replica_allocation = self
1175                                            .catalog()
1176                                            .cluster_replica_sizes()
1177                                            .0
1178                                            .get(location.size_for_billing())
1179                                            .expect(
1180                                                "location size is validated against the cluster replica sizes",
1181                                            );
1182                                        new_credit_consumption_rate -=
1183                                            replica_allocation.credits_per_hour
1184                                    }
1185                                }
1186                            }
1187                            DropObjectInfo::Database(_) => {
1188                                new_databases -= 1;
1189                            }
1190                            DropObjectInfo::Schema((database_spec, _)) => {
1191                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1192                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1193                                }
1194                            }
1195                            DropObjectInfo::Role(_) => {
1196                                new_roles -= 1;
1197                            }
1198                            DropObjectInfo::NetworkPolicy(_) => {
1199                                new_network_policies -= 1;
1200                            }
1201                            DropObjectInfo::Item(id) => {
1202                                let entry = self.catalog().get_entry(id);
1203                                *new_objects_per_schema
1204                                    .entry((
1205                                        entry.name().qualifiers.database_spec.clone(),
1206                                        entry.name().qualifiers.schema_spec.clone(),
1207                                    ))
1208                                    .or_insert(0) -= 1;
1209                                match entry.item() {
1210                                    CatalogItem::Connection(connection) => match connection.details
1211                                    {
1212                                        ConnectionDetails::AwsPrivatelink(_) => {
1213                                            new_aws_privatelink_connections -= 1;
1214                                        }
1215                                        _ => (),
1216                                    },
1217                                    CatalogItem::Table(_) => {
1218                                        new_tables -= 1;
1219                                    }
1220                                    CatalogItem::Source(source) => {
1221                                        new_sources -=
1222                                            source.user_controllable_persist_shard_count()
1223                                    }
1224                                    CatalogItem::Sink(_) => new_sinks -= 1,
1225                                    CatalogItem::MaterializedView(_) => {
1226                                        new_materialized_views -= 1;
1227                                    }
1228                                    CatalogItem::Secret(_) => {
1229                                        new_secrets -= 1;
1230                                    }
1231                                    CatalogItem::ContinualTask(_) => {
1232                                        new_continual_tasks -= 1;
1233                                    }
1234                                    CatalogItem::Log(_)
1235                                    | CatalogItem::View(_)
1236                                    | CatalogItem::Index(_)
1237                                    | CatalogItem::Type(_)
1238                                    | CatalogItem::Func(_) => {}
1239                                }
1240                            }
1241                        }
1242                    }
1243                }
1244                Op::UpdateItem {
1245                    name: _,
1246                    id,
1247                    to_item,
1248                } => match to_item {
1249                    CatalogItem::Source(source) => {
1250                        let current_source = self
1251                            .catalog()
1252                            .get_entry(id)
1253                            .source()
1254                            .expect("source update is for source item");
1255
1256                        new_sources += source.user_controllable_persist_shard_count()
1257                            - current_source.user_controllable_persist_shard_count();
1258                    }
1259                    CatalogItem::Connection(_)
1260                    | CatalogItem::Table(_)
1261                    | CatalogItem::Sink(_)
1262                    | CatalogItem::MaterializedView(_)
1263                    | CatalogItem::Secret(_)
1264                    | CatalogItem::Log(_)
1265                    | CatalogItem::View(_)
1266                    | CatalogItem::Index(_)
1267                    | CatalogItem::Type(_)
1268                    | CatalogItem::Func(_)
1269                    | CatalogItem::ContinualTask(_) => {}
1270                },
1271                Op::AlterRole { .. }
1272                | Op::AlterRetainHistory { .. }
1273                | Op::AlterSourceTimestampInterval { .. }
1274                | Op::AlterNetworkPolicy { .. }
1275                | Op::AlterAddColumn { .. }
1276                | Op::AlterMaterializedViewApplyReplacement { .. }
1277                | Op::UpdatePrivilege { .. }
1278                | Op::UpdateDefaultPrivilege { .. }
1279                | Op::GrantRole { .. }
1280                | Op::RenameCluster { .. }
1281                | Op::RenameClusterReplica { .. }
1282                | Op::RenameItem { .. }
1283                | Op::RenameSchema { .. }
1284                | Op::UpdateOwner { .. }
1285                | Op::RevokeRole { .. }
1286                | Op::UpdateClusterConfig { .. }
1287                | Op::UpdateClusterReplicaConfig { .. }
1288                | Op::UpdateSourceReferences { .. }
1289                | Op::UpdateSystemConfiguration { .. }
1290                | Op::ResetSystemConfiguration { .. }
1291                | Op::ResetAllSystemConfiguration { .. }
1292                | Op::Comment { .. }
1293                | Op::WeirdStorageUsageUpdates { .. } => {}
1294            }
1295        }
1296
1297        let mut current_aws_privatelink_connections = 0;
1298        let mut current_postgres_connections = 0;
1299        let mut current_mysql_connections = 0;
1300        let mut current_sql_server_connections = 0;
1301        let mut current_kafka_connections = 0;
1302        for c in self.catalog().user_connections() {
1303            let connection = c
1304                .connection()
1305                .expect("`user_connections()` only returns connection objects");
1306
1307            match connection.details {
1308                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1309                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1310                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1311                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1312                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1313                ConnectionDetails::Csr(_)
1314                | ConnectionDetails::Ssh { .. }
1315                | ConnectionDetails::Aws(_)
1316                | ConnectionDetails::IcebergCatalog(_) => {}
1317            }
1318        }
1319        self.validate_resource_limit(
1320            current_kafka_connections,
1321            new_kafka_connections,
1322            SystemVars::max_kafka_connections,
1323            "Kafka Connection",
1324            MAX_KAFKA_CONNECTIONS.name(),
1325        )?;
1326        self.validate_resource_limit(
1327            current_postgres_connections,
1328            new_postgres_connections,
1329            SystemVars::max_postgres_connections,
1330            "PostgreSQL Connection",
1331            MAX_POSTGRES_CONNECTIONS.name(),
1332        )?;
1333        self.validate_resource_limit(
1334            current_mysql_connections,
1335            new_mysql_connections,
1336            SystemVars::max_mysql_connections,
1337            "MySQL Connection",
1338            MAX_MYSQL_CONNECTIONS.name(),
1339        )?;
1340        self.validate_resource_limit(
1341            current_sql_server_connections,
1342            new_sql_server_connections,
1343            SystemVars::max_sql_server_connections,
1344            "SQL Server Connection",
1345            MAX_SQL_SERVER_CONNECTIONS.name(),
1346        )?;
1347        self.validate_resource_limit(
1348            current_aws_privatelink_connections,
1349            new_aws_privatelink_connections,
1350            SystemVars::max_aws_privatelink_connections,
1351            "AWS PrivateLink Connection",
1352            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1353        )?;
1354        self.validate_resource_limit(
1355            self.catalog().user_tables().count(),
1356            new_tables,
1357            SystemVars::max_tables,
1358            "table",
1359            MAX_TABLES.name(),
1360        )?;
1361
1362        let current_sources: usize = self
1363            .catalog()
1364            .user_sources()
1365            .filter_map(|source| source.source())
1366            .map(|source| source.user_controllable_persist_shard_count())
1367            .sum::<i64>()
1368            .try_into()
1369            .expect("non-negative sum of sources");
1370
1371        self.validate_resource_limit(
1372            current_sources,
1373            new_sources,
1374            SystemVars::max_sources,
1375            "source",
1376            MAX_SOURCES.name(),
1377        )?;
1378        self.validate_resource_limit(
1379            self.catalog().user_sinks().count(),
1380            new_sinks,
1381            SystemVars::max_sinks,
1382            "sink",
1383            MAX_SINKS.name(),
1384        )?;
1385        self.validate_resource_limit(
1386            self.catalog().user_materialized_views().count(),
1387            new_materialized_views,
1388            SystemVars::max_materialized_views,
1389            "materialized view",
1390            MAX_MATERIALIZED_VIEWS.name(),
1391        )?;
1392        self.validate_resource_limit(
1393            // Linked compute clusters don't count against the limit, since
1394            // we have a separate sources and sinks limit.
1395            //
1396            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1397            // and set a higher max cluster limit?
1398            self.catalog().user_clusters().count(),
1399            new_clusters,
1400            SystemVars::max_clusters,
1401            "cluster",
1402            MAX_CLUSTERS.name(),
1403        )?;
1404        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1405            // It's possible that the cluster hasn't been created yet.
1406            let current_amount = self
1407                .catalog()
1408                .try_get_cluster(cluster_id)
1409                .map(|instance| instance.user_replicas().count())
1410                .unwrap_or(0);
1411            self.validate_resource_limit(
1412                current_amount,
1413                new_replicas,
1414                SystemVars::max_replicas_per_cluster,
1415                "cluster replica",
1416                MAX_REPLICAS_PER_CLUSTER.name(),
1417            )?;
1418        }
1419        self.validate_resource_limit_numeric(
1420            self.current_credit_consumption_rate(),
1421            new_credit_consumption_rate,
1422            |system_vars| {
1423                self.license_key
1424                    .max_credit_consumption_rate()
1425                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1426            },
1427            "cluster replica",
1428            MAX_CREDIT_CONSUMPTION_RATE.name(),
1429        )?;
1430        self.validate_resource_limit(
1431            self.catalog().databases().count(),
1432            new_databases,
1433            SystemVars::max_databases,
1434            "database",
1435            MAX_DATABASES.name(),
1436        )?;
1437        for (database_id, new_schemas) in new_schemas_per_database {
1438            self.validate_resource_limit(
1439                self.catalog().get_database(database_id).schemas_by_id.len(),
1440                new_schemas,
1441                SystemVars::max_schemas_per_database,
1442                "schema",
1443                MAX_SCHEMAS_PER_DATABASE.name(),
1444            )?;
1445        }
1446        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1447            // For temporary schemas that don't exist yet (lazy creation),
1448            // treat them as having 0 items.
1449            let current_items = self
1450                .catalog()
1451                .try_get_schema(&database_spec, &schema_spec, conn_id)
1452                .map(|schema| schema.items.len())
1453                .unwrap_or(0);
1454            self.validate_resource_limit(
1455                current_items,
1456                new_objects,
1457                SystemVars::max_objects_per_schema,
1458                "object",
1459                MAX_OBJECTS_PER_SCHEMA.name(),
1460            )?;
1461        }
1462        self.validate_resource_limit(
1463            self.catalog().user_secrets().count(),
1464            new_secrets,
1465            SystemVars::max_secrets,
1466            "secret",
1467            MAX_SECRETS.name(),
1468        )?;
1469        self.validate_resource_limit(
1470            self.catalog().user_roles().count(),
1471            new_roles,
1472            SystemVars::max_roles,
1473            "role",
1474            MAX_ROLES.name(),
1475        )?;
1476        self.validate_resource_limit(
1477            self.catalog().user_continual_tasks().count(),
1478            new_continual_tasks,
1479            SystemVars::max_continual_tasks,
1480            "continual_task",
1481            MAX_CONTINUAL_TASKS.name(),
1482        )?;
1483        self.validate_resource_limit(
1484            self.catalog().user_network_policies().count(),
1485            new_network_policies,
1486            SystemVars::max_network_policies,
1487            "network_policy",
1488            MAX_NETWORK_POLICIES.name(),
1489        )?;
1490        Ok(())
1491    }
1492
1493    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1494    pub(crate) fn validate_resource_limit<F>(
1495        &self,
1496        current_amount: usize,
1497        new_instances: i64,
1498        resource_limit: F,
1499        resource_type: &str,
1500        limit_name: &str,
1501    ) -> Result<(), AdapterError>
1502    where
1503        F: Fn(&SystemVars) -> u32,
1504    {
1505        if new_instances <= 0 {
1506            return Ok(());
1507        }
1508
1509        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1510        let current_amount: Option<i64> = current_amount.try_into().ok();
1511        let desired =
1512            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1513
1514        let exceeds_limit = if let Some(desired) = desired {
1515            desired > limit
1516        } else {
1517            true
1518        };
1519
1520        let desired = desired
1521            .map(|desired| desired.to_string())
1522            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1523        let current = current_amount
1524            .map(|current| current.to_string())
1525            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1526        if exceeds_limit {
1527            Err(AdapterError::ResourceExhaustion {
1528                resource_type: resource_type.to_string(),
1529                limit_name: limit_name.to_string(),
1530                desired,
1531                limit: limit.to_string(),
1532                current,
1533            })
1534        } else {
1535            Ok(())
1536        }
1537    }
1538
1539    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1540    ///
1541    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1542    pub(crate) fn validate_resource_limit_numeric<F>(
1543        &self,
1544        current_amount: Numeric,
1545        new_amount: Numeric,
1546        resource_limit: F,
1547        resource_type: &str,
1548        limit_name: &str,
1549    ) -> Result<(), AdapterError>
1550    where
1551        F: Fn(&SystemVars) -> Numeric,
1552    {
1553        if new_amount <= Numeric::zero() {
1554            return Ok(());
1555        }
1556
1557        let limit = resource_limit(self.catalog().system_config());
1558        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1559        // semantics.
1560        // NaN should be impossible here since both values are positive.
1561        let desired = current_amount + new_amount;
1562        if desired > limit {
1563            Err(AdapterError::ResourceExhaustion {
1564                resource_type: resource_type.to_string(),
1565                limit_name: limit_name.to_string(),
1566                desired: desired.to_string(),
1567                limit: limit.to_string(),
1568                current: current_amount.to_string(),
1569            })
1570        } else {
1571            Ok(())
1572        }
1573    }
1574}