Skip to main content

mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
25use mz_cluster_client::ReplicaId;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::ClusterId;
28use mz_ore::instrument;
29use mz_ore::now::to_datetime;
30use mz_ore::retry::Retry;
31use mz_ore::task;
32use mz_repr::adt::numeric::Numeric;
33use mz_repr::{CatalogItemId, GlobalId, Timestamp};
34use mz_sql::catalog::{CatalogClusterReplica, CatalogSchema};
35use mz_sql::names::ResolvedDatabaseSpecifier;
36use mz_sql::plan::ConnectionDetails;
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{
39    self, DEFAULT_TIMESTAMP_INTERVAL, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
40    MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
41    MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
42    MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
43    MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
44};
45use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
46use mz_storage_types::connections::inline::IntoInlineConnection;
47use mz_storage_types::read_policy::ReadPolicy;
48use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
49use serde_json::json;
50use tracing::{Instrument, Level, event, info_span, warn};
51
52use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
53use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
54use crate::coord::Coordinator;
55use crate::coord::appends::BuiltinTableAppendNotify;
56use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
57use crate::session::{Session, Transaction, TransactionOps};
58use crate::telemetry::{EventDetails, SegmentClientExt};
59use crate::util::ResultExt;
60use crate::{AdapterError, ExecuteContext, catalog, flags};
61
62impl Coordinator {
63    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`].
64    #[instrument(name = "coord::catalog_transact")]
65    pub(crate) async fn catalog_transact(
66        &mut self,
67        session: Option<&Session>,
68        ops: Vec<catalog::Op>,
69    ) -> Result<(), AdapterError> {
70        let start = Instant::now();
71        let result = self
72            .catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73            .await;
74        self.metrics
75            .catalog_transact_seconds
76            .with_label_values(&["catalog_transact"])
77            .observe(start.elapsed().as_secs_f64());
78        result
79    }
80
81    /// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
82    /// and runs builtin table updates concurrently with any side effects (e.g.
83    /// creating collections).
84    // TODO(aljoscha): Remove this method once all call-sites have been migrated
85    // to the newer catalog_transact_with_context. The latter is what allows us
86    // to apply catalog implications that we derive from catalog chanages either
87    // when initially applying the ops to the catalog _or_ when following
88    // catalog changes from another process.
89    #[instrument(name = "coord::catalog_transact_with_side_effects")]
90    pub(crate) async fn catalog_transact_with_side_effects<F>(
91        &mut self,
92        mut ctx: Option<&mut ExecuteContext>,
93        ops: Vec<catalog::Op>,
94        side_effect: F,
95    ) -> Result<(), AdapterError>
96    where
97        F: for<'a> FnOnce(
98                &'a mut Coordinator,
99                Option<&'a mut ExecuteContext>,
100            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
101            + 'static,
102    {
103        let start = Instant::now();
104
105        let (table_updates, catalog_updates) = self
106            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
107            .await?;
108
109        // We can't run this concurrently with the explicit side effects,
110        // because both want to borrow self mutably.
111        let apply_implications_res = self
112            .apply_catalog_implications(ctx.as_deref_mut(), catalog_updates)
113            .await;
114
115        // We would get into an inconsistent state if we updated the catalog but
116        // then failed to apply commands/updates to the controller. Easiest
117        // thing to do is panic and let restart/bootstrap handle it.
118        apply_implications_res.expect("cannot fail to apply catalog update implications");
119
120        // Note: It's important that we keep the function call inside macro, this way we only run
121        // the consistency checks if soft assertions are enabled.
122        mz_ore::soft_assert_eq_no_log!(
123            self.check_consistency(),
124            Ok(()),
125            "coordinator inconsistency detected"
126        );
127
128        let side_effects_fut = side_effect(self, ctx);
129
130        // Run our side effects concurrently with the table updates.
131        let ((), ()) = futures::future::join(
132            side_effects_fut.instrument(info_span!(
133                "coord::catalog_transact_with_side_effects::side_effects_fut"
134            )),
135            table_updates.instrument(info_span!(
136                "coord::catalog_transact_with_side_effects::table_updates"
137            )),
138        )
139        .await;
140
141        self.metrics
142            .catalog_transact_seconds
143            .with_label_values(&["catalog_transact_with_side_effects"])
144            .observe(start.elapsed().as_secs_f64());
145
146        Ok(())
147    }
148
149    /// Same as [`Self::catalog_transact_inner`] but takes an execution context
150    /// or connection ID and runs builtin table updates concurrently with any
151    /// catalog implications that are generated as part of applying the given
152    /// `ops` (e.g. creating collections).
153    ///
154    /// This will use a connection ID if provided and otherwise fall back to
155    /// getting a connection ID from the execution context.
156    #[instrument(name = "coord::catalog_transact_with_context")]
157    pub(crate) async fn catalog_transact_with_context(
158        &mut self,
159        conn_id: Option<&ConnectionId>,
160        ctx: Option<&mut ExecuteContext>,
161        ops: Vec<catalog::Op>,
162    ) -> Result<(), AdapterError> {
163        let start = Instant::now();
164
165        let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
166
167        let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
168
169        let apply_catalog_implications_fut = self.apply_catalog_implications(ctx, catalog_updates);
170
171        // Apply catalog implications concurrently with the table updates.
172        let (combined_apply_res, ()) = futures::future::join(
173            apply_catalog_implications_fut.instrument(info_span!(
174                "coord::catalog_transact_with_context::side_effects_fut"
175            )),
176            table_updates.instrument(info_span!(
177                "coord::catalog_transact_with_context::table_updates"
178            )),
179        )
180        .await;
181
182        // We would get into an inconsistent state if we updated the catalog but
183        // then failed to apply implications. Easiest thing to do is panic and
184        // let restart/bootstrap handle it.
185        combined_apply_res.expect("cannot fail to apply catalog implications");
186
187        // Note: It's important that we keep the function call inside macro, this way we only run
188        // the consistency checks if soft assertions are enabled.
189        mz_ore::soft_assert_eq_no_log!(
190            self.check_consistency(),
191            Ok(()),
192            "coordinator inconsistency detected"
193        );
194
195        self.metrics
196            .catalog_transact_seconds
197            .with_label_values(&["catalog_transact_with_context"])
198            .observe(start.elapsed().as_secs_f64());
199
200        Ok(())
201    }
202
203    /// Executes a Catalog transaction with handling if the provided [`Session`]
204    /// is in a SQL transaction that is executing DDL.
205    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
206    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
207        &mut self,
208        ctx: &mut ExecuteContext,
209        ops: Vec<catalog::Op>,
210        side_effect: F,
211    ) -> Result<(), AdapterError>
212    where
213        F: for<'a> FnOnce(
214                &'a mut Coordinator,
215                Option<&'a mut ExecuteContext>,
216            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
217            + Send
218            + Sync
219            + 'static,
220    {
221        let start = Instant::now();
222
223        let Some(Transaction {
224            ops:
225                TransactionOps::DDL {
226                    ops: txn_ops,
227                    revision: txn_revision,
228                    state: txn_state,
229                    snapshot: txn_snapshot,
230                    side_effects: _,
231                },
232            ..
233        }) = ctx.session().transaction().inner()
234        else {
235            let result = self
236                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
237                .await;
238            self.metrics
239                .catalog_transact_seconds
240                .with_label_values(&["catalog_transact_with_ddl_transaction"])
241                .observe(start.elapsed().as_secs_f64());
242            return result;
243        };
244
245        // Make sure our Catalog hasn't changed since openning the transaction.
246        if self.catalog().transient_revision() != *txn_revision {
247            self.metrics
248                .catalog_transact_seconds
249                .with_label_values(&["catalog_transact_with_ddl_transaction"])
250                .observe(start.elapsed().as_secs_f64());
251            return Err(AdapterError::DDLTransactionRace);
252        }
253
254        // Clone what we need from the session before taking &mut below.
255        let txn_ops_clone = txn_ops.clone();
256        let txn_state_clone = txn_state.clone();
257        let prev_snapshot = txn_snapshot.clone();
258
259        // Validate resource limits with all accumulated + new ops (cheap O(N) counting).
260        let mut combined_ops = txn_ops_clone;
261        combined_ops.extend(ops.iter().cloned());
262        let conn_id = ctx.session().conn_id().clone();
263        self.validate_resource_limits(&combined_ops, &conn_id)?;
264
265        // Get oracle timestamp for audit log entries.
266        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
267
268        // Get ConnMeta for the session.
269        let conn = self.active_conns.get(ctx.session().conn_id());
270
271        // Incremental dry run: process only NEW ops against accumulated state.
272        // If we have a saved snapshot from a previous dry run, use it to
273        // initialize the transaction so it starts in sync with the accumulated
274        // state. Otherwise (first statement), the fresh durable transaction is
275        // already in sync with the real catalog state.
276        let (new_state, new_snapshot) = self
277            .catalog()
278            .transact_incremental_dry_run(
279                &txn_state_clone,
280                ops.clone(),
281                conn,
282                prev_snapshot,
283                oracle_write_ts,
284            )
285            .await?;
286
287        // Accumulate ops for eventual COMMIT.
288        let result = ctx
289            .session_mut()
290            .transaction_mut()
291            .add_ops(TransactionOps::DDL {
292                ops: combined_ops,
293                state: new_state,
294                side_effects: vec![Box::new(side_effect)],
295                revision: self.catalog().transient_revision(),
296                snapshot: Some(new_snapshot),
297            });
298
299        self.metrics
300            .catalog_transact_seconds
301            .with_label_values(&["catalog_transact_with_ddl_transaction"])
302            .observe(start.elapsed().as_secs_f64());
303
304        result
305    }
306
307    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
308    /// called after this function successfully returns on any built
309    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
310    #[instrument(name = "coord::catalog_transact_inner")]
311    pub(crate) async fn catalog_transact_inner(
312        &mut self,
313        conn_id: Option<&ConnectionId>,
314        ops: Vec<catalog::Op>,
315    ) -> Result<(BuiltinTableAppendNotify, Vec<ParsedStateUpdate>), AdapterError> {
316        if self.controller.read_only() {
317            return Err(AdapterError::ReadOnly);
318        }
319
320        event!(Level::TRACE, ops = format!("{:?}", ops));
321
322        let mut webhook_sources_to_restart = BTreeSet::new();
323        let mut clusters_to_drop = vec![];
324        let mut cluster_replicas_to_drop = vec![];
325        let mut clusters_to_create = vec![];
326        let mut cluster_replicas_to_create = vec![];
327        let mut update_metrics_config = false;
328        let mut update_tracing_config = false;
329        let mut update_controller_config = false;
330        let mut update_compute_config = false;
331        let mut update_storage_config = false;
332        let mut update_timestamp_oracle_config = false;
333        let mut update_metrics_retention = false;
334        let mut update_secrets_caching_config = false;
335        let mut update_cluster_scheduling_config = false;
336        let mut update_http_config = false;
337        let mut update_advance_timelines_interval = false;
338
339        for op in &ops {
340            match op {
341                catalog::Op::DropObjects(drop_object_infos) => {
342                    for drop_object_info in drop_object_infos {
343                        match &drop_object_info {
344                            catalog::DropObjectInfo::Item(_) => {
345                                // Nothing to do, these will be handled by
346                                // applying the side effects that we return.
347                            }
348                            catalog::DropObjectInfo::Cluster(id) => {
349                                clusters_to_drop.push(*id);
350                            }
351                            catalog::DropObjectInfo::ClusterReplica((
352                                cluster_id,
353                                replica_id,
354                                _reason,
355                            )) => {
356                                // Drop the cluster replica itself.
357                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
358                            }
359                            _ => (),
360                        }
361                    }
362                }
363                catalog::Op::ResetSystemConfiguration { name }
364                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
365                    update_metrics_config |= self
366                        .catalog
367                        .state()
368                        .system_config()
369                        .is_metrics_config_var(name);
370                    update_tracing_config |= vars::is_tracing_var(name);
371                    update_controller_config |= self
372                        .catalog
373                        .state()
374                        .system_config()
375                        .is_controller_config_var(name);
376                    update_compute_config |= self
377                        .catalog
378                        .state()
379                        .system_config()
380                        .is_compute_config_var(name);
381                    update_storage_config |= self
382                        .catalog
383                        .state()
384                        .system_config()
385                        .is_storage_config_var(name);
386                    update_timestamp_oracle_config |= vars::is_timestamp_oracle_config_var(name);
387                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
388                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
389                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
390                    update_http_config |= vars::is_http_config_var(name);
391                    update_advance_timelines_interval |= name == DEFAULT_TIMESTAMP_INTERVAL.name();
392                }
393                catalog::Op::ResetAllSystemConfiguration => {
394                    // Assume they all need to be updated.
395                    // We could see if the config's have actually changed, but
396                    // this is simpler.
397                    update_tracing_config = true;
398                    update_controller_config = true;
399                    update_compute_config = true;
400                    update_storage_config = true;
401                    update_timestamp_oracle_config = true;
402                    update_metrics_retention = true;
403                    update_secrets_caching_config = true;
404                    update_cluster_scheduling_config = true;
405                    update_metrics_config = true;
406                    update_http_config = true;
407                    update_advance_timelines_interval = true;
408                }
409                catalog::Op::RenameItem { id, .. } => {
410                    let item = self.catalog().get_entry(id);
411                    let is_webhook_source = item
412                        .source()
413                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
414                        .unwrap_or(false);
415                    if is_webhook_source {
416                        webhook_sources_to_restart.insert(*id);
417                    }
418                }
419                catalog::Op::RenameSchema {
420                    database_spec,
421                    schema_spec,
422                    ..
423                } => {
424                    let schema = self.catalog().get_schema(
425                        database_spec,
426                        schema_spec,
427                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
428                    );
429                    let webhook_sources = schema.item_ids().filter(|id| {
430                        let item = self.catalog().get_entry(id);
431                        item.source()
432                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
433                            .unwrap_or(false)
434                    });
435                    webhook_sources_to_restart.extend(webhook_sources);
436                }
437                catalog::Op::CreateCluster { id, .. } => {
438                    clusters_to_create.push(*id);
439                }
440                catalog::Op::CreateClusterReplica {
441                    cluster_id,
442                    name,
443                    config,
444                    ..
445                } => {
446                    cluster_replicas_to_create.push((
447                        *cluster_id,
448                        name.clone(),
449                        config.location.num_processes(),
450                    ));
451                }
452                _ => (),
453            }
454        }
455
456        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
457
458        // This will produce timestamps that are guaranteed to increase on each
459        // call, and also never be behind the system clock. If the system clock
460        // hasn't advanced (or has gone backward), it will increment by 1. For
461        // the audit log, we need to balance "close (within 10s or so) to the
462        // system clock" and "always goes up". We've chosen here to prioritize
463        // always going up, and believe we will always be close to the system
464        // clock because it is well configured (chrony) and so may only rarely
465        // regress or pause for 10s.
466        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
467
468        let Coordinator {
469            catalog,
470            active_conns,
471            controller,
472            cluster_replica_statuses,
473            ..
474        } = self;
475        let catalog = Arc::make_mut(catalog);
476        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
477
478        let TransactionResult {
479            builtin_table_updates,
480            catalog_updates,
481            audit_events,
482        } = catalog
483            .transact(
484                Some(&mut controller.storage_collections),
485                oracle_write_ts,
486                conn,
487                ops,
488            )
489            .await?;
490
491        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
492            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
493        }
494        for cluster_id in &clusters_to_drop {
495            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
496        }
497        for cluster_id in clusters_to_create {
498            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
499        }
500        let now = to_datetime((catalog.config().now)());
501        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
502            let replica_id = catalog
503                .resolve_replica_in_cluster(&cluster_id, &replica_name)
504                .expect("just created")
505                .replica_id();
506            cluster_replica_statuses.initialize_cluster_replica_statuses(
507                cluster_id,
508                replica_id,
509                num_processes,
510                now,
511            );
512        }
513
514        // Append our builtin table updates, then return the notify so we can run other tasks in
515        // parallel.
516        let (builtin_update_notify, _) = self
517            .builtin_table_update()
518            .execute(builtin_table_updates)
519            .await;
520
521        // No error returns are allowed after this point. Enforce this at compile time
522        // by using this odd structure so we don't accidentally add a stray `?`.
523        let _: () = async {
524            if !webhook_sources_to_restart.is_empty() {
525                self.restart_webhook_sources(webhook_sources_to_restart);
526            }
527
528            if update_metrics_config {
529                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
530            }
531            if update_controller_config {
532                self.update_controller_config();
533            }
534            if update_compute_config {
535                self.update_compute_config();
536            }
537            if update_storage_config {
538                self.update_storage_config();
539            }
540            if update_timestamp_oracle_config {
541                self.update_timestamp_oracle_config();
542            }
543            if update_metrics_retention {
544                self.update_metrics_retention();
545            }
546            if update_tracing_config {
547                self.update_tracing_config();
548            }
549            if update_secrets_caching_config {
550                self.update_secrets_caching_config();
551            }
552            if update_cluster_scheduling_config {
553                self.update_cluster_scheduling_config();
554            }
555            if update_http_config {
556                self.update_http_config();
557            }
558            if update_advance_timelines_interval {
559                let new_interval = self.catalog().system_config().default_timestamp_interval();
560                if new_interval != self.advance_timelines_interval.period() {
561                    self.advance_timelines_interval = tokio::time::interval(new_interval);
562                }
563            }
564        }
565        .instrument(info_span!("coord::catalog_transact_with::finalize"))
566        .await;
567
568        let conn = conn_id.and_then(|id| self.active_conns.get(id));
569        if let Some(segment_client) = &self.segment_client {
570            for VersionedEvent::V1(event) in audit_events {
571                let event_type = format!(
572                    "{} {}",
573                    event.object_type.as_title_case(),
574                    event.event_type.as_title_case()
575                );
576                segment_client.environment_track(
577                    &self.catalog().config().environment_id,
578                    event_type,
579                    json!({ "details": event.details.as_json() }),
580                    EventDetails {
581                        user_id: conn
582                            .and_then(|c| c.user().external_metadata.as_ref())
583                            .map(|m| m.user_id),
584                        application_name: conn.map(|c| c.application_name()),
585                        ..Default::default()
586                    },
587                );
588            }
589        }
590
591        Ok((builtin_update_notify, catalog_updates))
592    }
593
594    pub(crate) fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
595        self.drop_introspection_subscribes(replica_id);
596
597        self.controller
598            .drop_replica(cluster_id, replica_id)
599            .expect("dropping replica must not fail");
600    }
601
602    /// A convenience method for dropping sources.
603    pub(crate) fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
604        for (item_id, _gid) in &sources {
605            self.active_webhooks.remove(item_id);
606        }
607        let storage_metadata = self.catalog.state().storage_metadata();
608        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
609        self.controller
610            .storage
611            .drop_sources(storage_metadata, source_gids)
612            .unwrap_or_terminate("cannot fail to drop sources");
613    }
614
615    /// A convenience method for dropping tables.
616    pub(crate) fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
617        for (item_id, _gid) in &tables {
618            self.active_webhooks.remove(item_id);
619        }
620
621        let storage_metadata = self.catalog.state().storage_metadata();
622        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
623        self.controller
624            .storage
625            .drop_tables(storage_metadata, table_gids, ts)
626            .unwrap_or_terminate("cannot fail to drop tables");
627    }
628
629    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
630        for id in sources {
631            self.active_webhooks.remove(&id);
632        }
633    }
634
635    /// Like `drop_compute_sinks`, but for a single compute sink.
636    ///
637    /// Returns the controller's state for the compute sink if the identified
638    /// sink was known to the controller. It is the caller's responsibility to
639    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
640    #[must_use]
641    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
642        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
643    }
644
645    /// Drops a batch of compute sinks.
646    ///
647    /// For each sink that exists, the coordinator and controller's state
648    /// associated with the sink is removed.
649    ///
650    /// Returns a map containing the controller's state for each sink that was
651    /// removed. It is the caller's responsibility to retire the returned sinks.
652    /// Consider using `retire_compute_sinks` instead.
653    #[must_use]
654    pub async fn drop_compute_sinks(
655        &mut self,
656        sink_ids: impl IntoIterator<Item = GlobalId>,
657    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
658        let mut by_id = BTreeMap::new();
659        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
660        for sink_id in sink_ids {
661            let sink = match self.remove_active_compute_sink(sink_id).await {
662                None => {
663                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
664                    continue;
665                }
666                Some(sink) => sink,
667            };
668
669            by_cluster
670                .entry(sink.cluster_id())
671                .or_default()
672                .push(sink_id);
673            by_id.insert(sink_id, sink);
674        }
675        for (cluster_id, ids) in by_cluster {
676            let compute = &mut self.controller.compute;
677            // A cluster could have been dropped, so verify it exists.
678            if compute.instance_exists(cluster_id) {
679                compute
680                    .drop_collections(cluster_id, ids)
681                    .unwrap_or_terminate("cannot fail to drop collections");
682            }
683        }
684        by_id
685    }
686
687    /// Retires a batch of sinks with disparate reasons for retirement.
688    ///
689    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
690    /// then retired with its corresponding reason.
691    pub async fn retire_compute_sinks(
692        &mut self,
693        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
694    ) {
695        let sink_ids = reasons.keys().cloned();
696        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
697            let reason = reasons
698                .remove(&id)
699                .expect("all returned IDs are in `reasons`");
700            sink.retire(reason);
701        }
702    }
703
704    /// Drops all pending replicas for a set of clusters
705    /// that are undergoing reconfiguration.
706    pub async fn drop_reconfiguration_replicas(
707        &mut self,
708        cluster_ids: BTreeSet<ClusterId>,
709    ) -> Result<(), AdapterError> {
710        let pending_cluster_ops: Vec<Op> = cluster_ids
711            .iter()
712            .map(|c| {
713                self.catalog()
714                    .get_cluster(c.clone())
715                    .replicas()
716                    .filter_map(|r| match r.config.location {
717                        ReplicaLocation::Managed(ref l) if l.pending => {
718                            Some(DropObjectInfo::ClusterReplica((
719                                c.clone(),
720                                r.replica_id,
721                                ReplicaCreateDropReason::Manual,
722                            )))
723                        }
724                        _ => None,
725                    })
726                    .collect::<Vec<DropObjectInfo>>()
727            })
728            .filter_map(|pending_replica_drop_ops_by_cluster| {
729                match pending_replica_drop_ops_by_cluster.len() {
730                    0 => None,
731                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
732                }
733            })
734            .collect();
735        if !pending_cluster_ops.is_empty() {
736            self.catalog_transact(None, pending_cluster_ops).await?;
737        }
738        Ok(())
739    }
740
741    /// Cancels all active compute sinks for the identified connection.
742    #[mz_ore::instrument(level = "debug")]
743    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
744        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
745            .await
746    }
747
748    /// Cancels all active cluster reconfigurations sinks for the identified connection.
749    #[mz_ore::instrument(level = "debug")]
750    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
751        &mut self,
752        conn_id: &ConnectionId,
753    ) {
754        self.retire_cluster_reconfigurations_for_conn(conn_id).await
755    }
756
757    /// Retires all active compute sinks for the identified connection with the
758    /// specified reason.
759    #[mz_ore::instrument(level = "debug")]
760    pub(crate) async fn retire_compute_sinks_for_conn(
761        &mut self,
762        conn_id: &ConnectionId,
763        reason: ActiveComputeSinkRetireReason,
764    ) {
765        let drop_sinks = self
766            .active_conns
767            .get_mut(conn_id)
768            .expect("must exist for active session")
769            .drop_sinks
770            .iter()
771            .map(|sink_id| (*sink_id, reason.clone()))
772            .collect();
773        self.retire_compute_sinks(drop_sinks).await;
774    }
775
776    /// Cleans pending cluster reconfiguraiotns for the identified connection
777    #[mz_ore::instrument(level = "debug")]
778    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
779        &mut self,
780        conn_id: &ConnectionId,
781    ) {
782        let reconfiguring_clusters = self
783            .active_conns
784            .get(conn_id)
785            .expect("must exist for active session")
786            .pending_cluster_alters
787            .clone();
788        // try to drop reconfig replicas
789        self.drop_reconfiguration_replicas(reconfiguring_clusters)
790            .await
791            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
792
793        self.active_conns
794            .get_mut(conn_id)
795            .expect("must exist for active session")
796            .pending_cluster_alters
797            .clear();
798    }
799
800    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
801        let storage_metadata = self.catalog.state().storage_metadata();
802        self.controller
803            .storage
804            .drop_sinks(storage_metadata, sink_gids)
805            .unwrap_or_terminate("cannot fail to drop sinks");
806    }
807
808    pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
809        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
810        for (cluster_id, gid) in collections {
811            by_cluster.entry(cluster_id).or_default().push(gid);
812        }
813        for (cluster_id, gids) in by_cluster {
814            let compute = &mut self.controller.compute;
815            // A cluster could have been dropped, so verify it exists.
816            if compute.instance_exists(cluster_id) {
817                compute
818                    .drop_collections(cluster_id, gids)
819                    .unwrap_or_terminate("cannot fail to drop collections");
820            }
821        }
822    }
823
824    pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
825        // Match the create path (catalog_implications.rs) which gracefully
826        // logs an error when cloud_resource_controller is None, rather than
827        // panicking.
828        let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() else {
829            warn!("dropping VPC endpoints without cloud_resource_controller; skipping cleanup");
830            return;
831        };
832        let cloud_resource_controller = Arc::clone(cloud_resource_controller);
833        // We don't want to block the coordinator on an external delete api
834        // calls, so move the drop vpc_endpoint to a separate task. This does
835        // mean that a failed drop won't bubble up to the user as an error
836        // message. However, even if it did (and how the code previously
837        // worked), mz has already dropped it from our catalog, and so we
838        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
839        // eventually be cleaned during restart via coord bootstrap.
840        task::spawn(
841            || "drop_vpc_endpoints",
842            async move {
843                for vpc_endpoint in vpc_endpoints {
844                    let _ = Retry::default()
845                        .max_duration(Duration::from_secs(60))
846                        .retry_async(|_state| async {
847                            fail_point!("drop_vpc_endpoint", |r| {
848                                Err(anyhow::anyhow!("Fail point error {:?}", r))
849                            });
850                            match cloud_resource_controller
851                                .delete_vpc_endpoint(vpc_endpoint)
852                                .await
853                            {
854                                Ok(_) => Ok(()),
855                                Err(e) => {
856                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
857                                    Err(e)
858                                }
859                            }
860                        })
861                        .await;
862                }
863            }
864            .instrument(info_span!(
865                "coord::catalog_transact_inner::drop_vpc_endpoints"
866            )),
867        );
868    }
869
870    /// Removes all temporary items created by the specified connection, though
871    /// not the temporary schema itself.
872    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
873        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
874        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
875
876        if all_items.is_empty() {
877            return;
878        }
879        let op = Op::DropObjects(
880            all_items
881                .into_iter()
882                .map(DropObjectInfo::manual_drop_from_object_id)
883                .collect(),
884        );
885
886        self.catalog_transact_with_context(Some(conn_id), None, vec![op])
887            .await
888            .expect("unable to drop temporary items for conn_id");
889    }
890
891    fn update_cluster_scheduling_config(&self) {
892        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
893        self.controller
894            .update_orchestrator_scheduling_config(config);
895    }
896
897    fn update_secrets_caching_config(&self) {
898        let config = flags::caching_config(self.catalog.system_config());
899        self.caching_secrets_reader.set_policy(config);
900    }
901
902    fn update_tracing_config(&self) {
903        let tracing = flags::tracing_config(self.catalog().system_config());
904        tracing.apply(&self.tracing_handle);
905    }
906
907    fn update_compute_config(&mut self) {
908        let config_params = flags::compute_config(self.catalog().system_config());
909        self.controller.compute.update_configuration(config_params);
910    }
911
912    fn update_storage_config(&mut self) {
913        let config_params = flags::storage_config(self.catalog().system_config());
914        self.controller.storage.update_parameters(config_params);
915    }
916
917    fn update_timestamp_oracle_config(&self) {
918        let config_params = flags::timestamp_oracle_config(self.catalog().system_config());
919        if let Some(config) = self.timestamp_oracle_config.as_ref() {
920            config.apply_parameters(config_params)
921        }
922    }
923
924    fn update_metrics_retention(&self) {
925        let duration = self.catalog().system_config().metrics_retention();
926        let policy = ReadPolicy::lag_writes_by(
927            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
928                tracing::error!("Absurd metrics retention duration: {duration:?}.");
929                u64::MAX
930            })),
931            SINCE_GRANULARITY,
932        );
933        let storage_policies = self
934            .catalog()
935            .entries()
936            .filter(|entry| {
937                entry.item().is_retained_metrics_object()
938                    && entry.item().is_compute_object_on_cluster().is_none()
939            })
940            .map(|entry| (entry.id(), policy.clone()))
941            .collect::<Vec<_>>();
942        let compute_policies = self
943            .catalog()
944            .entries()
945            .filter_map(|entry| {
946                if let (true, Some(cluster_id)) = (
947                    entry.item().is_retained_metrics_object(),
948                    entry.item().is_compute_object_on_cluster(),
949                ) {
950                    Some((cluster_id, entry.id(), policy.clone()))
951                } else {
952                    None
953                }
954            })
955            .collect::<Vec<_>>();
956        self.update_storage_read_policies(storage_policies);
957        self.update_compute_read_policies(compute_policies);
958    }
959
960    fn update_controller_config(&mut self) {
961        let sys_config = self.catalog().system_config();
962        self.controller
963            .update_configuration(sys_config.dyncfg_updates());
964    }
965
966    fn update_http_config(&mut self) {
967        let webhook_request_limit = self
968            .catalog()
969            .system_config()
970            .webhook_concurrent_request_limit();
971        self.webhook_concurrency_limit
972            .set_limit(webhook_request_limit);
973    }
974
975    pub(crate) async fn create_storage_export(
976        &mut self,
977        id: GlobalId,
978        sink: &Sink,
979    ) -> Result<(), AdapterError> {
980        // Validate `sink.from` is in fact a storage collection
981        self.controller.storage.check_exists(sink.from)?;
982
983        // The AsOf is used to determine at what time to snapshot reading from
984        // the persist collection.  This is primarily relevant when we do _not_
985        // want to include the snapshot in the sink.
986        //
987        // We choose the smallest as_of that is legal, according to the sinked
988        // collection's since.
989        let id_bundle = crate::CollectionIdBundle {
990            storage_ids: btreeset! {sink.from},
991            compute_ids: btreemap! {},
992        };
993
994        // We're putting in place read holds, such that create_exports, below,
995        // which calls update_read_capabilities, can successfully do so.
996        // Otherwise, the since of dependencies might move along concurrently,
997        // pulling the rug from under us!
998        //
999        // TODO: Maybe in the future, pass those holds on to storage, to hold on
1000        // to them and downgrade when possible?
1001        let read_holds = self.acquire_read_holds(&id_bundle);
1002        let as_of = read_holds.least_valid_read();
1003
1004        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1005        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1006            from: sink.from,
1007            from_desc: storage_sink_from_entry
1008                .relation_desc()
1009                .expect("sinks can only be built on items with descs")
1010                .into_owned(),
1011            connection: sink
1012                .connection
1013                .clone()
1014                .into_inline_connection(self.catalog().state()),
1015            envelope: sink.envelope,
1016            as_of,
1017            with_snapshot: sink.with_snapshot,
1018            version: sink.version,
1019            from_storage_metadata: (),
1020            to_storage_metadata: (),
1021            commit_interval: sink.commit_interval,
1022        };
1023
1024        let collection_desc = CollectionDescription {
1025            // TODO(sinks): make generic once we have more than one sink type.
1026            desc: KAFKA_PROGRESS_DESC.clone(),
1027            data_source: DataSource::Sink {
1028                desc: ExportDescription {
1029                    sink: storage_sink_desc,
1030                    instance_id: sink.cluster_id,
1031                },
1032            },
1033            since: None,
1034            timeline: None,
1035            primary: None,
1036        };
1037        let collections = vec![(id, collection_desc)];
1038
1039        // Create the collections.
1040        let storage_metadata = self.catalog.state().storage_metadata();
1041        let res = self
1042            .controller
1043            .storage
1044            .create_collections(storage_metadata, None, collections)
1045            .await;
1046
1047        // Drop read holds after the export has been created, at which point
1048        // storage will have put in its own read holds.
1049        drop(read_holds);
1050
1051        Ok(res?)
1052    }
1053
1054    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1055    /// exceeded.
1056    fn validate_resource_limits(
1057        &self,
1058        ops: &Vec<catalog::Op>,
1059        conn_id: &ConnectionId,
1060    ) -> Result<(), AdapterError> {
1061        let mut new_kafka_connections = 0;
1062        let mut new_postgres_connections = 0;
1063        let mut new_mysql_connections = 0;
1064        let mut new_sql_server_connections = 0;
1065        let mut new_aws_privatelink_connections = 0;
1066        let mut new_tables = 0;
1067        let mut new_sources = 0;
1068        let mut new_sinks = 0;
1069        let mut new_materialized_views = 0;
1070        let mut new_clusters = 0;
1071        let mut new_replicas_per_cluster = BTreeMap::new();
1072        let mut new_credit_consumption_rate = Numeric::zero();
1073        let mut new_databases = 0;
1074        let mut new_schemas_per_database = BTreeMap::new();
1075        let mut new_objects_per_schema = BTreeMap::new();
1076        let mut new_secrets = 0;
1077        let mut new_roles = 0;
1078        let mut new_network_policies = 0;
1079        for op in ops {
1080            match op {
1081                Op::CreateDatabase { .. } => {
1082                    new_databases += 1;
1083                }
1084                Op::CreateSchema { database_id, .. } => {
1085                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1086                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1087                    }
1088                }
1089                Op::CreateRole { .. } => {
1090                    new_roles += 1;
1091                }
1092                Op::CreateNetworkPolicy { .. } => {
1093                    new_network_policies += 1;
1094                }
1095                Op::CreateCluster { .. } => {
1096                    // TODO(benesch): having deprecated linked clusters, remove
1097                    // the `max_sources` and `max_sinks` limit, and set a higher
1098                    // max cluster limit?
1099                    new_clusters += 1;
1100                }
1101                Op::CreateClusterReplica {
1102                    cluster_id, config, ..
1103                } => {
1104                    if cluster_id.is_user() {
1105                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1106                        if let ReplicaLocation::Managed(location) = &config.location {
1107                            let replica_allocation = self
1108                                .catalog()
1109                                .cluster_replica_sizes()
1110                                .0
1111                                .get(location.size_for_billing())
1112                                .expect(
1113                                    "location size is validated against the cluster replica sizes",
1114                                );
1115                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1116                        }
1117                    }
1118                }
1119                Op::CreateItem { name, item, .. } => {
1120                    *new_objects_per_schema
1121                        .entry((
1122                            name.qualifiers.database_spec.clone(),
1123                            name.qualifiers.schema_spec.clone(),
1124                        ))
1125                        .or_insert(0) += 1;
1126                    match item {
1127                        CatalogItem::Connection(connection) => match connection.details {
1128                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1129                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1130                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1131                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1132                            ConnectionDetails::AwsPrivatelink(_) => {
1133                                new_aws_privatelink_connections += 1
1134                            }
1135                            ConnectionDetails::Csr(_)
1136                            | ConnectionDetails::Ssh { .. }
1137                            | ConnectionDetails::Aws(_)
1138                            | ConnectionDetails::IcebergCatalog(_) => {}
1139                        },
1140                        CatalogItem::Table(_) => {
1141                            new_tables += 1;
1142                        }
1143                        CatalogItem::Source(source) => {
1144                            new_sources += source.user_controllable_persist_shard_count()
1145                        }
1146                        CatalogItem::Sink(_) => new_sinks += 1,
1147                        CatalogItem::MaterializedView(_) => {
1148                            new_materialized_views += 1;
1149                        }
1150                        CatalogItem::Secret(_) => {
1151                            new_secrets += 1;
1152                        }
1153                        CatalogItem::Log(_)
1154                        | CatalogItem::View(_)
1155                        | CatalogItem::Index(_)
1156                        | CatalogItem::Type(_)
1157                        | CatalogItem::Func(_) => {}
1158                    }
1159                }
1160                Op::DropObjects(drop_object_infos) => {
1161                    for drop_object_info in drop_object_infos {
1162                        match drop_object_info {
1163                            DropObjectInfo::Cluster(_) => {
1164                                new_clusters -= 1;
1165                            }
1166                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1167                                if cluster_id.is_user() {
1168                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1169                                    let cluster = self
1170                                        .catalog()
1171                                        .get_cluster_replica(*cluster_id, *replica_id);
1172                                    if let ReplicaLocation::Managed(location) =
1173                                        &cluster.config.location
1174                                    {
1175                                        let replica_allocation = self
1176                                            .catalog()
1177                                            .cluster_replica_sizes()
1178                                            .0
1179                                            .get(location.size_for_billing())
1180                                            .expect(
1181                                                "location size is validated against the cluster replica sizes",
1182                                            );
1183                                        new_credit_consumption_rate -=
1184                                            replica_allocation.credits_per_hour
1185                                    }
1186                                }
1187                            }
1188                            DropObjectInfo::Database(_) => {
1189                                new_databases -= 1;
1190                            }
1191                            DropObjectInfo::Schema((database_spec, _)) => {
1192                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1193                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1194                                }
1195                            }
1196                            DropObjectInfo::Role(_) => {
1197                                new_roles -= 1;
1198                            }
1199                            DropObjectInfo::NetworkPolicy(_) => {
1200                                new_network_policies -= 1;
1201                            }
1202                            DropObjectInfo::Item(id) => {
1203                                let entry = self.catalog().get_entry(id);
1204                                *new_objects_per_schema
1205                                    .entry((
1206                                        entry.name().qualifiers.database_spec.clone(),
1207                                        entry.name().qualifiers.schema_spec.clone(),
1208                                    ))
1209                                    .or_insert(0) -= 1;
1210                                match entry.item() {
1211                                    CatalogItem::Connection(connection) => match connection.details
1212                                    {
1213                                        ConnectionDetails::AwsPrivatelink(_) => {
1214                                            new_aws_privatelink_connections -= 1;
1215                                        }
1216                                        _ => (),
1217                                    },
1218                                    CatalogItem::Table(_) => {
1219                                        new_tables -= 1;
1220                                    }
1221                                    CatalogItem::Source(source) => {
1222                                        new_sources -=
1223                                            source.user_controllable_persist_shard_count()
1224                                    }
1225                                    CatalogItem::Sink(_) => new_sinks -= 1,
1226                                    CatalogItem::MaterializedView(_) => {
1227                                        new_materialized_views -= 1;
1228                                    }
1229                                    CatalogItem::Secret(_) => {
1230                                        new_secrets -= 1;
1231                                    }
1232                                    CatalogItem::Log(_)
1233                                    | CatalogItem::View(_)
1234                                    | CatalogItem::Index(_)
1235                                    | CatalogItem::Type(_)
1236                                    | CatalogItem::Func(_) => {}
1237                                }
1238                            }
1239                        }
1240                    }
1241                }
1242                Op::UpdateItem {
1243                    name: _,
1244                    id,
1245                    to_item,
1246                } => match to_item {
1247                    CatalogItem::Source(source) => {
1248                        let current_source = self
1249                            .catalog()
1250                            .get_entry(id)
1251                            .source()
1252                            .expect("source update is for source item");
1253
1254                        new_sources += source.user_controllable_persist_shard_count()
1255                            - current_source.user_controllable_persist_shard_count();
1256                    }
1257                    CatalogItem::Connection(_)
1258                    | CatalogItem::Table(_)
1259                    | CatalogItem::Sink(_)
1260                    | CatalogItem::MaterializedView(_)
1261                    | CatalogItem::Secret(_)
1262                    | CatalogItem::Log(_)
1263                    | CatalogItem::View(_)
1264                    | CatalogItem::Index(_)
1265                    | CatalogItem::Type(_)
1266                    | CatalogItem::Func(_) => {}
1267                },
1268                Op::AlterRole { .. }
1269                | Op::AlterRetainHistory { .. }
1270                | Op::AlterSourceTimestampInterval { .. }
1271                | Op::AlterNetworkPolicy { .. }
1272                | Op::AlterAddColumn { .. }
1273                | Op::AlterMaterializedViewApplyReplacement { .. }
1274                | Op::UpdatePrivilege { .. }
1275                | Op::UpdateDefaultPrivilege { .. }
1276                | Op::GrantRole { .. }
1277                | Op::RenameCluster { .. }
1278                | Op::RenameClusterReplica { .. }
1279                | Op::RenameItem { .. }
1280                | Op::RenameSchema { .. }
1281                | Op::UpdateOwner { .. }
1282                | Op::RevokeRole { .. }
1283                | Op::UpdateClusterConfig { .. }
1284                | Op::UpdateClusterReplicaConfig { .. }
1285                | Op::UpdateSourceReferences { .. }
1286                | Op::UpdateSystemConfiguration { .. }
1287                | Op::ResetSystemConfiguration { .. }
1288                | Op::ResetAllSystemConfiguration { .. }
1289                | Op::Comment { .. }
1290                | Op::WeirdStorageUsageUpdates { .. }
1291                | Op::InjectAuditEvents { .. } => {}
1292            }
1293        }
1294
1295        let mut current_aws_privatelink_connections = 0;
1296        let mut current_postgres_connections = 0;
1297        let mut current_mysql_connections = 0;
1298        let mut current_sql_server_connections = 0;
1299        let mut current_kafka_connections = 0;
1300        for c in self.catalog().user_connections() {
1301            let connection = c
1302                .connection()
1303                .expect("`user_connections()` only returns connection objects");
1304
1305            match connection.details {
1306                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1307                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1308                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1309                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1310                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1311                ConnectionDetails::Csr(_)
1312                | ConnectionDetails::Ssh { .. }
1313                | ConnectionDetails::Aws(_)
1314                | ConnectionDetails::IcebergCatalog(_) => {}
1315            }
1316        }
1317        self.validate_resource_limit(
1318            current_kafka_connections,
1319            new_kafka_connections,
1320            SystemVars::max_kafka_connections,
1321            "Kafka Connection",
1322            MAX_KAFKA_CONNECTIONS.name(),
1323        )?;
1324        self.validate_resource_limit(
1325            current_postgres_connections,
1326            new_postgres_connections,
1327            SystemVars::max_postgres_connections,
1328            "PostgreSQL Connection",
1329            MAX_POSTGRES_CONNECTIONS.name(),
1330        )?;
1331        self.validate_resource_limit(
1332            current_mysql_connections,
1333            new_mysql_connections,
1334            SystemVars::max_mysql_connections,
1335            "MySQL Connection",
1336            MAX_MYSQL_CONNECTIONS.name(),
1337        )?;
1338        self.validate_resource_limit(
1339            current_sql_server_connections,
1340            new_sql_server_connections,
1341            SystemVars::max_sql_server_connections,
1342            "SQL Server Connection",
1343            MAX_SQL_SERVER_CONNECTIONS.name(),
1344        )?;
1345        self.validate_resource_limit(
1346            current_aws_privatelink_connections,
1347            new_aws_privatelink_connections,
1348            SystemVars::max_aws_privatelink_connections,
1349            "AWS PrivateLink Connection",
1350            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1351        )?;
1352        self.validate_resource_limit(
1353            self.catalog().user_tables().count(),
1354            new_tables,
1355            SystemVars::max_tables,
1356            "table",
1357            MAX_TABLES.name(),
1358        )?;
1359
1360        let current_sources: usize = self
1361            .catalog()
1362            .user_sources()
1363            .filter_map(|source| source.source())
1364            .map(|source| source.user_controllable_persist_shard_count())
1365            .sum::<i64>()
1366            .try_into()
1367            .expect("non-negative sum of sources");
1368
1369        self.validate_resource_limit(
1370            current_sources,
1371            new_sources,
1372            SystemVars::max_sources,
1373            "source",
1374            MAX_SOURCES.name(),
1375        )?;
1376        self.validate_resource_limit(
1377            self.catalog().user_sinks().count(),
1378            new_sinks,
1379            SystemVars::max_sinks,
1380            "sink",
1381            MAX_SINKS.name(),
1382        )?;
1383        self.validate_resource_limit(
1384            self.catalog().user_materialized_views().count(),
1385            new_materialized_views,
1386            SystemVars::max_materialized_views,
1387            "materialized view",
1388            MAX_MATERIALIZED_VIEWS.name(),
1389        )?;
1390        self.validate_resource_limit(
1391            // Linked compute clusters don't count against the limit, since
1392            // we have a separate sources and sinks limit.
1393            //
1394            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1395            // and set a higher max cluster limit?
1396            self.catalog().user_clusters().count(),
1397            new_clusters,
1398            SystemVars::max_clusters,
1399            "cluster",
1400            MAX_CLUSTERS.name(),
1401        )?;
1402        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1403            // It's possible that the cluster hasn't been created yet.
1404            let current_amount = self
1405                .catalog()
1406                .try_get_cluster(cluster_id)
1407                .map(|instance| instance.user_replicas().count())
1408                .unwrap_or(0);
1409            self.validate_resource_limit(
1410                current_amount,
1411                new_replicas,
1412                SystemVars::max_replicas_per_cluster,
1413                "cluster replica",
1414                MAX_REPLICAS_PER_CLUSTER.name(),
1415            )?;
1416        }
1417        self.validate_resource_limit_numeric(
1418            self.current_credit_consumption_rate(),
1419            new_credit_consumption_rate,
1420            |system_vars| {
1421                self.license_key
1422                    .max_credit_consumption_rate()
1423                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1424            },
1425            "cluster replica",
1426            MAX_CREDIT_CONSUMPTION_RATE.name(),
1427        )?;
1428        self.validate_resource_limit(
1429            self.catalog().databases().count(),
1430            new_databases,
1431            SystemVars::max_databases,
1432            "database",
1433            MAX_DATABASES.name(),
1434        )?;
1435        for (database_id, new_schemas) in new_schemas_per_database {
1436            self.validate_resource_limit(
1437                self.catalog().get_database(database_id).schemas_by_id.len(),
1438                new_schemas,
1439                SystemVars::max_schemas_per_database,
1440                "schema",
1441                MAX_SCHEMAS_PER_DATABASE.name(),
1442            )?;
1443        }
1444        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1445            // For temporary schemas that don't exist yet (lazy creation),
1446            // treat them as having 0 items.
1447            let current_items = self
1448                .catalog()
1449                .try_get_schema(&database_spec, &schema_spec, conn_id)
1450                .map(|schema| schema.items.len())
1451                .unwrap_or(0);
1452            self.validate_resource_limit(
1453                current_items,
1454                new_objects,
1455                SystemVars::max_objects_per_schema,
1456                "object",
1457                MAX_OBJECTS_PER_SCHEMA.name(),
1458            )?;
1459        }
1460        self.validate_resource_limit(
1461            self.catalog().user_secrets().count(),
1462            new_secrets,
1463            SystemVars::max_secrets,
1464            "secret",
1465            MAX_SECRETS.name(),
1466        )?;
1467        self.validate_resource_limit(
1468            self.catalog().user_roles().count(),
1469            new_roles,
1470            SystemVars::max_roles,
1471            "role",
1472            MAX_ROLES.name(),
1473        )?;
1474        self.validate_resource_limit(
1475            self.catalog().user_network_policies().count(),
1476            new_network_policies,
1477            SystemVars::max_network_policies,
1478            "network_policy",
1479            MAX_NETWORK_POLICIES.name(),
1480        )?;
1481        Ok(())
1482    }
1483
1484    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1485    pub(crate) fn validate_resource_limit<F>(
1486        &self,
1487        current_amount: usize,
1488        new_instances: i64,
1489        resource_limit: F,
1490        resource_type: &str,
1491        limit_name: &str,
1492    ) -> Result<(), AdapterError>
1493    where
1494        F: Fn(&SystemVars) -> u32,
1495    {
1496        if new_instances <= 0 {
1497            return Ok(());
1498        }
1499
1500        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1501        let current_amount: Option<i64> = current_amount.try_into().ok();
1502        let desired =
1503            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1504
1505        let exceeds_limit = if let Some(desired) = desired {
1506            desired > limit
1507        } else {
1508            true
1509        };
1510
1511        let desired = desired
1512            .map(|desired| desired.to_string())
1513            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1514        let current = current_amount
1515            .map(|current| current.to_string())
1516            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1517        if exceeds_limit {
1518            Err(AdapterError::ResourceExhaustion {
1519                resource_type: resource_type.to_string(),
1520                limit_name: limit_name.to_string(),
1521                desired,
1522                limit: limit.to_string(),
1523                current,
1524            })
1525        } else {
1526            Ok(())
1527        }
1528    }
1529
1530    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1531    ///
1532    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1533    pub(crate) fn validate_resource_limit_numeric<F>(
1534        &self,
1535        current_amount: Numeric,
1536        new_amount: Numeric,
1537        resource_limit: F,
1538        resource_type: &str,
1539        limit_name: &str,
1540    ) -> Result<(), AdapterError>
1541    where
1542        F: Fn(&SystemVars) -> Numeric,
1543    {
1544        if new_amount <= Numeric::zero() {
1545            return Ok(());
1546        }
1547
1548        let limit = resource_limit(self.catalog().system_config());
1549        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1550        // semantics.
1551        // NaN should be impossible here since both values are positive.
1552        let desired = current_amount + new_amount;
1553        if desired > limit {
1554            Err(AdapterError::ResourceExhaustion {
1555                resource_type: resource_type.to_string(),
1556                limit_name: limit_name.to_string(),
1557                desired: desired.to_string(),
1558                limit: limit.to_string(),
1559                current: current_amount.to_string(),
1560            })
1561        } else {
1562            Ok(())
1563        }
1564    }
1565}