mz_adapter/coord/
catalog_implications.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to deriving and applying implications from [catalog
11//! changes](ParsedStateUpdate).
12//!
13//! The flow from "raw" catalog changes to [CatalogImplication] works like this:
14//!
15//! StateUpdateKind -> ParsedStateUpdate -> CatalogImplication
16//!
17//! [ParsedStateUpdate] adds context to a "raw" catalog change
18//! ([StateUpdateKind](mz_catalog::memory::objects::StateUpdateKind)). It
19//! includes an in-memory representation of the updated object, which can in
20//! theory be derived from the raw change but only when we have access to all
21//! the other raw changes or to an in-memory Catalog, which represents a
22//! "rollup" of all the raw changes.
23//!
24//! [CatalogImplication] is both the state machine that we use for absorbing
25//! multiple state updates for the same object and the final command that has to
26//! be applied to in-memory state and or the controller(s) after absorbing all
27//! the state updates in a given batch of updates.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37    CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38    MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
55use mz_storage_types::sinks::StorageSinkConnection;
56use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig};
57use tracing::{Instrument, info_span, warn};
58
59use crate::active_compute_sink::ActiveComputeSinkRetireReason;
60use crate::coord::Coordinator;
61use crate::coord::catalog_implications::parsed_state_updates::{
62    ParsedStateUpdate, ParsedStateUpdateKind,
63};
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71    /// Applies implications from the given bucket of [ParsedStateUpdate] to our
72    /// in-memory state and our controllers. This also applies transitive
73    /// implications, for example, peeks and subscribes will be cancelled when
74    /// referenced objects are dropped.
75    ///
76    /// This _requires_ that the given updates are consolidated. There must be
77    /// at most one addition and/or one retraction for a given item, as
78    /// identified by that items ID type.
79    #[instrument(level = "debug")]
80    pub async fn apply_catalog_implications(
81        &mut self,
82        ctx: Option<&mut ExecuteContext>,
83        catalog_updates: Vec<ParsedStateUpdate>,
84    ) -> Result<(), AdapterError> {
85        let start = Instant::now();
86
87        let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88        let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89        let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90            BTreeMap::new();
91
92        for update in catalog_updates {
93            tracing::trace!(?update, "got parsed state update");
94            match &update.kind {
95                ParsedStateUpdateKind::Item {
96                    durable_item,
97                    parsed_item: _,
98                    connection: _,
99                    parsed_full_name: _,
100                } => {
101                    let entry = catalog_implications
102                        .entry(durable_item.id.clone())
103                        .or_insert_with(|| CatalogImplication::None);
104                    entry.absorb(update);
105                }
106                ParsedStateUpdateKind::TemporaryItem {
107                    durable_item,
108                    parsed_item: _,
109                    connection: _,
110                    parsed_full_name: _,
111                } => {
112                    let entry = catalog_implications
113                        .entry(durable_item.id.clone())
114                        .or_insert_with(|| CatalogImplication::None);
115                    entry.absorb(update);
116                }
117                ParsedStateUpdateKind::Cluster {
118                    durable_cluster,
119                    parsed_cluster: _,
120                } => {
121                    let entry = cluster_commands
122                        .entry(durable_cluster.id)
123                        .or_insert_with(|| CatalogImplication::None);
124                    entry.absorb(update.clone());
125                }
126                ParsedStateUpdateKind::ClusterReplica {
127                    durable_cluster_replica,
128                    parsed_cluster_replica: _,
129                } => {
130                    let entry = cluster_replica_commands
131                        .entry((
132                            durable_cluster_replica.cluster_id,
133                            durable_cluster_replica.replica_id,
134                        ))
135                        .or_insert_with(|| CatalogImplication::None);
136                    entry.absorb(update.clone());
137                }
138            }
139        }
140
141        self.apply_catalog_implications_inner(
142            ctx,
143            catalog_implications.into_iter().collect_vec(),
144            cluster_commands.into_iter().collect_vec(),
145            cluster_replica_commands.into_iter().collect_vec(),
146        )
147        .await?;
148
149        self.metrics
150            .apply_catalog_implications_seconds
151            .observe(start.elapsed().as_secs_f64());
152
153        Ok(())
154    }
155
156    #[instrument(level = "debug")]
157    async fn apply_catalog_implications_inner(
158        &mut self,
159        ctx: Option<&mut ExecuteContext>,
160        implications: Vec<(CatalogItemId, CatalogImplication)>,
161        cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162        cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163    ) -> Result<(), AdapterError> {
164        let mut tables_to_drop = BTreeSet::new();
165        let mut sources_to_drop = vec![];
166        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167        let mut storage_sink_gids_to_drop = vec![];
168        let mut compute_gids_to_drop = vec![];
169        let mut view_gids_to_drop = vec![];
170        let mut secrets_to_drop = vec![];
171        let mut vpc_endpoints_to_drop = vec![];
172        let mut clusters_to_drop = vec![];
173        let mut cluster_replicas_to_drop = vec![];
174        let mut compute_sinks_to_drop = BTreeMap::new();
175        let mut peeks_to_drop = vec![];
176        let mut copies_to_drop = vec![];
177
178        // Maps for storing names of dropped objects for error messages.
179        let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
180        let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
181
182        // Separate collections for tables (which need write timestamps) and
183        // sources (which don't).
184        let mut table_collections_to_create = BTreeMap::new();
185        let mut source_collections_to_create = BTreeMap::new();
186        let mut storage_policies_to_initialize = BTreeMap::new();
187        let mut execution_timestamps_to_set = BTreeSet::new();
188        let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
189
190        // Replacing a materialized view causes the replacement's catalog entry
191        // to be dropped. Its compute and storage collections are transferred to
192        // the target MV though, so we must make sure not to drop those.
193        let mut replacement_gids = vec![];
194
195        // Collections for batching connection-related alterations.
196        let mut source_connections_to_alter: BTreeMap<
197            GlobalId,
198            GenericSourceConnection<InlinedConnection>,
199        > = BTreeMap::new();
200        let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
201            BTreeMap::new();
202        let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
203            BTreeMap::new();
204
205        // We're incrementally migrating the code that manipulates the
206        // controller from closures in the sequencer. For some types of catalog
207        // changes we haven't done this migration yet, so there you will see
208        // just a log message. Over the next couple of PRs all of these will go
209        // away.
210
211        for (catalog_id, implication) in implications {
212            tracing::trace!(?implication, "have to apply catalog implication");
213
214            match implication {
215                CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
216                    self.handle_create_table(
217                        &ctx,
218                        &mut table_collections_to_create,
219                        &mut storage_policies_to_initialize,
220                        &mut execution_timestamps_to_set,
221                        catalog_id,
222                        table.clone(),
223                    )
224                    .await?
225                }
226                CatalogImplication::Table(CatalogImplicationKind::Altered {
227                    prev: prev_table,
228                    new: new_table,
229                }) => self.handle_alter_table(prev_table, new_table).await?,
230
231                CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
232                    let global_ids = table.global_ids();
233                    for global_id in global_ids {
234                        tables_to_drop.insert((catalog_id, global_id));
235                        dropped_item_names.insert(global_id, full_name.clone());
236                    }
237                }
238                CatalogImplication::Source(CatalogImplicationKind::Added((
239                    source,
240                    _connection,
241                ))) => {
242                    // Get the compaction windows for all sources with this
243                    // catalog_id This replicates the logic from
244                    // sequence_create_source where it collects all item_ids and
245                    // gets their compaction windows
246                    let compaction_windows = self
247                        .catalog()
248                        .state()
249                        .source_compaction_windows(vec![catalog_id]);
250
251                    self.handle_create_source(
252                        &mut source_collections_to_create,
253                        &mut storage_policies_to_initialize,
254                        catalog_id,
255                        source,
256                        compaction_windows,
257                    )
258                    .await?
259                }
260                CatalogImplication::Source(CatalogImplicationKind::Altered {
261                    prev: (prev_source, _prev_connection),
262                    new: (new_source, _new_connection),
263                }) => {
264                    tracing::debug!(
265                        ?prev_source,
266                        ?new_source,
267                        "not handling AlterSource in here yet"
268                    );
269                }
270                CatalogImplication::Source(CatalogImplicationKind::Dropped(
271                    (source, connection),
272                    full_name,
273                )) => {
274                    let global_id = source.global_id();
275                    sources_to_drop.push((catalog_id, global_id));
276                    dropped_item_names.insert(global_id, full_name);
277
278                    if let DataSourceDesc::Ingestion { desc, .. }
279                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
280                    {
281                        match &desc.connection {
282                            GenericSourceConnection::Postgres(_referenced_conn) => {
283                                let inline_conn = connection.expect("missing inlined connection");
284
285                                let pg_conn = match inline_conn {
286                                    GenericSourceConnection::Postgres(pg_conn) => pg_conn,
287                                    other => {
288                                        panic!("expected postgres connection, got: {:?}", other)
289                                    }
290                                };
291                                let pending_drop = (
292                                    pg_conn.connection.clone(),
293                                    pg_conn.publication_details.slot.clone(),
294                                );
295                                replication_slots_to_drop.push(pending_drop);
296                            }
297                            _ => {}
298                        }
299                    }
300                }
301                CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
302                    tracing::debug!(?sink, "not handling AddSink in here yet");
303                }
304                CatalogImplication::Sink(CatalogImplicationKind::Altered {
305                    prev: prev_sink,
306                    new: new_sink,
307                }) => {
308                    tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
309                }
310                CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
311                    storage_sink_gids_to_drop.push(sink.global_id());
312                    dropped_item_names.insert(sink.global_id(), full_name);
313                }
314                CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
315                    tracing::debug!(?index, "not handling AddIndex in here yet");
316                }
317                CatalogImplication::Index(CatalogImplicationKind::Altered {
318                    prev: prev_index,
319                    new: new_index,
320                }) => {
321                    tracing::debug!(
322                        ?prev_index,
323                        ?new_index,
324                        "not handling AlterIndex in here yet"
325                    );
326                }
327                CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
328                    compute_gids_to_drop.push((index.cluster_id, index.global_id()));
329                    dropped_item_names.insert(index.global_id(), full_name);
330                }
331                CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
332                    tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
333                }
334                CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
335                    prev: prev_mv,
336                    new: new_mv,
337                }) => {
338                    let old_gid = prev_mv.global_id_writes();
339                    let new_gid = new_mv.global_id_writes();
340                    if new_gid != old_gid {
341                        replacement_gids.push((new_mv.cluster_id, new_gid));
342                    }
343
344                    self.handle_alter_materialized_view(prev_mv, new_mv)?;
345                }
346                CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
347                    mv,
348                    full_name,
349                )) => {
350                    compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
351                    sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
352                    dropped_item_names.insert(mv.global_id_writes(), full_name);
353                }
354                CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
355                    tracing::debug!(?view, "not handling AddView in here yet");
356                }
357                CatalogImplication::View(CatalogImplicationKind::Altered {
358                    prev: prev_view,
359                    new: new_view,
360                }) => {
361                    tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
362                }
363                CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
364                    view_gids_to_drop.push(view.global_id());
365                    dropped_item_names.insert(view.global_id(), full_name);
366                }
367                CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
368                    tracing::debug!(?ct, "not handling AddContinualTask in here yet");
369                }
370                CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
371                    prev: prev_ct,
372                    new: new_ct,
373                }) => {
374                    tracing::debug!(
375                        ?prev_ct,
376                        ?new_ct,
377                        "not handling AlterContinualTask in here yet"
378                    );
379                }
380                CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
381                    ct,
382                    _full_name,
383                )) => {
384                    compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
385                    sources_to_drop.push((catalog_id, ct.global_id()));
386                }
387                CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
388                    // No action needed: the secret payload is stored in
389                    // secrets_controller.ensure() BEFORE the catalog transaction.
390                    // By the time we see this update, the secret is already stored.
391                }
392                CatalogImplication::Secret(CatalogImplicationKind::Altered {
393                    prev: _prev_secret,
394                    new: _new_secret,
395                }) => {
396                    // No action needed: altering a secret updates the payload via
397                    // secrets_controller.ensure() without a catalog transaction.
398                }
399                CatalogImplication::Secret(CatalogImplicationKind::Dropped(
400                    _secret,
401                    _full_name,
402                )) => {
403                    secrets_to_drop.push(catalog_id);
404                }
405                CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
406                    match &connection.details {
407                        // SSH connections: key pair is stored in secrets_controller
408                        // BEFORE the catalog transaction, so no action needed here.
409                        ConnectionDetails::Ssh { .. } => {}
410                        // AWS PrivateLink connections: create the VPC endpoint
411                        ConnectionDetails::AwsPrivatelink(privatelink) => {
412                            let spec = VpcEndpointConfig {
413                                aws_service_name: privatelink.service_name.to_owned(),
414                                availability_zone_ids: privatelink.availability_zones.to_owned(),
415                            };
416                            vpc_endpoints_to_create.push((catalog_id, spec));
417                        }
418                        // Other connection types don't require post-transaction actions
419                        _ => {}
420                    }
421                }
422                CatalogImplication::Connection(CatalogImplicationKind::Altered {
423                    prev: _prev_connection,
424                    new: new_connection,
425                }) => {
426                    self.handle_alter_connection(
427                        catalog_id,
428                        new_connection,
429                        &mut vpc_endpoints_to_create,
430                        &mut source_connections_to_alter,
431                        &mut sink_connections_to_alter,
432                        &mut source_export_data_configs_to_alter,
433                    );
434                }
435                CatalogImplication::Connection(CatalogImplicationKind::Dropped(
436                    connection,
437                    _full_name,
438                )) => {
439                    match &connection.details {
440                        // SSH connections have an associated secret that should be dropped
441                        ConnectionDetails::Ssh { .. } => {
442                            secrets_to_drop.push(catalog_id);
443                        }
444                        // AWS PrivateLink connections have an associated
445                        // VpcEndpoint K8S resource that should be dropped
446                        ConnectionDetails::AwsPrivatelink(_) => {
447                            vpc_endpoints_to_drop.push(catalog_id);
448                        }
449                        _ => (),
450                    }
451                }
452                CatalogImplication::None => {
453                    // Nothing to do for None commands
454                }
455                CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
456                    unreachable!("clusters and cluster replicas are handled below")
457                }
458                CatalogImplication::Table(CatalogImplicationKind::None)
459                | CatalogImplication::Source(CatalogImplicationKind::None)
460                | CatalogImplication::Sink(CatalogImplicationKind::None)
461                | CatalogImplication::Index(CatalogImplicationKind::None)
462                | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
463                | CatalogImplication::View(CatalogImplicationKind::None)
464                | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
465                | CatalogImplication::Secret(CatalogImplicationKind::None)
466                | CatalogImplication::Connection(CatalogImplicationKind::None) => {
467                    unreachable!("will never leave None in place");
468                }
469            }
470        }
471
472        for (cluster_id, command) in cluster_commands {
473            tracing::trace!(?command, "have cluster command to apply!");
474
475            match command {
476                CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
477                    tracing::debug!(?cluster, "not handling AddCluster in here yet");
478                }
479                CatalogImplication::Cluster(CatalogImplicationKind::Altered {
480                    prev: prev_cluster,
481                    new: new_cluster,
482                }) => {
483                    tracing::debug!(
484                        ?prev_cluster,
485                        ?new_cluster,
486                        "not handling AlterCluster in here yet"
487                    );
488                }
489                CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
490                    cluster,
491                    _full_name,
492                )) => {
493                    clusters_to_drop.push(cluster_id);
494                    dropped_cluster_names.insert(cluster_id, cluster.name);
495                }
496                CatalogImplication::Cluster(CatalogImplicationKind::None) => {
497                    unreachable!("will never leave None in place");
498                }
499                command => {
500                    unreachable!(
501                        "we only handle cluster commands in this map, got: {:?}",
502                        command
503                    );
504                }
505            }
506        }
507
508        for ((cluster_id, replica_id), command) in cluster_replica_commands {
509            tracing::trace!(?command, "have cluster replica command to apply!");
510
511            match command {
512                CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
513                    tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
514                }
515                CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
516                    prev: prev_replica,
517                    new: new_replica,
518                }) => {
519                    tracing::debug!(
520                        ?prev_replica,
521                        ?new_replica,
522                        "not handling AlterClusterReplica in here yet"
523                    );
524                }
525                CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
526                    _replica,
527                    _full_name,
528                )) => {
529                    cluster_replicas_to_drop.push((cluster_id, replica_id));
530                }
531                CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
532                    unreachable!("will never leave None in place");
533                }
534                command => {
535                    unreachable!(
536                        "we only handle cluster replica commands in this map, got: {:?}",
537                        command
538                    );
539                }
540            }
541        }
542
543        // Cancel out drops against replacements.
544        for (cluster_id, gid) in replacement_gids {
545            sources_to_drop.retain(|(_, id)| *id != gid);
546            compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
547        }
548
549        if !source_collections_to_create.is_empty() {
550            self.create_source_collections(source_collections_to_create)
551                .await?;
552        }
553
554        // Have to create sources first and then tables, because tables within
555        // one transaction can depend on sources.
556        if !table_collections_to_create.is_empty() {
557            self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
558                .await?;
559        }
560        // It is _very_ important that we only initialize read policies after we
561        // have created all the sources/collections. Some of the sources created
562        // in this collection might have dependencies on other sources, so the
563        // controller must get a chance to install read holds before we set a
564        // policy that might make the since advance.
565        self.initialize_storage_collections(storage_policies_to_initialize)
566            .await?;
567
568        // Create VPC endpoints for AWS PrivateLink connections
569        if !vpc_endpoints_to_create.is_empty() {
570            if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
571                for (connection_id, spec) in vpc_endpoints_to_create {
572                    if let Err(err) = cloud_resource_controller
573                        .ensure_vpc_endpoint(connection_id, spec)
574                        .await
575                    {
576                        tracing::error!(?err, "failed to ensure vpc endpoint!");
577                    }
578                }
579            } else {
580                tracing::error!(
581                    "AWS PrivateLink connections unsupported without cloud_resource_controller"
582                );
583            }
584        }
585
586        // Apply batched connection alterations to dependent sources/sinks/tables.
587        if !source_connections_to_alter.is_empty() {
588            self.controller
589                .storage
590                .alter_ingestion_connections(source_connections_to_alter)
591                .await
592                .unwrap_or_terminate("cannot fail to alter ingestion connections");
593        }
594
595        if !sink_connections_to_alter.is_empty() {
596            self.controller
597                .storage
598                .alter_export_connections(sink_connections_to_alter)
599                .await
600                .unwrap_or_terminate("altering export connections after txn must succeed");
601        }
602
603        if !source_export_data_configs_to_alter.is_empty() {
604            self.controller
605                .storage
606                .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
607                .await
608                .unwrap_or_terminate("altering source export data configs after txn must succeed");
609        }
610
611        let collections_to_drop: BTreeSet<_> = sources_to_drop
612            .iter()
613            .map(|(_, gid)| *gid)
614            .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
615            .chain(storage_sink_gids_to_drop.iter().copied())
616            .chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
617            .chain(view_gids_to_drop.iter().copied())
618            .collect();
619
620        // Clean up any active compute sinks like subscribes or copy to-s that
621        // rely on dropped relations or clusters.
622        for (sink_id, sink) in &self.active_compute_sinks {
623            let cluster_id = sink.cluster_id();
624            if let Some(id) = sink
625                .depends_on()
626                .iter()
627                .find(|id| collections_to_drop.contains(id))
628            {
629                let name = dropped_item_names
630                    .get(id)
631                    .map(|n| format!("relation {}", n.quoted()))
632                    .expect("missing relation name");
633                compute_sinks_to_drop.insert(
634                    *sink_id,
635                    ActiveComputeSinkRetireReason::DependencyDropped(name),
636                );
637            } else if clusters_to_drop.contains(&cluster_id) {
638                let name = dropped_cluster_names
639                    .get(&cluster_id)
640                    .map(|n| format!("cluster {}", n.quoted()))
641                    .expect("missing cluster name");
642                compute_sinks_to_drop.insert(
643                    *sink_id,
644                    ActiveComputeSinkRetireReason::DependencyDropped(name),
645                );
646            }
647        }
648
649        // Clean up any pending peeks that rely on dropped relations or clusters.
650        for (uuid, pending_peek) in &self.pending_peeks {
651            if let Some(id) = pending_peek
652                .depends_on
653                .iter()
654                .find(|id| collections_to_drop.contains(id))
655            {
656                let name = dropped_item_names
657                    .get(id)
658                    .map(|n| format!("relation {}", n.quoted()))
659                    .expect("missing relation name");
660                peeks_to_drop.push((name, uuid.clone()));
661            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
662                let name = dropped_cluster_names
663                    .get(&pending_peek.cluster_id)
664                    .map(|n| format!("cluster {}", n.quoted()))
665                    .expect("missing cluster name");
666                peeks_to_drop.push((name, uuid.clone()));
667            }
668        }
669
670        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
671        for (conn_id, pending_copy) in &self.active_copies {
672            let dropping_table = tables_to_drop
673                .iter()
674                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
675            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
676
677            if dropping_table || dropping_cluster {
678                copies_to_drop.push(conn_id.clone());
679            }
680        }
681
682        let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
683            .iter()
684            .map(|(_id, gid)| gid)
685            .chain(storage_sink_gids_to_drop.iter())
686            .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
687            .copied()
688            .collect();
689
690        // Gather resources that we have to remove from timeline state and
691        // pre-check if any Timelines become empty, when we drop the specified
692        // storage and compute resources.
693        //
694        // Note: We only apply these changes below.
695        let mut timeline_id_bundles = BTreeMap::new();
696
697        for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
698            let mut id_bundle = CollectionIdBundle::default();
699
700            for storage_id in read_holds.storage_ids() {
701                if storage_gids_to_drop.contains(&storage_id) {
702                    id_bundle.storage_ids.insert(storage_id);
703                }
704            }
705
706            for (instance_id, id) in read_holds.compute_ids() {
707                if compute_gids_to_drop.contains(&(instance_id, id))
708                    || clusters_to_drop.contains(&instance_id)
709                {
710                    id_bundle
711                        .compute_ids
712                        .entry(instance_id)
713                        .or_default()
714                        .insert(id);
715                }
716            }
717
718            timeline_id_bundles.insert(timeline.clone(), id_bundle);
719        }
720
721        let mut timeline_associations = BTreeMap::new();
722        for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
723            let TimelineState { read_holds, .. } = self
724                .global_timelines
725                .get(&timeline)
726                .expect("all timelines have a timestamp oracle");
727
728            let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
729            timeline_associations.insert(timeline, (empty, id_bundle));
730        }
731
732        // No error returns are allowed after this point. Enforce this at compile time
733        // by using this odd structure so we don't accidentally add a stray `?`.
734        let _: () = async {
735            if !timeline_associations.is_empty() {
736                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
737                    let became_empty =
738                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
739                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
740                }
741            }
742
743            // Note that we drop tables before sources since there can be a weak
744            // dependency on sources from tables in the storage controller that
745            // will result in error logging that we'd prefer to avoid. This
746            // isn't an actual dependency issue but we'd like to keep that error
747            // logging around to indicate when an actual dependency error might
748            // occur.
749            if !tables_to_drop.is_empty() {
750                let ts = self.get_local_write_ts().await;
751                self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
752            }
753
754            if !sources_to_drop.is_empty() {
755                self.drop_sources(sources_to_drop);
756            }
757
758            if !storage_sink_gids_to_drop.is_empty() {
759                self.drop_storage_sinks(storage_sink_gids_to_drop);
760            }
761
762            if !compute_sinks_to_drop.is_empty() {
763                self.retire_compute_sinks(compute_sinks_to_drop).await;
764            }
765
766            if !peeks_to_drop.is_empty() {
767                for (dropped_name, uuid) in peeks_to_drop {
768                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
769                        let cancel_reason = PeekResponse::Error(format!(
770                            "query could not complete because {dropped_name} was dropped"
771                        ));
772                        self.controller
773                            .compute
774                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
775                            .unwrap_or_terminate("unable to cancel peek");
776                        self.retire_execution(
777                            StatementEndedExecutionReason::Canceled,
778                            pending_peek.ctx_extra.defuse(),
779                        );
780                    }
781                }
782            }
783
784            if !copies_to_drop.is_empty() {
785                for conn_id in copies_to_drop {
786                    self.cancel_pending_copy(&conn_id);
787                }
788            }
789
790            if !compute_gids_to_drop.is_empty() {
791                self.drop_compute_collections(compute_gids_to_drop);
792            }
793
794            if !vpc_endpoints_to_drop.is_empty() {
795                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
796            }
797
798            if !cluster_replicas_to_drop.is_empty() {
799                fail::fail_point!("after_catalog_drop_replica");
800
801                for (cluster_id, replica_id) in cluster_replicas_to_drop {
802                    self.drop_replica(cluster_id, replica_id);
803                }
804            }
805            if !clusters_to_drop.is_empty() {
806                for cluster_id in clusters_to_drop {
807                    self.controller.drop_cluster(cluster_id);
808                }
809            }
810
811            // We don't want to block the main coordinator thread on cleaning
812            // up external resources (PostgreSQL replication slots and secrets),
813            // so we perform that cleanup in a background task.
814            //
815            // TODO(14551): This is inherently best effort. An ill-timed crash
816            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
817            // See <https://github.com/MaterializeInc/materialize/issues/14551>
818            task::spawn(|| "drop_replication_slots_and_secrets", {
819                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
820                let secrets_controller = Arc::clone(&self.secrets_controller);
821                let secrets_reader = Arc::clone(self.secrets_reader());
822                let storage_config = self.controller.storage.config().clone();
823
824                async move {
825                    for (connection, replication_slot_name) in replication_slots_to_drop {
826                        tracing::info!(?replication_slot_name, "dropping replication slot");
827
828                        // Try to drop the replication slots, but give up after
829                        // a while. The PostgreSQL server may no longer be
830                        // healthy. Users often drop PostgreSQL sources
831                        // *because* the PostgreSQL server has been
832                        // decomissioned.
833                        let result: Result<(), anyhow::Error> = Retry::default()
834                            .max_duration(Duration::from_secs(60))
835                            .retry_async(|_state| async {
836                                let config = connection
837                                    .config(&secrets_reader, &storage_config, InTask::No)
838                                    .await
839                                    .map_err(|e| {
840                                        anyhow::anyhow!(
841                                            "error creating Postgres client for \
842                                            dropping acquired slots: {}",
843                                            e.display_with_causes()
844                                        )
845                                    })?;
846
847                                mz_postgres_util::drop_replication_slots(
848                                    &ssh_tunnel_manager,
849                                    config.clone(),
850                                    &[(&replication_slot_name, true)],
851                                )
852                                .await?;
853
854                                Ok(())
855                            })
856                            .await;
857
858                        if let Err(err) = result {
859                            tracing::warn!(
860                                ?replication_slot_name,
861                                ?err,
862                                "failed to drop replication slot"
863                            );
864                        }
865                    }
866
867                    // Drop secrets *after* dropping the replication slots,
868                    // because dropping replication slots may rely on those
869                    // secrets still being present.
870                    //
871                    // It's okay if we crash before processing the secret drops,
872                    // as we look for and remove any orphaned secrets during
873                    // startup.
874                    fail_point!("drop_secrets");
875                    for secret in secrets_to_drop {
876                        if let Err(e) = secrets_controller.delete(secret).await {
877                            warn!("Dropping secrets has encountered an error: {}", e);
878                        }
879                    }
880                }
881            });
882        }
883        .instrument(info_span!(
884            "coord::apply_catalog_implications_inner::finalize"
885        ))
886        .await;
887
888        Ok(())
889    }
890
891    #[instrument(level = "debug")]
892    async fn create_table_collections(
893        &mut self,
894        table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
895        execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
896    ) -> Result<(), AdapterError> {
897        // If we have tables, determine the initial validity for the table.
898        let register_ts = self.get_local_write_ts().await.timestamp;
899
900        // After acquiring `register_ts` but before using it, we need to
901        // be sure we're still the leader. Otherwise a new generation
902        // may also be trying to use `register_ts` for a different
903        // purpose.
904        //
905        // See #28216.
906        self.catalog
907            .confirm_leadership()
908            .await
909            .unwrap_or_terminate("unable to confirm leadership");
910
911        for id in execution_timestamps_to_set {
912            self.set_statement_execution_timestamp(id, register_ts);
913        }
914
915        let storage_metadata = self.catalog.state().storage_metadata();
916
917        self.controller
918            .storage
919            .create_collections(
920                storage_metadata,
921                Some(register_ts),
922                table_collections_to_create.into_iter().collect_vec(),
923            )
924            .await
925            .unwrap_or_terminate("cannot fail to create collections");
926
927        self.apply_local_write(register_ts).await;
928
929        Ok(())
930    }
931
932    #[instrument(level = "debug")]
933    async fn create_source_collections(
934        &mut self,
935        source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
936    ) -> Result<(), AdapterError> {
937        let storage_metadata = self.catalog.state().storage_metadata();
938
939        self.controller
940            .storage
941            .create_collections(
942                storage_metadata,
943                None, // Sources don't need a write timestamp
944                source_collections_to_create.into_iter().collect_vec(),
945            )
946            .await
947            .unwrap_or_terminate("cannot fail to create collections");
948
949        Ok(())
950    }
951
952    #[instrument(level = "debug")]
953    async fn initialize_storage_collections(
954        &mut self,
955        storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
956    ) -> Result<(), AdapterError> {
957        for (compaction_window, global_ids) in storage_policies_to_initialize {
958            self.initialize_read_policies(
959                &CollectionIdBundle {
960                    storage_ids: global_ids,
961                    compute_ids: BTreeMap::new(),
962                },
963                compaction_window,
964            )
965            .await;
966        }
967
968        Ok(())
969    }
970
971    #[instrument(level = "debug")]
972    async fn handle_create_table(
973        &self,
974        ctx: &Option<&mut ExecuteContext>,
975        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
976        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
977        execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
978        table_id: CatalogItemId,
979        table: Table,
980    ) -> Result<(), AdapterError> {
981        // The table data_source determines whether this table will be written to
982        // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
983        // (e.g. a source-fed table).
984        match &table.data_source {
985            TableDataSource::TableWrites { defaults: _ } => {
986                let versions: BTreeMap<_, _> = table
987                    .collection_descs()
988                    .map(|(gid, version, desc)| (version, (gid, desc)))
989                    .collect();
990                let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
991                    let collection_desc = CollectionDescription::for_table(desc.clone());
992
993                    (*gid, collection_desc)
994                });
995
996                let compaction_window = table
997                    .custom_logical_compaction_window
998                    .unwrap_or(CompactionWindow::Default);
999                let ids_to_initialize = storage_policies_to_initialize
1000                    .entry(compaction_window)
1001                    .or_default();
1002
1003                for (gid, collection_desc) in collection_descs {
1004                    storage_collections_to_create.insert(gid, collection_desc);
1005                    ids_to_initialize.insert(gid);
1006                }
1007
1008                if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1009                    execution_timestamps_to_set.insert(id);
1010                }
1011            }
1012            TableDataSource::DataSource {
1013                desc: data_source_desc,
1014                timeline,
1015            } => {
1016                match data_source_desc {
1017                    DataSourceDesc::IngestionExport {
1018                        ingestion_id,
1019                        external_reference: _,
1020                        details,
1021                        data_config,
1022                    } => {
1023                        let global_ingestion_id =
1024                            self.catalog().get_entry(ingestion_id).latest_global_id();
1025
1026                        let collection_desc = CollectionDescription::<Timestamp> {
1027                            desc: table.desc.latest(),
1028                            data_source: DataSource::IngestionExport {
1029                                ingestion_id: global_ingestion_id,
1030                                details: details.clone(),
1031                                data_config: data_config
1032                                    .clone()
1033                                    .into_inline_connection(self.catalog.state()),
1034                            },
1035                            since: None,
1036                            timeline: Some(timeline.clone()),
1037                            primary: None,
1038                        };
1039
1040                        let global_id = table
1041                            .global_ids()
1042                            .expect_element(|| "subsources cannot have multiple versions");
1043
1044                        storage_collections_to_create.insert(global_id, collection_desc);
1045
1046                        let read_policies = self
1047                            .catalog()
1048                            .state()
1049                            .source_compaction_windows(vec![table_id]);
1050                        for (compaction_window, catalog_ids) in read_policies {
1051                            let compaction_ids = storage_policies_to_initialize
1052                                .entry(compaction_window)
1053                                .or_default();
1054
1055                            let gids = catalog_ids
1056                                .into_iter()
1057                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1058                                .flatten();
1059                            compaction_ids.extend(gids);
1060                        }
1061                    }
1062                    DataSourceDesc::Webhook {
1063                        validate_using: _,
1064                        body_format: _,
1065                        headers: _,
1066                        cluster_id: _,
1067                    } => {
1068                        // Create the underlying collection with the latest schema from the Table.
1069                        assert_eq!(
1070                            table.desc.latest_version(),
1071                            RelationVersion::root(),
1072                            "found webhook with more than 1 relation version, {:?}",
1073                            table.desc
1074                        );
1075                        let desc = table.desc.latest();
1076
1077                        let collection_desc = CollectionDescription::<Timestamp> {
1078                            desc,
1079                            data_source: DataSource::Webhook,
1080                            since: None,
1081                            timeline: Some(timeline.clone()),
1082                            primary: None,
1083                        };
1084
1085                        let global_id = table
1086                            .global_ids()
1087                            .expect_element(|| "webhooks cannot have multiple versions");
1088
1089                        storage_collections_to_create.insert(global_id, collection_desc);
1090
1091                        let read_policies = self
1092                            .catalog()
1093                            .state()
1094                            .source_compaction_windows(vec![table_id]);
1095
1096                        for (compaction_window, catalog_ids) in read_policies {
1097                            let compaction_ids = storage_policies_to_initialize
1098                                .entry(compaction_window)
1099                                .or_default();
1100
1101                            let gids = catalog_ids
1102                                .into_iter()
1103                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1104                                .flatten();
1105                            compaction_ids.extend(gids);
1106                        }
1107                    }
1108                    _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1109                }
1110            }
1111        }
1112
1113        Ok(())
1114    }
1115
1116    #[instrument(level = "debug")]
1117    async fn handle_alter_table(
1118        &mut self,
1119        prev_table: Table,
1120        new_table: Table,
1121    ) -> Result<(), AdapterError> {
1122        let existing_gid = prev_table.global_id_writes();
1123        let new_gid = new_table.global_id_writes();
1124
1125        if existing_gid == new_gid {
1126            // It's not an ALTER TABLE as far as the controller is concerned,
1127            // because we still have the same GlobalId. This is likely a change
1128            // from an ALTER SWAP.
1129            return Ok(());
1130        }
1131
1132        // Acquire a read hold on the original table for the duration of
1133        // the alter to prevent the since of the original table from
1134        // getting advanced, while the ALTER is running.
1135        let existing_table = crate::CollectionIdBundle {
1136            storage_ids: BTreeSet::from([existing_gid]),
1137            compute_ids: BTreeMap::new(),
1138        };
1139        let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1140
1141        let expected_version = prev_table.desc.latest_version();
1142        let new_version = new_table.desc.latest_version();
1143        let new_desc = new_table
1144            .desc
1145            .at_version(RelationVersionSelector::Specific(new_version));
1146
1147        let register_ts = self.get_local_write_ts().await.timestamp;
1148
1149        // Alter the table description, creating a "new" collection.
1150        self.controller
1151            .storage
1152            .alter_table_desc(
1153                existing_gid,
1154                new_gid,
1155                new_desc,
1156                expected_version,
1157                register_ts,
1158            )
1159            .await
1160            .expect("failed to alter desc of table");
1161
1162        // Initialize the ReadPolicy which ensures we have the correct read holds.
1163        let compaction_window = new_table
1164            .custom_logical_compaction_window
1165            .unwrap_or(CompactionWindow::Default);
1166        self.initialize_read_policies(
1167            &crate::CollectionIdBundle {
1168                storage_ids: BTreeSet::from([new_gid]),
1169                compute_ids: BTreeMap::new(),
1170            },
1171            compaction_window,
1172        )
1173        .await;
1174
1175        self.apply_local_write(register_ts).await;
1176
1177        // Alter is complete! We can drop our read hold.
1178        drop(existing_table_read_hold);
1179
1180        Ok(())
1181    }
1182
1183    #[instrument(level = "debug")]
1184    fn handle_alter_materialized_view(
1185        &mut self,
1186        prev_mv: MaterializedView,
1187        new_mv: MaterializedView,
1188    ) -> Result<(), AdapterError> {
1189        let old_gid = prev_mv.global_id_writes();
1190        let new_gid = new_mv.global_id_writes();
1191
1192        if old_gid == new_gid {
1193            // It's not an ALTER MATERIALIZED VIEW as far as the controller is
1194            // concerned, because we still have the same GlobalId. This is
1195            // likely a change from an ALTER SWAP.
1196            return Ok(());
1197        }
1198
1199        // Cut over the MV computation, by shutting down the old dataflow and allowing the
1200        // new dataflow to start writing.
1201        //
1202        // Both commands are applied to their respective clusters asynchronously and they
1203        // race, so it is possible that we allow writes by the replacement before having
1204        // dropped the old dataflow. Thus there might be a period where the two dataflows
1205        // are competing in trying to commit conflicting outputs. Eventually the old
1206        // dataflow will be dropped and the replacement takes over.
1207        self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
1208        self.allow_writes(new_mv.cluster_id, new_gid);
1209
1210        Ok(())
1211    }
1212
1213    #[instrument(level = "debug")]
1214    async fn handle_create_source(
1215        &self,
1216        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1217        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1218        item_id: CatalogItemId,
1219        source: Source,
1220        compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1221    ) -> Result<(), AdapterError> {
1222        let data_source = match source.data_source {
1223            DataSourceDesc::Ingestion { desc, cluster_id } => {
1224                let desc = desc.into_inline_connection(self.catalog().state());
1225                let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1226
1227                let ingestion = mz_storage_types::sources::IngestionDescription::new(
1228                    desc,
1229                    cluster_id,
1230                    item_global_id,
1231                );
1232
1233                DataSource::Ingestion(ingestion)
1234            }
1235            DataSourceDesc::OldSyntaxIngestion {
1236                desc,
1237                progress_subsource,
1238                data_config,
1239                details,
1240                cluster_id,
1241            } => {
1242                let desc = desc.into_inline_connection(self.catalog().state());
1243                let data_config = data_config.into_inline_connection(self.catalog().state());
1244
1245                // TODO(parkmycar): We should probably check the type here, but I'm not
1246                // sure if this will always be a Source or a Table.
1247                let progress_subsource = self
1248                    .catalog()
1249                    .get_entry(&progress_subsource)
1250                    .latest_global_id();
1251
1252                let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1253                    desc,
1254                    cluster_id,
1255                    progress_subsource,
1256                );
1257
1258                let legacy_export = SourceExport {
1259                    storage_metadata: (),
1260                    data_config,
1261                    details,
1262                };
1263
1264                ingestion
1265                    .source_exports
1266                    .insert(source.global_id, legacy_export);
1267
1268                DataSource::Ingestion(ingestion)
1269            }
1270            DataSourceDesc::IngestionExport {
1271                ingestion_id,
1272                external_reference: _,
1273                details,
1274                data_config,
1275            } => {
1276                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1277                // this will always be a Source or a Table.
1278                let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1279
1280                DataSource::IngestionExport {
1281                    ingestion_id,
1282                    details,
1283                    data_config: data_config.into_inline_connection(self.catalog().state()),
1284                }
1285            }
1286            DataSourceDesc::Progress => DataSource::Progress,
1287            DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1288            DataSourceDesc::Introspection(_) => {
1289                unreachable!("cannot create sources with introspection data sources")
1290            }
1291        };
1292
1293        storage_collections_to_create.insert(
1294            source.global_id,
1295            CollectionDescription::<Timestamp> {
1296                desc: source.desc.clone(),
1297                data_source,
1298                timeline: Some(source.timeline),
1299                since: None,
1300                primary: None,
1301            },
1302        );
1303
1304        // Initialize read policies for the source
1305        for (compaction_window, catalog_ids) in compaction_windows {
1306            let compaction_ids = storage_policies_to_initialize
1307                .entry(compaction_window)
1308                .or_default();
1309
1310            let gids = catalog_ids
1311                .into_iter()
1312                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1313                .flatten();
1314            compaction_ids.extend(gids);
1315        }
1316
1317        Ok(())
1318    }
1319
1320    /// Handles altering a connection by collecting all the dependent sources,
1321    /// sinks, and tables that need their connection updated.
1322    ///
1323    /// This mirrors the logic from `sequence_alter_connection_stage_finish` but
1324    /// collects the changes into batched collections for application after all
1325    /// implications are processed.
1326    #[instrument(level = "debug")]
1327    fn handle_alter_connection(
1328        &self,
1329        connection_id: CatalogItemId,
1330        connection: Connection,
1331        vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1332        source_connections_to_alter: &mut BTreeMap<
1333            GlobalId,
1334            GenericSourceConnection<InlinedConnection>,
1335        >,
1336        sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1337        source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1338    ) {
1339        use std::collections::VecDeque;
1340
1341        // Handle AWS PrivateLink connections by queueing VPC endpoint creation.
1342        if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1343            let spec = VpcEndpointConfig {
1344                aws_service_name: privatelink.service_name.to_owned(),
1345                availability_zone_ids: privatelink.availability_zones.to_owned(),
1346            };
1347            vpc_endpoints_to_create.push((connection_id, spec));
1348        }
1349
1350        // Walk the dependency graph to find all sources, sinks, and tables
1351        // that depend on this connection (directly or transitively through
1352        // other connections).
1353        let mut connections_to_process = VecDeque::new();
1354        connections_to_process.push_front(connection_id.clone());
1355
1356        while let Some(id) = connections_to_process.pop_front() {
1357            for dependent_id in self.catalog().get_entry(&id).used_by() {
1358                let dependent_entry = self.catalog().get_entry(dependent_id);
1359                match dependent_entry.item() {
1360                    CatalogItem::Connection(_) => {
1361                        // Connections can depend on other connections (e.g., a
1362                        // Kafka connection using an SSH tunnel connection).
1363                        // Process these transitively.
1364                        connections_to_process.push_back(*dependent_id);
1365                    }
1366                    CatalogItem::Source(source) => {
1367                        let desc = match &dependent_entry
1368                            .source()
1369                            .expect("known to be source")
1370                            .data_source
1371                        {
1372                            DataSourceDesc::Ingestion { desc, .. }
1373                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1374                                desc.clone().into_inline_connection(self.catalog().state())
1375                            }
1376                            _ => {
1377                                // Only ingestions reference connections directly.
1378                                continue;
1379                            }
1380                        };
1381
1382                        source_connections_to_alter.insert(source.global_id, desc.connection);
1383                    }
1384                    CatalogItem::Sink(sink) => {
1385                        let export = dependent_entry.sink().expect("known to be sink");
1386                        sink_connections_to_alter.insert(
1387                            sink.global_id,
1388                            export
1389                                .connection
1390                                .clone()
1391                                .into_inline_connection(self.catalog().state()),
1392                        );
1393                    }
1394                    CatalogItem::Table(table) => {
1395                        // This is a source-fed table that references a schema
1396                        // registry connection as part of its encoding/data
1397                        // config.
1398                        if let Some((_, _, _, export_data_config)) =
1399                            dependent_entry.source_export_details()
1400                        {
1401                            let data_config = export_data_config.clone();
1402                            source_export_data_configs_to_alter.insert(
1403                                table.global_id_writes(),
1404                                data_config.into_inline_connection(self.catalog().state()),
1405                            );
1406                        }
1407                    }
1408                    _ => {
1409                        // Other item types don't have connection dependencies
1410                        // that need updating.
1411                    }
1412                }
1413            }
1414        }
1415    }
1416}
1417
1418/// A state machine for building catalog implications from catalog updates.
1419///
1420/// Once all [ParsedStateUpdate] of a timestamp are ingested this is a command
1421/// that has to potentially be applied to in-memory state and/or the
1422/// controller(s).
1423#[derive(Debug, Clone)]
1424enum CatalogImplication {
1425    None,
1426    Table(CatalogImplicationKind<Table>),
1427    Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1428    Sink(CatalogImplicationKind<Sink>),
1429    Index(CatalogImplicationKind<Index>),
1430    MaterializedView(CatalogImplicationKind<MaterializedView>),
1431    View(CatalogImplicationKind<View>),
1432    ContinualTask(CatalogImplicationKind<ContinualTask>),
1433    Secret(CatalogImplicationKind<Secret>),
1434    Connection(CatalogImplicationKind<Connection>),
1435    Cluster(CatalogImplicationKind<Cluster>),
1436    ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1437}
1438
1439#[derive(Debug, Clone)]
1440enum CatalogImplicationKind<T> {
1441    /// No operations seen yet.
1442    None,
1443    /// Item was added.
1444    Added(T),
1445    /// Item was dropped (with its name retained for error messages).
1446    Dropped(T, String),
1447    /// Item is being altered from one state to another.
1448    Altered { prev: T, new: T },
1449}
1450
1451impl<T: Clone> CatalogImplicationKind<T> {
1452    /// Apply a state transition based on a diff. Returns an error message if
1453    /// the transition is invalid.
1454    fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1455        use CatalogImplicationKind::*;
1456        use StateDiff::*;
1457
1458        let new_state = match (&*self, diff) {
1459            // Initial state transitions
1460            (None, Addition) => Added(item),
1461            (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1462
1463            // From Added state
1464            (Added(existing), Retraction) => {
1465                // Add -> Drop means the item is being altered
1466                Altered {
1467                    prev: item,
1468                    new: existing.clone(),
1469                }
1470            }
1471            (Added(_), Addition) => {
1472                return Err("Cannot add an already added object".to_string());
1473            }
1474
1475            // From Dropped state
1476            (Dropped(existing, _), Addition) => {
1477                // Drop -> Add means the item is being altered
1478                Altered {
1479                    prev: existing.clone(),
1480                    new: item,
1481                }
1482            }
1483            (Dropped(_, _), Retraction) => {
1484                return Err("Cannot drop an already dropped object".to_string());
1485            }
1486
1487            // From Altered state
1488            (Altered { .. }, _) => {
1489                return Err(format!(
1490                    "Cannot apply {:?} to an object in Altered state",
1491                    diff
1492                ));
1493            }
1494        };
1495
1496        *self = new_state;
1497        Ok(())
1498    }
1499}
1500
1501/// Macro to generate absorb methods for each item type.
1502macro_rules! impl_absorb_method {
1503    (
1504        $method_name:ident,
1505        $variant:ident,
1506        $item_type:ty
1507    ) => {
1508        fn $method_name(
1509            &mut self,
1510            item: $item_type,
1511            parsed_full_name: Option<String>,
1512            diff: StateDiff,
1513        ) {
1514            let state = match self {
1515                CatalogImplication::$variant(state) => state,
1516                CatalogImplication::None => {
1517                    *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1518                    match self {
1519                        CatalogImplication::$variant(state) => state,
1520                        _ => unreachable!(),
1521                    }
1522                }
1523                _ => {
1524                    panic!(
1525                        "Unexpected command type for {:?}: {} {:?}",
1526                        self,
1527                        stringify!($variant),
1528                        diff,
1529                    );
1530                }
1531            };
1532
1533            if let Err(e) = state.transition(item, parsed_full_name, diff) {
1534                panic!(
1535                    "Invalid state transition for {}: {}",
1536                    stringify!($variant),
1537                    e
1538                );
1539            }
1540        }
1541    };
1542}
1543
1544impl CatalogImplication {
1545    /// Absorbs the given catalog update into this [CatalogImplication], causing
1546    /// a state transition or error.
1547    fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1548        match catalog_update.kind {
1549            ParsedStateUpdateKind::Item {
1550                durable_item: _,
1551                parsed_item,
1552                connection,
1553                parsed_full_name,
1554            } => match parsed_item {
1555                CatalogItem::Table(table) => {
1556                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1557                }
1558                CatalogItem::Source(source) => {
1559                    self.absorb_source(
1560                        (source, connection),
1561                        Some(parsed_full_name),
1562                        catalog_update.diff,
1563                    );
1564                }
1565                CatalogItem::Sink(sink) => {
1566                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1567                }
1568                CatalogItem::Index(index) => {
1569                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1570                }
1571                CatalogItem::MaterializedView(mv) => {
1572                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1573                }
1574                CatalogItem::View(view) => {
1575                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1576                }
1577                CatalogItem::ContinualTask(ct) => {
1578                    self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1579                }
1580                CatalogItem::Secret(secret) => {
1581                    self.absorb_secret(secret, None, catalog_update.diff);
1582                }
1583                CatalogItem::Connection(connection) => {
1584                    self.absorb_connection(connection, None, catalog_update.diff);
1585                }
1586                CatalogItem::Log(_) => {}
1587                CatalogItem::Type(_) => {}
1588                CatalogItem::Func(_) => {}
1589            },
1590            ParsedStateUpdateKind::TemporaryItem {
1591                durable_item: _,
1592                parsed_item,
1593                connection,
1594                parsed_full_name,
1595            } => match parsed_item {
1596                CatalogItem::Table(table) => {
1597                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1598                }
1599                CatalogItem::Source(source) => {
1600                    self.absorb_source(
1601                        (source, connection),
1602                        Some(parsed_full_name),
1603                        catalog_update.diff,
1604                    );
1605                }
1606                CatalogItem::Sink(sink) => {
1607                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1608                }
1609                CatalogItem::Index(index) => {
1610                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1611                }
1612                CatalogItem::MaterializedView(mv) => {
1613                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1614                }
1615                CatalogItem::View(view) => {
1616                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1617                }
1618                CatalogItem::ContinualTask(ct) => {
1619                    self.absorb_continual_task(ct, None, catalog_update.diff);
1620                }
1621                CatalogItem::Secret(secret) => {
1622                    self.absorb_secret(secret, None, catalog_update.diff);
1623                }
1624                CatalogItem::Connection(connection) => {
1625                    self.absorb_connection(connection, None, catalog_update.diff);
1626                }
1627                CatalogItem::Log(_) => {}
1628                CatalogItem::Type(_) => {}
1629                CatalogItem::Func(_) => {}
1630            },
1631            ParsedStateUpdateKind::Cluster {
1632                durable_cluster: _,
1633                parsed_cluster,
1634            } => {
1635                self.absorb_cluster(parsed_cluster, catalog_update.diff);
1636            }
1637            ParsedStateUpdateKind::ClusterReplica {
1638                durable_cluster_replica: _,
1639                parsed_cluster_replica,
1640            } => {
1641                self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1642            }
1643        }
1644    }
1645
1646    impl_absorb_method!(absorb_table, Table, Table);
1647    impl_absorb_method!(
1648        absorb_source,
1649        Source,
1650        (Source, Option<GenericSourceConnection>)
1651    );
1652    impl_absorb_method!(absorb_sink, Sink, Sink);
1653    impl_absorb_method!(absorb_index, Index, Index);
1654    impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1655    impl_absorb_method!(absorb_view, View, View);
1656
1657    impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1658    impl_absorb_method!(absorb_secret, Secret, Secret);
1659    impl_absorb_method!(absorb_connection, Connection, Connection);
1660
1661    // Special case for cluster which uses the cluster's name field.
1662    fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1663        let state = match self {
1664            CatalogImplication::Cluster(state) => state,
1665            CatalogImplication::None => {
1666                *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1667                match self {
1668                    CatalogImplication::Cluster(state) => state,
1669                    _ => unreachable!(),
1670                }
1671            }
1672            _ => {
1673                panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1674            }
1675        };
1676
1677        if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1678            panic!("invalid state transition for cluster: {}", e);
1679        }
1680    }
1681
1682    // Special case for cluster replica which uses the cluster replica's name field.
1683    fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1684        let state = match self {
1685            CatalogImplication::ClusterReplica(state) => state,
1686            CatalogImplication::None => {
1687                *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1688                match self {
1689                    CatalogImplication::ClusterReplica(state) => state,
1690                    _ => unreachable!(),
1691                }
1692            }
1693            _ => {
1694                panic!(
1695                    "Unexpected command type for {:?}: ClusterReplica {:?}",
1696                    self, diff
1697                );
1698            }
1699        };
1700
1701        if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1702        {
1703            panic!("invalid state transition for cluster replica: {}", e);
1704        }
1705    }
1706}
1707
1708#[cfg(test)]
1709mod tests {
1710    use super::*;
1711    use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1712    use mz_sql::names::ResolvedIds;
1713    use std::collections::BTreeMap;
1714
1715    fn create_test_table(name: &str) -> Table {
1716        Table {
1717            desc: VersionedRelationDesc::new(
1718                RelationDesc::builder()
1719                    .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1720                    .finish(),
1721            ),
1722            create_sql: None,
1723            collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1724            conn_id: None,
1725            resolved_ids: ResolvedIds::empty(),
1726            custom_logical_compaction_window: None,
1727            is_retained_metrics_object: false,
1728            data_source: TableDataSource::TableWrites { defaults: vec![] },
1729        }
1730    }
1731
1732    #[mz_ore::test]
1733    fn test_item_state_transitions() {
1734        // Test None -> Added
1735        let mut state = CatalogImplicationKind::None;
1736        assert!(
1737            state
1738                .transition("item1".to_string(), None, StateDiff::Addition)
1739                .is_ok()
1740        );
1741        assert!(matches!(state, CatalogImplicationKind::Added(_)));
1742
1743        // Test Added -> Altered (via retraction)
1744        let mut state = CatalogImplicationKind::Added("new_item".to_string());
1745        assert!(
1746            state
1747                .transition("old_item".to_string(), None, StateDiff::Retraction)
1748                .is_ok()
1749        );
1750        match &state {
1751            CatalogImplicationKind::Altered { prev, new } => {
1752                // The retracted item is the OLD state
1753                assert_eq!(prev, "old_item");
1754                // The existing Added item is the NEW state
1755                assert_eq!(new, "new_item");
1756            }
1757            _ => panic!("Expected Altered state"),
1758        }
1759
1760        // Test None -> Dropped
1761        let mut state = CatalogImplicationKind::None;
1762        assert!(
1763            state
1764                .transition(
1765                    "item1".to_string(),
1766                    Some("test_name".to_string()),
1767                    StateDiff::Retraction
1768                )
1769                .is_ok()
1770        );
1771        assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1772
1773        // Test Dropped -> Altered (via addition)
1774        let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1775        assert!(
1776            state
1777                .transition("new_item".to_string(), None, StateDiff::Addition)
1778                .is_ok()
1779        );
1780        match &state {
1781            CatalogImplicationKind::Altered { prev, new } => {
1782                // The existing Dropped item is the OLD state
1783                assert_eq!(prev, "old_item");
1784                // The added item is the NEW state
1785                assert_eq!(new, "new_item");
1786            }
1787            _ => panic!("Expected Altered state"),
1788        }
1789
1790        // Test invalid transitions
1791        let mut state = CatalogImplicationKind::Added("item".to_string());
1792        assert!(
1793            state
1794                .transition("item2".to_string(), None, StateDiff::Addition)
1795                .is_err()
1796        );
1797
1798        let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1799        assert!(
1800            state
1801                .transition("item2".to_string(), None, StateDiff::Retraction)
1802                .is_err()
1803        );
1804    }
1805
1806    #[mz_ore::test]
1807    fn test_table_absorb_state_machine() {
1808        let table1 = create_test_table("table1");
1809        let table2 = create_test_table("table2");
1810
1811        // Test None -> AddTable
1812        let mut cmd = CatalogImplication::None;
1813        cmd.absorb_table(
1814            table1.clone(),
1815            Some("schema.table1".to_string()),
1816            StateDiff::Addition,
1817        );
1818        // Check that we have an Added state
1819        match &cmd {
1820            CatalogImplication::Table(state) => match state {
1821                CatalogImplicationKind::Added(t) => {
1822                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1823                }
1824                _ => panic!("Expected Added state"),
1825            },
1826            _ => panic!("Expected Table command"),
1827        }
1828
1829        // Test AddTable -> AlterTable (via retraction)
1830        // This tests the bug fix: when we have AddTable(table1) and receive Retraction(table2),
1831        // table2 is the old state being removed, table1 is the new state
1832        cmd.absorb_table(
1833            table2.clone(),
1834            Some("schema.table2".to_string()),
1835            StateDiff::Retraction,
1836        );
1837        match &cmd {
1838            CatalogImplication::Table(state) => match state {
1839                CatalogImplicationKind::Altered { prev, new } => {
1840                    // Verify the fix: prev should be the retracted table, new should be the added table
1841                    assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1842                    assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1843                }
1844                _ => panic!("Expected Altered state"),
1845            },
1846            _ => panic!("Expected Table command"),
1847        }
1848
1849        // Test None -> DropTable
1850        let mut cmd = CatalogImplication::None;
1851        cmd.absorb_table(
1852            table1.clone(),
1853            Some("schema.table1".to_string()),
1854            StateDiff::Retraction,
1855        );
1856        match &cmd {
1857            CatalogImplication::Table(state) => match state {
1858                CatalogImplicationKind::Dropped(t, name) => {
1859                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1860                    assert_eq!(name, "schema.table1");
1861                }
1862                _ => panic!("Expected Dropped state"),
1863            },
1864            _ => panic!("Expected Table command"),
1865        }
1866
1867        // Test DropTable -> AlterTable (via addition)
1868        cmd.absorb_table(
1869            table2.clone(),
1870            Some("schema.table2".to_string()),
1871            StateDiff::Addition,
1872        );
1873        match &cmd {
1874            CatalogImplication::Table(state) => match state {
1875                CatalogImplicationKind::Altered { prev, new } => {
1876                    // prev should be the dropped table, new should be the added table
1877                    assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1878                    assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1879                }
1880                _ => panic!("Expected Altered state"),
1881            },
1882            _ => panic!("Expected Table command"),
1883        }
1884    }
1885
1886    #[mz_ore::test]
1887    #[should_panic(expected = "Cannot add an already added object")]
1888    fn test_invalid_double_add() {
1889        let table = create_test_table("table");
1890        let mut cmd = CatalogImplication::None;
1891
1892        // First addition
1893        cmd.absorb_table(
1894            table.clone(),
1895            Some("schema.table".to_string()),
1896            StateDiff::Addition,
1897        );
1898
1899        // Second addition should panic
1900        cmd.absorb_table(
1901            table.clone(),
1902            Some("schema.table".to_string()),
1903            StateDiff::Addition,
1904        );
1905    }
1906
1907    #[mz_ore::test]
1908    #[should_panic(expected = "Cannot drop an already dropped object")]
1909    fn test_invalid_double_drop() {
1910        let table = create_test_table("table");
1911        let mut cmd = CatalogImplication::None;
1912
1913        // First drop
1914        cmd.absorb_table(
1915            table.clone(),
1916            Some("schema.table".to_string()),
1917            StateDiff::Retraction,
1918        );
1919
1920        // Second drop should panic
1921        cmd.absorb_table(
1922            table.clone(),
1923            Some("schema.table".to_string()),
1924            StateDiff::Retraction,
1925        );
1926    }
1927}