mz_adapter/coord/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module encapsulates all of the [`Coordinator`]'s logic for creating, dropping,
11//! and altering objects.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, Connection, DataSourceDesc, Sink};
25use mz_compute_client::protocol::response::PeekResponse;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::instrument;
31use mz_ore::now::to_datetime;
32use mz_ore::retry::Retry;
33use mz_ore::str::StrExt;
34use mz_ore::task;
35use mz_repr::adt::numeric::Numeric;
36use mz_repr::{CatalogItemId, GlobalId, Timestamp};
37use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, CatalogSchema};
38use mz_sql::names::ResolvedDatabaseSpecifier;
39use mz_sql::plan::ConnectionDetails;
40use mz_sql::session::metadata::SessionMetadata;
41use mz_sql::session::vars::{
42    self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
43    MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
44    MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
45    MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
46    MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
47};
48use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
49use mz_storage_types::connections::PostgresConnection;
50use mz_storage_types::connections::inline::IntoInlineConnection;
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::GenericSourceConnection;
53use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
54use serde_json::json;
55use tracing::{Instrument, Level, event, info_span, warn};
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
58use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
59use crate::coord::Coordinator;
60use crate::coord::appends::BuiltinTableAppendNotify;
61use crate::coord::timeline::{TimelineContext, TimelineState};
62use crate::session::{Session, Transaction, TransactionOps};
63use crate::statement_logging::StatementEndedExecutionReason;
64use crate::telemetry::{EventDetails, SegmentClientExt};
65use crate::util::ResultExt;
66use crate::{AdapterError, ExecuteContext, catalog, flags};
67
68impl Coordinator {
69    /// Same as [`Self::catalog_transact_conn`] but takes a [`Session`].
70    #[instrument(name = "coord::catalog_transact")]
71    pub(crate) async fn catalog_transact(
72        &mut self,
73        session: Option<&Session>,
74        ops: Vec<catalog::Op>,
75    ) -> Result<(), AdapterError> {
76        self.catalog_transact_conn(session.map(|session| session.conn_id()), ops)
77            .await
78    }
79
80    /// Same as [`Self::catalog_transact_conn`] but takes a [`Session`] and runs
81    /// builtin table updates concurrently with any side effects (e.g. creating
82    /// collections).
83    #[instrument(name = "coord::catalog_transact_with_side_effects")]
84    pub(crate) async fn catalog_transact_with_side_effects<F>(
85        &mut self,
86        ctx: Option<&mut ExecuteContext>,
87        ops: Vec<catalog::Op>,
88        side_effect: F,
89    ) -> Result<(), AdapterError>
90    where
91        F: for<'a> FnOnce(
92                &'a mut Coordinator,
93                Option<&'a mut ExecuteContext>,
94            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
95            + 'static,
96    {
97        let table_updates = self
98            .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
99            .await?;
100        let side_effects_fut = side_effect(self, ctx);
101
102        // Run our side effects concurrently with the table updates.
103        let ((), ()) = futures::future::join(
104            side_effects_fut.instrument(info_span!(
105                "coord::catalog_transact_with_side_effects::side_effects_fut"
106            )),
107            table_updates.instrument(info_span!(
108                "coord::catalog_transact_with_side_effects::table_updates"
109            )),
110        )
111        .await;
112
113        Ok(())
114    }
115
116    /// Same as [`Self::catalog_transact_inner`] but awaits the table updates.
117    #[instrument(name = "coord::catalog_transact_conn")]
118    pub(crate) async fn catalog_transact_conn(
119        &mut self,
120        conn_id: Option<&ConnectionId>,
121        ops: Vec<catalog::Op>,
122    ) -> Result<(), AdapterError> {
123        let table_updates = self.catalog_transact_inner(conn_id, ops).await?;
124        table_updates
125            .instrument(info_span!("coord::catalog_transact_conn::table_updates"))
126            .await;
127        Ok(())
128    }
129
130    /// Executes a Catalog transaction with handling if the provided [`Session`]
131    /// is in a SQL transaction that is executing DDL.
132    #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
133    pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
134        &mut self,
135        ctx: &mut ExecuteContext,
136        ops: Vec<catalog::Op>,
137        side_effect: F,
138    ) -> Result<(), AdapterError>
139    where
140        F: for<'a> FnOnce(
141                &'a mut Coordinator,
142                Option<&'a mut ExecuteContext>,
143            ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
144            + Send
145            + Sync
146            + 'static,
147    {
148        let Some(Transaction {
149            ops:
150                TransactionOps::DDL {
151                    ops: txn_ops,
152                    revision: txn_revision,
153                    side_effects: _,
154                    state: _,
155                },
156            ..
157        }) = ctx.session().transaction().inner()
158        else {
159            return self
160                .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
161                .await;
162        };
163
164        // Make sure our Catalog hasn't changed since openning the transaction.
165        if self.catalog().transient_revision() != *txn_revision {
166            return Err(AdapterError::DDLTransactionRace);
167        }
168
169        // Combine the existing ops with the new ops so we can replay them.
170        let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
171        all_ops.extend(txn_ops.iter().cloned());
172        all_ops.extend(ops.clone());
173        all_ops.push(Op::TransactionDryRun);
174
175        // Run our Catalog transaction, but abort before committing.
176        let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
177
178        match result {
179            // We purposefully fail with this error to prevent committing the transaction.
180            Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
181                // Sets these ops to our transaction, bailing if the Catalog has changed since we
182                // ran the transaction.
183                ctx.session_mut()
184                    .transaction_mut()
185                    .add_ops(TransactionOps::DDL {
186                        ops: new_ops,
187                        state: new_state,
188                        side_effects: vec![Box::new(side_effect)],
189                        revision: self.catalog().transient_revision(),
190                    })?;
191                Ok(())
192            }
193            Ok(_) => unreachable!("unexpected success!"),
194            Err(e) => Err(e),
195        }
196    }
197
198    /// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be
199    /// called after this function successfully returns on any built
200    /// [`DataflowDesc`](mz_compute_types::dataflows::DataflowDesc).
201    #[instrument(name = "coord::catalog_transact_inner")]
202    pub(crate) async fn catalog_transact_inner<'a>(
203        &mut self,
204        conn_id: Option<&ConnectionId>,
205        ops: Vec<catalog::Op>,
206    ) -> Result<BuiltinTableAppendNotify, AdapterError> {
207        if self.controller.read_only() {
208            return Err(AdapterError::ReadOnly);
209        }
210
211        event!(Level::TRACE, ops = format!("{:?}", ops));
212
213        let mut sources_to_drop = vec![];
214        let mut webhook_sources_to_restart = BTreeSet::new();
215        let mut table_gids_to_drop = vec![];
216        let mut storage_sink_gids_to_drop = vec![];
217        let mut indexes_to_drop = vec![];
218        let mut materialized_views_to_drop = vec![];
219        let mut continual_tasks_to_drop = vec![];
220        let mut views_to_drop = vec![];
221        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
222        let mut secrets_to_drop = vec![];
223        let mut vpc_endpoints_to_drop = vec![];
224        let mut clusters_to_drop = vec![];
225        let mut cluster_replicas_to_drop = vec![];
226        let mut compute_sinks_to_drop = BTreeMap::new();
227        let mut peeks_to_drop = vec![];
228        let mut copies_to_drop = vec![];
229        let mut clusters_to_create = vec![];
230        let mut cluster_replicas_to_create = vec![];
231        let mut update_metrics_config = false;
232        let mut update_tracing_config = false;
233        let mut update_controller_config = false;
234        let mut update_compute_config = false;
235        let mut update_storage_config = false;
236        let mut update_pg_timestamp_oracle_config = false;
237        let mut update_metrics_retention = false;
238        let mut update_secrets_caching_config = false;
239        let mut update_cluster_scheduling_config = false;
240        let mut update_http_config = false;
241
242        for op in &ops {
243            match op {
244                catalog::Op::DropObjects(drop_object_infos) => {
245                    for drop_object_info in drop_object_infos {
246                        match &drop_object_info {
247                            catalog::DropObjectInfo::Item(id) => {
248                                match self.catalog().get_entry(id).item() {
249                                    CatalogItem::Table(table) => {
250                                        table_gids_to_drop
251                                            .extend(table.global_ids().map(|gid| (*id, gid)));
252                                    }
253                                    CatalogItem::Source(source) => {
254                                        sources_to_drop.push((*id, source.global_id()));
255                                        if let DataSourceDesc::Ingestion {
256                                            ingestion_desc, ..
257                                        } = &source.data_source
258                                        {
259                                            match &ingestion_desc.desc.connection {
260                                                GenericSourceConnection::Postgres(conn) => {
261                                                    let conn = conn.clone().into_inline_connection(
262                                                        self.catalog().state(),
263                                                    );
264                                                    let pending_drop = (
265                                                        conn.connection.clone(),
266                                                        conn.publication_details.slot.clone(),
267                                                    );
268                                                    replication_slots_to_drop.push(pending_drop);
269                                                }
270                                                _ => {}
271                                            }
272                                        }
273                                    }
274                                    CatalogItem::Sink(sink) => {
275                                        storage_sink_gids_to_drop.push(sink.global_id());
276                                    }
277                                    CatalogItem::Index(index) => {
278                                        indexes_to_drop.push((index.cluster_id, index.global_id()));
279                                    }
280                                    CatalogItem::MaterializedView(mv) => {
281                                        materialized_views_to_drop
282                                            .push((mv.cluster_id, mv.global_id()));
283                                    }
284                                    CatalogItem::View(view) => {
285                                        views_to_drop.push((*id, view.clone()))
286                                    }
287                                    CatalogItem::ContinualTask(ct) => {
288                                        continual_tasks_to_drop.push((
289                                            *id,
290                                            ct.cluster_id,
291                                            ct.global_id(),
292                                        ));
293                                    }
294                                    CatalogItem::Secret(_) => {
295                                        secrets_to_drop.push(*id);
296                                    }
297                                    CatalogItem::Connection(Connection { details, .. }) => {
298                                        match details {
299                                            // SSH connections have an associated secret that should be dropped
300                                            ConnectionDetails::Ssh { .. } => {
301                                                secrets_to_drop.push(*id);
302                                            }
303                                            // AWS PrivateLink connections have an associated
304                                            // VpcEndpoint K8S resource that should be dropped
305                                            ConnectionDetails::AwsPrivatelink(_) => {
306                                                vpc_endpoints_to_drop.push(*id);
307                                            }
308                                            _ => (),
309                                        }
310                                    }
311                                    _ => (),
312                                }
313                            }
314                            catalog::DropObjectInfo::Cluster(id) => {
315                                clusters_to_drop.push(*id);
316                            }
317                            catalog::DropObjectInfo::ClusterReplica((
318                                cluster_id,
319                                replica_id,
320                                _reason,
321                            )) => {
322                                // Drop the cluster replica itself.
323                                cluster_replicas_to_drop.push((*cluster_id, *replica_id));
324                            }
325                            _ => (),
326                        }
327                    }
328                }
329                catalog::Op::ResetSystemConfiguration { name }
330                | catalog::Op::UpdateSystemConfiguration { name, .. } => {
331                    update_metrics_config |= self
332                        .catalog
333                        .state()
334                        .system_config()
335                        .is_metrics_config_var(name);
336                    update_tracing_config |= vars::is_tracing_var(name);
337                    update_controller_config |= self
338                        .catalog
339                        .state()
340                        .system_config()
341                        .is_controller_config_var(name);
342                    update_compute_config |= self
343                        .catalog
344                        .state()
345                        .system_config()
346                        .is_compute_config_var(name);
347                    update_storage_config |= self
348                        .catalog
349                        .state()
350                        .system_config()
351                        .is_storage_config_var(name);
352                    update_pg_timestamp_oracle_config |=
353                        vars::is_pg_timestamp_oracle_config_var(name);
354                    update_metrics_retention |= name == vars::METRICS_RETENTION.name();
355                    update_secrets_caching_config |= vars::is_secrets_caching_var(name);
356                    update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
357                    update_http_config |= vars::is_http_config_var(name);
358                }
359                catalog::Op::ResetAllSystemConfiguration => {
360                    // Assume they all need to be updated.
361                    // We could see if the config's have actually changed, but
362                    // this is simpler.
363                    update_tracing_config = true;
364                    update_controller_config = true;
365                    update_compute_config = true;
366                    update_storage_config = true;
367                    update_pg_timestamp_oracle_config = true;
368                    update_metrics_retention = true;
369                    update_secrets_caching_config = true;
370                    update_cluster_scheduling_config = true;
371                    update_http_config = true;
372                }
373                catalog::Op::RenameItem { id, .. } => {
374                    let item = self.catalog().get_entry(id);
375                    let is_webhook_source = item
376                        .source()
377                        .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
378                        .unwrap_or(false);
379                    if is_webhook_source {
380                        webhook_sources_to_restart.insert(*id);
381                    }
382                }
383                catalog::Op::RenameSchema {
384                    database_spec,
385                    schema_spec,
386                    ..
387                } => {
388                    let schema = self.catalog().get_schema(
389                        database_spec,
390                        schema_spec,
391                        conn_id.unwrap_or(&SYSTEM_CONN_ID),
392                    );
393                    let webhook_sources = schema.item_ids().filter(|id| {
394                        let item = self.catalog().get_entry(id);
395                        item.source()
396                            .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
397                            .unwrap_or(false)
398                    });
399                    webhook_sources_to_restart.extend(webhook_sources);
400                }
401                catalog::Op::CreateCluster { id, .. } => {
402                    clusters_to_create.push(*id);
403                }
404                catalog::Op::CreateClusterReplica {
405                    cluster_id,
406                    name,
407                    config,
408                    ..
409                } => {
410                    cluster_replicas_to_create.push((
411                        *cluster_id,
412                        name.clone(),
413                        config.location.num_processes(),
414                    ));
415                }
416                _ => (),
417            }
418        }
419
420        let collections_to_drop: BTreeSet<GlobalId> = sources_to_drop
421            .iter()
422            .map(|(_, gid)| *gid)
423            .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
424            .chain(storage_sink_gids_to_drop.iter().copied())
425            .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
426            .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
427            .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
428            .chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
429            .collect();
430
431        // Clean up any active compute sinks like subscribes or copy to-s that rely on dropped relations or clusters.
432        for (sink_id, sink) in &self.active_compute_sinks {
433            let cluster_id = sink.cluster_id();
434            let conn_id = &sink.connection_id();
435            if let Some(id) = sink
436                .depends_on()
437                .iter()
438                .find(|id| collections_to_drop.contains(id))
439            {
440                let entry = self.catalog().get_entry_by_global_id(id);
441                let name = self
442                    .catalog()
443                    .resolve_full_name(entry.name(), Some(conn_id))
444                    .to_string();
445                compute_sinks_to_drop.insert(
446                    *sink_id,
447                    ActiveComputeSinkRetireReason::DependencyDropped(format!(
448                        "relation {}",
449                        name.quoted()
450                    )),
451                );
452            } else if clusters_to_drop.contains(&cluster_id) {
453                let name = self.catalog().get_cluster(cluster_id).name();
454                compute_sinks_to_drop.insert(
455                    *sink_id,
456                    ActiveComputeSinkRetireReason::DependencyDropped(format!(
457                        "cluster {}",
458                        name.quoted()
459                    )),
460                );
461            }
462        }
463
464        // Clean up any pending peeks that rely on dropped relations or clusters.
465        for (uuid, pending_peek) in &self.pending_peeks {
466            if let Some(id) = pending_peek
467                .depends_on
468                .iter()
469                .find(|id| collections_to_drop.contains(id))
470            {
471                let entry = self.catalog().get_entry_by_global_id(id);
472                let name = self
473                    .catalog()
474                    .resolve_full_name(entry.name(), Some(&pending_peek.conn_id));
475                peeks_to_drop.push((
476                    format!("relation {}", name.to_string().quoted()),
477                    uuid.clone(),
478                ));
479            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
480                let name = self.catalog().get_cluster(pending_peek.cluster_id).name();
481                peeks_to_drop.push((format!("cluster {}", name.quoted()), uuid.clone()));
482            }
483        }
484
485        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
486        for (conn_id, pending_copy) in &self.active_copies {
487            let dropping_table = table_gids_to_drop
488                .iter()
489                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
490            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
491
492            if dropping_table || dropping_cluster {
493                copies_to_drop.push(conn_id.clone());
494            }
495        }
496
497        let storage_ids_to_drop = sources_to_drop
498            .iter()
499            .map(|(_, gid)| *gid)
500            .chain(storage_sink_gids_to_drop.iter().copied())
501            .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
502            .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
503            .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
504        let compute_ids_to_drop = indexes_to_drop
505            .iter()
506            .copied()
507            .chain(materialized_views_to_drop.iter().copied())
508            .chain(
509                continual_tasks_to_drop
510                    .iter()
511                    .map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
512            );
513
514        // Check if any Timelines would become empty, if we dropped the specified storage or
515        // compute resources.
516        //
517        // Note: only after a Transaction succeeds do we actually drop the timeline
518        let collection_id_bundle = self.build_collection_id_bundle(
519            storage_ids_to_drop,
520            compute_ids_to_drop,
521            clusters_to_drop.clone(),
522        );
523        let timeline_associations: BTreeMap<_, _> = self
524            .catalog()
525            .partition_ids_by_timeline_context(&collection_id_bundle)
526            .filter_map(|(context, bundle)| {
527                let TimelineContext::TimelineDependent(timeline) = context else {
528                    return None;
529                };
530                let TimelineState { read_holds, .. } = self
531                    .global_timelines
532                    .get(&timeline)
533                    .expect("all timeslines have a timestamp oracle");
534
535                let empty = read_holds.id_bundle().difference(&bundle).is_empty();
536
537                Some((timeline, (empty, bundle)))
538            })
539            .collect();
540
541        self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
542
543        // This will produce timestamps that are guaranteed to increase on each
544        // call, and also never be behind the system clock. If the system clock
545        // hasn't advanced (or has gone backward), it will increment by 1. For
546        // the audit log, we need to balance "close (within 10s or so) to the
547        // system clock" and "always goes up". We've chosen here to prioritize
548        // always going up, and believe we will always be close to the system
549        // clock because it is well configured (chrony) and so may only rarely
550        // regress or pause for 10s.
551        let oracle_write_ts = self.get_local_write_ts().await.timestamp;
552
553        let Coordinator {
554            catalog,
555            active_conns,
556            controller,
557            cluster_replica_statuses,
558            ..
559        } = self;
560        let catalog = Arc::make_mut(catalog);
561        let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
562
563        let TransactionResult {
564            builtin_table_updates,
565            audit_events,
566        } = catalog
567            .transact(
568                Some(&mut controller.storage_collections),
569                oracle_write_ts,
570                conn,
571                ops,
572            )
573            .await?;
574
575        for (cluster_id, replica_id) in &cluster_replicas_to_drop {
576            cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
577        }
578        for cluster_id in &clusters_to_drop {
579            cluster_replica_statuses.remove_cluster_statuses(cluster_id);
580        }
581        for cluster_id in clusters_to_create {
582            cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
583        }
584        let now = to_datetime((catalog.config().now)());
585        for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
586            let replica_id = catalog
587                .resolve_replica_in_cluster(&cluster_id, &replica_name)
588                .expect("just created")
589                .replica_id();
590            cluster_replica_statuses.initialize_cluster_replica_statuses(
591                cluster_id,
592                replica_id,
593                num_processes,
594                now,
595            );
596        }
597
598        // Append our builtin table updates, then return the notify so we can run other tasks in
599        // parallel.
600        let (builtin_update_notify, _) = self
601            .builtin_table_update()
602            .execute(builtin_table_updates)
603            .await;
604
605        // No error returns are allowed after this point. Enforce this at compile time
606        // by using this odd structure so we don't accidentally add a stray `?`.
607        let _: () = async {
608            if !timeline_associations.is_empty() {
609                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
610                    let became_empty =
611                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
612                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
613                }
614            }
615            if !table_gids_to_drop.is_empty() {
616                let ts = self.get_local_write_ts().await;
617                self.drop_tables(table_gids_to_drop, ts.timestamp);
618            }
619            // Note that we drop tables before sources since there can be a weak dependency
620            // on sources from tables in the storage controller that will result in error
621            // logging that we'd prefer to avoid. This isn't an actual dependency issue but
622            // we'd like to keep that error logging around to indicate when an actual
623            // dependency error might occur.
624            if !sources_to_drop.is_empty() {
625                self.drop_sources(sources_to_drop);
626            }
627            if !webhook_sources_to_restart.is_empty() {
628                self.restart_webhook_sources(webhook_sources_to_restart);
629            }
630            if !storage_sink_gids_to_drop.is_empty() {
631                self.drop_storage_sinks(storage_sink_gids_to_drop);
632            }
633            if !compute_sinks_to_drop.is_empty() {
634                self.retire_compute_sinks(compute_sinks_to_drop).await;
635            }
636            if !peeks_to_drop.is_empty() {
637                for (dropped_name, uuid) in peeks_to_drop {
638                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
639                        let cancel_reason = PeekResponse::Error(format!(
640                            "query could not complete because {dropped_name} was dropped"
641                        ));
642                        self.controller
643                            .compute
644                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
645                            .unwrap_or_terminate("unable to cancel peek");
646                        self.retire_execution(
647                            StatementEndedExecutionReason::Canceled,
648                            pending_peek.ctx_extra,
649                        );
650                    }
651                }
652            }
653            if !copies_to_drop.is_empty() {
654                for conn_id in copies_to_drop {
655                    self.cancel_pending_copy(&conn_id);
656                }
657            }
658            if !indexes_to_drop.is_empty() {
659                self.drop_indexes(indexes_to_drop);
660            }
661            if !materialized_views_to_drop.is_empty() {
662                self.drop_materialized_views(materialized_views_to_drop);
663            }
664            if !continual_tasks_to_drop.is_empty() {
665                self.drop_continual_tasks(continual_tasks_to_drop);
666            }
667            if !vpc_endpoints_to_drop.is_empty() {
668                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
669            }
670            if !cluster_replicas_to_drop.is_empty() {
671                fail::fail_point!("after_catalog_drop_replica");
672                for (cluster_id, replica_id) in cluster_replicas_to_drop {
673                    self.drop_replica(cluster_id, replica_id);
674                }
675            }
676            if !clusters_to_drop.is_empty() {
677                for cluster_id in clusters_to_drop {
678                    self.controller.drop_cluster(cluster_id);
679                }
680            }
681
682            // We don't want to block the main coordinator thread on cleaning
683            // up external resources (PostgreSQL replication slots and secrets),
684            // so we perform that cleanup in a background task.
685            //
686            // TODO(database-issues#4154): This is inherently best effort. An ill-timed crash
687            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
688            // See <https://github.com/MaterializeInc/database-issues/issues/4154>
689            task::spawn(|| "drop_replication_slots_and_secrets", {
690                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
691                let secrets_controller = Arc::clone(&self.secrets_controller);
692                let secrets_reader = Arc::clone(self.secrets_reader());
693                let storage_config = self.controller.storage.config().clone();
694
695                async move {
696                    for (connection, replication_slot_name) in replication_slots_to_drop {
697                        tracing::info!(?replication_slot_name, "dropping replication slot");
698
699                        // Try to drop the replication slots, but give up after
700                        // a while. The PostgreSQL server may no longer be
701                        // healthy. Users often drop PostgreSQL sources
702                        // *because* the PostgreSQL server has been
703                        // decomissioned.
704                        let result: Result<(), anyhow::Error> = Retry::default()
705                            .max_duration(Duration::from_secs(60))
706                            .retry_async(|_state| async {
707                                let config = connection
708                                    .config(&secrets_reader, &storage_config, InTask::No)
709                                    .await
710                                    .map_err(|e| {
711                                        anyhow::anyhow!(
712                                            "error creating Postgres client for \
713                                            dropping acquired slots: {}",
714                                            e.display_with_causes()
715                                        )
716                                    })?;
717                                // TODO (maz): since this is always true now, can we drop it?
718                                mz_postgres_util::drop_replication_slots(
719                                    &ssh_tunnel_manager,
720                                    config.clone(),
721                                    &[(&replication_slot_name, true)],
722                                )
723                                .await?;
724
725                                Ok(())
726                            })
727                            .await;
728
729                        if let Err(err) = result {
730                            tracing::warn!(
731                                ?replication_slot_name,
732                                ?err,
733                                "failed to drop replication slot"
734                            );
735                        }
736                    }
737
738                    // Drop secrets *after* dropping the replication slots,
739                    // because those replication slots may.
740                    //
741                    // It's okay if we crash before processing the secret drops,
742                    // as we look for and remove any orphaned secrets during
743                    // startup.
744                    fail_point!("drop_secrets");
745                    for secret in secrets_to_drop {
746                        if let Err(e) = secrets_controller.delete(secret).await {
747                            warn!("Dropping secrets has encountered an error: {}", e);
748                        }
749                    }
750                }
751            });
752
753            if update_metrics_config {
754                mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
755            }
756            if update_controller_config {
757                self.update_controller_config();
758            }
759            if update_compute_config {
760                self.update_compute_config();
761            }
762            if update_storage_config {
763                self.update_storage_config();
764            }
765            if update_pg_timestamp_oracle_config {
766                self.update_pg_timestamp_oracle_config();
767            }
768            if update_metrics_retention {
769                self.update_metrics_retention();
770            }
771            if update_tracing_config {
772                self.update_tracing_config();
773            }
774            if update_secrets_caching_config {
775                self.update_secrets_caching_config();
776            }
777            if update_cluster_scheduling_config {
778                self.update_cluster_scheduling_config();
779            }
780            if update_http_config {
781                self.update_http_config();
782            }
783        }
784        .instrument(info_span!("coord::catalog_transact_with::finalize"))
785        .await;
786
787        let conn = conn_id.and_then(|id| self.active_conns.get(id));
788        if let Some(segment_client) = &self.segment_client {
789            for VersionedEvent::V1(event) in audit_events {
790                let event_type = format!(
791                    "{} {}",
792                    event.object_type.as_title_case(),
793                    event.event_type.as_title_case()
794                );
795                segment_client.environment_track(
796                    &self.catalog().config().environment_id,
797                    event_type,
798                    json!({ "details": event.details.as_json() }),
799                    EventDetails {
800                        user_id: conn
801                            .and_then(|c| c.user().external_metadata.as_ref())
802                            .map(|m| m.user_id),
803                        application_name: conn.map(|c| c.application_name()),
804                        ..Default::default()
805                    },
806                );
807            }
808        }
809
810        // Note: It's important that we keep the function call inside macro, this way we only run
811        // the consistency checks if sort assertions are enabled.
812        mz_ore::soft_assert_eq_no_log!(
813            self.check_consistency(),
814            Ok(()),
815            "coordinator inconsistency detected"
816        );
817
818        Ok(builtin_update_notify)
819    }
820
821    fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
822        self.drop_introspection_subscribes(replica_id);
823
824        self.controller
825            .drop_replica(cluster_id, replica_id)
826            .expect("dropping replica must not fail");
827    }
828
829    /// A convenience method for dropping sources.
830    fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
831        for (item_id, _gid) in &sources {
832            self.active_webhooks.remove(item_id);
833        }
834        let storage_metadata = self.catalog.state().storage_metadata();
835        let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
836        self.controller
837            .storage
838            .drop_sources(storage_metadata, source_gids)
839            .unwrap_or_terminate("cannot fail to drop sources");
840    }
841
842    fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
843        for (item_id, _gid) in &tables {
844            self.active_webhooks.remove(item_id);
845        }
846        let storage_metadata = self.catalog.state().storage_metadata();
847        let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
848        self.controller
849            .storage
850            .drop_tables(storage_metadata, table_gids, ts)
851            .unwrap_or_terminate("cannot fail to drop tables");
852    }
853
854    fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
855        for id in sources {
856            self.active_webhooks.remove(&id);
857        }
858    }
859
860    /// Like `drop_compute_sinks`, but for a single compute sink.
861    ///
862    /// Returns the controller's state for the compute sink if the identified
863    /// sink was known to the controller. It is the caller's responsibility to
864    /// retire the returned sink. Consider using `retire_compute_sinks` instead.
865    #[must_use]
866    pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
867        self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
868    }
869
870    /// Drops a batch of compute sinks.
871    ///
872    /// For each sink that exists, the coordinator and controller's state
873    /// associated with the sink is removed.
874    ///
875    /// Returns a map containing the controller's state for each sink that was
876    /// removed. It is the caller's responsibility to retire the returned sinks.
877    /// Consider using `retire_compute_sinks` instead.
878    #[must_use]
879    pub async fn drop_compute_sinks(
880        &mut self,
881        sink_ids: impl IntoIterator<Item = GlobalId>,
882    ) -> BTreeMap<GlobalId, ActiveComputeSink> {
883        let mut by_id = BTreeMap::new();
884        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
885        for sink_id in sink_ids {
886            let sink = match self.remove_active_compute_sink(sink_id).await {
887                None => {
888                    tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
889                    continue;
890                }
891                Some(sink) => sink,
892            };
893
894            by_cluster
895                .entry(sink.cluster_id())
896                .or_default()
897                .push(sink_id);
898            by_id.insert(sink_id, sink);
899        }
900        for (cluster_id, ids) in by_cluster {
901            let compute = &mut self.controller.compute;
902            // A cluster could have been dropped, so verify it exists.
903            if compute.instance_exists(cluster_id) {
904                compute
905                    .drop_collections(cluster_id, ids)
906                    .unwrap_or_terminate("cannot fail to drop collections");
907            }
908        }
909        by_id
910    }
911
912    /// Retires a batch of sinks with disparate reasons for retirement.
913    ///
914    /// Each sink identified in `reasons` is dropped (see `drop_compute_sinks`),
915    /// then retired with its corresponding reason.
916    pub async fn retire_compute_sinks(
917        &mut self,
918        mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
919    ) {
920        let sink_ids = reasons.keys().cloned();
921        for (id, sink) in self.drop_compute_sinks(sink_ids).await {
922            let reason = reasons
923                .remove(&id)
924                .expect("all returned IDs are in `reasons`");
925            sink.retire(reason);
926        }
927    }
928
929    /// Drops all pending replicas for a set of clusters
930    /// that are undergoing reconfiguration.
931    pub async fn drop_reconfiguration_replicas(
932        &mut self,
933        cluster_ids: BTreeSet<ClusterId>,
934    ) -> Result<(), AdapterError> {
935        let pending_cluster_ops: Vec<Op> = cluster_ids
936            .iter()
937            .map(|c| {
938                self.catalog()
939                    .get_cluster(c.clone())
940                    .replicas()
941                    .filter_map(|r| match r.config.location {
942                        ReplicaLocation::Managed(ref l) if l.pending => {
943                            Some(DropObjectInfo::ClusterReplica((
944                                c.clone(),
945                                r.replica_id,
946                                ReplicaCreateDropReason::Manual,
947                            )))
948                        }
949                        _ => None,
950                    })
951                    .collect::<Vec<DropObjectInfo>>()
952            })
953            .filter_map(|pending_replica_drop_ops_by_cluster| {
954                match pending_replica_drop_ops_by_cluster.len() {
955                    0 => None,
956                    _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
957                }
958            })
959            .collect();
960        if !pending_cluster_ops.is_empty() {
961            self.catalog_transact(None, pending_cluster_ops).await?;
962        }
963        Ok(())
964    }
965
966    /// Cancels all active compute sinks for the identified connection.
967    #[mz_ore::instrument(level = "debug")]
968    pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
969        self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
970            .await
971    }
972
973    /// Cancels all active cluster reconfigurations sinks for the identified connection.
974    #[mz_ore::instrument(level = "debug")]
975    pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
976        &mut self,
977        conn_id: &ConnectionId,
978    ) {
979        self.retire_cluster_reconfigurations_for_conn(conn_id).await
980    }
981
982    /// Retires all active compute sinks for the identified connection with the
983    /// specified reason.
984    #[mz_ore::instrument(level = "debug")]
985    pub(crate) async fn retire_compute_sinks_for_conn(
986        &mut self,
987        conn_id: &ConnectionId,
988        reason: ActiveComputeSinkRetireReason,
989    ) {
990        let drop_sinks = self
991            .active_conns
992            .get_mut(conn_id)
993            .expect("must exist for active session")
994            .drop_sinks
995            .iter()
996            .map(|sink_id| (*sink_id, reason.clone()))
997            .collect();
998        self.retire_compute_sinks(drop_sinks).await;
999    }
1000
1001    /// Cleans pending cluster reconfiguraiotns for the identified connection
1002    #[mz_ore::instrument(level = "debug")]
1003    pub(crate) async fn retire_cluster_reconfigurations_for_conn(
1004        &mut self,
1005        conn_id: &ConnectionId,
1006    ) {
1007        let reconfiguring_clusters = self
1008            .active_conns
1009            .get(conn_id)
1010            .expect("must exist for active session")
1011            .pending_cluster_alters
1012            .clone();
1013        // try to drop reconfig replicas
1014        self.drop_reconfiguration_replicas(reconfiguring_clusters)
1015            .await
1016            .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
1017
1018        self.active_conns
1019            .get_mut(conn_id)
1020            .expect("must exist for active session")
1021            .pending_cluster_alters
1022            .clear();
1023    }
1024
1025    pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
1026        let storage_metadata = self.catalog.state().storage_metadata();
1027        self.controller
1028            .storage
1029            .drop_sinks(storage_metadata, sink_gids)
1030            .unwrap_or_terminate("cannot fail to drop sinks");
1031    }
1032
1033    pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
1034        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1035        for (cluster_id, gid) in indexes {
1036            by_cluster.entry(cluster_id).or_default().push(gid);
1037        }
1038        for (cluster_id, gids) in by_cluster {
1039            let compute = &mut self.controller.compute;
1040            // A cluster could have been dropped, so verify it exists.
1041            if compute.instance_exists(cluster_id) {
1042                compute
1043                    .drop_collections(cluster_id, gids)
1044                    .unwrap_or_terminate("cannot fail to drop collections");
1045            }
1046        }
1047    }
1048
1049    /// A convenience method for dropping materialized views.
1050    fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
1051        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1052        let mut mv_gids = Vec::new();
1053        for (cluster_id, gid) in mviews {
1054            by_cluster.entry(cluster_id).or_default().push(gid);
1055            mv_gids.push(gid);
1056        }
1057
1058        // Drop compute sinks.
1059        for (cluster_id, ids) in by_cluster {
1060            let compute = &mut self.controller.compute;
1061            // A cluster could have been dropped, so verify it exists.
1062            if compute.instance_exists(cluster_id) {
1063                compute
1064                    .drop_collections(cluster_id, ids)
1065                    .unwrap_or_terminate("cannot fail to drop collections");
1066            }
1067        }
1068
1069        // Drop storage resources.
1070        let storage_metadata = self.catalog.state().storage_metadata();
1071        self.controller
1072            .storage
1073            .drop_sources(storage_metadata, mv_gids)
1074            .unwrap_or_terminate("cannot fail to drop sources");
1075    }
1076
1077    /// A convenience method for dropping continual tasks.
1078    fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
1079        let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1080        let mut source_ids = Vec::new();
1081        for (item_id, cluster_id, gid) in cts {
1082            by_cluster.entry(cluster_id).or_default().push(gid);
1083            source_ids.push((item_id, gid));
1084        }
1085
1086        // Drop compute sinks.
1087        for (cluster_id, ids) in by_cluster {
1088            let compute = &mut self.controller.compute;
1089            // A cluster could have been dropped, so verify it exists.
1090            if compute.instance_exists(cluster_id) {
1091                compute
1092                    .drop_collections(cluster_id, ids)
1093                    .unwrap_or_terminate("cannot fail to drop collections");
1094            }
1095        }
1096
1097        // Drop storage sources.
1098        self.drop_sources(source_ids)
1099    }
1100
1101    fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
1102        let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
1103            .as_ref()
1104            .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
1105            .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
1106        // We don't want to block the coordinator on an external delete api
1107        // calls, so move the drop vpc_endpoint to a separate task. This does
1108        // mean that a failed drop won't bubble up to the user as an error
1109        // message. However, even if it did (and how the code previously
1110        // worked), mz has already dropped it from our catalog, and so we
1111        // wouldn't be able to retry anyway. Any orphaned vpc_endpoints will
1112        // eventually be cleaned during restart via coord bootstrap.
1113        task::spawn(
1114            || "drop_vpc_endpoints",
1115            async move {
1116                for vpc_endpoint in vpc_endpoints {
1117                    let _ = Retry::default()
1118                        .max_duration(Duration::from_secs(60))
1119                        .retry_async(|_state| async {
1120                            fail_point!("drop_vpc_endpoint", |r| {
1121                                Err(anyhow::anyhow!("Fail point error {:?}", r))
1122                            });
1123                            match cloud_resource_controller
1124                                .delete_vpc_endpoint(vpc_endpoint)
1125                                .await
1126                            {
1127                                Ok(_) => Ok(()),
1128                                Err(e) => {
1129                                    warn!("Dropping VPC Endpoints has encountered an error: {}", e);
1130                                    Err(e)
1131                                }
1132                            }
1133                        })
1134                        .await;
1135                }
1136            }
1137            .instrument(info_span!(
1138                "coord::catalog_transact_inner::drop_vpc_endpoints"
1139            )),
1140        );
1141    }
1142
1143    /// Removes all temporary items created by the specified connection, though
1144    /// not the temporary schema itself.
1145    pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
1146        let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
1147        let all_items = self.catalog().object_dependents(&temp_items, conn_id);
1148
1149        if all_items.is_empty() {
1150            return;
1151        }
1152        let op = Op::DropObjects(
1153            all_items
1154                .into_iter()
1155                .map(DropObjectInfo::manual_drop_from_object_id)
1156                .collect(),
1157        );
1158
1159        self.catalog_transact_conn(Some(conn_id), vec![op])
1160            .await
1161            .expect("unable to drop temporary items for conn_id");
1162    }
1163
1164    fn update_cluster_scheduling_config(&self) {
1165        let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
1166        self.controller
1167            .update_orchestrator_scheduling_config(config);
1168    }
1169
1170    fn update_secrets_caching_config(&self) {
1171        let config = flags::caching_config(self.catalog.system_config());
1172        self.caching_secrets_reader.set_policy(config);
1173    }
1174
1175    fn update_tracing_config(&self) {
1176        let tracing = flags::tracing_config(self.catalog().system_config());
1177        tracing.apply(&self.tracing_handle);
1178    }
1179
1180    fn update_compute_config(&mut self) {
1181        let config_params = flags::compute_config(self.catalog().system_config());
1182        self.controller.compute.update_configuration(config_params);
1183    }
1184
1185    fn update_storage_config(&mut self) {
1186        let config_params = flags::storage_config(self.catalog().system_config());
1187        self.controller.storage.update_parameters(config_params);
1188    }
1189
1190    fn update_pg_timestamp_oracle_config(&self) {
1191        let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
1192        if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
1193            config_params.apply(config)
1194        }
1195    }
1196
1197    fn update_metrics_retention(&self) {
1198        let duration = self.catalog().system_config().metrics_retention();
1199        let policy = ReadPolicy::lag_writes_by(
1200            Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
1201                tracing::error!("Absurd metrics retention duration: {duration:?}.");
1202                u64::MAX
1203            })),
1204            SINCE_GRANULARITY,
1205        );
1206        let storage_policies = self
1207            .catalog()
1208            .entries()
1209            .filter(|entry| {
1210                entry.item().is_retained_metrics_object()
1211                    && entry.item().is_compute_object_on_cluster().is_none()
1212            })
1213            .map(|entry| (entry.id(), policy.clone()))
1214            .collect::<Vec<_>>();
1215        let compute_policies = self
1216            .catalog()
1217            .entries()
1218            .filter_map(|entry| {
1219                if let (true, Some(cluster_id)) = (
1220                    entry.item().is_retained_metrics_object(),
1221                    entry.item().is_compute_object_on_cluster(),
1222                ) {
1223                    Some((cluster_id, entry.id(), policy.clone()))
1224                } else {
1225                    None
1226                }
1227            })
1228            .collect::<Vec<_>>();
1229        self.update_storage_read_policies(storage_policies);
1230        self.update_compute_read_policies(compute_policies);
1231    }
1232
1233    fn update_controller_config(&mut self) {
1234        let sys_config = self.catalog().system_config();
1235        self.controller
1236            .update_configuration(sys_config.dyncfg_updates());
1237    }
1238
1239    fn update_http_config(&mut self) {
1240        let webhook_request_limit = self
1241            .catalog()
1242            .system_config()
1243            .webhook_concurrent_request_limit();
1244        self.webhook_concurrency_limit
1245            .set_limit(webhook_request_limit);
1246    }
1247
1248    pub(crate) async fn create_storage_export(
1249        &mut self,
1250        id: GlobalId,
1251        sink: &Sink,
1252    ) -> Result<(), AdapterError> {
1253        // Validate `sink.from` is in fact a storage collection
1254        self.controller.storage.check_exists(sink.from)?;
1255
1256        // The AsOf is used to determine at what time to snapshot reading from
1257        // the persist collection.  This is primarily relevant when we do _not_
1258        // want to include the snapshot in the sink.
1259        //
1260        // We choose the smallest as_of that is legal, according to the sinked
1261        // collection's since.
1262        let id_bundle = crate::CollectionIdBundle {
1263            storage_ids: btreeset! {sink.from},
1264            compute_ids: btreemap! {},
1265        };
1266
1267        // We're putting in place read holds, such that create_exports, below,
1268        // which calls update_read_capabilities, can successfully do so.
1269        // Otherwise, the since of dependencies might move along concurrently,
1270        // pulling the rug from under us!
1271        //
1272        // TODO: Maybe in the future, pass those holds on to storage, to hold on
1273        // to them and downgrade when possible?
1274        let read_holds = self.acquire_read_holds(&id_bundle);
1275        let as_of = read_holds.least_valid_read();
1276
1277        let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1278        let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1279            from: sink.from,
1280            from_desc: storage_sink_from_entry
1281                .desc(&self.catalog().resolve_full_name(
1282                    storage_sink_from_entry.name(),
1283                    storage_sink_from_entry.conn_id(),
1284                ))
1285                .expect("sinks can only be built on items with descs")
1286                .into_owned(),
1287            connection: sink
1288                .connection
1289                .clone()
1290                .into_inline_connection(self.catalog().state()),
1291            envelope: sink.envelope,
1292            as_of,
1293            with_snapshot: sink.with_snapshot,
1294            version: sink.version,
1295            from_storage_metadata: (),
1296            to_storage_metadata: (),
1297        };
1298
1299        let collection_desc = CollectionDescription {
1300            // TODO(sinks): make generic once we have more than one sink type.
1301            desc: KAFKA_PROGRESS_DESC.clone(),
1302            data_source: DataSource::Sink {
1303                desc: ExportDescription {
1304                    sink: storage_sink_desc,
1305                    instance_id: sink.cluster_id,
1306                },
1307            },
1308            since: None,
1309            status_collection_id: None,
1310            timeline: None,
1311        };
1312        let collections = vec![(id, collection_desc)];
1313
1314        // Create the collections.
1315        let storage_metadata = self.catalog.state().storage_metadata();
1316        let res = self
1317            .controller
1318            .storage
1319            .create_collections(storage_metadata, None, collections)
1320            .await;
1321
1322        // Drop read holds after the export has been created, at which point
1323        // storage will have put in its own read holds.
1324        drop(read_holds);
1325
1326        Ok(res?)
1327    }
1328
1329    /// Validate all resource limits in a catalog transaction and return an error if that limit is
1330    /// exceeded.
1331    fn validate_resource_limits(
1332        &self,
1333        ops: &Vec<catalog::Op>,
1334        conn_id: &ConnectionId,
1335    ) -> Result<(), AdapterError> {
1336        let mut new_kafka_connections = 0;
1337        let mut new_postgres_connections = 0;
1338        let mut new_mysql_connections = 0;
1339        let mut new_sql_server_connections = 0;
1340        let mut new_aws_privatelink_connections = 0;
1341        let mut new_tables = 0;
1342        let mut new_sources = 0;
1343        let mut new_sinks = 0;
1344        let mut new_materialized_views = 0;
1345        let mut new_clusters = 0;
1346        let mut new_replicas_per_cluster = BTreeMap::new();
1347        let mut new_credit_consumption_rate = Numeric::zero();
1348        let mut new_databases = 0;
1349        let mut new_schemas_per_database = BTreeMap::new();
1350        let mut new_objects_per_schema = BTreeMap::new();
1351        let mut new_secrets = 0;
1352        let mut new_roles = 0;
1353        let mut new_continual_tasks = 0;
1354        let mut new_network_policies = 0;
1355        for op in ops {
1356            match op {
1357                Op::CreateDatabase { .. } => {
1358                    new_databases += 1;
1359                }
1360                Op::CreateSchema { database_id, .. } => {
1361                    if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1362                        *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1363                    }
1364                }
1365                Op::CreateRole { .. } => {
1366                    new_roles += 1;
1367                }
1368                Op::CreateNetworkPolicy { .. } => {
1369                    new_network_policies += 1;
1370                }
1371                Op::CreateCluster { .. } => {
1372                    // TODO(benesch): having deprecated linked clusters, remove
1373                    // the `max_sources` and `max_sinks` limit, and set a higher
1374                    // max cluster limit?
1375                    new_clusters += 1;
1376                }
1377                Op::CreateClusterReplica {
1378                    cluster_id, config, ..
1379                } => {
1380                    if cluster_id.is_user() {
1381                        *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1382                        if let ReplicaLocation::Managed(location) = &config.location {
1383                            let replica_allocation = self
1384                                .catalog()
1385                                .cluster_replica_sizes()
1386                                .0
1387                                .get(location.size_for_billing())
1388                                .expect(
1389                                    "location size is validated against the cluster replica sizes",
1390                                );
1391                            new_credit_consumption_rate += replica_allocation.credits_per_hour
1392                        }
1393                    }
1394                }
1395                Op::CreateItem { name, item, .. } => {
1396                    *new_objects_per_schema
1397                        .entry((
1398                            name.qualifiers.database_spec.clone(),
1399                            name.qualifiers.schema_spec.clone(),
1400                        ))
1401                        .or_insert(0) += 1;
1402                    match item {
1403                        CatalogItem::Connection(connection) => match connection.details {
1404                            ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1405                            ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1406                            ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1407                            ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1408                            ConnectionDetails::AwsPrivatelink(_) => {
1409                                new_aws_privatelink_connections += 1
1410                            }
1411                            ConnectionDetails::Csr(_)
1412                            | ConnectionDetails::Ssh { .. }
1413                            | ConnectionDetails::Aws(_)
1414                            | ConnectionDetails::IcebergCatalog(_) => {}
1415                        },
1416                        CatalogItem::Table(_) => {
1417                            new_tables += 1;
1418                        }
1419                        CatalogItem::Source(source) => {
1420                            new_sources += source.user_controllable_persist_shard_count()
1421                        }
1422                        CatalogItem::Sink(_) => new_sinks += 1,
1423                        CatalogItem::MaterializedView(_) => {
1424                            new_materialized_views += 1;
1425                        }
1426                        CatalogItem::Secret(_) => {
1427                            new_secrets += 1;
1428                        }
1429                        CatalogItem::ContinualTask(_) => {
1430                            new_continual_tasks += 1;
1431                        }
1432                        CatalogItem::Log(_)
1433                        | CatalogItem::View(_)
1434                        | CatalogItem::Index(_)
1435                        | CatalogItem::Type(_)
1436                        | CatalogItem::Func(_) => {}
1437                    }
1438                }
1439                Op::DropObjects(drop_object_infos) => {
1440                    for drop_object_info in drop_object_infos {
1441                        match drop_object_info {
1442                            DropObjectInfo::Cluster(_) => {
1443                                new_clusters -= 1;
1444                            }
1445                            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1446                                if cluster_id.is_user() {
1447                                    *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1448                                    let cluster = self
1449                                        .catalog()
1450                                        .get_cluster_replica(*cluster_id, *replica_id);
1451                                    if let ReplicaLocation::Managed(location) =
1452                                        &cluster.config.location
1453                                    {
1454                                        let replica_allocation = self
1455                                            .catalog()
1456                                            .cluster_replica_sizes()
1457                                            .0
1458                                            .get(location.size_for_billing())
1459                                            .expect(
1460                                                "location size is validated against the cluster replica sizes",
1461                                            );
1462                                        new_credit_consumption_rate -=
1463                                            replica_allocation.credits_per_hour
1464                                    }
1465                                }
1466                            }
1467                            DropObjectInfo::Database(_) => {
1468                                new_databases -= 1;
1469                            }
1470                            DropObjectInfo::Schema((database_spec, _)) => {
1471                                if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1472                                    *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1473                                }
1474                            }
1475                            DropObjectInfo::Role(_) => {
1476                                new_roles -= 1;
1477                            }
1478                            DropObjectInfo::NetworkPolicy(_) => {
1479                                new_network_policies -= 1;
1480                            }
1481                            DropObjectInfo::Item(id) => {
1482                                let entry = self.catalog().get_entry(id);
1483                                *new_objects_per_schema
1484                                    .entry((
1485                                        entry.name().qualifiers.database_spec.clone(),
1486                                        entry.name().qualifiers.schema_spec.clone(),
1487                                    ))
1488                                    .or_insert(0) -= 1;
1489                                match entry.item() {
1490                                    CatalogItem::Connection(connection) => match connection.details
1491                                    {
1492                                        ConnectionDetails::AwsPrivatelink(_) => {
1493                                            new_aws_privatelink_connections -= 1;
1494                                        }
1495                                        _ => (),
1496                                    },
1497                                    CatalogItem::Table(_) => {
1498                                        new_tables -= 1;
1499                                    }
1500                                    CatalogItem::Source(source) => {
1501                                        new_sources -=
1502                                            source.user_controllable_persist_shard_count()
1503                                    }
1504                                    CatalogItem::Sink(_) => new_sinks -= 1,
1505                                    CatalogItem::MaterializedView(_) => {
1506                                        new_materialized_views -= 1;
1507                                    }
1508                                    CatalogItem::Secret(_) => {
1509                                        new_secrets -= 1;
1510                                    }
1511                                    CatalogItem::ContinualTask(_) => {
1512                                        new_continual_tasks -= 1;
1513                                    }
1514                                    CatalogItem::Log(_)
1515                                    | CatalogItem::View(_)
1516                                    | CatalogItem::Index(_)
1517                                    | CatalogItem::Type(_)
1518                                    | CatalogItem::Func(_) => {}
1519                                }
1520                            }
1521                        }
1522                    }
1523                }
1524                Op::UpdateItem {
1525                    name: _,
1526                    id,
1527                    to_item,
1528                } => match to_item {
1529                    CatalogItem::Source(source) => {
1530                        let current_source = self
1531                            .catalog()
1532                            .get_entry(id)
1533                            .source()
1534                            .expect("source update is for source item");
1535
1536                        new_sources += source.user_controllable_persist_shard_count()
1537                            - current_source.user_controllable_persist_shard_count();
1538                    }
1539                    CatalogItem::Connection(_)
1540                    | CatalogItem::Table(_)
1541                    | CatalogItem::Sink(_)
1542                    | CatalogItem::MaterializedView(_)
1543                    | CatalogItem::Secret(_)
1544                    | CatalogItem::Log(_)
1545                    | CatalogItem::View(_)
1546                    | CatalogItem::Index(_)
1547                    | CatalogItem::Type(_)
1548                    | CatalogItem::Func(_)
1549                    | CatalogItem::ContinualTask(_) => {}
1550                },
1551                Op::AlterRole { .. }
1552                | Op::AlterRetainHistory { .. }
1553                | Op::AlterNetworkPolicy { .. }
1554                | Op::AlterAddColumn { .. }
1555                | Op::UpdatePrivilege { .. }
1556                | Op::UpdateDefaultPrivilege { .. }
1557                | Op::GrantRole { .. }
1558                | Op::RenameCluster { .. }
1559                | Op::RenameClusterReplica { .. }
1560                | Op::RenameItem { .. }
1561                | Op::RenameSchema { .. }
1562                | Op::UpdateOwner { .. }
1563                | Op::RevokeRole { .. }
1564                | Op::UpdateClusterConfig { .. }
1565                | Op::UpdateClusterReplicaConfig { .. }
1566                | Op::UpdateSourceReferences { .. }
1567                | Op::UpdateSystemConfiguration { .. }
1568                | Op::ResetSystemConfiguration { .. }
1569                | Op::ResetAllSystemConfiguration { .. }
1570                | Op::Comment { .. }
1571                | Op::WeirdStorageUsageUpdates { .. }
1572                | Op::TransactionDryRun => {}
1573            }
1574        }
1575
1576        let mut current_aws_privatelink_connections = 0;
1577        let mut current_postgres_connections = 0;
1578        let mut current_mysql_connections = 0;
1579        let mut current_sql_server_connections = 0;
1580        let mut current_kafka_connections = 0;
1581        for c in self.catalog().user_connections() {
1582            let connection = c
1583                .connection()
1584                .expect("`user_connections()` only returns connection objects");
1585
1586            match connection.details {
1587                ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1588                ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1589                ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1590                ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1591                ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1592                ConnectionDetails::Csr(_)
1593                | ConnectionDetails::Ssh { .. }
1594                | ConnectionDetails::Aws(_)
1595                | ConnectionDetails::IcebergCatalog(_) => {}
1596            }
1597        }
1598        self.validate_resource_limit(
1599            current_kafka_connections,
1600            new_kafka_connections,
1601            SystemVars::max_kafka_connections,
1602            "Kafka Connection",
1603            MAX_KAFKA_CONNECTIONS.name(),
1604        )?;
1605        self.validate_resource_limit(
1606            current_postgres_connections,
1607            new_postgres_connections,
1608            SystemVars::max_postgres_connections,
1609            "PostgreSQL Connection",
1610            MAX_POSTGRES_CONNECTIONS.name(),
1611        )?;
1612        self.validate_resource_limit(
1613            current_mysql_connections,
1614            new_mysql_connections,
1615            SystemVars::max_mysql_connections,
1616            "MySQL Connection",
1617            MAX_MYSQL_CONNECTIONS.name(),
1618        )?;
1619        self.validate_resource_limit(
1620            current_sql_server_connections,
1621            new_sql_server_connections,
1622            SystemVars::max_sql_server_connections,
1623            "SQL Server Connection",
1624            MAX_SQL_SERVER_CONNECTIONS.name(),
1625        )?;
1626        self.validate_resource_limit(
1627            current_aws_privatelink_connections,
1628            new_aws_privatelink_connections,
1629            SystemVars::max_aws_privatelink_connections,
1630            "AWS PrivateLink Connection",
1631            MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1632        )?;
1633        self.validate_resource_limit(
1634            self.catalog().user_tables().count(),
1635            new_tables,
1636            SystemVars::max_tables,
1637            "table",
1638            MAX_TABLES.name(),
1639        )?;
1640
1641        let current_sources: usize = self
1642            .catalog()
1643            .user_sources()
1644            .filter_map(|source| source.source())
1645            .map(|source| source.user_controllable_persist_shard_count())
1646            .sum::<i64>()
1647            .try_into()
1648            .expect("non-negative sum of sources");
1649
1650        self.validate_resource_limit(
1651            current_sources,
1652            new_sources,
1653            SystemVars::max_sources,
1654            "source",
1655            MAX_SOURCES.name(),
1656        )?;
1657        self.validate_resource_limit(
1658            self.catalog().user_sinks().count(),
1659            new_sinks,
1660            SystemVars::max_sinks,
1661            "sink",
1662            MAX_SINKS.name(),
1663        )?;
1664        self.validate_resource_limit(
1665            self.catalog().user_materialized_views().count(),
1666            new_materialized_views,
1667            SystemVars::max_materialized_views,
1668            "materialized view",
1669            MAX_MATERIALIZED_VIEWS.name(),
1670        )?;
1671        self.validate_resource_limit(
1672            // Linked compute clusters don't count against the limit, since
1673            // we have a separate sources and sinks limit.
1674            //
1675            // TODO(benesch): remove the `max_sources` and `max_sinks` limit,
1676            // and set a higher max cluster limit?
1677            self.catalog().user_clusters().count(),
1678            new_clusters,
1679            SystemVars::max_clusters,
1680            "cluster",
1681            MAX_CLUSTERS.name(),
1682        )?;
1683        for (cluster_id, new_replicas) in new_replicas_per_cluster {
1684            // It's possible that the cluster hasn't been created yet.
1685            let current_amount = self
1686                .catalog()
1687                .try_get_cluster(cluster_id)
1688                .map(|instance| instance.user_replicas().count())
1689                .unwrap_or(0);
1690            self.validate_resource_limit(
1691                current_amount,
1692                new_replicas,
1693                SystemVars::max_replicas_per_cluster,
1694                "cluster replica",
1695                MAX_REPLICAS_PER_CLUSTER.name(),
1696            )?;
1697        }
1698        let current_credit_consumption_rate = self
1699            .catalog()
1700            .user_cluster_replicas()
1701            .filter_map(|replica| match &replica.config.location {
1702                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
1703                ReplicaLocation::Unmanaged(_) => None,
1704            })
1705            .map(|size| {
1706                self.catalog()
1707                    .cluster_replica_sizes()
1708                    .0
1709                    .get(size)
1710                    .expect("location size is validated against the cluster replica sizes")
1711                    .credits_per_hour
1712            })
1713            .sum();
1714        self.validate_resource_limit_numeric(
1715            current_credit_consumption_rate,
1716            new_credit_consumption_rate,
1717            |system_vars| {
1718                self.license_key
1719                    .max_credit_consumption_rate()
1720                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1721            },
1722            "cluster replica",
1723            MAX_CREDIT_CONSUMPTION_RATE.name(),
1724        )?;
1725        self.validate_resource_limit(
1726            self.catalog().databases().count(),
1727            new_databases,
1728            SystemVars::max_databases,
1729            "database",
1730            MAX_DATABASES.name(),
1731        )?;
1732        for (database_id, new_schemas) in new_schemas_per_database {
1733            self.validate_resource_limit(
1734                self.catalog().get_database(database_id).schemas_by_id.len(),
1735                new_schemas,
1736                SystemVars::max_schemas_per_database,
1737                "schema",
1738                MAX_SCHEMAS_PER_DATABASE.name(),
1739            )?;
1740        }
1741        for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1742            self.validate_resource_limit(
1743                self.catalog()
1744                    .get_schema(&database_spec, &schema_spec, conn_id)
1745                    .items
1746                    .len(),
1747                new_objects,
1748                SystemVars::max_objects_per_schema,
1749                "object",
1750                MAX_OBJECTS_PER_SCHEMA.name(),
1751            )?;
1752        }
1753        self.validate_resource_limit(
1754            self.catalog().user_secrets().count(),
1755            new_secrets,
1756            SystemVars::max_secrets,
1757            "secret",
1758            MAX_SECRETS.name(),
1759        )?;
1760        self.validate_resource_limit(
1761            self.catalog().user_roles().count(),
1762            new_roles,
1763            SystemVars::max_roles,
1764            "role",
1765            MAX_ROLES.name(),
1766        )?;
1767        self.validate_resource_limit(
1768            self.catalog().user_continual_tasks().count(),
1769            new_continual_tasks,
1770            SystemVars::max_continual_tasks,
1771            "continual_task",
1772            MAX_CONTINUAL_TASKS.name(),
1773        )?;
1774        self.validate_resource_limit(
1775            self.catalog().user_continual_tasks().count(),
1776            new_network_policies,
1777            SystemVars::max_network_policies,
1778            "network_policy",
1779            MAX_NETWORK_POLICIES.name(),
1780        )?;
1781        Ok(())
1782    }
1783
1784    /// Validate a specific type of resource limit and return an error if that limit is exceeded.
1785    pub(crate) fn validate_resource_limit<F>(
1786        &self,
1787        current_amount: usize,
1788        new_instances: i64,
1789        resource_limit: F,
1790        resource_type: &str,
1791        limit_name: &str,
1792    ) -> Result<(), AdapterError>
1793    where
1794        F: Fn(&SystemVars) -> u32,
1795    {
1796        if new_instances <= 0 {
1797            return Ok(());
1798        }
1799
1800        let limit: i64 = resource_limit(self.catalog().system_config()).into();
1801        let current_amount: Option<i64> = current_amount.try_into().ok();
1802        let desired =
1803            current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1804
1805        let exceeds_limit = if let Some(desired) = desired {
1806            desired > limit
1807        } else {
1808            true
1809        };
1810
1811        let desired = desired
1812            .map(|desired| desired.to_string())
1813            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1814        let current = current_amount
1815            .map(|current| current.to_string())
1816            .unwrap_or_else(|| format!("more than {}", i64::MAX));
1817        if exceeds_limit {
1818            Err(AdapterError::ResourceExhaustion {
1819                resource_type: resource_type.to_string(),
1820                limit_name: limit_name.to_string(),
1821                desired,
1822                limit: limit.to_string(),
1823                current,
1824            })
1825        } else {
1826            Ok(())
1827        }
1828    }
1829
1830    /// Validate a specific type of float resource limit and return an error if that limit is exceeded.
1831    ///
1832    /// This is very similar to [`Self::validate_resource_limit`] but for numerics.
1833    fn validate_resource_limit_numeric<F>(
1834        &self,
1835        current_amount: Numeric,
1836        new_amount: Numeric,
1837        resource_limit: F,
1838        resource_type: &str,
1839        limit_name: &str,
1840    ) -> Result<(), AdapterError>
1841    where
1842        F: Fn(&SystemVars) -> Numeric,
1843    {
1844        if new_amount <= Numeric::zero() {
1845            return Ok(());
1846        }
1847
1848        let limit = resource_limit(self.catalog().system_config());
1849        // Floats will overflow to infinity instead of panicking, which has the correct comparison
1850        // semantics.
1851        // NaN should be impossible here since both values are positive.
1852        let desired = current_amount + new_amount;
1853        if desired > limit {
1854            Err(AdapterError::ResourceExhaustion {
1855                resource_type: resource_type.to_string(),
1856                limit_name: limit_name.to_string(),
1857                desired: desired.to_string(),
1858                limit: limit.to_string(),
1859                current: current_amount.to_string(),
1860            })
1861        } else {
1862            Ok(())
1863        }
1864    }
1865}