Skip to main content

mz_adapter/coord/
catalog_implications.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to deriving and applying implications from [catalog
11//! changes](ParsedStateUpdate).
12//!
13//! The flow from "raw" catalog changes to [CatalogImplication] works like this:
14//!
15//! StateUpdateKind -> ParsedStateUpdate -> CatalogImplication
16//!
17//! [ParsedStateUpdate] adds context to a "raw" catalog change
18//! ([StateUpdateKind](mz_catalog::memory::objects::StateUpdateKind)). It
19//! includes an in-memory representation of the updated object, which can in
20//! theory be derived from the raw change but only when we have access to all
21//! the other raw changes or to an in-memory Catalog, which represents a
22//! "rollup" of all the raw changes.
23//!
24//! [CatalogImplication] is both the state machine that we use for absorbing
25//! multiple state updates for the same object and the final command that has to
26//! be applied to in-memory state and or the controller(s) after absorbing all
27//! the state updates in a given batch of updates.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37    CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38    MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
55use mz_storage_types::sinks::StorageSinkConnection;
56use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig};
57use tracing::{Instrument, info_span, warn};
58
59use crate::active_compute_sink::ActiveComputeSinkRetireReason;
60use crate::coord::Coordinator;
61use crate::coord::catalog_implications::parsed_state_updates::{
62    ParsedStateUpdate, ParsedStateUpdateKind,
63};
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71    /// Applies implications from the given bucket of [ParsedStateUpdate] to our
72    /// in-memory state and our controllers. This also applies transitive
73    /// implications, for example, peeks and subscribes will be cancelled when
74    /// referenced objects are dropped.
75    ///
76    /// This _requires_ that the given updates are consolidated. There must be
77    /// at most one addition and/or one retraction for a given item, as
78    /// identified by that items ID type.
79    #[instrument(level = "debug")]
80    pub async fn apply_catalog_implications(
81        &mut self,
82        ctx: Option<&mut ExecuteContext>,
83        catalog_updates: Vec<ParsedStateUpdate>,
84    ) -> Result<(), AdapterError> {
85        let start = Instant::now();
86
87        let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88        let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89        let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90            BTreeMap::new();
91
92        for update in catalog_updates {
93            tracing::trace!(?update, "got parsed state update");
94            match &update.kind {
95                ParsedStateUpdateKind::Item {
96                    durable_item,
97                    parsed_item: _,
98                    connection: _,
99                    parsed_full_name: _,
100                } => {
101                    let entry = catalog_implications
102                        .entry(durable_item.id.clone())
103                        .or_insert_with(|| CatalogImplication::None);
104                    entry.absorb(update);
105                }
106                ParsedStateUpdateKind::TemporaryItem {
107                    durable_item,
108                    parsed_item: _,
109                    connection: _,
110                    parsed_full_name: _,
111                } => {
112                    let entry = catalog_implications
113                        .entry(durable_item.id.clone())
114                        .or_insert_with(|| CatalogImplication::None);
115                    entry.absorb(update);
116                }
117                ParsedStateUpdateKind::Cluster {
118                    durable_cluster,
119                    parsed_cluster: _,
120                } => {
121                    let entry = cluster_commands
122                        .entry(durable_cluster.id)
123                        .or_insert_with(|| CatalogImplication::None);
124                    entry.absorb(update.clone());
125                }
126                ParsedStateUpdateKind::ClusterReplica {
127                    durable_cluster_replica,
128                    parsed_cluster_replica: _,
129                } => {
130                    let entry = cluster_replica_commands
131                        .entry((
132                            durable_cluster_replica.cluster_id,
133                            durable_cluster_replica.replica_id,
134                        ))
135                        .or_insert_with(|| CatalogImplication::None);
136                    entry.absorb(update.clone());
137                }
138            }
139        }
140
141        self.apply_catalog_implications_inner(
142            ctx,
143            catalog_implications.into_iter().collect_vec(),
144            cluster_commands.into_iter().collect_vec(),
145            cluster_replica_commands.into_iter().collect_vec(),
146        )
147        .await?;
148
149        self.metrics
150            .apply_catalog_implications_seconds
151            .observe(start.elapsed().as_secs_f64());
152
153        Ok(())
154    }
155
156    #[instrument(level = "debug")]
157    async fn apply_catalog_implications_inner(
158        &mut self,
159        ctx: Option<&mut ExecuteContext>,
160        implications: Vec<(CatalogItemId, CatalogImplication)>,
161        cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162        cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163    ) -> Result<(), AdapterError> {
164        let mut tables_to_drop = BTreeSet::new();
165        let mut sources_to_drop = vec![];
166        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167        let mut storage_sink_gids_to_drop = vec![];
168        let mut indexes_to_drop = vec![];
169        let mut compute_sinks_to_drop = vec![];
170        let mut view_gids_to_drop = vec![];
171        let mut secrets_to_drop = vec![];
172        let mut vpc_endpoints_to_drop = vec![];
173        let mut clusters_to_drop = vec![];
174        let mut cluster_replicas_to_drop = vec![];
175        let mut active_compute_sinks_to_drop = BTreeMap::new();
176        let mut peeks_to_drop = vec![];
177        let mut copies_to_drop = vec![];
178
179        // Maps for storing names of dropped objects for error messages.
180        let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
181        let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
182
183        // Separate collections for tables (which need write timestamps) and
184        // sources (which don't).
185        let mut table_collections_to_create = BTreeMap::new();
186        let mut source_collections_to_create = BTreeMap::new();
187        let mut storage_policies_to_initialize = BTreeMap::new();
188        let mut execution_timestamps_to_set = BTreeSet::new();
189        let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
190
191        // Sources that shouldn't be dropped, even if we saw a `Dropped` event.
192        // Used for correct handling of ALTER MV.
193        let mut source_gids_to_keep = BTreeSet::new();
194
195        // Collections for batching connection-related alterations.
196        let mut source_connections_to_alter: BTreeMap<
197            GlobalId,
198            GenericSourceConnection<InlinedConnection>,
199        > = BTreeMap::new();
200        let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
201            BTreeMap::new();
202        let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
203            BTreeMap::new();
204
205        // We're incrementally migrating the code that manipulates the
206        // controller from closures in the sequencer. For some types of catalog
207        // changes we haven't done this migration yet, so there you will see
208        // just a log message. Over the next couple of PRs all of these will go
209        // away.
210
211        for (catalog_id, implication) in implications {
212            tracing::trace!(?implication, "have to apply catalog implication");
213
214            match implication {
215                CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
216                    self.handle_create_table(
217                        &ctx,
218                        &mut table_collections_to_create,
219                        &mut storage_policies_to_initialize,
220                        &mut execution_timestamps_to_set,
221                        catalog_id,
222                        table.clone(),
223                    )
224                    .await?
225                }
226                CatalogImplication::Table(CatalogImplicationKind::Altered {
227                    prev: prev_table,
228                    new: new_table,
229                }) => self.handle_alter_table(prev_table, new_table).await?,
230
231                CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
232                    let global_ids = table.global_ids();
233                    for global_id in global_ids {
234                        tables_to_drop.insert((catalog_id, global_id));
235                        dropped_item_names.insert(global_id, full_name.clone());
236                    }
237                }
238                CatalogImplication::Source(CatalogImplicationKind::Added((
239                    source,
240                    _connection,
241                ))) => {
242                    // Get the compaction windows for all sources with this
243                    // catalog_id This replicates the logic from
244                    // sequence_create_source where it collects all item_ids and
245                    // gets their compaction windows
246                    let compaction_windows = self
247                        .catalog()
248                        .state()
249                        .source_compaction_windows(vec![catalog_id]);
250
251                    self.handle_create_source(
252                        &mut source_collections_to_create,
253                        &mut storage_policies_to_initialize,
254                        catalog_id,
255                        source,
256                        compaction_windows,
257                    )
258                    .await?
259                }
260                CatalogImplication::Source(CatalogImplicationKind::Altered {
261                    prev: (prev_source, _prev_connection),
262                    new: (new_source, _new_connection),
263                }) => {
264                    tracing::debug!(
265                        ?prev_source,
266                        ?new_source,
267                        "not handling AlterSource in here yet"
268                    );
269                }
270                CatalogImplication::Source(CatalogImplicationKind::Dropped(
271                    (source, connection),
272                    full_name,
273                )) => {
274                    let global_id = source.global_id();
275                    sources_to_drop.push((catalog_id, global_id));
276                    dropped_item_names.insert(global_id, full_name);
277
278                    if let DataSourceDesc::Ingestion { desc, .. }
279                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
280                    {
281                        match &desc.connection {
282                            GenericSourceConnection::Postgres(_referenced_conn) => {
283                                let inline_conn = connection.expect("missing inlined connection");
284
285                                let pg_conn = match inline_conn {
286                                    GenericSourceConnection::Postgres(pg_conn) => pg_conn,
287                                    other => {
288                                        panic!("expected postgres connection, got: {:?}", other)
289                                    }
290                                };
291                                let pending_drop = (
292                                    pg_conn.connection.clone(),
293                                    pg_conn.publication_details.slot.clone(),
294                                );
295                                replication_slots_to_drop.push(pending_drop);
296                            }
297                            _ => {}
298                        }
299                    }
300                }
301                CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
302                    tracing::debug!(?sink, "not handling AddSink in here yet");
303                }
304                CatalogImplication::Sink(CatalogImplicationKind::Altered {
305                    prev: prev_sink,
306                    new: new_sink,
307                }) => {
308                    tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
309                }
310                CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
311                    storage_sink_gids_to_drop.push(sink.global_id());
312                    dropped_item_names.insert(sink.global_id(), full_name);
313                }
314                CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
315                    tracing::debug!(?index, "not handling AddIndex in here yet");
316                }
317                CatalogImplication::Index(CatalogImplicationKind::Altered {
318                    prev: prev_index,
319                    new: new_index,
320                }) => {
321                    tracing::debug!(
322                        ?prev_index,
323                        ?new_index,
324                        "not handling AlterIndex in here yet"
325                    );
326                }
327                CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
328                    indexes_to_drop.push((index.cluster_id, index.global_id()));
329                    dropped_item_names.insert(index.global_id(), full_name);
330                }
331                CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
332                    tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
333                }
334                CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
335                    prev: prev_mv,
336                    new: new_mv,
337                }) => {
338                    // We get here for two reasons:
339                    //  * Name changes, like those caused by ALTER SCHEMA.
340                    //  * Replacement application.
341                    //
342                    // In the first case, we don't have to do anything here. The second case is
343                    // tricky: Replacement application changes the `CatalogItemId` of the target to
344                    // that of the replacement and simultaneously drops the replacement. Which
345                    // means when we get here `prev_mv` is the replacement that should be dropped,
346                    // and `new_mv` is the target that already exists but under a different ID
347                    // (which will receive a `Dropped` event separately). We can sniff out this
348                    // case by checking for version differences.
349                    if prev_mv.collections != new_mv.collections {
350                        // Sanity check: The replacement's last (and only) version must be the same
351                        // as the new target's last version.
352                        assert_eq!(
353                            prev_mv.global_id_writes(),
354                            new_mv.global_id_writes(),
355                            "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
356                        );
357
358                        let gid = new_mv.global_id_writes();
359                        self.allow_writes(new_mv.cluster_id, gid);
360
361                        // There will be a separate `Dropped` implication for the old definition of
362                        // the target MV. That will drop the old compute collection, as we desire,
363                        // but we need to prevent it from dropping the old storage collection as
364                        // well, since that might still be depended on.
365                        source_gids_to_keep.extend(new_mv.global_ids());
366                    }
367                }
368                CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
369                    mv,
370                    full_name,
371                )) => {
372                    compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
373                    for gid in mv.global_ids() {
374                        sources_to_drop.push((catalog_id, gid));
375                        dropped_item_names.insert(gid, full_name.clone());
376                    }
377                }
378                CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
379                    tracing::debug!(?view, "not handling AddView in here yet");
380                }
381                CatalogImplication::View(CatalogImplicationKind::Altered {
382                    prev: prev_view,
383                    new: new_view,
384                }) => {
385                    tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
386                }
387                CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
388                    view_gids_to_drop.push(view.global_id());
389                    dropped_item_names.insert(view.global_id(), full_name);
390                }
391                CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
392                    tracing::debug!(?ct, "not handling AddContinualTask in here yet");
393                }
394                CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
395                    prev: prev_ct,
396                    new: new_ct,
397                }) => {
398                    tracing::debug!(
399                        ?prev_ct,
400                        ?new_ct,
401                        "not handling AlterContinualTask in here yet"
402                    );
403                }
404                CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
405                    ct,
406                    _full_name,
407                )) => {
408                    compute_sinks_to_drop.push((ct.cluster_id, ct.global_id()));
409                    sources_to_drop.push((catalog_id, ct.global_id()));
410                }
411                CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
412                    // No action needed: the secret payload is stored in
413                    // secrets_controller.ensure() BEFORE the catalog transaction.
414                    // By the time we see this update, the secret is already stored.
415                }
416                CatalogImplication::Secret(CatalogImplicationKind::Altered {
417                    prev: _prev_secret,
418                    new: _new_secret,
419                }) => {
420                    // No action needed: altering a secret updates the payload via
421                    // secrets_controller.ensure() without a catalog transaction.
422                }
423                CatalogImplication::Secret(CatalogImplicationKind::Dropped(
424                    _secret,
425                    _full_name,
426                )) => {
427                    secrets_to_drop.push(catalog_id);
428                }
429                CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
430                    match &connection.details {
431                        // SSH connections: key pair is stored in secrets_controller
432                        // BEFORE the catalog transaction, so no action needed here.
433                        ConnectionDetails::Ssh { .. } => {}
434                        // AWS PrivateLink connections: create the VPC endpoint
435                        ConnectionDetails::AwsPrivatelink(privatelink) => {
436                            let spec = VpcEndpointConfig {
437                                aws_service_name: privatelink.service_name.to_owned(),
438                                availability_zone_ids: privatelink.availability_zones.to_owned(),
439                            };
440                            vpc_endpoints_to_create.push((catalog_id, spec));
441                        }
442                        // Other connection types don't require post-transaction actions
443                        _ => {}
444                    }
445                }
446                CatalogImplication::Connection(CatalogImplicationKind::Altered {
447                    prev: _prev_connection,
448                    new: new_connection,
449                }) => {
450                    self.handle_alter_connection(
451                        catalog_id,
452                        new_connection,
453                        &mut vpc_endpoints_to_create,
454                        &mut source_connections_to_alter,
455                        &mut sink_connections_to_alter,
456                        &mut source_export_data_configs_to_alter,
457                    );
458                }
459                CatalogImplication::Connection(CatalogImplicationKind::Dropped(
460                    connection,
461                    _full_name,
462                )) => {
463                    match &connection.details {
464                        // SSH connections have an associated secret that should be dropped
465                        ConnectionDetails::Ssh { .. } => {
466                            secrets_to_drop.push(catalog_id);
467                        }
468                        // AWS PrivateLink connections have an associated
469                        // VpcEndpoint K8S resource that should be dropped
470                        ConnectionDetails::AwsPrivatelink(_) => {
471                            vpc_endpoints_to_drop.push(catalog_id);
472                        }
473                        _ => (),
474                    }
475                }
476                CatalogImplication::None => {
477                    // Nothing to do for None commands
478                }
479                CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
480                    unreachable!("clusters and cluster replicas are handled below")
481                }
482                CatalogImplication::Table(CatalogImplicationKind::None)
483                | CatalogImplication::Source(CatalogImplicationKind::None)
484                | CatalogImplication::Sink(CatalogImplicationKind::None)
485                | CatalogImplication::Index(CatalogImplicationKind::None)
486                | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
487                | CatalogImplication::View(CatalogImplicationKind::None)
488                | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
489                | CatalogImplication::Secret(CatalogImplicationKind::None)
490                | CatalogImplication::Connection(CatalogImplicationKind::None) => {
491                    unreachable!("will never leave None in place");
492                }
493            }
494        }
495
496        for (cluster_id, command) in cluster_commands {
497            tracing::trace!(?command, "have cluster command to apply!");
498
499            match command {
500                CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
501                    tracing::debug!(?cluster, "not handling AddCluster in here yet");
502                }
503                CatalogImplication::Cluster(CatalogImplicationKind::Altered {
504                    prev: prev_cluster,
505                    new: new_cluster,
506                }) => {
507                    tracing::debug!(
508                        ?prev_cluster,
509                        ?new_cluster,
510                        "not handling AlterCluster in here yet"
511                    );
512                }
513                CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
514                    cluster,
515                    _full_name,
516                )) => {
517                    clusters_to_drop.push(cluster_id);
518                    dropped_cluster_names.insert(cluster_id, cluster.name);
519                }
520                CatalogImplication::Cluster(CatalogImplicationKind::None) => {
521                    unreachable!("will never leave None in place");
522                }
523                command => {
524                    unreachable!(
525                        "we only handle cluster commands in this map, got: {:?}",
526                        command
527                    );
528                }
529            }
530        }
531
532        for ((cluster_id, replica_id), command) in cluster_replica_commands {
533            tracing::trace!(?command, "have cluster replica command to apply!");
534
535            match command {
536                CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
537                    tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
538                }
539                CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
540                    prev: prev_replica,
541                    new: new_replica,
542                }) => {
543                    tracing::debug!(
544                        ?prev_replica,
545                        ?new_replica,
546                        "not handling AlterClusterReplica in here yet"
547                    );
548                }
549                CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
550                    _replica,
551                    _full_name,
552                )) => {
553                    cluster_replicas_to_drop.push((cluster_id, replica_id));
554                }
555                CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
556                    unreachable!("will never leave None in place");
557                }
558                command => {
559                    unreachable!(
560                        "we only handle cluster replica commands in this map, got: {:?}",
561                        command
562                    );
563                }
564            }
565        }
566
567        if !source_collections_to_create.is_empty() {
568            self.create_source_collections(source_collections_to_create)
569                .await?;
570        }
571
572        // Have to create sources first and then tables, because tables within
573        // one transaction can depend on sources.
574        if !table_collections_to_create.is_empty() {
575            self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
576                .await?;
577        }
578        // It is _very_ important that we only initialize read policies after we
579        // have created all the sources/collections. Some of the sources created
580        // in this collection might have dependencies on other sources, so the
581        // controller must get a chance to install read holds before we set a
582        // policy that might make the since advance.
583        self.initialize_storage_collections(storage_policies_to_initialize)
584            .await?;
585
586        // Create VPC endpoints for AWS PrivateLink connections
587        if !vpc_endpoints_to_create.is_empty() {
588            if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
589                for (connection_id, spec) in vpc_endpoints_to_create {
590                    if let Err(err) = cloud_resource_controller
591                        .ensure_vpc_endpoint(connection_id, spec)
592                        .await
593                    {
594                        tracing::error!(?err, "failed to ensure vpc endpoint!");
595                    }
596                }
597            } else {
598                tracing::error!(
599                    "AWS PrivateLink connections unsupported without cloud_resource_controller"
600                );
601            }
602        }
603
604        // Apply batched connection alterations to dependent sources/sinks/tables.
605        if !source_connections_to_alter.is_empty() {
606            self.controller
607                .storage
608                .alter_ingestion_connections(source_connections_to_alter)
609                .await
610                .unwrap_or_terminate("cannot fail to alter ingestion connections");
611        }
612
613        if !sink_connections_to_alter.is_empty() {
614            self.controller
615                .storage
616                .alter_export_connections(sink_connections_to_alter)
617                .await
618                .unwrap_or_terminate("altering export connections after txn must succeed");
619        }
620
621        if !source_export_data_configs_to_alter.is_empty() {
622            self.controller
623                .storage
624                .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
625                .await
626                .unwrap_or_terminate("altering source export data configs after txn must succeed");
627        }
628
629        // Apply source drop overwrites.
630        sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
631
632        let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
633            .iter()
634            .map(|(_, gid)| *gid)
635            .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
636            .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
637            .chain(view_gids_to_drop.iter().copied())
638            .collect();
639
640        // Clean up any active compute sinks like subscribes or copy to-s that
641        // rely on dropped relations or clusters.
642        for (sink_id, sink) in &self.active_compute_sinks {
643            let cluster_id = sink.cluster_id();
644            if let Some(id) = sink
645                .depends_on()
646                .iter()
647                .find(|id| readable_collections_to_drop.contains(id))
648            {
649                let name = dropped_item_names
650                    .get(id)
651                    .map(|n| format!("relation {}", n.quoted()))
652                    .expect("missing relation name");
653                active_compute_sinks_to_drop.insert(
654                    *sink_id,
655                    ActiveComputeSinkRetireReason::DependencyDropped(name),
656                );
657            } else if clusters_to_drop.contains(&cluster_id) {
658                let name = dropped_cluster_names
659                    .get(&cluster_id)
660                    .map(|n| format!("cluster {}", n.quoted()))
661                    .expect("missing cluster name");
662                active_compute_sinks_to_drop.insert(
663                    *sink_id,
664                    ActiveComputeSinkRetireReason::DependencyDropped(name),
665                );
666            }
667        }
668
669        // Clean up any pending peeks that rely on dropped relations or clusters.
670        for (uuid, pending_peek) in &self.pending_peeks {
671            if let Some(id) = pending_peek
672                .depends_on
673                .iter()
674                .find(|id| readable_collections_to_drop.contains(id))
675            {
676                let name = dropped_item_names
677                    .get(id)
678                    .map(|n| format!("relation {}", n.quoted()))
679                    .expect("missing relation name");
680                peeks_to_drop.push((name, uuid.clone()));
681            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
682                let name = dropped_cluster_names
683                    .get(&pending_peek.cluster_id)
684                    .map(|n| format!("cluster {}", n.quoted()))
685                    .expect("missing cluster name");
686                peeks_to_drop.push((name, uuid.clone()));
687            }
688        }
689
690        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
691        for (conn_id, pending_copy) in &self.active_copies {
692            let dropping_table = tables_to_drop
693                .iter()
694                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
695            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
696
697            if dropping_table || dropping_cluster {
698                copies_to_drop.push(conn_id.clone());
699            }
700        }
701
702        let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
703            .iter()
704            .map(|(_id, gid)| gid)
705            .chain(storage_sink_gids_to_drop.iter())
706            .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
707            .copied()
708            .collect();
709        let compute_gids_to_drop: Vec<_> = indexes_to_drop
710            .iter()
711            .chain(compute_sinks_to_drop.iter())
712            .copied()
713            .collect();
714
715        // Gather resources that we have to remove from timeline state and
716        // pre-check if any Timelines become empty, when we drop the specified
717        // storage and compute resources.
718        //
719        // Note: We only apply these changes below.
720        let mut timeline_id_bundles = BTreeMap::new();
721
722        for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
723            let mut id_bundle = CollectionIdBundle::default();
724
725            for storage_id in read_holds.storage_ids() {
726                if storage_gids_to_drop.contains(&storage_id) {
727                    id_bundle.storage_ids.insert(storage_id);
728                }
729            }
730
731            for (instance_id, id) in read_holds.compute_ids() {
732                if compute_gids_to_drop.contains(&(instance_id, id))
733                    || clusters_to_drop.contains(&instance_id)
734                {
735                    id_bundle
736                        .compute_ids
737                        .entry(instance_id)
738                        .or_default()
739                        .insert(id);
740                }
741            }
742
743            timeline_id_bundles.insert(timeline.clone(), id_bundle);
744        }
745
746        let mut timeline_associations = BTreeMap::new();
747        for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
748            let TimelineState { read_holds, .. } = self
749                .global_timelines
750                .get(&timeline)
751                .expect("all timelines have a timestamp oracle");
752
753            let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
754            timeline_associations.insert(timeline, (empty, id_bundle));
755        }
756
757        // No error returns are allowed after this point. Enforce this at compile time
758        // by using this odd structure so we don't accidentally add a stray `?`.
759        let _: () = async {
760            if !timeline_associations.is_empty() {
761                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
762                    let became_empty =
763                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
764                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
765                }
766            }
767
768            // Note that we drop tables before sources since there can be a weak
769            // dependency on sources from tables in the storage controller that
770            // will result in error logging that we'd prefer to avoid. This
771            // isn't an actual dependency issue but we'd like to keep that error
772            // logging around to indicate when an actual dependency error might
773            // occur.
774            if !tables_to_drop.is_empty() {
775                let ts = self.get_local_write_ts().await;
776                self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
777            }
778
779            if !sources_to_drop.is_empty() {
780                self.drop_sources(sources_to_drop);
781            }
782
783            if !storage_sink_gids_to_drop.is_empty() {
784                self.drop_storage_sinks(storage_sink_gids_to_drop);
785            }
786
787            if !active_compute_sinks_to_drop.is_empty() {
788                self.retire_compute_sinks(active_compute_sinks_to_drop)
789                    .await;
790            }
791
792            if !peeks_to_drop.is_empty() {
793                for (dropped_name, uuid) in peeks_to_drop {
794                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
795                        let cancel_reason = PeekResponse::Error(format!(
796                            "query could not complete because {dropped_name} was dropped"
797                        ));
798                        self.controller
799                            .compute
800                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
801                            .unwrap_or_terminate("unable to cancel peek");
802                        self.retire_execution(
803                            StatementEndedExecutionReason::Canceled,
804                            pending_peek.ctx_extra.defuse(),
805                        );
806                    }
807                }
808            }
809
810            if !copies_to_drop.is_empty() {
811                for conn_id in copies_to_drop {
812                    self.cancel_pending_copy(&conn_id);
813                }
814            }
815
816            if !compute_gids_to_drop.is_empty() {
817                self.drop_compute_collections(compute_gids_to_drop);
818            }
819
820            if !vpc_endpoints_to_drop.is_empty() {
821                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
822            }
823
824            if !cluster_replicas_to_drop.is_empty() {
825                fail::fail_point!("after_catalog_drop_replica");
826
827                for (cluster_id, replica_id) in cluster_replicas_to_drop {
828                    self.drop_replica(cluster_id, replica_id);
829                }
830            }
831            if !clusters_to_drop.is_empty() {
832                for cluster_id in clusters_to_drop {
833                    self.controller.drop_cluster(cluster_id);
834                }
835            }
836
837            // We don't want to block the main coordinator thread on cleaning
838            // up external resources (PostgreSQL replication slots and secrets),
839            // so we perform that cleanup in a background task.
840            //
841            // TODO(14551): This is inherently best effort. An ill-timed crash
842            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
843            // See <https://github.com/MaterializeInc/materialize/issues/14551>
844            task::spawn(|| "drop_replication_slots_and_secrets", {
845                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
846                let secrets_controller = Arc::clone(&self.secrets_controller);
847                let secrets_reader = Arc::clone(self.secrets_reader());
848                let storage_config = self.controller.storage.config().clone();
849
850                async move {
851                    for (connection, replication_slot_name) in replication_slots_to_drop {
852                        tracing::info!(?replication_slot_name, "dropping replication slot");
853
854                        // Try to drop the replication slots, but give up after
855                        // a while. The PostgreSQL server may no longer be
856                        // healthy. Users often drop PostgreSQL sources
857                        // *because* the PostgreSQL server has been
858                        // decomissioned.
859                        let result: Result<(), anyhow::Error> = Retry::default()
860                            .max_duration(Duration::from_secs(60))
861                            .retry_async(|_state| async {
862                                let config = connection
863                                    .config(&secrets_reader, &storage_config, InTask::No)
864                                    .await
865                                    .map_err(|e| {
866                                        anyhow::anyhow!(
867                                            "error creating Postgres client for \
868                                            dropping acquired slots: {}",
869                                            e.display_with_causes()
870                                        )
871                                    })?;
872
873                                mz_postgres_util::drop_replication_slots(
874                                    &ssh_tunnel_manager,
875                                    config.clone(),
876                                    &[(&replication_slot_name, true)],
877                                )
878                                .await?;
879
880                                Ok(())
881                            })
882                            .await;
883
884                        if let Err(err) = result {
885                            tracing::warn!(
886                                ?replication_slot_name,
887                                ?err,
888                                "failed to drop replication slot"
889                            );
890                        }
891                    }
892
893                    // Drop secrets *after* dropping the replication slots,
894                    // because dropping replication slots may rely on those
895                    // secrets still being present.
896                    //
897                    // It's okay if we crash before processing the secret drops,
898                    // as we look for and remove any orphaned secrets during
899                    // startup.
900                    fail_point!("drop_secrets");
901                    for secret in secrets_to_drop {
902                        if let Err(e) = secrets_controller.delete(secret).await {
903                            warn!("Dropping secrets has encountered an error: {}", e);
904                        }
905                    }
906                }
907            });
908        }
909        .instrument(info_span!(
910            "coord::apply_catalog_implications_inner::finalize"
911        ))
912        .await;
913
914        Ok(())
915    }
916
917    #[instrument(level = "debug")]
918    async fn create_table_collections(
919        &mut self,
920        table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
921        execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
922    ) -> Result<(), AdapterError> {
923        // If we have tables, determine the initial validity for the table.
924        let register_ts = self.get_local_write_ts().await.timestamp;
925
926        // After acquiring `register_ts` but before using it, we need to
927        // be sure we're still the leader. Otherwise a new generation
928        // may also be trying to use `register_ts` for a different
929        // purpose.
930        //
931        // See #28216.
932        self.catalog
933            .confirm_leadership()
934            .await
935            .unwrap_or_terminate("unable to confirm leadership");
936
937        for id in execution_timestamps_to_set {
938            self.set_statement_execution_timestamp(id, register_ts);
939        }
940
941        let storage_metadata = self.catalog.state().storage_metadata();
942
943        self.controller
944            .storage
945            .create_collections(
946                storage_metadata,
947                Some(register_ts),
948                table_collections_to_create.into_iter().collect_vec(),
949            )
950            .await
951            .unwrap_or_terminate("cannot fail to create collections");
952
953        self.apply_local_write(register_ts).await;
954
955        Ok(())
956    }
957
958    #[instrument(level = "debug")]
959    async fn create_source_collections(
960        &mut self,
961        source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
962    ) -> Result<(), AdapterError> {
963        let storage_metadata = self.catalog.state().storage_metadata();
964
965        self.controller
966            .storage
967            .create_collections(
968                storage_metadata,
969                None, // Sources don't need a write timestamp
970                source_collections_to_create.into_iter().collect_vec(),
971            )
972            .await
973            .unwrap_or_terminate("cannot fail to create collections");
974
975        Ok(())
976    }
977
978    #[instrument(level = "debug")]
979    async fn initialize_storage_collections(
980        &mut self,
981        storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
982    ) -> Result<(), AdapterError> {
983        for (compaction_window, global_ids) in storage_policies_to_initialize {
984            self.initialize_read_policies(
985                &CollectionIdBundle {
986                    storage_ids: global_ids,
987                    compute_ids: BTreeMap::new(),
988                },
989                compaction_window,
990            )
991            .await;
992        }
993
994        Ok(())
995    }
996
997    #[instrument(level = "debug")]
998    async fn handle_create_table(
999        &self,
1000        ctx: &Option<&mut ExecuteContext>,
1001        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1002        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1003        execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1004        table_id: CatalogItemId,
1005        table: Table,
1006    ) -> Result<(), AdapterError> {
1007        // The table data_source determines whether this table will be written to
1008        // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1009        // (e.g. a source-fed table).
1010        match &table.data_source {
1011            TableDataSource::TableWrites { defaults: _ } => {
1012                let versions: BTreeMap<_, _> = table
1013                    .collection_descs()
1014                    .map(|(gid, version, desc)| (version, (gid, desc)))
1015                    .collect();
1016                let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1017                    let collection_desc = CollectionDescription::for_table(desc.clone());
1018
1019                    (*gid, collection_desc)
1020                });
1021
1022                let compaction_window = table
1023                    .custom_logical_compaction_window
1024                    .unwrap_or(CompactionWindow::Default);
1025                let ids_to_initialize = storage_policies_to_initialize
1026                    .entry(compaction_window)
1027                    .or_default();
1028
1029                for (gid, collection_desc) in collection_descs {
1030                    storage_collections_to_create.insert(gid, collection_desc);
1031                    ids_to_initialize.insert(gid);
1032                }
1033
1034                if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1035                    execution_timestamps_to_set.insert(id);
1036                }
1037            }
1038            TableDataSource::DataSource {
1039                desc: data_source_desc,
1040                timeline,
1041            } => {
1042                match data_source_desc {
1043                    DataSourceDesc::IngestionExport {
1044                        ingestion_id,
1045                        external_reference: _,
1046                        details,
1047                        data_config,
1048                    } => {
1049                        let global_ingestion_id =
1050                            self.catalog().get_entry(ingestion_id).latest_global_id();
1051
1052                        let collection_desc = CollectionDescription::<Timestamp> {
1053                            desc: table.desc.latest(),
1054                            data_source: DataSource::IngestionExport {
1055                                ingestion_id: global_ingestion_id,
1056                                details: details.clone(),
1057                                data_config: data_config
1058                                    .clone()
1059                                    .into_inline_connection(self.catalog.state()),
1060                            },
1061                            since: None,
1062                            timeline: Some(timeline.clone()),
1063                            primary: None,
1064                        };
1065
1066                        let global_id = table
1067                            .global_ids()
1068                            .expect_element(|| "subsources cannot have multiple versions");
1069
1070                        storage_collections_to_create.insert(global_id, collection_desc);
1071
1072                        let read_policies = self
1073                            .catalog()
1074                            .state()
1075                            .source_compaction_windows(vec![table_id]);
1076                        for (compaction_window, catalog_ids) in read_policies {
1077                            let compaction_ids = storage_policies_to_initialize
1078                                .entry(compaction_window)
1079                                .or_default();
1080
1081                            let gids = catalog_ids
1082                                .into_iter()
1083                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1084                                .flatten();
1085                            compaction_ids.extend(gids);
1086                        }
1087                    }
1088                    DataSourceDesc::Webhook {
1089                        validate_using: _,
1090                        body_format: _,
1091                        headers: _,
1092                        cluster_id: _,
1093                    } => {
1094                        // Create the underlying collection with the latest schema from the Table.
1095                        assert_eq!(
1096                            table.desc.latest_version(),
1097                            RelationVersion::root(),
1098                            "found webhook with more than 1 relation version, {:?}",
1099                            table.desc
1100                        );
1101                        let desc = table.desc.latest();
1102
1103                        let collection_desc = CollectionDescription::<Timestamp> {
1104                            desc,
1105                            data_source: DataSource::Webhook,
1106                            since: None,
1107                            timeline: Some(timeline.clone()),
1108                            primary: None,
1109                        };
1110
1111                        let global_id = table
1112                            .global_ids()
1113                            .expect_element(|| "webhooks cannot have multiple versions");
1114
1115                        storage_collections_to_create.insert(global_id, collection_desc);
1116
1117                        let read_policies = self
1118                            .catalog()
1119                            .state()
1120                            .source_compaction_windows(vec![table_id]);
1121
1122                        for (compaction_window, catalog_ids) in read_policies {
1123                            let compaction_ids = storage_policies_to_initialize
1124                                .entry(compaction_window)
1125                                .or_default();
1126
1127                            let gids = catalog_ids
1128                                .into_iter()
1129                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1130                                .flatten();
1131                            compaction_ids.extend(gids);
1132                        }
1133                    }
1134                    _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1135                }
1136            }
1137        }
1138
1139        Ok(())
1140    }
1141
1142    #[instrument(level = "debug")]
1143    async fn handle_alter_table(
1144        &mut self,
1145        prev_table: Table,
1146        new_table: Table,
1147    ) -> Result<(), AdapterError> {
1148        let existing_gid = prev_table.global_id_writes();
1149        let new_gid = new_table.global_id_writes();
1150
1151        if existing_gid == new_gid {
1152            // It's not an ALTER TABLE as far as the controller is concerned,
1153            // because we still have the same GlobalId. This is likely a change
1154            // from an ALTER SWAP.
1155            return Ok(());
1156        }
1157
1158        // Acquire a read hold on the original table for the duration of
1159        // the alter to prevent the since of the original table from
1160        // getting advanced, while the ALTER is running.
1161        let existing_table = crate::CollectionIdBundle {
1162            storage_ids: BTreeSet::from([existing_gid]),
1163            compute_ids: BTreeMap::new(),
1164        };
1165        let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1166
1167        let expected_version = prev_table.desc.latest_version();
1168        let new_version = new_table.desc.latest_version();
1169        let new_desc = new_table
1170            .desc
1171            .at_version(RelationVersionSelector::Specific(new_version));
1172
1173        let register_ts = self.get_local_write_ts().await.timestamp;
1174
1175        // Alter the table description, creating a "new" collection.
1176        self.controller
1177            .storage
1178            .alter_table_desc(
1179                existing_gid,
1180                new_gid,
1181                new_desc,
1182                expected_version,
1183                register_ts,
1184            )
1185            .await
1186            .expect("failed to alter desc of table");
1187
1188        // Initialize the ReadPolicy which ensures we have the correct read holds.
1189        let compaction_window = new_table
1190            .custom_logical_compaction_window
1191            .unwrap_or(CompactionWindow::Default);
1192        self.initialize_read_policies(
1193            &crate::CollectionIdBundle {
1194                storage_ids: BTreeSet::from([new_gid]),
1195                compute_ids: BTreeMap::new(),
1196            },
1197            compaction_window,
1198        )
1199        .await;
1200
1201        self.apply_local_write(register_ts).await;
1202
1203        // Alter is complete! We can drop our read hold.
1204        drop(existing_table_read_hold);
1205
1206        Ok(())
1207    }
1208
1209    #[instrument(level = "debug")]
1210    async fn handle_create_source(
1211        &self,
1212        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1213        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1214        item_id: CatalogItemId,
1215        source: Source,
1216        compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1217    ) -> Result<(), AdapterError> {
1218        let data_source = match source.data_source {
1219            DataSourceDesc::Ingestion { desc, cluster_id } => {
1220                let desc = desc.into_inline_connection(self.catalog().state());
1221                let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1222
1223                let ingestion = mz_storage_types::sources::IngestionDescription::new(
1224                    desc,
1225                    cluster_id,
1226                    item_global_id,
1227                );
1228
1229                DataSource::Ingestion(ingestion)
1230            }
1231            DataSourceDesc::OldSyntaxIngestion {
1232                desc,
1233                progress_subsource,
1234                data_config,
1235                details,
1236                cluster_id,
1237            } => {
1238                let desc = desc.into_inline_connection(self.catalog().state());
1239                let data_config = data_config.into_inline_connection(self.catalog().state());
1240
1241                // TODO(parkmycar): We should probably check the type here, but I'm not
1242                // sure if this will always be a Source or a Table.
1243                let progress_subsource = self
1244                    .catalog()
1245                    .get_entry(&progress_subsource)
1246                    .latest_global_id();
1247
1248                let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1249                    desc,
1250                    cluster_id,
1251                    progress_subsource,
1252                );
1253
1254                let legacy_export = SourceExport {
1255                    storage_metadata: (),
1256                    data_config,
1257                    details,
1258                };
1259
1260                ingestion
1261                    .source_exports
1262                    .insert(source.global_id, legacy_export);
1263
1264                DataSource::Ingestion(ingestion)
1265            }
1266            DataSourceDesc::IngestionExport {
1267                ingestion_id,
1268                external_reference: _,
1269                details,
1270                data_config,
1271            } => {
1272                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1273                // this will always be a Source or a Table.
1274                let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1275
1276                DataSource::IngestionExport {
1277                    ingestion_id,
1278                    details,
1279                    data_config: data_config.into_inline_connection(self.catalog().state()),
1280                }
1281            }
1282            DataSourceDesc::Progress => DataSource::Progress,
1283            DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1284            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1285                unreachable!("cannot create sources with internal data sources")
1286            }
1287        };
1288
1289        storage_collections_to_create.insert(
1290            source.global_id,
1291            CollectionDescription::<Timestamp> {
1292                desc: source.desc.clone(),
1293                data_source,
1294                timeline: Some(source.timeline),
1295                since: None,
1296                primary: None,
1297            },
1298        );
1299
1300        // Initialize read policies for the source
1301        for (compaction_window, catalog_ids) in compaction_windows {
1302            let compaction_ids = storage_policies_to_initialize
1303                .entry(compaction_window)
1304                .or_default();
1305
1306            let gids = catalog_ids
1307                .into_iter()
1308                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1309                .flatten();
1310            compaction_ids.extend(gids);
1311        }
1312
1313        Ok(())
1314    }
1315
1316    /// Handles altering a connection by collecting all the dependent sources,
1317    /// sinks, and tables that need their connection updated.
1318    ///
1319    /// This mirrors the logic from `sequence_alter_connection_stage_finish` but
1320    /// collects the changes into batched collections for application after all
1321    /// implications are processed.
1322    #[instrument(level = "debug")]
1323    fn handle_alter_connection(
1324        &self,
1325        connection_id: CatalogItemId,
1326        connection: Connection,
1327        vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1328        source_connections_to_alter: &mut BTreeMap<
1329            GlobalId,
1330            GenericSourceConnection<InlinedConnection>,
1331        >,
1332        sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1333        source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1334    ) {
1335        use std::collections::VecDeque;
1336
1337        // Handle AWS PrivateLink connections by queueing VPC endpoint creation.
1338        if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1339            let spec = VpcEndpointConfig {
1340                aws_service_name: privatelink.service_name.to_owned(),
1341                availability_zone_ids: privatelink.availability_zones.to_owned(),
1342            };
1343            vpc_endpoints_to_create.push((connection_id, spec));
1344        }
1345
1346        // Walk the dependency graph to find all sources, sinks, and tables
1347        // that depend on this connection (directly or transitively through
1348        // other connections).
1349        let mut connections_to_process = VecDeque::new();
1350        connections_to_process.push_front(connection_id.clone());
1351
1352        while let Some(id) = connections_to_process.pop_front() {
1353            for dependent_id in self.catalog().get_entry(&id).used_by() {
1354                let dependent_entry = self.catalog().get_entry(dependent_id);
1355                match dependent_entry.item() {
1356                    CatalogItem::Connection(_) => {
1357                        // Connections can depend on other connections (e.g., a
1358                        // Kafka connection using an SSH tunnel connection).
1359                        // Process these transitively.
1360                        connections_to_process.push_back(*dependent_id);
1361                    }
1362                    CatalogItem::Source(source) => {
1363                        let desc = match &dependent_entry
1364                            .source()
1365                            .expect("known to be source")
1366                            .data_source
1367                        {
1368                            DataSourceDesc::Ingestion { desc, .. }
1369                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1370                                desc.clone().into_inline_connection(self.catalog().state())
1371                            }
1372                            _ => {
1373                                // Only ingestions reference connections directly.
1374                                continue;
1375                            }
1376                        };
1377
1378                        source_connections_to_alter.insert(source.global_id, desc.connection);
1379                    }
1380                    CatalogItem::Sink(sink) => {
1381                        let export = dependent_entry.sink().expect("known to be sink");
1382                        sink_connections_to_alter.insert(
1383                            sink.global_id,
1384                            export
1385                                .connection
1386                                .clone()
1387                                .into_inline_connection(self.catalog().state()),
1388                        );
1389                    }
1390                    CatalogItem::Table(table) => {
1391                        // This is a source-fed table that references a schema
1392                        // registry connection as part of its encoding/data
1393                        // config.
1394                        if let Some((_, _, _, export_data_config)) =
1395                            dependent_entry.source_export_details()
1396                        {
1397                            let data_config = export_data_config.clone();
1398                            source_export_data_configs_to_alter.insert(
1399                                table.global_id_writes(),
1400                                data_config.into_inline_connection(self.catalog().state()),
1401                            );
1402                        }
1403                    }
1404                    _ => {
1405                        // Other item types don't have connection dependencies
1406                        // that need updating.
1407                    }
1408                }
1409            }
1410        }
1411    }
1412}
1413
1414/// A state machine for building catalog implications from catalog updates.
1415///
1416/// Once all [ParsedStateUpdate] of a timestamp are ingested this is a command
1417/// that has to potentially be applied to in-memory state and/or the
1418/// controller(s).
1419#[derive(Debug, Clone)]
1420enum CatalogImplication {
1421    None,
1422    Table(CatalogImplicationKind<Table>),
1423    Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1424    Sink(CatalogImplicationKind<Sink>),
1425    Index(CatalogImplicationKind<Index>),
1426    MaterializedView(CatalogImplicationKind<MaterializedView>),
1427    View(CatalogImplicationKind<View>),
1428    ContinualTask(CatalogImplicationKind<ContinualTask>),
1429    Secret(CatalogImplicationKind<Secret>),
1430    Connection(CatalogImplicationKind<Connection>),
1431    Cluster(CatalogImplicationKind<Cluster>),
1432    ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1433}
1434
1435#[derive(Debug, Clone)]
1436enum CatalogImplicationKind<T> {
1437    /// No operations seen yet.
1438    None,
1439    /// Item was added.
1440    Added(T),
1441    /// Item was dropped (with its name retained for error messages).
1442    Dropped(T, String),
1443    /// Item is being altered from one state to another.
1444    Altered { prev: T, new: T },
1445}
1446
1447impl<T: Clone> CatalogImplicationKind<T> {
1448    /// Apply a state transition based on a diff. Returns an error message if
1449    /// the transition is invalid.
1450    fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1451        use CatalogImplicationKind::*;
1452        use StateDiff::*;
1453
1454        let new_state = match (&*self, diff) {
1455            // Initial state transitions
1456            (None, Addition) => Added(item),
1457            (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1458
1459            // From Added state
1460            (Added(existing), Retraction) => {
1461                // Add -> Drop means the item is being altered
1462                Altered {
1463                    prev: item,
1464                    new: existing.clone(),
1465                }
1466            }
1467            (Added(_), Addition) => {
1468                return Err("Cannot add an already added object".to_string());
1469            }
1470
1471            // From Dropped state
1472            (Dropped(existing, _), Addition) => {
1473                // Drop -> Add means the item is being altered
1474                Altered {
1475                    prev: existing.clone(),
1476                    new: item,
1477                }
1478            }
1479            (Dropped(_, _), Retraction) => {
1480                return Err("Cannot drop an already dropped object".to_string());
1481            }
1482
1483            // From Altered state
1484            (Altered { .. }, _) => {
1485                return Err(format!(
1486                    "Cannot apply {:?} to an object in Altered state",
1487                    diff
1488                ));
1489            }
1490        };
1491
1492        *self = new_state;
1493        Ok(())
1494    }
1495}
1496
1497/// Macro to generate absorb methods for each item type.
1498macro_rules! impl_absorb_method {
1499    (
1500        $method_name:ident,
1501        $variant:ident,
1502        $item_type:ty
1503    ) => {
1504        fn $method_name(
1505            &mut self,
1506            item: $item_type,
1507            parsed_full_name: Option<String>,
1508            diff: StateDiff,
1509        ) {
1510            let state = match self {
1511                CatalogImplication::$variant(state) => state,
1512                CatalogImplication::None => {
1513                    *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1514                    match self {
1515                        CatalogImplication::$variant(state) => state,
1516                        _ => unreachable!(),
1517                    }
1518                }
1519                _ => {
1520                    panic!(
1521                        "Unexpected command type for {:?}: {} {:?}",
1522                        self,
1523                        stringify!($variant),
1524                        diff,
1525                    );
1526                }
1527            };
1528
1529            if let Err(e) = state.transition(item, parsed_full_name, diff) {
1530                panic!(
1531                    "Invalid state transition for {}: {}",
1532                    stringify!($variant),
1533                    e
1534                );
1535            }
1536        }
1537    };
1538}
1539
1540impl CatalogImplication {
1541    /// Absorbs the given catalog update into this [CatalogImplication], causing
1542    /// a state transition or error.
1543    fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1544        match catalog_update.kind {
1545            ParsedStateUpdateKind::Item {
1546                durable_item: _,
1547                parsed_item,
1548                connection,
1549                parsed_full_name,
1550            } => match parsed_item {
1551                CatalogItem::Table(table) => {
1552                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1553                }
1554                CatalogItem::Source(source) => {
1555                    self.absorb_source(
1556                        (source, connection),
1557                        Some(parsed_full_name),
1558                        catalog_update.diff,
1559                    );
1560                }
1561                CatalogItem::Sink(sink) => {
1562                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1563                }
1564                CatalogItem::Index(index) => {
1565                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1566                }
1567                CatalogItem::MaterializedView(mv) => {
1568                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1569                }
1570                CatalogItem::View(view) => {
1571                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1572                }
1573                CatalogItem::ContinualTask(ct) => {
1574                    self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1575                }
1576                CatalogItem::Secret(secret) => {
1577                    self.absorb_secret(secret, None, catalog_update.diff);
1578                }
1579                CatalogItem::Connection(connection) => {
1580                    self.absorb_connection(connection, None, catalog_update.diff);
1581                }
1582                CatalogItem::Log(_) => {}
1583                CatalogItem::Type(_) => {}
1584                CatalogItem::Func(_) => {}
1585            },
1586            ParsedStateUpdateKind::TemporaryItem {
1587                durable_item: _,
1588                parsed_item,
1589                connection,
1590                parsed_full_name,
1591            } => match parsed_item {
1592                CatalogItem::Table(table) => {
1593                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1594                }
1595                CatalogItem::Source(source) => {
1596                    self.absorb_source(
1597                        (source, connection),
1598                        Some(parsed_full_name),
1599                        catalog_update.diff,
1600                    );
1601                }
1602                CatalogItem::Sink(sink) => {
1603                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1604                }
1605                CatalogItem::Index(index) => {
1606                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1607                }
1608                CatalogItem::MaterializedView(mv) => {
1609                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1610                }
1611                CatalogItem::View(view) => {
1612                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1613                }
1614                CatalogItem::ContinualTask(ct) => {
1615                    self.absorb_continual_task(ct, None, catalog_update.diff);
1616                }
1617                CatalogItem::Secret(secret) => {
1618                    self.absorb_secret(secret, None, catalog_update.diff);
1619                }
1620                CatalogItem::Connection(connection) => {
1621                    self.absorb_connection(connection, None, catalog_update.diff);
1622                }
1623                CatalogItem::Log(_) => {}
1624                CatalogItem::Type(_) => {}
1625                CatalogItem::Func(_) => {}
1626            },
1627            ParsedStateUpdateKind::Cluster {
1628                durable_cluster: _,
1629                parsed_cluster,
1630            } => {
1631                self.absorb_cluster(parsed_cluster, catalog_update.diff);
1632            }
1633            ParsedStateUpdateKind::ClusterReplica {
1634                durable_cluster_replica: _,
1635                parsed_cluster_replica,
1636            } => {
1637                self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1638            }
1639        }
1640    }
1641
1642    impl_absorb_method!(absorb_table, Table, Table);
1643    impl_absorb_method!(
1644        absorb_source,
1645        Source,
1646        (Source, Option<GenericSourceConnection>)
1647    );
1648    impl_absorb_method!(absorb_sink, Sink, Sink);
1649    impl_absorb_method!(absorb_index, Index, Index);
1650    impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1651    impl_absorb_method!(absorb_view, View, View);
1652
1653    impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1654    impl_absorb_method!(absorb_secret, Secret, Secret);
1655    impl_absorb_method!(absorb_connection, Connection, Connection);
1656
1657    // Special case for cluster which uses the cluster's name field.
1658    fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1659        let state = match self {
1660            CatalogImplication::Cluster(state) => state,
1661            CatalogImplication::None => {
1662                *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1663                match self {
1664                    CatalogImplication::Cluster(state) => state,
1665                    _ => unreachable!(),
1666                }
1667            }
1668            _ => {
1669                panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1670            }
1671        };
1672
1673        if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1674            panic!("invalid state transition for cluster: {}", e);
1675        }
1676    }
1677
1678    // Special case for cluster replica which uses the cluster replica's name field.
1679    fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1680        let state = match self {
1681            CatalogImplication::ClusterReplica(state) => state,
1682            CatalogImplication::None => {
1683                *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1684                match self {
1685                    CatalogImplication::ClusterReplica(state) => state,
1686                    _ => unreachable!(),
1687                }
1688            }
1689            _ => {
1690                panic!(
1691                    "Unexpected command type for {:?}: ClusterReplica {:?}",
1692                    self, diff
1693                );
1694            }
1695        };
1696
1697        if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1698        {
1699            panic!("invalid state transition for cluster replica: {}", e);
1700        }
1701    }
1702}
1703
1704#[cfg(test)]
1705mod tests {
1706    use super::*;
1707    use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1708    use mz_sql::names::ResolvedIds;
1709    use std::collections::BTreeMap;
1710
1711    fn create_test_table(name: &str) -> Table {
1712        Table {
1713            desc: VersionedRelationDesc::new(
1714                RelationDesc::builder()
1715                    .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1716                    .finish(),
1717            ),
1718            create_sql: None,
1719            collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1720            conn_id: None,
1721            resolved_ids: ResolvedIds::empty(),
1722            custom_logical_compaction_window: None,
1723            is_retained_metrics_object: false,
1724            data_source: TableDataSource::TableWrites { defaults: vec![] },
1725        }
1726    }
1727
1728    #[mz_ore::test]
1729    fn test_item_state_transitions() {
1730        // Test None -> Added
1731        let mut state = CatalogImplicationKind::None;
1732        assert!(
1733            state
1734                .transition("item1".to_string(), None, StateDiff::Addition)
1735                .is_ok()
1736        );
1737        assert!(matches!(state, CatalogImplicationKind::Added(_)));
1738
1739        // Test Added -> Altered (via retraction)
1740        let mut state = CatalogImplicationKind::Added("new_item".to_string());
1741        assert!(
1742            state
1743                .transition("old_item".to_string(), None, StateDiff::Retraction)
1744                .is_ok()
1745        );
1746        match &state {
1747            CatalogImplicationKind::Altered { prev, new } => {
1748                // The retracted item is the OLD state
1749                assert_eq!(prev, "old_item");
1750                // The existing Added item is the NEW state
1751                assert_eq!(new, "new_item");
1752            }
1753            _ => panic!("Expected Altered state"),
1754        }
1755
1756        // Test None -> Dropped
1757        let mut state = CatalogImplicationKind::None;
1758        assert!(
1759            state
1760                .transition(
1761                    "item1".to_string(),
1762                    Some("test_name".to_string()),
1763                    StateDiff::Retraction
1764                )
1765                .is_ok()
1766        );
1767        assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1768
1769        // Test Dropped -> Altered (via addition)
1770        let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1771        assert!(
1772            state
1773                .transition("new_item".to_string(), None, StateDiff::Addition)
1774                .is_ok()
1775        );
1776        match &state {
1777            CatalogImplicationKind::Altered { prev, new } => {
1778                // The existing Dropped item is the OLD state
1779                assert_eq!(prev, "old_item");
1780                // The added item is the NEW state
1781                assert_eq!(new, "new_item");
1782            }
1783            _ => panic!("Expected Altered state"),
1784        }
1785
1786        // Test invalid transitions
1787        let mut state = CatalogImplicationKind::Added("item".to_string());
1788        assert!(
1789            state
1790                .transition("item2".to_string(), None, StateDiff::Addition)
1791                .is_err()
1792        );
1793
1794        let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1795        assert!(
1796            state
1797                .transition("item2".to_string(), None, StateDiff::Retraction)
1798                .is_err()
1799        );
1800    }
1801
1802    #[mz_ore::test]
1803    fn test_table_absorb_state_machine() {
1804        let table1 = create_test_table("table1");
1805        let table2 = create_test_table("table2");
1806
1807        // Test None -> AddTable
1808        let mut cmd = CatalogImplication::None;
1809        cmd.absorb_table(
1810            table1.clone(),
1811            Some("schema.table1".to_string()),
1812            StateDiff::Addition,
1813        );
1814        // Check that we have an Added state
1815        match &cmd {
1816            CatalogImplication::Table(state) => match state {
1817                CatalogImplicationKind::Added(t) => {
1818                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1819                }
1820                _ => panic!("Expected Added state"),
1821            },
1822            _ => panic!("Expected Table command"),
1823        }
1824
1825        // Test AddTable -> AlterTable (via retraction)
1826        // This tests the bug fix: when we have AddTable(table1) and receive Retraction(table2),
1827        // table2 is the old state being removed, table1 is the new state
1828        cmd.absorb_table(
1829            table2.clone(),
1830            Some("schema.table2".to_string()),
1831            StateDiff::Retraction,
1832        );
1833        match &cmd {
1834            CatalogImplication::Table(state) => match state {
1835                CatalogImplicationKind::Altered { prev, new } => {
1836                    // Verify the fix: prev should be the retracted table, new should be the added table
1837                    assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1838                    assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1839                }
1840                _ => panic!("Expected Altered state"),
1841            },
1842            _ => panic!("Expected Table command"),
1843        }
1844
1845        // Test None -> DropTable
1846        let mut cmd = CatalogImplication::None;
1847        cmd.absorb_table(
1848            table1.clone(),
1849            Some("schema.table1".to_string()),
1850            StateDiff::Retraction,
1851        );
1852        match &cmd {
1853            CatalogImplication::Table(state) => match state {
1854                CatalogImplicationKind::Dropped(t, name) => {
1855                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1856                    assert_eq!(name, "schema.table1");
1857                }
1858                _ => panic!("Expected Dropped state"),
1859            },
1860            _ => panic!("Expected Table command"),
1861        }
1862
1863        // Test DropTable -> AlterTable (via addition)
1864        cmd.absorb_table(
1865            table2.clone(),
1866            Some("schema.table2".to_string()),
1867            StateDiff::Addition,
1868        );
1869        match &cmd {
1870            CatalogImplication::Table(state) => match state {
1871                CatalogImplicationKind::Altered { prev, new } => {
1872                    // prev should be the dropped table, new should be the added table
1873                    assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1874                    assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1875                }
1876                _ => panic!("Expected Altered state"),
1877            },
1878            _ => panic!("Expected Table command"),
1879        }
1880    }
1881
1882    #[mz_ore::test]
1883    #[should_panic(expected = "Cannot add an already added object")]
1884    fn test_invalid_double_add() {
1885        let table = create_test_table("table");
1886        let mut cmd = CatalogImplication::None;
1887
1888        // First addition
1889        cmd.absorb_table(
1890            table.clone(),
1891            Some("schema.table".to_string()),
1892            StateDiff::Addition,
1893        );
1894
1895        // Second addition should panic
1896        cmd.absorb_table(
1897            table.clone(),
1898            Some("schema.table".to_string()),
1899            StateDiff::Addition,
1900        );
1901    }
1902
1903    #[mz_ore::test]
1904    #[should_panic(expected = "Cannot drop an already dropped object")]
1905    fn test_invalid_double_drop() {
1906        let table = create_test_table("table");
1907        let mut cmd = CatalogImplication::None;
1908
1909        // First drop
1910        cmd.absorb_table(
1911            table.clone(),
1912            Some("schema.table".to_string()),
1913            StateDiff::Retraction,
1914        );
1915
1916        // Second drop should panic
1917        cmd.absorb_table(
1918            table.clone(),
1919            Some("schema.table".to_string()),
1920            StateDiff::Retraction,
1921        );
1922    }
1923}