Skip to main content

mz_adapter/coord/
catalog_implications.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to deriving and applying implications from [catalog
11//! changes](ParsedStateUpdate).
12//!
13//! The flow from "raw" catalog changes to [CatalogImplication] works like this:
14//!
15//! StateUpdateKind -> ParsedStateUpdate -> CatalogImplication
16//!
17//! [ParsedStateUpdate] adds context to a "raw" catalog change
18//! ([StateUpdateKind](mz_catalog::memory::objects::StateUpdateKind)). It
19//! includes an in-memory representation of the updated object, which can in
20//! theory be derived from the raw change but only when we have access to all
21//! the other raw changes or to an in-memory Catalog, which represents a
22//! "rollup" of all the raw changes.
23//!
24//! [CatalogImplication] is both the state machine that we use for absorbing
25//! multiple state updates for the same object and the final command that has to
26//! be applied to in-memory state and or the controller(s) after absorbing all
27//! the state updates in a given batch of updates.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37    CatalogItem, Cluster, ClusterReplica, Connection, DataSourceDesc, Index, MaterializedView,
38    Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::logging::LogVariant;
42use mz_compute_client::protocol::response::PeekResponse;
43use mz_controller::clusters::{ClusterRole, ReplicaConfig};
44use mz_controller_types::{ClusterId, ReplicaId};
45use mz_ore::collections::CollectionExt;
46use mz_ore::error::ErrorExt;
47use mz_ore::future::InTask;
48use mz_ore::instrument;
49use mz_ore::retry::Retry;
50use mz_ore::task;
51use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector};
52use mz_sql::plan::ConnectionDetails;
53use mz_storage_client::controller::{CollectionDescription, DataSource};
54use mz_storage_types::connections::PostgresConnection;
55use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
56use mz_storage_types::sinks::StorageSinkConnection;
57use mz_storage_types::sources::{
58    GenericSourceConnection, SourceDesc, SourceExport, SourceExportDataConfig,
59};
60use tracing::{Instrument, info_span, warn};
61
62use crate::active_compute_sink::ActiveComputeSinkRetireReason;
63use crate::coord::Coordinator;
64use crate::coord::catalog_implications::parsed_state_updates::{
65    ParsedStateUpdate, ParsedStateUpdateKind,
66};
67use crate::coord::peek::DroppedDependency;
68use crate::coord::timeline::TimelineState;
69use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
70use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
71
72pub mod parsed_state_updates;
73
74impl Coordinator {
75    /// Applies implications from the given bucket of [ParsedStateUpdate] to our
76    /// in-memory state and our controllers. This also applies transitive
77    /// implications, for example, peeks and subscribes will be cancelled when
78    /// referenced objects are dropped.
79    ///
80    /// This _requires_ that the given updates are consolidated. There must be
81    /// at most one addition and/or one retraction for a given item, as
82    /// identified by that items ID type.
83    #[instrument(level = "debug")]
84    pub async fn apply_catalog_implications(
85        &mut self,
86        ctx: Option<&mut ExecuteContext>,
87        catalog_updates: Vec<ParsedStateUpdate>,
88    ) -> Result<(), AdapterError> {
89        let start = Instant::now();
90
91        let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
92        let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
93        let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
94            BTreeMap::new();
95        // Introspection source index additions, collected separately and
96        // merged into the AddCluster handler. Not routed through the
97        // absorb machinery since they are simple additions that don't
98        // need Altered support.
99        let mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>> =
100            BTreeMap::new();
101
102        for update in catalog_updates {
103            tracing::trace!(?update, "got parsed state update");
104            match &update.kind {
105                ParsedStateUpdateKind::Item {
106                    durable_item,
107                    parsed_item: _,
108                    connection: _,
109                    parsed_full_name: _,
110                } => {
111                    let entry = catalog_implications
112                        .entry(durable_item.id.clone())
113                        .or_insert_with(|| CatalogImplication::None);
114                    entry.absorb(update);
115                }
116                ParsedStateUpdateKind::TemporaryItem {
117                    durable_item,
118                    parsed_item: _,
119                    connection: _,
120                    parsed_full_name: _,
121                } => {
122                    let entry = catalog_implications
123                        .entry(durable_item.id.clone())
124                        .or_insert_with(|| CatalogImplication::None);
125                    entry.absorb(update);
126                }
127                ParsedStateUpdateKind::Cluster {
128                    durable_cluster,
129                    parsed_cluster: _,
130                } => {
131                    let entry = cluster_commands
132                        .entry(durable_cluster.id)
133                        .or_insert_with(|| CatalogImplication::None);
134                    entry.absorb(update.clone());
135                }
136                ParsedStateUpdateKind::ClusterReplica {
137                    durable_cluster_replica,
138                    parsed_cluster_replica: _,
139                } => {
140                    let entry = cluster_replica_commands
141                        .entry((
142                            durable_cluster_replica.cluster_id,
143                            durable_cluster_replica.replica_id,
144                        ))
145                        .or_insert_with(|| CatalogImplication::None);
146                    entry.absorb(update.clone());
147                }
148                ParsedStateUpdateKind::IntrospectionSourceIndex {
149                    cluster_id,
150                    log,
151                    index_id,
152                } => {
153                    if update.diff == StateDiff::Addition {
154                        introspection_source_indexes
155                            .entry(*cluster_id)
156                            .or_default()
157                            .insert(log.clone(), *index_id);
158                    }
159                    // Retractions don't need handling: introspection
160                    // source indexes are dropped with their cluster.
161                }
162            }
163        }
164
165        self.apply_catalog_implications_inner(
166            ctx,
167            catalog_implications.into_iter().collect_vec(),
168            cluster_commands.into_iter().collect_vec(),
169            cluster_replica_commands.into_iter().collect_vec(),
170            introspection_source_indexes,
171        )
172        .await?;
173
174        self.metrics
175            .apply_catalog_implications_seconds
176            .observe(start.elapsed().as_secs_f64());
177
178        Ok(())
179    }
180
181    #[instrument(level = "debug")]
182    async fn apply_catalog_implications_inner(
183        &mut self,
184        ctx: Option<&mut ExecuteContext>,
185        implications: Vec<(CatalogItemId, CatalogImplication)>,
186        cluster_commands: Vec<(ClusterId, CatalogImplication)>,
187        cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
188        mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>>,
189    ) -> Result<(), AdapterError> {
190        let mut tables_to_drop = BTreeSet::new();
191        let mut sources_to_drop = vec![];
192        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
193        let mut storage_sink_gids_to_drop = vec![];
194        let mut indexes_to_drop = vec![];
195        let mut compute_sinks_to_drop = vec![];
196        let mut view_gids_to_drop = vec![];
197        let mut secrets_to_drop = vec![];
198        let mut vpc_endpoints_to_drop = vec![];
199        let mut clusters_to_drop = vec![];
200        let mut cluster_replicas_to_drop = vec![];
201        let mut active_compute_sinks_to_drop = BTreeMap::new();
202        let mut peeks_to_drop = vec![];
203        let mut copies_to_drop = vec![];
204
205        // Maps for storing names of dropped objects for error messages.
206        let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
207        let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
208
209        // Separate collections for tables (which need write timestamps) and
210        // sources (which don't).
211        let mut table_collections_to_create = BTreeMap::new();
212        let mut source_collections_to_create = BTreeMap::new();
213        let mut storage_policies_to_initialize = BTreeMap::new();
214        let mut execution_timestamps_to_set = BTreeSet::new();
215        let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
216
217        // Sources that shouldn't be dropped, even if we saw a `Dropped` event.
218        // Used for correct handling of ALTER MV.
219        let mut source_gids_to_keep = BTreeSet::new();
220
221        // Collections for batching connection-related alterations.
222        let mut source_connections_to_alter: BTreeMap<
223            GlobalId,
224            GenericSourceConnection<InlinedConnection>,
225        > = BTreeMap::new();
226        let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
227            BTreeMap::new();
228        let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
229            BTreeMap::new();
230        let mut source_descs_to_alter: BTreeMap<GlobalId, SourceDesc> = BTreeMap::new();
231
232        // We're incrementally migrating the code that manipulates the
233        // controller from closures in the sequencer. For some types of catalog
234        // changes we haven't done this migration yet, so there you will see
235        // just a log message. Over the next couple of PRs all of these will go
236        // away.
237
238        for (catalog_id, implication) in implications {
239            tracing::trace!(?implication, "have to apply catalog implication");
240
241            match implication {
242                CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
243                    self.handle_create_table(
244                        &ctx,
245                        &mut table_collections_to_create,
246                        &mut storage_policies_to_initialize,
247                        &mut execution_timestamps_to_set,
248                        catalog_id,
249                        table.clone(),
250                    )
251                    .await?
252                }
253                CatalogImplication::Table(CatalogImplicationKind::Altered {
254                    prev: prev_table,
255                    new: new_table,
256                }) => {
257                    self.handle_alter_table(catalog_id, prev_table, new_table)
258                        .await?
259                }
260
261                CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
262                    let global_ids = table.global_ids();
263                    for global_id in global_ids {
264                        tables_to_drop.insert((catalog_id, global_id));
265                        dropped_item_names.insert(global_id, full_name.clone());
266                    }
267                }
268                CatalogImplication::Source(CatalogImplicationKind::Added((
269                    source,
270                    _connection,
271                ))) => {
272                    // Get the compaction windows for all sources with this
273                    // catalog_id This replicates the logic from
274                    // sequence_create_source where it collects all item_ids and
275                    // gets their compaction windows
276                    let compaction_windows = self
277                        .catalog()
278                        .state()
279                        .source_compaction_windows(vec![catalog_id]);
280
281                    self.handle_create_source(
282                        &mut source_collections_to_create,
283                        &mut storage_policies_to_initialize,
284                        catalog_id,
285                        source,
286                        compaction_windows,
287                    )
288                    .await?
289                }
290                CatalogImplication::Source(CatalogImplicationKind::Altered {
291                    prev: (prev_source, _prev_connection),
292                    new: (new_source, new_connection),
293                }) => {
294                    if prev_source.custom_logical_compaction_window
295                        != new_source.custom_logical_compaction_window
296                    {
297                        let new_window = new_source
298                            .custom_logical_compaction_window
299                            .unwrap_or(CompactionWindow::Default);
300                        self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
301                    }
302                    match (&prev_source.data_source, &new_source.data_source) {
303                        (
304                            DataSourceDesc::Ingestion {
305                                desc: prev_desc, ..
306                            }
307                            | DataSourceDesc::OldSyntaxIngestion {
308                                desc: prev_desc, ..
309                            },
310                            DataSourceDesc::Ingestion { desc: new_desc, .. }
311                            | DataSourceDesc::OldSyntaxIngestion { desc: new_desc, .. },
312                        ) => {
313                            if prev_desc != new_desc {
314                                let inlined_connection = new_connection
315                                    .expect("ingestion source should have inlined connection");
316                                let inlined_desc = SourceDesc {
317                                    connection: inlined_connection,
318                                    timestamp_interval: new_desc.timestamp_interval,
319                                };
320                                source_descs_to_alter.insert(new_source.global_id, inlined_desc);
321                            }
322                        }
323                        _ => {}
324                    }
325                }
326                CatalogImplication::Source(CatalogImplicationKind::Dropped(
327                    (source, connection),
328                    full_name,
329                )) => {
330                    let global_id = source.global_id();
331                    sources_to_drop.push((catalog_id, global_id));
332                    dropped_item_names.insert(global_id, full_name);
333
334                    if let DataSourceDesc::Ingestion { desc, .. }
335                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
336                    {
337                        match &desc.connection {
338                            GenericSourceConnection::Postgres(_referenced_conn) => {
339                                let inline_conn = connection.expect("missing inlined connection");
340
341                                let pg_conn = match inline_conn {
342                                    GenericSourceConnection::Postgres(pg_conn) => pg_conn,
343                                    other => {
344                                        panic!("expected postgres connection, got: {:?}", other)
345                                    }
346                                };
347                                let pending_drop = (
348                                    pg_conn.connection.clone(),
349                                    pg_conn.publication_details.slot.clone(),
350                                );
351                                replication_slots_to_drop.push(pending_drop);
352                            }
353                            _ => {}
354                        }
355                    }
356                }
357                CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
358                    tracing::debug!(?sink, "not handling AddSink in here yet");
359                }
360                CatalogImplication::Sink(CatalogImplicationKind::Altered {
361                    prev: prev_sink,
362                    new: new_sink,
363                }) => {
364                    tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
365                }
366                CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
367                    storage_sink_gids_to_drop.push(sink.global_id());
368                    dropped_item_names.insert(sink.global_id(), full_name);
369                }
370                CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
371                    tracing::debug!(?index, "not handling AddIndex in here yet");
372                }
373                CatalogImplication::Index(CatalogImplicationKind::Altered {
374                    prev: prev_index,
375                    new: new_index,
376                }) => {
377                    if prev_index.custom_logical_compaction_window
378                        != new_index.custom_logical_compaction_window
379                    {
380                        let new_window = new_index
381                            .custom_logical_compaction_window
382                            .unwrap_or(CompactionWindow::Default);
383                        self.update_compute_read_policy(
384                            new_index.cluster_id,
385                            catalog_id,
386                            new_window.into(),
387                        );
388                    }
389                }
390                CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
391                    indexes_to_drop.push((index.cluster_id, index.global_id()));
392                    dropped_item_names.insert(index.global_id(), full_name);
393                }
394                CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
395                    tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
396                }
397                CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
398                    prev: prev_mv,
399                    new: new_mv,
400                }) => {
401                    // We get here for three reasons:
402                    //  1. Name changes, like those caused by ALTER SCHEMA.
403                    //  2. Replacement application.
404                    //  3. Compaction window changes (ALTER ... SET (RETAIN HISTORY ...)).
405                    //
406                    // 1. Name changes: We don't have to do anything here.
407                    //
408                    // 2. Replacement application: This is tricky: It changes the `CatalogItemId` of
409                    // the target to that of the replacement and simultaneously drops the replacement.
410                    // Which means when we get here `prev_mv` is the replacement that should be
411                    // dropped, and `new_mv` is the target that already exists but under a different
412                    // ID (which will receive a `Dropped` event separately). We can sniff out this
413                    // case by checking for version differences.
414                    //
415                    // 3. Compaction window changes: We handle this in an `else if`, because if there
416                    // is also a replacement application, then the replacement's storage collections
417                    // already have the correct read policies from when they were created, so we
418                    // don't need to update them here.
419                    if prev_mv.collections != new_mv.collections {
420                        // Sanity check: The replacement's last (and only) version must be the same
421                        // as the new target's last version.
422                        assert_eq!(
423                            prev_mv.global_id_writes(),
424                            new_mv.global_id_writes(),
425                            "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
426                        );
427
428                        let gid = new_mv.global_id_writes();
429                        self.allow_writes(new_mv.cluster_id, gid);
430
431                        // There will be a separate `Dropped` implication for the old definition of
432                        // the target MV. That will drop the old compute collection, as we desire,
433                        // but we need to prevent it from dropping the old storage collection as
434                        // well, since that might still be depended on.
435                        source_gids_to_keep.extend(new_mv.global_ids());
436                    } else if prev_mv.custom_logical_compaction_window
437                        != new_mv.custom_logical_compaction_window
438                    {
439                        let new_window = new_mv
440                            .custom_logical_compaction_window
441                            .unwrap_or(CompactionWindow::Default);
442                        self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
443                    }
444                }
445                CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
446                    mv,
447                    full_name,
448                )) => {
449                    compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
450                    for gid in mv.global_ids() {
451                        sources_to_drop.push((catalog_id, gid));
452                        dropped_item_names.insert(gid, full_name.clone());
453                    }
454                }
455                CatalogImplication::View(CatalogImplicationKind::Added(_view)) => {
456                    // No action needed: views are catalog-only objects with no
457                    // storage collections or dataflows to create.
458                }
459                CatalogImplication::View(CatalogImplicationKind::Altered {
460                    prev: _prev_view,
461                    new: _new_view,
462                }) => {
463                    // No action needed: view alterations (e.g. renames) are
464                    // catalog-only and require no controller changes.
465                }
466                CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
467                    view_gids_to_drop.push(view.global_id());
468                    dropped_item_names.insert(view.global_id(), full_name);
469                }
470                CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
471                    // No action needed: the secret payload is stored in
472                    // secrets_controller.ensure() BEFORE the catalog transaction.
473                    // By the time we see this update, the secret is already stored.
474                }
475                CatalogImplication::Secret(CatalogImplicationKind::Altered {
476                    prev: _prev_secret,
477                    new: _new_secret,
478                }) => {
479                    // No action needed: altering a secret updates the payload via
480                    // secrets_controller.ensure() without a catalog transaction.
481                }
482                CatalogImplication::Secret(CatalogImplicationKind::Dropped(
483                    _secret,
484                    _full_name,
485                )) => {
486                    secrets_to_drop.push(catalog_id);
487                }
488                CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
489                    match &connection.details {
490                        // SSH connections: key pair is stored in secrets_controller
491                        // BEFORE the catalog transaction, so no action needed here.
492                        ConnectionDetails::Ssh { .. } => {}
493                        // AWS PrivateLink connections: create the VPC endpoint
494                        ConnectionDetails::AwsPrivatelink(privatelink) => {
495                            let spec = VpcEndpointConfig {
496                                aws_service_name: privatelink.service_name.to_owned(),
497                                availability_zone_ids: privatelink.availability_zones.to_owned(),
498                            };
499                            vpc_endpoints_to_create.push((catalog_id, spec));
500                        }
501                        // Other connection types don't require post-transaction actions
502                        _ => {}
503                    }
504                }
505                CatalogImplication::Connection(CatalogImplicationKind::Altered {
506                    prev: _prev_connection,
507                    new: new_connection,
508                }) => {
509                    self.handle_alter_connection(
510                        catalog_id,
511                        new_connection,
512                        &mut vpc_endpoints_to_create,
513                        &mut source_connections_to_alter,
514                        &mut sink_connections_to_alter,
515                        &mut source_export_data_configs_to_alter,
516                    );
517                }
518                CatalogImplication::Connection(CatalogImplicationKind::Dropped(
519                    connection,
520                    _full_name,
521                )) => {
522                    match &connection.details {
523                        // SSH connections have an associated secret that should be dropped
524                        ConnectionDetails::Ssh { .. } => {
525                            secrets_to_drop.push(catalog_id);
526                        }
527                        // AWS PrivateLink connections have an associated
528                        // VpcEndpoint K8S resource that should be dropped
529                        ConnectionDetails::AwsPrivatelink(_) => {
530                            vpc_endpoints_to_drop.push(catalog_id);
531                        }
532                        _ => (),
533                    }
534                }
535                CatalogImplication::None => {
536                    // Nothing to do for None commands
537                }
538                CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
539                    unreachable!("clusters and cluster replicas are handled below")
540                }
541                CatalogImplication::Table(CatalogImplicationKind::None)
542                | CatalogImplication::Source(CatalogImplicationKind::None)
543                | CatalogImplication::Sink(CatalogImplicationKind::None)
544                | CatalogImplication::Index(CatalogImplicationKind::None)
545                | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
546                | CatalogImplication::View(CatalogImplicationKind::None)
547                | CatalogImplication::Secret(CatalogImplicationKind::None)
548                | CatalogImplication::Connection(CatalogImplicationKind::None) => {
549                    unreachable!("will never leave None in place");
550                }
551            }
552        }
553
554        for (cluster_id, command) in cluster_commands {
555            tracing::trace!(?command, "have cluster command to apply!");
556
557            match command {
558                CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
559                    // The Cluster's log_indexes is empty at parse time
560                    // because IntrospectionSourceIndex updates are applied
561                    // after the Cluster update. Use the separately collected
562                    // introspection_source_indexes instead.
563                    let arranged_logs = introspection_source_indexes
564                        .remove(&cluster_id)
565                        .unwrap_or_default();
566                    let introspection_source_ids: Vec<_> =
567                        arranged_logs.values().copied().collect();
568
569                    self.controller
570                        .create_cluster(
571                            cluster_id,
572                            mz_controller::clusters::ClusterConfig {
573                                arranged_logs,
574                                workload_class: cluster.config.workload_class.clone(),
575                            },
576                        )
577                        .expect("creating cluster must not fail");
578
579                    if !introspection_source_ids.is_empty() {
580                        self.initialize_compute_read_policies(
581                            introspection_source_ids,
582                            cluster_id,
583                            CompactionWindow::Default,
584                        )
585                        .await;
586                    }
587                }
588                CatalogImplication::Cluster(CatalogImplicationKind::Altered {
589                    prev: prev_cluster,
590                    new: new_cluster,
591                }) => {
592                    // Replica adds/drops/renames from config changes arrive as
593                    // separate AddClusterReplica/DroppedClusterReplica/
594                    // AlterClusterReplica events, so the only cluster-level
595                    // side effect here is updating the workload class on the
596                    // controller when it changes.
597                    if prev_cluster.config.workload_class != new_cluster.config.workload_class {
598                        self.controller.update_cluster_workload_class(
599                            cluster_id,
600                            new_cluster.config.workload_class.clone(),
601                        );
602                    }
603                }
604                CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
605                    cluster,
606                    _full_name,
607                )) => {
608                    clusters_to_drop.push(cluster_id);
609                    dropped_cluster_names.insert(cluster_id, cluster.name);
610                }
611                CatalogImplication::Cluster(CatalogImplicationKind::None) => {
612                    unreachable!("will never leave None in place");
613                }
614                command => {
615                    unreachable!(
616                        "we only handle cluster commands in this map, got: {:?}",
617                        command
618                    );
619                }
620            }
621        }
622
623        for ((cluster_id, replica_id), command) in cluster_replica_commands {
624            tracing::trace!(?command, "have cluster replica command to apply!");
625
626            match command {
627                CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
628                    // Read the cluster name and role from the current catalog
629                    // state. This is correct as long as implications are
630                    // processed right after each catalog transaction. For a
631                    // more future-proof approach that tracks cluster info
632                    // locally across transactions, see the last commit of
633                    // https://github.com/ggevay/materialize/tree/implications-cluster-name-tracking
634                    // which removes that logic.
635                    let cluster = self.catalog().get_cluster(cluster_id);
636                    let cluster_name = cluster.name.clone();
637                    let cluster_role = cluster.role();
638                    self.handle_create_cluster_replica(
639                        cluster_id,
640                        replica_id,
641                        cluster_role,
642                        cluster_name,
643                        replica.name.clone(),
644                        replica.config.clone(),
645                    )
646                    .await;
647                }
648                CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
649                    prev: _prev_replica,
650                    new: _new_replica,
651                }) => {
652                    // No action needed: cluster replica alterations (e.g.
653                    // renames, owner changes, pending flag changes) are
654                    // catalog-only and require no controller changes.
655                }
656                CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
657                    _replica,
658                    _full_name,
659                )) => {
660                    cluster_replicas_to_drop.push((cluster_id, replica_id));
661                }
662                CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
663                    unreachable!("will never leave None in place");
664                }
665                command => {
666                    unreachable!(
667                        "we only handle cluster replica commands in this map, got: {:?}",
668                        command
669                    );
670                }
671            }
672        }
673
674        if !source_collections_to_create.is_empty() {
675            self.create_source_collections(source_collections_to_create)
676                .await?;
677        }
678
679        // Have to create sources first and then tables, because tables within
680        // one transaction can depend on sources.
681        if !table_collections_to_create.is_empty() {
682            self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
683                .await?;
684        }
685        // It is _very_ important that we only initialize read policies after we
686        // have created all the sources/collections. Some of the sources created
687        // in this collection might have dependencies on other sources, so the
688        // controller must get a chance to install read holds before we set a
689        // policy that might make the since advance.
690        self.initialize_storage_collections(storage_policies_to_initialize)
691            .await?;
692
693        // Create VPC endpoints for AWS PrivateLink connections
694        if !vpc_endpoints_to_create.is_empty() {
695            if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
696                for (connection_id, spec) in vpc_endpoints_to_create {
697                    if let Err(err) = cloud_resource_controller
698                        .ensure_vpc_endpoint(connection_id, spec)
699                        .await
700                    {
701                        tracing::error!(?err, "failed to ensure vpc endpoint!");
702                    }
703                }
704            } else {
705                tracing::error!(
706                    "AWS PrivateLink connections unsupported without cloud_resource_controller"
707                );
708            }
709        }
710
711        // Apply batched connection alterations to dependent sources/sinks/tables.
712        if !source_connections_to_alter.is_empty() {
713            self.controller
714                .storage
715                .alter_ingestion_connections(source_connections_to_alter)
716                .await
717                .unwrap_or_terminate("cannot fail to alter ingestion connections");
718        }
719
720        if !sink_connections_to_alter.is_empty() {
721            self.controller
722                .storage
723                .alter_export_connections(sink_connections_to_alter)
724                .await
725                .unwrap_or_terminate("altering export connections after txn must succeed");
726        }
727
728        if !source_export_data_configs_to_alter.is_empty() {
729            self.controller
730                .storage
731                .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
732                .await
733                .unwrap_or_terminate("altering source export data configs after txn must succeed");
734        }
735
736        if !source_descs_to_alter.is_empty() {
737            self.controller
738                .storage
739                .alter_ingestion_source_desc(source_descs_to_alter)
740                .await
741                .unwrap_or_terminate("cannot fail to alter ingestion source desc");
742        }
743
744        // Apply source drop overwrites.
745        sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
746
747        let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
748            .iter()
749            .map(|(_, gid)| *gid)
750            .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
751            .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
752            .chain(view_gids_to_drop.iter().copied())
753            .collect();
754
755        // Clean up any active compute sinks like subscribes or copy to-s that
756        // rely on dropped relations or clusters.
757        for (sink_id, sink) in &self.active_compute_sinks {
758            let cluster_id = sink.cluster_id();
759            if let Some(id) = sink
760                .depends_on()
761                .iter()
762                .find(|id| readable_collections_to_drop.contains(id))
763            {
764                let name = dropped_item_names
765                    .get(id)
766                    .cloned()
767                    .expect("missing relation name");
768                active_compute_sinks_to_drop.insert(
769                    *sink_id,
770                    ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Relation {
771                        name,
772                    }),
773                );
774            } else if clusters_to_drop.contains(&cluster_id) {
775                let name = dropped_cluster_names
776                    .get(&cluster_id)
777                    .cloned()
778                    .expect("missing cluster name");
779                active_compute_sinks_to_drop.insert(
780                    *sink_id,
781                    ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Cluster {
782                        name,
783                    }),
784                );
785            }
786        }
787
788        // Clean up any pending peeks that rely on dropped relations or clusters.
789        for (uuid, pending_peek) in &self.pending_peeks {
790            if let Some(id) = pending_peek
791                .depends_on
792                .iter()
793                .find(|id| readable_collections_to_drop.contains(id))
794            {
795                let name = dropped_item_names
796                    .get(id)
797                    .cloned()
798                    .expect("missing relation name");
799                peeks_to_drop.push((DroppedDependency::Relation { name }, uuid.clone()));
800            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
801                let name = dropped_cluster_names
802                    .get(&pending_peek.cluster_id)
803                    .cloned()
804                    .expect("missing cluster name");
805                peeks_to_drop.push((DroppedDependency::Cluster { name }, uuid.clone()));
806            }
807        }
808
809        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
810        for (conn_id, pending_copy) in &self.active_copies {
811            let dropping_table = tables_to_drop
812                .iter()
813                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
814            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
815
816            if dropping_table || dropping_cluster {
817                copies_to_drop.push(conn_id.clone());
818            }
819        }
820
821        let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
822            .iter()
823            .map(|(_id, gid)| gid)
824            .chain(storage_sink_gids_to_drop.iter())
825            .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
826            .copied()
827            .collect();
828        let compute_gids_to_drop: Vec<_> = indexes_to_drop
829            .iter()
830            .chain(compute_sinks_to_drop.iter())
831            .copied()
832            .collect();
833
834        // Gather resources that we have to remove from timeline state and
835        // pre-check if any Timelines become empty, when we drop the specified
836        // storage and compute resources.
837        //
838        // Note: We only apply these changes below.
839        let mut timeline_id_bundles = BTreeMap::new();
840
841        for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
842            let mut id_bundle = CollectionIdBundle::default();
843
844            for storage_id in read_holds.storage_ids() {
845                if storage_gids_to_drop.contains(&storage_id) {
846                    id_bundle.storage_ids.insert(storage_id);
847                }
848            }
849
850            for (instance_id, id) in read_holds.compute_ids() {
851                if compute_gids_to_drop.contains(&(instance_id, id))
852                    || clusters_to_drop.contains(&instance_id)
853                {
854                    id_bundle
855                        .compute_ids
856                        .entry(instance_id)
857                        .or_default()
858                        .insert(id);
859                }
860            }
861
862            timeline_id_bundles.insert(timeline.clone(), id_bundle);
863        }
864
865        let mut timeline_associations = BTreeMap::new();
866        for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
867            let TimelineState { read_holds, .. } = self
868                .global_timelines
869                .get(&timeline)
870                .expect("all timelines have a timestamp oracle");
871
872            let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
873            timeline_associations.insert(timeline, (empty, id_bundle));
874        }
875
876        // No error returns are allowed after this point. Enforce this at compile time
877        // by using this odd structure so we don't accidentally add a stray `?`.
878        let _: () = async {
879            if !timeline_associations.is_empty() {
880                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
881                    let became_empty =
882                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
883                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
884                }
885            }
886
887            // Note that we drop tables before sources since there can be a weak
888            // dependency on sources from tables in the storage controller that
889            // will result in error logging that we'd prefer to avoid. This
890            // isn't an actual dependency issue but we'd like to keep that error
891            // logging around to indicate when an actual dependency error might
892            // occur.
893            if !tables_to_drop.is_empty() {
894                let ts = self.get_local_write_ts().await;
895                self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
896            }
897
898            if !sources_to_drop.is_empty() {
899                self.drop_sources(sources_to_drop);
900            }
901
902            if !storage_sink_gids_to_drop.is_empty() {
903                self.drop_storage_sinks(storage_sink_gids_to_drop);
904            }
905
906            if !active_compute_sinks_to_drop.is_empty() {
907                self.retire_compute_sinks(active_compute_sinks_to_drop)
908                    .await;
909            }
910
911            if !peeks_to_drop.is_empty() {
912                for (dep, uuid) in peeks_to_drop {
913                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
914                        let cancel_reason = PeekResponse::Error(dep.query_terminated_error());
915                        self.controller
916                            .compute
917                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
918                            .unwrap_or_terminate("unable to cancel peek");
919                        self.retire_execution(
920                            StatementEndedExecutionReason::Canceled,
921                            pending_peek.ctx_extra.defuse(),
922                        );
923                    }
924                }
925            }
926
927            if !copies_to_drop.is_empty() {
928                for conn_id in copies_to_drop {
929                    self.cancel_pending_copy(&conn_id);
930                }
931            }
932
933            if !compute_gids_to_drop.is_empty() {
934                self.drop_compute_collections(compute_gids_to_drop);
935            }
936
937            if !vpc_endpoints_to_drop.is_empty() {
938                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
939            }
940
941            if !cluster_replicas_to_drop.is_empty() {
942                fail::fail_point!("after_catalog_drop_replica");
943
944                for (cluster_id, replica_id) in cluster_replicas_to_drop {
945                    self.drop_replica(cluster_id, replica_id);
946                }
947            }
948            if !clusters_to_drop.is_empty() {
949                for cluster_id in clusters_to_drop {
950                    self.controller.drop_cluster(cluster_id);
951                }
952            }
953
954            // We don't want to block the main coordinator thread on cleaning
955            // up external resources (PostgreSQL replication slots and secrets),
956            // so we perform that cleanup in a background task.
957            //
958            // TODO(14551): This is inherently best effort. An ill-timed crash
959            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
960            // See <https://github.com/MaterializeInc/materialize/issues/14551>
961            task::spawn(|| "drop_replication_slots_and_secrets", {
962                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
963                let caching_secrets_reader = self.caching_secrets_reader.clone();
964                let secrets_controller = Arc::clone(&self.secrets_controller);
965                let secrets_reader = Arc::clone(self.secrets_reader());
966                let storage_config = self.controller.storage.config().clone();
967
968                async move {
969                    for (connection, replication_slot_name) in replication_slots_to_drop {
970                        tracing::info!(?replication_slot_name, "dropping replication slot");
971
972                        // Try to drop the replication slots, but give up after
973                        // a while. The PostgreSQL server may no longer be
974                        // healthy. Users often drop PostgreSQL sources
975                        // *because* the PostgreSQL server has been
976                        // decomissioned.
977                        let result: Result<(), anyhow::Error> = Retry::default()
978                            .max_duration(Duration::from_secs(60))
979                            .retry_async(|_state| async {
980                                let config = connection
981                                    .config(&secrets_reader, &storage_config, InTask::No)
982                                    .await
983                                    .map_err(|e| {
984                                        anyhow::anyhow!(
985                                            "error creating Postgres client for \
986                                            dropping acquired slots: {}",
987                                            e.display_with_causes()
988                                        )
989                                    })?;
990
991                                mz_postgres_util::drop_replication_slots(
992                                    &ssh_tunnel_manager,
993                                    config.clone(),
994                                    &[(&replication_slot_name, true)],
995                                )
996                                .await?;
997
998                                Ok(())
999                            })
1000                            .await;
1001
1002                        if let Err(err) = result {
1003                            tracing::warn!(
1004                                ?replication_slot_name,
1005                                ?err,
1006                                "failed to drop replication slot"
1007                            );
1008                        }
1009                    }
1010
1011                    // Drop secrets *after* dropping the replication slots,
1012                    // because dropping replication slots may rely on those
1013                    // secrets still being present.
1014                    //
1015                    // It's okay if we crash before processing the secret drops,
1016                    // as we look for and remove any orphaned secrets during
1017                    // startup.
1018                    fail_point!("drop_secrets");
1019                    for secret in secrets_to_drop {
1020                        if let Err(e) = secrets_controller.delete(secret).await {
1021                            warn!("Dropping secrets has encountered an error: {}", e);
1022                        } else {
1023                            caching_secrets_reader.invalidate(secret);
1024                        }
1025                    }
1026                }
1027            });
1028        }
1029        .instrument(info_span!(
1030            "coord::apply_catalog_implications_inner::finalize"
1031        ))
1032        .await;
1033
1034        Ok(())
1035    }
1036
1037    #[instrument(level = "debug")]
1038    async fn create_table_collections(
1039        &mut self,
1040        table_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1041        execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
1042    ) -> Result<(), AdapterError> {
1043        // If we have tables, determine the initial validity for the table.
1044        let write_ts = self.get_local_write_ts().await;
1045        let register_ts = write_ts.timestamp;
1046
1047        // After acquiring `register_ts` but before using it, we need to
1048        // be sure we're still the leader. Otherwise a new generation
1049        // may also be trying to use `register_ts` for a different
1050        // purpose. See materialize#28216.
1051        //
1052        // We also should advance the upper of the catalog shard, to ensure it
1053        // is readable at the oracle read ts after we bump it to the
1054        // `register_ts` below. Both of these needs are served by calling
1055        // `advance_upper`.
1056        self.catalog
1057            .advance_upper(write_ts.advance_to)
1058            .await
1059            .unwrap_or_terminate("unable to advance catalog upper");
1060
1061        for id in execution_timestamps_to_set {
1062            self.set_statement_execution_timestamp(id, register_ts);
1063        }
1064
1065        let storage_metadata = self.catalog.state().storage_metadata();
1066
1067        self.controller
1068            .storage
1069            .create_collections(
1070                storage_metadata,
1071                Some(register_ts),
1072                table_collections_to_create.into_iter().collect_vec(),
1073            )
1074            .await
1075            .unwrap_or_terminate("cannot fail to create collections");
1076
1077        self.apply_local_write(register_ts).await;
1078
1079        Ok(())
1080    }
1081
1082    #[instrument(level = "debug")]
1083    async fn create_source_collections(
1084        &mut self,
1085        source_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1086    ) -> Result<(), AdapterError> {
1087        let storage_metadata = self.catalog.state().storage_metadata();
1088
1089        self.controller
1090            .storage
1091            .create_collections(
1092                storage_metadata,
1093                None, // Sources don't need a write timestamp
1094                source_collections_to_create.into_iter().collect_vec(),
1095            )
1096            .await
1097            .unwrap_or_terminate("cannot fail to create collections");
1098
1099        Ok(())
1100    }
1101
1102    #[instrument(level = "debug")]
1103    async fn initialize_storage_collections(
1104        &mut self,
1105        storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1106    ) -> Result<(), AdapterError> {
1107        for (compaction_window, global_ids) in storage_policies_to_initialize {
1108            self.initialize_read_policies(
1109                &CollectionIdBundle {
1110                    storage_ids: global_ids,
1111                    compute_ids: BTreeMap::new(),
1112                },
1113                compaction_window,
1114            )
1115            .await;
1116        }
1117
1118        Ok(())
1119    }
1120
1121    #[instrument(level = "debug")]
1122    async fn handle_create_table(
1123        &self,
1124        ctx: &Option<&mut ExecuteContext>,
1125        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1126        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1127        execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1128        table_id: CatalogItemId,
1129        table: Table,
1130    ) -> Result<(), AdapterError> {
1131        // The table data_source determines whether this table will be written to
1132        // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1133        // (e.g. a source-fed table).
1134        match &table.data_source {
1135            TableDataSource::TableWrites { defaults: _ } => {
1136                let versions: BTreeMap<_, _> = table
1137                    .collection_descs()
1138                    .map(|(gid, version, desc)| (version, (gid, desc)))
1139                    .collect();
1140                let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1141                    let collection_desc = CollectionDescription::for_table(desc.clone());
1142
1143                    (*gid, collection_desc)
1144                });
1145
1146                let compaction_window = table
1147                    .custom_logical_compaction_window
1148                    .unwrap_or(CompactionWindow::Default);
1149                let ids_to_initialize = storage_policies_to_initialize
1150                    .entry(compaction_window)
1151                    .or_default();
1152
1153                for (gid, collection_desc) in collection_descs {
1154                    storage_collections_to_create.insert(gid, collection_desc);
1155                    ids_to_initialize.insert(gid);
1156                }
1157
1158                if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1159                    execution_timestamps_to_set.insert(id);
1160                }
1161            }
1162            TableDataSource::DataSource {
1163                desc: data_source_desc,
1164                timeline,
1165            } => {
1166                match data_source_desc {
1167                    DataSourceDesc::IngestionExport {
1168                        ingestion_id,
1169                        external_reference: _,
1170                        details,
1171                        data_config,
1172                    } => {
1173                        let global_ingestion_id =
1174                            self.catalog().get_entry(ingestion_id).latest_global_id();
1175
1176                        let collection_desc = CollectionDescription {
1177                            desc: table.desc.latest(),
1178                            data_source: DataSource::IngestionExport {
1179                                ingestion_id: global_ingestion_id,
1180                                details: details.clone(),
1181                                data_config: data_config
1182                                    .clone()
1183                                    .into_inline_connection(self.catalog.state()),
1184                            },
1185                            since: None,
1186                            timeline: Some(timeline.clone()),
1187                            primary: None,
1188                        };
1189
1190                        let global_id = table
1191                            .global_ids()
1192                            .expect_element(|| "subsources cannot have multiple versions");
1193
1194                        storage_collections_to_create.insert(global_id, collection_desc);
1195
1196                        let read_policies = self
1197                            .catalog()
1198                            .state()
1199                            .source_compaction_windows(vec![table_id]);
1200                        for (compaction_window, catalog_ids) in read_policies {
1201                            let compaction_ids = storage_policies_to_initialize
1202                                .entry(compaction_window)
1203                                .or_default();
1204
1205                            let gids = catalog_ids
1206                                .into_iter()
1207                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1208                                .flatten();
1209                            compaction_ids.extend(gids);
1210                        }
1211                    }
1212                    DataSourceDesc::Webhook {
1213                        validate_using: _,
1214                        body_format: _,
1215                        headers: _,
1216                        cluster_id: _,
1217                    } => {
1218                        // Create the underlying collection with the latest schema from the Table.
1219                        assert_eq!(
1220                            table.desc.latest_version(),
1221                            RelationVersion::root(),
1222                            "found webhook with more than 1 relation version, {:?}",
1223                            table.desc
1224                        );
1225                        let desc = table.desc.latest();
1226
1227                        let collection_desc = CollectionDescription {
1228                            desc,
1229                            data_source: DataSource::Webhook,
1230                            since: None,
1231                            timeline: Some(timeline.clone()),
1232                            primary: None,
1233                        };
1234
1235                        let global_id = table
1236                            .global_ids()
1237                            .expect_element(|| "webhooks cannot have multiple versions");
1238
1239                        storage_collections_to_create.insert(global_id, collection_desc);
1240
1241                        let read_policies = self
1242                            .catalog()
1243                            .state()
1244                            .source_compaction_windows(vec![table_id]);
1245
1246                        for (compaction_window, catalog_ids) in read_policies {
1247                            let compaction_ids = storage_policies_to_initialize
1248                                .entry(compaction_window)
1249                                .or_default();
1250
1251                            let gids = catalog_ids
1252                                .into_iter()
1253                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1254                                .flatten();
1255                            compaction_ids.extend(gids);
1256                        }
1257                    }
1258                    _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1259                }
1260            }
1261        }
1262
1263        Ok(())
1264    }
1265
1266    #[instrument(level = "debug")]
1267    async fn handle_alter_table(
1268        &mut self,
1269        catalog_id: CatalogItemId,
1270        prev_table: Table,
1271        new_table: Table,
1272    ) -> Result<(), AdapterError> {
1273        let existing_gid = prev_table.global_id_writes();
1274        let new_gid = new_table.global_id_writes();
1275
1276        if existing_gid == new_gid {
1277            // It's not an ALTER TABLE ADD COLUMN, because we still have the
1278            // same GlobalId. It might be a compaction window change.
1279            if prev_table.custom_logical_compaction_window
1280                != new_table.custom_logical_compaction_window
1281            {
1282                let new_window = new_table
1283                    .custom_logical_compaction_window
1284                    .unwrap_or(CompactionWindow::Default);
1285                self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1286            }
1287            return Ok(());
1288        }
1289
1290        // Acquire a read hold on the original table for the duration of
1291        // the alter to prevent the since of the original table from
1292        // getting advanced, while the ALTER is running.
1293        let existing_table = crate::CollectionIdBundle {
1294            storage_ids: BTreeSet::from([existing_gid]),
1295            compute_ids: BTreeMap::new(),
1296        };
1297        let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1298
1299        let expected_version = prev_table.desc.latest_version();
1300        let new_version = new_table.desc.latest_version();
1301        let new_desc = new_table
1302            .desc
1303            .at_version(RelationVersionSelector::Specific(new_version));
1304
1305        let write_ts = self.get_local_write_ts().await;
1306        let register_ts = write_ts.timestamp;
1307
1308        // Ensure the catalog will be immediately readable at the read ts we're
1309        // about to bump.
1310        self.catalog
1311            .advance_upper(write_ts.advance_to)
1312            .await
1313            .unwrap_or_terminate("unable to advance catalog upper");
1314
1315        // Alter the table description, creating a "new" collection.
1316        self.controller
1317            .storage
1318            .alter_table_desc(
1319                existing_gid,
1320                new_gid,
1321                new_desc,
1322                expected_version,
1323                register_ts,
1324            )
1325            .await
1326            .expect("failed to alter desc of table");
1327
1328        // Initialize the ReadPolicy which ensures we have the correct read holds.
1329        let compaction_window = new_table
1330            .custom_logical_compaction_window
1331            .unwrap_or(CompactionWindow::Default);
1332        self.initialize_read_policies(
1333            &crate::CollectionIdBundle {
1334                storage_ids: BTreeSet::from([new_gid]),
1335                compute_ids: BTreeMap::new(),
1336            },
1337            compaction_window,
1338        )
1339        .await;
1340
1341        self.apply_local_write(register_ts).await;
1342
1343        // Alter is complete! We can drop our read hold.
1344        drop(existing_table_read_hold);
1345
1346        Ok(())
1347    }
1348
1349    #[instrument(level = "debug")]
1350    async fn handle_create_source(
1351        &self,
1352        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1353        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1354        item_id: CatalogItemId,
1355        source: Source,
1356        compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1357    ) -> Result<(), AdapterError> {
1358        let data_source = match source.data_source {
1359            DataSourceDesc::Ingestion { desc, cluster_id } => {
1360                let desc = desc.into_inline_connection(self.catalog().state());
1361                let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1362
1363                let ingestion = mz_storage_types::sources::IngestionDescription::new(
1364                    desc,
1365                    cluster_id,
1366                    item_global_id,
1367                );
1368
1369                DataSource::Ingestion(ingestion)
1370            }
1371            DataSourceDesc::OldSyntaxIngestion {
1372                desc,
1373                progress_subsource,
1374                data_config,
1375                details,
1376                cluster_id,
1377            } => {
1378                let desc = desc.into_inline_connection(self.catalog().state());
1379                let data_config = data_config.into_inline_connection(self.catalog().state());
1380
1381                // TODO(parkmycar): We should probably check the type here, but I'm not
1382                // sure if this will always be a Source or a Table.
1383                let progress_subsource = self
1384                    .catalog()
1385                    .get_entry(&progress_subsource)
1386                    .latest_global_id();
1387
1388                let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1389                    desc,
1390                    cluster_id,
1391                    progress_subsource,
1392                );
1393
1394                let legacy_export = SourceExport {
1395                    storage_metadata: (),
1396                    data_config,
1397                    details,
1398                };
1399
1400                ingestion
1401                    .source_exports
1402                    .insert(source.global_id, legacy_export);
1403
1404                DataSource::Ingestion(ingestion)
1405            }
1406            DataSourceDesc::IngestionExport {
1407                ingestion_id,
1408                external_reference: _,
1409                details,
1410                data_config,
1411            } => {
1412                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1413                // this will always be a Source or a Table.
1414                let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1415
1416                DataSource::IngestionExport {
1417                    ingestion_id,
1418                    details,
1419                    data_config: data_config.into_inline_connection(self.catalog().state()),
1420                }
1421            }
1422            DataSourceDesc::Progress => DataSource::Progress,
1423            DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1424            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1425                unreachable!("cannot create sources with internal data sources")
1426            }
1427        };
1428
1429        storage_collections_to_create.insert(
1430            source.global_id,
1431            CollectionDescription {
1432                desc: source.desc.clone(),
1433                data_source,
1434                timeline: Some(source.timeline),
1435                since: None,
1436                primary: None,
1437            },
1438        );
1439
1440        // Initialize read policies for the source
1441        for (compaction_window, catalog_ids) in compaction_windows {
1442            let compaction_ids = storage_policies_to_initialize
1443                .entry(compaction_window)
1444                .or_default();
1445
1446            let gids = catalog_ids
1447                .into_iter()
1448                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1449                .flatten();
1450            compaction_ids.extend(gids);
1451        }
1452
1453        Ok(())
1454    }
1455
1456    /// Handles altering a connection by collecting all the dependent sources,
1457    /// sinks, and tables that need their connection updated.
1458    ///
1459    /// This mirrors the logic from `sequence_alter_connection_stage_finish` but
1460    /// collects the changes into batched collections for application after all
1461    /// implications are processed.
1462    #[instrument(level = "debug")]
1463    fn handle_alter_connection(
1464        &self,
1465        connection_id: CatalogItemId,
1466        connection: Connection,
1467        vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1468        source_connections_to_alter: &mut BTreeMap<
1469            GlobalId,
1470            GenericSourceConnection<InlinedConnection>,
1471        >,
1472        sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1473        source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1474    ) {
1475        use std::collections::VecDeque;
1476
1477        // Handle AWS PrivateLink connections by queueing VPC endpoint creation.
1478        if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1479            let spec = VpcEndpointConfig {
1480                aws_service_name: privatelink.service_name.to_owned(),
1481                availability_zone_ids: privatelink.availability_zones.to_owned(),
1482            };
1483            vpc_endpoints_to_create.push((connection_id, spec));
1484        }
1485
1486        // Walk the dependency graph to find all sources, sinks, and tables
1487        // that depend on this connection (directly or transitively through
1488        // other connections).
1489        let mut connections_to_process = VecDeque::new();
1490        connections_to_process.push_front(connection_id.clone());
1491
1492        while let Some(id) = connections_to_process.pop_front() {
1493            for dependent_id in self.catalog().get_entry(&id).used_by() {
1494                let dependent_entry = self.catalog().get_entry(dependent_id);
1495                match dependent_entry.item() {
1496                    CatalogItem::Connection(_) => {
1497                        // Connections can depend on other connections (e.g., a
1498                        // Kafka connection using an SSH tunnel connection).
1499                        // Process these transitively.
1500                        connections_to_process.push_back(*dependent_id);
1501                    }
1502                    CatalogItem::Source(source) => {
1503                        let desc = match &dependent_entry
1504                            .source()
1505                            .expect("known to be source")
1506                            .data_source
1507                        {
1508                            DataSourceDesc::Ingestion { desc, .. }
1509                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1510                                desc.clone().into_inline_connection(self.catalog().state())
1511                            }
1512                            DataSourceDesc::IngestionExport { .. }
1513                            | DataSourceDesc::Introspection(_)
1514                            | DataSourceDesc::Progress
1515                            | DataSourceDesc::Webhook { .. }
1516                            | DataSourceDesc::Catalog => {
1517                                // Only ingestions reference connections directly.
1518                                continue;
1519                            }
1520                        };
1521
1522                        source_connections_to_alter.insert(source.global_id, desc.connection);
1523                    }
1524                    CatalogItem::Sink(sink) => {
1525                        let export = dependent_entry.sink().expect("known to be sink");
1526                        sink_connections_to_alter.insert(
1527                            sink.global_id,
1528                            export
1529                                .connection
1530                                .clone()
1531                                .into_inline_connection(self.catalog().state()),
1532                        );
1533                    }
1534                    CatalogItem::Table(table) => {
1535                        // This is a source-fed table that references a schema
1536                        // registry connection as part of its encoding/data
1537                        // config.
1538                        if let Some((_, _, _, export_data_config)) =
1539                            dependent_entry.source_export_details()
1540                        {
1541                            let data_config = export_data_config.clone();
1542                            source_export_data_configs_to_alter.insert(
1543                                table.global_id_writes(),
1544                                data_config.into_inline_connection(self.catalog().state()),
1545                            );
1546                        }
1547                    }
1548                    CatalogItem::Log(_)
1549                    | CatalogItem::View(_)
1550                    | CatalogItem::MaterializedView(_)
1551                    | CatalogItem::Index(_)
1552                    | CatalogItem::Type(_)
1553                    | CatalogItem::Func(_)
1554                    | CatalogItem::Secret(_) => {
1555                        // Other item types don't have connection dependencies
1556                        // that need updating.
1557                    }
1558                }
1559            }
1560        }
1561    }
1562
1563    async fn handle_create_cluster_replica(
1564        &mut self,
1565        cluster_id: ClusterId,
1566        replica_id: ReplicaId,
1567        role: ClusterRole,
1568        cluster_name: String,
1569        replica_name: String,
1570        replica_config: ReplicaConfig,
1571    ) {
1572        let enable_worker_core_affinity =
1573            self.catalog().system_config().enable_worker_core_affinity();
1574        let enable_storage_introspection_logs = self
1575            .catalog()
1576            .system_config()
1577            .enable_storage_introspection_logs();
1578
1579        self.controller
1580            .create_replica(
1581                cluster_id,
1582                replica_id,
1583                cluster_name,
1584                replica_name,
1585                role,
1586                replica_config,
1587                enable_worker_core_affinity,
1588                enable_storage_introspection_logs,
1589            )
1590            .expect("creating replicas must not fail");
1591
1592        self.install_introspection_subscribes(cluster_id, replica_id)
1593            .await;
1594    }
1595}
1596
1597/// A state machine for building catalog implications from catalog updates.
1598///
1599/// Once all [ParsedStateUpdate] of a timestamp are ingested this is a command
1600/// that has to potentially be applied to in-memory state and/or the
1601/// controller(s).
1602#[derive(Debug, Clone)]
1603enum CatalogImplication {
1604    None,
1605    Table(CatalogImplicationKind<Table>),
1606    Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1607    Sink(CatalogImplicationKind<Sink>),
1608    Index(CatalogImplicationKind<Index>),
1609    MaterializedView(CatalogImplicationKind<MaterializedView>),
1610    View(CatalogImplicationKind<View>),
1611    Secret(CatalogImplicationKind<Secret>),
1612    Connection(CatalogImplicationKind<Connection>),
1613    Cluster(CatalogImplicationKind<Cluster>),
1614    ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1615}
1616
1617#[derive(Debug, Clone)]
1618enum CatalogImplicationKind<T> {
1619    /// No operations seen yet.
1620    None,
1621    /// Item was added.
1622    Added(T),
1623    /// Item was dropped (with its name retained for error messages).
1624    Dropped(T, String),
1625    /// Item is being altered from one state to another.
1626    Altered { prev: T, new: T },
1627}
1628
1629impl<T: Clone> CatalogImplicationKind<T> {
1630    /// Apply a state transition based on a diff. Returns an error message if
1631    /// the transition is invalid.
1632    fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1633        use CatalogImplicationKind::*;
1634        use StateDiff::*;
1635
1636        let new_state = match (&*self, diff) {
1637            // Initial state transitions
1638            (None, Addition) => Added(item),
1639            (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1640
1641            // From Added state
1642            (Added(existing), Retraction) => {
1643                // Add -> Drop means the item is being altered
1644                Altered {
1645                    prev: item,
1646                    new: existing.clone(),
1647                }
1648            }
1649            (Added(_), Addition) => {
1650                return Err("Cannot add an already added object".to_string());
1651            }
1652
1653            // From Dropped state
1654            (Dropped(existing, _), Addition) => {
1655                // Drop -> Add means the item is being altered
1656                Altered {
1657                    prev: existing.clone(),
1658                    new: item,
1659                }
1660            }
1661            (Dropped(_, _), Retraction) => {
1662                return Err("Cannot drop an already dropped object".to_string());
1663            }
1664
1665            // From Altered state
1666            (Altered { .. }, _) => {
1667                return Err(format!(
1668                    "Cannot apply {:?} to an object in Altered state",
1669                    diff
1670                ));
1671            }
1672        };
1673
1674        *self = new_state;
1675        Ok(())
1676    }
1677}
1678
1679/// Macro to generate absorb methods for each item type.
1680macro_rules! impl_absorb_method {
1681    (
1682        $method_name:ident,
1683        $variant:ident,
1684        $item_type:ty
1685    ) => {
1686        fn $method_name(
1687            &mut self,
1688            item: $item_type,
1689            parsed_full_name: Option<String>,
1690            diff: StateDiff,
1691        ) {
1692            let state = match self {
1693                CatalogImplication::$variant(state) => state,
1694                CatalogImplication::None => {
1695                    *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1696                    match self {
1697                        CatalogImplication::$variant(state) => state,
1698                        _ => unreachable!(),
1699                    }
1700                }
1701                _ => {
1702                    panic!(
1703                        "Unexpected command type for {:?}: {} {:?}",
1704                        self,
1705                        stringify!($variant),
1706                        diff,
1707                    );
1708                }
1709            };
1710
1711            if let Err(e) = state.transition(item, parsed_full_name, diff) {
1712                panic!(
1713                    "Invalid state transition for {}: {}",
1714                    stringify!($variant),
1715                    e
1716                );
1717            }
1718        }
1719    };
1720}
1721
1722impl CatalogImplication {
1723    /// Absorbs the given catalog update into this [CatalogImplication], causing
1724    /// a state transition or error.
1725    fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1726        match catalog_update.kind {
1727            ParsedStateUpdateKind::Item {
1728                durable_item: _,
1729                parsed_item,
1730                connection,
1731                parsed_full_name,
1732            } => match parsed_item {
1733                CatalogItem::Table(table) => {
1734                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1735                }
1736                CatalogItem::Source(source) => {
1737                    self.absorb_source(
1738                        (source, connection),
1739                        Some(parsed_full_name),
1740                        catalog_update.diff,
1741                    );
1742                }
1743                CatalogItem::Sink(sink) => {
1744                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1745                }
1746                CatalogItem::Index(index) => {
1747                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1748                }
1749                CatalogItem::MaterializedView(mv) => {
1750                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1751                }
1752                CatalogItem::View(view) => {
1753                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1754                }
1755
1756                CatalogItem::Secret(secret) => {
1757                    self.absorb_secret(secret, None, catalog_update.diff);
1758                }
1759                CatalogItem::Connection(connection) => {
1760                    self.absorb_connection(connection, None, catalog_update.diff);
1761                }
1762                CatalogItem::Log(_) => {}
1763                CatalogItem::Type(_) => {}
1764                CatalogItem::Func(_) => {}
1765            },
1766            ParsedStateUpdateKind::TemporaryItem {
1767                durable_item: _,
1768                parsed_item,
1769                connection,
1770                parsed_full_name,
1771            } => match parsed_item {
1772                CatalogItem::Table(table) => {
1773                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1774                }
1775                CatalogItem::Source(source) => {
1776                    self.absorb_source(
1777                        (source, connection),
1778                        Some(parsed_full_name),
1779                        catalog_update.diff,
1780                    );
1781                }
1782                CatalogItem::Sink(sink) => {
1783                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1784                }
1785                CatalogItem::Index(index) => {
1786                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1787                }
1788                CatalogItem::MaterializedView(mv) => {
1789                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1790                }
1791                CatalogItem::View(view) => {
1792                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1793                }
1794
1795                CatalogItem::Secret(secret) => {
1796                    self.absorb_secret(secret, None, catalog_update.diff);
1797                }
1798                CatalogItem::Connection(connection) => {
1799                    self.absorb_connection(connection, None, catalog_update.diff);
1800                }
1801                CatalogItem::Log(_) => {}
1802                CatalogItem::Type(_) => {}
1803                CatalogItem::Func(_) => {}
1804            },
1805            ParsedStateUpdateKind::Cluster {
1806                durable_cluster: _,
1807                parsed_cluster,
1808            } => {
1809                let name = parsed_cluster.name.clone();
1810                self.absorb_cluster(parsed_cluster, Some(name), catalog_update.diff);
1811            }
1812            ParsedStateUpdateKind::ClusterReplica {
1813                durable_cluster_replica: _,
1814                parsed_cluster_replica,
1815            } => {
1816                let name = parsed_cluster_replica.name.clone();
1817                self.absorb_cluster_replica(
1818                    parsed_cluster_replica,
1819                    Some(name),
1820                    catalog_update.diff,
1821                );
1822            }
1823            ParsedStateUpdateKind::IntrospectionSourceIndex { .. } => {
1824                // IntrospectionSourceIndex updates are collected
1825                // separately in apply_catalog_implications and not
1826                // routed through absorb.
1827                unreachable!("IntrospectionSourceIndex should not be passed to absorb");
1828            }
1829        }
1830    }
1831
1832    impl_absorb_method!(absorb_table, Table, Table);
1833    impl_absorb_method!(
1834        absorb_source,
1835        Source,
1836        (Source, Option<GenericSourceConnection>)
1837    );
1838    impl_absorb_method!(absorb_sink, Sink, Sink);
1839    impl_absorb_method!(absorb_index, Index, Index);
1840    impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1841    impl_absorb_method!(absorb_view, View, View);
1842
1843    impl_absorb_method!(absorb_secret, Secret, Secret);
1844    impl_absorb_method!(absorb_connection, Connection, Connection);
1845
1846    impl_absorb_method!(absorb_cluster, Cluster, Cluster);
1847    impl_absorb_method!(absorb_cluster_replica, ClusterReplica, ClusterReplica);
1848}
1849
1850#[cfg(test)]
1851mod tests {
1852    use super::*;
1853    use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1854    use mz_sql::names::ResolvedIds;
1855    use std::collections::BTreeMap;
1856
1857    fn create_test_table(name: &str) -> Table {
1858        Table {
1859            desc: VersionedRelationDesc::new(
1860                RelationDesc::builder()
1861                    .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1862                    .finish(),
1863            ),
1864            create_sql: None,
1865            collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1866            conn_id: None,
1867            resolved_ids: ResolvedIds::empty(),
1868            custom_logical_compaction_window: None,
1869            is_retained_metrics_object: false,
1870            data_source: TableDataSource::TableWrites { defaults: vec![] },
1871        }
1872    }
1873
1874    #[mz_ore::test]
1875    fn test_item_state_transitions() {
1876        // Test None -> Added
1877        let mut state = CatalogImplicationKind::None;
1878        assert!(
1879            state
1880                .transition("item1".to_string(), None, StateDiff::Addition)
1881                .is_ok()
1882        );
1883        assert!(matches!(state, CatalogImplicationKind::Added(_)));
1884
1885        // Test Added -> Altered (via retraction)
1886        let mut state = CatalogImplicationKind::Added("new_item".to_string());
1887        assert!(
1888            state
1889                .transition("old_item".to_string(), None, StateDiff::Retraction)
1890                .is_ok()
1891        );
1892        match &state {
1893            CatalogImplicationKind::Altered { prev, new } => {
1894                // The retracted item is the OLD state
1895                assert_eq!(prev, "old_item");
1896                // The existing Added item is the NEW state
1897                assert_eq!(new, "new_item");
1898            }
1899            _ => panic!("Expected Altered state"),
1900        }
1901
1902        // Test None -> Dropped
1903        let mut state = CatalogImplicationKind::None;
1904        assert!(
1905            state
1906                .transition(
1907                    "item1".to_string(),
1908                    Some("test_name".to_string()),
1909                    StateDiff::Retraction
1910                )
1911                .is_ok()
1912        );
1913        assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1914
1915        // Test Dropped -> Altered (via addition)
1916        let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1917        assert!(
1918            state
1919                .transition("new_item".to_string(), None, StateDiff::Addition)
1920                .is_ok()
1921        );
1922        match &state {
1923            CatalogImplicationKind::Altered { prev, new } => {
1924                // The existing Dropped item is the OLD state
1925                assert_eq!(prev, "old_item");
1926                // The added item is the NEW state
1927                assert_eq!(new, "new_item");
1928            }
1929            _ => panic!("Expected Altered state"),
1930        }
1931
1932        // Test invalid transitions
1933        let mut state = CatalogImplicationKind::Added("item".to_string());
1934        assert!(
1935            state
1936                .transition("item2".to_string(), None, StateDiff::Addition)
1937                .is_err()
1938        );
1939
1940        let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1941        assert!(
1942            state
1943                .transition("item2".to_string(), None, StateDiff::Retraction)
1944                .is_err()
1945        );
1946    }
1947
1948    #[mz_ore::test]
1949    fn test_table_absorb_state_machine() {
1950        let table1 = create_test_table("table1");
1951        let table2 = create_test_table("table2");
1952
1953        // Test None -> AddTable
1954        let mut cmd = CatalogImplication::None;
1955        cmd.absorb_table(
1956            table1.clone(),
1957            Some("schema.table1".to_string()),
1958            StateDiff::Addition,
1959        );
1960        // Check that we have an Added state
1961        match &cmd {
1962            CatalogImplication::Table(state) => match state {
1963                CatalogImplicationKind::Added(t) => {
1964                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1965                }
1966                _ => panic!("Expected Added state"),
1967            },
1968            _ => panic!("Expected Table command"),
1969        }
1970
1971        // Test AddTable -> AlterTable (via retraction)
1972        // This tests the bug fix: when we have AddTable(table1) and receive Retraction(table2),
1973        // table2 is the old state being removed, table1 is the new state
1974        cmd.absorb_table(
1975            table2.clone(),
1976            Some("schema.table2".to_string()),
1977            StateDiff::Retraction,
1978        );
1979        match &cmd {
1980            CatalogImplication::Table(state) => match state {
1981                CatalogImplicationKind::Altered { prev, new } => {
1982                    // Verify the fix: prev should be the retracted table, new should be the added table
1983                    assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1984                    assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1985                }
1986                _ => panic!("Expected Altered state"),
1987            },
1988            _ => panic!("Expected Table command"),
1989        }
1990
1991        // Test None -> DropTable
1992        let mut cmd = CatalogImplication::None;
1993        cmd.absorb_table(
1994            table1.clone(),
1995            Some("schema.table1".to_string()),
1996            StateDiff::Retraction,
1997        );
1998        match &cmd {
1999            CatalogImplication::Table(state) => match state {
2000                CatalogImplicationKind::Dropped(t, name) => {
2001                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
2002                    assert_eq!(name, "schema.table1");
2003                }
2004                _ => panic!("Expected Dropped state"),
2005            },
2006            _ => panic!("Expected Table command"),
2007        }
2008
2009        // Test DropTable -> AlterTable (via addition)
2010        cmd.absorb_table(
2011            table2.clone(),
2012            Some("schema.table2".to_string()),
2013            StateDiff::Addition,
2014        );
2015        match &cmd {
2016            CatalogImplication::Table(state) => match state {
2017                CatalogImplicationKind::Altered { prev, new } => {
2018                    // prev should be the dropped table, new should be the added table
2019                    assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
2020                    assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
2021                }
2022                _ => panic!("Expected Altered state"),
2023            },
2024            _ => panic!("Expected Table command"),
2025        }
2026    }
2027
2028    #[mz_ore::test]
2029    #[should_panic(expected = "Cannot add an already added object")]
2030    fn test_invalid_double_add() {
2031        let table = create_test_table("table");
2032        let mut cmd = CatalogImplication::None;
2033
2034        // First addition
2035        cmd.absorb_table(
2036            table.clone(),
2037            Some("schema.table".to_string()),
2038            StateDiff::Addition,
2039        );
2040
2041        // Second addition should panic
2042        cmd.absorb_table(
2043            table.clone(),
2044            Some("schema.table".to_string()),
2045            StateDiff::Addition,
2046        );
2047    }
2048
2049    #[mz_ore::test]
2050    #[should_panic(expected = "Cannot drop an already dropped object")]
2051    fn test_invalid_double_drop() {
2052        let table = create_test_table("table");
2053        let mut cmd = CatalogImplication::None;
2054
2055        // First drop
2056        cmd.absorb_table(
2057            table.clone(),
2058            Some("schema.table".to_string()),
2059            StateDiff::Retraction,
2060        );
2061
2062        // Second drop should panic
2063        cmd.absorb_table(
2064            table.clone(),
2065            Some("schema.table".to_string()),
2066            StateDiff::Retraction,
2067        );
2068    }
2069}