Skip to main content

mz_adapter/coord/
catalog_implications.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to deriving and applying implications from [catalog
11//! changes](ParsedStateUpdate).
12//!
13//! The flow from "raw" catalog changes to [CatalogImplication] works like this:
14//!
15//! StateUpdateKind -> ParsedStateUpdate -> CatalogImplication
16//!
17//! [ParsedStateUpdate] adds context to a "raw" catalog change
18//! ([StateUpdateKind](mz_catalog::memory::objects::StateUpdateKind)). It
19//! includes an in-memory representation of the updated object, which can in
20//! theory be derived from the raw change but only when we have access to all
21//! the other raw changes or to an in-memory Catalog, which represents a
22//! "rollup" of all the raw changes.
23//!
24//! [CatalogImplication] is both the state machine that we use for absorbing
25//! multiple state updates for the same object and the final command that has to
26//! be applied to in-memory state and or the controller(s) after absorbing all
27//! the state updates in a given batch of updates.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37    CatalogItem, Cluster, ClusterReplica, Connection, DataSourceDesc, Index, MaterializedView,
38    Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::logging::LogVariant;
42use mz_compute_client::protocol::response::PeekResponse;
43use mz_controller::clusters::{ClusterRole, ReplicaConfig};
44use mz_controller_types::{ClusterId, ReplicaId};
45use mz_ore::collections::CollectionExt;
46use mz_ore::error::ErrorExt;
47use mz_ore::future::InTask;
48use mz_ore::instrument;
49use mz_ore::retry::Retry;
50use mz_ore::task;
51use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector};
52use mz_sql::plan::ConnectionDetails;
53use mz_storage_client::controller::{CollectionDescription, DataSource};
54use mz_storage_types::connections::PostgresConnection;
55use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
56use mz_storage_types::sinks::StorageSinkConnection;
57use mz_storage_types::sources::{
58    GenericSourceConnection, SourceDesc, SourceExport, SourceExportDataConfig,
59};
60use tracing::{Instrument, info_span, warn};
61
62use crate::active_compute_sink::ActiveComputeSinkRetireReason;
63use crate::coord::Coordinator;
64use crate::coord::catalog_implications::parsed_state_updates::{
65    ParsedStateUpdate, ParsedStateUpdateKind,
66};
67use crate::coord::peek::DroppedDependency;
68use crate::coord::timeline::TimelineState;
69use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
70use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
71
72pub mod parsed_state_updates;
73
74impl Coordinator {
75    /// Applies implications from the given bucket of [ParsedStateUpdate] to our
76    /// in-memory state and our controllers. This also applies transitive
77    /// implications, for example, peeks and subscribes will be cancelled when
78    /// referenced objects are dropped.
79    ///
80    /// This _requires_ that the given updates are consolidated. There must be
81    /// at most one addition and/or one retraction for a given item, as
82    /// identified by that items ID type.
83    #[instrument(level = "debug")]
84    pub async fn apply_catalog_implications(
85        &mut self,
86        ctx: Option<&mut ExecuteContext>,
87        catalog_updates: Vec<ParsedStateUpdate>,
88    ) -> Result<(), AdapterError> {
89        let start = Instant::now();
90
91        let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
92        let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
93        let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
94            BTreeMap::new();
95        // Introspection source index additions, collected separately and
96        // merged into the AddCluster handler. Not routed through the
97        // absorb machinery since they are simple additions that don't
98        // need Altered support.
99        let mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>> =
100            BTreeMap::new();
101        // Whether any replica-scoped system-parameter override changed in this
102        // batch. The push re-pushes the complete per-replica dyncfg layer, so we
103        // only track that a change happened, not the individual rows.
104        let mut replica_scoped_config_changed = false;
105
106        for update in catalog_updates {
107            tracing::trace!(?update, "got parsed state update");
108            match &update.kind {
109                ParsedStateUpdateKind::Item {
110                    durable_item,
111                    parsed_item: _,
112                    connection: _,
113                    parsed_full_name: _,
114                } => {
115                    let entry = catalog_implications
116                        .entry(durable_item.id.clone())
117                        .or_insert_with(|| CatalogImplication::None);
118                    entry.absorb(update);
119                }
120                ParsedStateUpdateKind::TemporaryItem {
121                    durable_item,
122                    parsed_item: _,
123                    connection: _,
124                    parsed_full_name: _,
125                } => {
126                    let entry = catalog_implications
127                        .entry(durable_item.id.clone())
128                        .or_insert_with(|| CatalogImplication::None);
129                    entry.absorb(update);
130                }
131                ParsedStateUpdateKind::Cluster {
132                    durable_cluster,
133                    parsed_cluster: _,
134                } => {
135                    let entry = cluster_commands
136                        .entry(durable_cluster.id)
137                        .or_insert_with(|| CatalogImplication::None);
138                    entry.absorb(update.clone());
139                }
140                ParsedStateUpdateKind::ClusterReplica {
141                    durable_cluster_replica,
142                    parsed_cluster_replica: _,
143                } => {
144                    let entry = cluster_replica_commands
145                        .entry((
146                            durable_cluster_replica.cluster_id,
147                            durable_cluster_replica.replica_id,
148                        ))
149                        .or_insert_with(|| CatalogImplication::None);
150                    entry.absorb(update.clone());
151                }
152                ParsedStateUpdateKind::IntrospectionSourceIndex {
153                    cluster_id,
154                    log,
155                    index_id,
156                } => {
157                    if update.diff == StateDiff::Addition {
158                        introspection_source_indexes
159                            .entry(*cluster_id)
160                            .or_default()
161                            .insert(log.clone(), *index_id);
162                    }
163                    // Retractions don't need handling: introspection
164                    // source indexes are dropped with their cluster.
165                }
166                ParsedStateUpdateKind::ReplicaSystemConfiguration { durable: _ } => {
167                    // Additions and retractions both re-derive the full
168                    // per-replica layer from the working copy, so the diff sign
169                    // does not matter here.
170                    replica_scoped_config_changed = true;
171                }
172            }
173        }
174
175        self.apply_catalog_implications_inner(
176            ctx,
177            catalog_implications.into_iter().collect_vec(),
178            cluster_commands.into_iter().collect_vec(),
179            cluster_replica_commands.into_iter().collect_vec(),
180            introspection_source_indexes,
181            replica_scoped_config_changed,
182        )
183        .await?;
184
185        self.metrics
186            .apply_catalog_implications_seconds
187            .observe(start.elapsed().as_secs_f64());
188
189        Ok(())
190    }
191
192    #[instrument(level = "debug")]
193    async fn apply_catalog_implications_inner(
194        &mut self,
195        ctx: Option<&mut ExecuteContext>,
196        implications: Vec<(CatalogItemId, CatalogImplication)>,
197        cluster_commands: Vec<(ClusterId, CatalogImplication)>,
198        cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
199        mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>>,
200        replica_scoped_config_changed: bool,
201    ) -> Result<(), AdapterError> {
202        let mut tables_to_drop = BTreeSet::new();
203        let mut sources_to_drop = vec![];
204        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
205        let mut storage_sink_gids_to_drop = vec![];
206        let mut indexes_to_drop = vec![];
207        let mut compute_sinks_to_drop = vec![];
208        let mut view_gids_to_drop = vec![];
209        let mut secrets_to_drop = vec![];
210        let mut vpc_endpoints_to_drop = vec![];
211        let mut clusters_to_drop = vec![];
212        let mut cluster_replicas_to_drop = vec![];
213        let mut active_compute_sinks_to_drop = BTreeMap::new();
214        let mut peeks_to_drop = vec![];
215        let mut copies_to_drop = vec![];
216
217        // Maps for storing names of dropped objects for error messages.
218        let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
219        let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
220
221        // Separate collections for tables (which need write timestamps) and
222        // sources (which don't).
223        let mut table_collections_to_create = BTreeMap::new();
224        let mut source_collections_to_create = BTreeMap::new();
225        let mut storage_policies_to_initialize = BTreeMap::new();
226        let mut execution_timestamps_to_set = BTreeSet::new();
227        let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
228
229        // Sources that shouldn't be dropped, even if we saw a `Dropped` event.
230        // Used for correct handling of ALTER MV.
231        let mut source_gids_to_keep = BTreeSet::new();
232
233        // Collections for batching connection-related alterations.
234        let mut source_connections_to_alter: BTreeMap<
235            GlobalId,
236            GenericSourceConnection<InlinedConnection>,
237        > = BTreeMap::new();
238        let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
239            BTreeMap::new();
240        let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
241            BTreeMap::new();
242        let mut source_descs_to_alter: BTreeMap<GlobalId, SourceDesc> = BTreeMap::new();
243
244        // We're incrementally migrating the code that manipulates the
245        // controller from closures in the sequencer. For some types of catalog
246        // changes we haven't done this migration yet, so there you will see
247        // just a log message. Over the next couple of PRs all of these will go
248        // away.
249
250        for (catalog_id, implication) in implications {
251            tracing::trace!(?implication, "have to apply catalog implication");
252
253            match implication {
254                CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
255                    self.handle_create_table(
256                        &ctx,
257                        &mut table_collections_to_create,
258                        &mut storage_policies_to_initialize,
259                        &mut execution_timestamps_to_set,
260                        catalog_id,
261                        table.clone(),
262                    )
263                    .await?
264                }
265                CatalogImplication::Table(CatalogImplicationKind::Altered {
266                    prev: prev_table,
267                    new: new_table,
268                }) => {
269                    self.handle_alter_table(catalog_id, prev_table, new_table)
270                        .await?
271                }
272
273                CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
274                    let global_ids = table.global_ids();
275                    for global_id in global_ids {
276                        tables_to_drop.insert((catalog_id, global_id));
277                        dropped_item_names.insert(global_id, full_name.clone());
278                    }
279                }
280                CatalogImplication::Source(CatalogImplicationKind::Added((
281                    source,
282                    _connection,
283                ))) => {
284                    // Get the compaction windows for all sources with this
285                    // catalog_id This replicates the logic from
286                    // sequence_create_source where it collects all item_ids and
287                    // gets their compaction windows
288                    let compaction_windows = self
289                        .catalog()
290                        .state()
291                        .source_compaction_windows(vec![catalog_id]);
292
293                    self.handle_create_source(
294                        &mut source_collections_to_create,
295                        &mut storage_policies_to_initialize,
296                        catalog_id,
297                        source,
298                        compaction_windows,
299                    )
300                    .await?
301                }
302                CatalogImplication::Source(CatalogImplicationKind::Altered {
303                    prev: (prev_source, _prev_connection),
304                    new: (new_source, new_connection),
305                }) => {
306                    if prev_source.custom_logical_compaction_window
307                        != new_source.custom_logical_compaction_window
308                    {
309                        let new_window = new_source
310                            .custom_logical_compaction_window
311                            .unwrap_or(CompactionWindow::Default);
312                        self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
313                    }
314                    match (&prev_source.data_source, &new_source.data_source) {
315                        (
316                            DataSourceDesc::Ingestion {
317                                desc: prev_desc, ..
318                            }
319                            | DataSourceDesc::OldSyntaxIngestion {
320                                desc: prev_desc, ..
321                            },
322                            DataSourceDesc::Ingestion { desc: new_desc, .. }
323                            | DataSourceDesc::OldSyntaxIngestion { desc: new_desc, .. },
324                        ) => {
325                            if prev_desc != new_desc {
326                                let inlined_connection = new_connection
327                                    .expect("ingestion source should have inlined connection");
328                                let inlined_desc = SourceDesc {
329                                    connection: inlined_connection,
330                                    timestamp_interval: new_desc.timestamp_interval,
331                                };
332                                source_descs_to_alter.insert(new_source.global_id, inlined_desc);
333                            }
334                        }
335                        _ => {}
336                    }
337                }
338                CatalogImplication::Source(CatalogImplicationKind::Dropped(
339                    (source, connection),
340                    full_name,
341                )) => {
342                    let global_id = source.global_id();
343                    sources_to_drop.push((catalog_id, global_id));
344                    dropped_item_names.insert(global_id, full_name);
345
346                    if let DataSourceDesc::Ingestion { desc, .. }
347                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
348                    {
349                        match &desc.connection {
350                            GenericSourceConnection::Postgres(_referenced_conn) => {
351                                let inline_conn = connection.expect("missing inlined connection");
352
353                                let pg_conn = match inline_conn {
354                                    GenericSourceConnection::Postgres(pg_conn) => pg_conn,
355                                    other => {
356                                        panic!("expected postgres connection, got: {:?}", other)
357                                    }
358                                };
359                                let pending_drop = (
360                                    pg_conn.connection.clone(),
361                                    pg_conn.publication_details.slot.clone(),
362                                );
363                                replication_slots_to_drop.push(pending_drop);
364                            }
365                            _ => {}
366                        }
367                    }
368                }
369                CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
370                    tracing::debug!(?sink, "not handling AddSink in here yet");
371                }
372                CatalogImplication::Sink(CatalogImplicationKind::Altered {
373                    prev: prev_sink,
374                    new: new_sink,
375                }) => {
376                    tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
377                }
378                CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
379                    storage_sink_gids_to_drop.push(sink.global_id());
380                    dropped_item_names.insert(sink.global_id(), full_name);
381                }
382                CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
383                    tracing::debug!(?index, "not handling AddIndex in here yet");
384                }
385                CatalogImplication::Index(CatalogImplicationKind::Altered {
386                    prev: prev_index,
387                    new: new_index,
388                }) => {
389                    if prev_index.custom_logical_compaction_window
390                        != new_index.custom_logical_compaction_window
391                    {
392                        let new_window = new_index
393                            .custom_logical_compaction_window
394                            .unwrap_or(CompactionWindow::Default);
395                        self.update_compute_read_policy(
396                            new_index.cluster_id,
397                            catalog_id,
398                            new_window.into(),
399                        );
400                    }
401                }
402                CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
403                    indexes_to_drop.push((index.cluster_id, index.global_id()));
404                    dropped_item_names.insert(index.global_id(), full_name);
405                }
406                CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
407                    tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
408                }
409                CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
410                    prev: prev_mv,
411                    new: new_mv,
412                }) => {
413                    // We get here for three reasons:
414                    //  1. Name changes, like those caused by ALTER SCHEMA.
415                    //  2. Replacement application.
416                    //  3. Compaction window changes (ALTER ... SET (RETAIN HISTORY ...)).
417                    //
418                    // 1. Name changes: We don't have to do anything here.
419                    //
420                    // 2. Replacement application: This is tricky: It changes the `CatalogItemId` of
421                    // the target to that of the replacement and simultaneously drops the replacement.
422                    // Which means when we get here `prev_mv` is the replacement that should be
423                    // dropped, and `new_mv` is the target that already exists but under a different
424                    // ID (which will receive a `Dropped` event separately). We can sniff out this
425                    // case by checking for version differences.
426                    //
427                    // 3. Compaction window changes: We handle this in an `else if`, because if there
428                    // is also a replacement application, then the replacement's storage collections
429                    // already have the correct read policies from when they were created, so we
430                    // don't need to update them here.
431                    if prev_mv.collections != new_mv.collections {
432                        // Sanity check: The replacement's last (and only) version must be the same
433                        // as the new target's last version.
434                        assert_eq!(
435                            prev_mv.global_id_writes(),
436                            new_mv.global_id_writes(),
437                            "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
438                        );
439
440                        let gid = new_mv.global_id_writes();
441                        self.allow_writes(new_mv.cluster_id, gid);
442
443                        // There will be a separate `Dropped` implication for the old definition of
444                        // the target MV. That will drop the old compute collection, as we desire,
445                        // but we need to prevent it from dropping the old storage collection as
446                        // well, since that might still be depended on.
447                        source_gids_to_keep.extend(new_mv.global_ids());
448                    } else if prev_mv.custom_logical_compaction_window
449                        != new_mv.custom_logical_compaction_window
450                    {
451                        let new_window = new_mv
452                            .custom_logical_compaction_window
453                            .unwrap_or(CompactionWindow::Default);
454                        self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
455                    }
456                }
457                CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
458                    mv,
459                    full_name,
460                )) => {
461                    compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
462                    for gid in mv.global_ids() {
463                        sources_to_drop.push((catalog_id, gid));
464                        dropped_item_names.insert(gid, full_name.clone());
465                    }
466                }
467                CatalogImplication::View(CatalogImplicationKind::Added(_view)) => {
468                    // No action needed: views are catalog-only objects with no
469                    // storage collections or dataflows to create.
470                }
471                CatalogImplication::View(CatalogImplicationKind::Altered {
472                    prev: _prev_view,
473                    new: _new_view,
474                }) => {
475                    // No action needed: view alterations (e.g. renames) are
476                    // catalog-only and require no controller changes.
477                }
478                CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
479                    view_gids_to_drop.push(view.global_id());
480                    dropped_item_names.insert(view.global_id(), full_name);
481                }
482                CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
483                    // No action needed: the secret payload is stored in
484                    // secrets_controller.ensure() BEFORE the catalog transaction.
485                    // By the time we see this update, the secret is already stored.
486                }
487                CatalogImplication::Secret(CatalogImplicationKind::Altered {
488                    prev: _prev_secret,
489                    new: _new_secret,
490                }) => {
491                    // No action needed: altering a secret updates the payload via
492                    // secrets_controller.ensure() without a catalog transaction.
493                }
494                CatalogImplication::Secret(CatalogImplicationKind::Dropped(
495                    _secret,
496                    _full_name,
497                )) => {
498                    secrets_to_drop.push(catalog_id);
499                }
500                CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
501                    match &connection.details {
502                        // SSH connections: key pair is stored in secrets_controller
503                        // BEFORE the catalog transaction, so no action needed here.
504                        ConnectionDetails::Ssh { .. } => {}
505                        // AWS PrivateLink connections: create the VPC endpoint
506                        ConnectionDetails::AwsPrivatelink(privatelink) => {
507                            let spec = VpcEndpointConfig {
508                                aws_service_name: privatelink.service_name.to_owned(),
509                                availability_zone_ids: privatelink.availability_zones.to_owned(),
510                            };
511                            vpc_endpoints_to_create.push((catalog_id, spec));
512                        }
513                        // Other connection types don't require post-transaction actions
514                        _ => {}
515                    }
516                }
517                CatalogImplication::Connection(CatalogImplicationKind::Altered {
518                    prev: _prev_connection,
519                    new: new_connection,
520                }) => {
521                    self.handle_alter_connection(
522                        catalog_id,
523                        new_connection,
524                        &mut vpc_endpoints_to_create,
525                        &mut source_connections_to_alter,
526                        &mut sink_connections_to_alter,
527                        &mut source_export_data_configs_to_alter,
528                    );
529                }
530                CatalogImplication::Connection(CatalogImplicationKind::Dropped(
531                    connection,
532                    _full_name,
533                )) => {
534                    match &connection.details {
535                        // SSH connections have an associated secret that should be dropped
536                        ConnectionDetails::Ssh { .. } => {
537                            secrets_to_drop.push(catalog_id);
538                        }
539                        // AWS PrivateLink connections have an associated
540                        // VpcEndpoint K8S resource that should be dropped
541                        ConnectionDetails::AwsPrivatelink(_) => {
542                            vpc_endpoints_to_drop.push(catalog_id);
543                        }
544                        _ => (),
545                    }
546                }
547                CatalogImplication::None => {
548                    // Nothing to do for None commands
549                }
550                CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
551                    unreachable!("clusters and cluster replicas are handled below")
552                }
553                CatalogImplication::Table(CatalogImplicationKind::None)
554                | CatalogImplication::Source(CatalogImplicationKind::None)
555                | CatalogImplication::Sink(CatalogImplicationKind::None)
556                | CatalogImplication::Index(CatalogImplicationKind::None)
557                | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
558                | CatalogImplication::View(CatalogImplicationKind::None)
559                | CatalogImplication::Secret(CatalogImplicationKind::None)
560                | CatalogImplication::Connection(CatalogImplicationKind::None) => {
561                    unreachable!("will never leave None in place");
562                }
563            }
564        }
565
566        for (cluster_id, command) in cluster_commands {
567            tracing::trace!(?command, "have cluster command to apply!");
568
569            match command {
570                CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
571                    // The Cluster's log_indexes is empty at parse time
572                    // because IntrospectionSourceIndex updates are applied
573                    // after the Cluster update. Use the separately collected
574                    // introspection_source_indexes instead.
575                    let arranged_logs = introspection_source_indexes
576                        .remove(&cluster_id)
577                        .unwrap_or_default();
578                    let introspection_source_ids: Vec<_> =
579                        arranged_logs.values().copied().collect();
580
581                    self.controller
582                        .create_cluster(
583                            cluster_id,
584                            mz_controller::clusters::ClusterConfig {
585                                arranged_logs,
586                                workload_class: cluster.config.workload_class.clone(),
587                            },
588                        )
589                        .expect("creating cluster must not fail");
590
591                    if !introspection_source_ids.is_empty() {
592                        self.initialize_compute_read_policies(
593                            introspection_source_ids,
594                            cluster_id,
595                            CompactionWindow::Default,
596                        )
597                        .await;
598                    }
599                }
600                CatalogImplication::Cluster(CatalogImplicationKind::Altered {
601                    prev: prev_cluster,
602                    new: new_cluster,
603                }) => {
604                    // Replica adds/drops/renames from config changes arrive as
605                    // separate AddClusterReplica/DroppedClusterReplica/
606                    // AlterClusterReplica events, so the only cluster-level
607                    // side effect here is updating the workload class on the
608                    // controller when it changes.
609                    if prev_cluster.config.workload_class != new_cluster.config.workload_class {
610                        self.controller.update_cluster_workload_class(
611                            cluster_id,
612                            new_cluster.config.workload_class.clone(),
613                        );
614                    }
615                }
616                CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
617                    cluster,
618                    _full_name,
619                )) => {
620                    clusters_to_drop.push(cluster_id);
621                    dropped_cluster_names.insert(cluster_id, cluster.name);
622                }
623                CatalogImplication::Cluster(CatalogImplicationKind::None) => {
624                    unreachable!("will never leave None in place");
625                }
626                command => {
627                    unreachable!(
628                        "we only handle cluster commands in this map, got: {:?}",
629                        command
630                    );
631                }
632            }
633        }
634
635        // Apply replica-scoped overrides after clusters are created (so their
636        // compute instances exist) but before replicas are created below. The
637        // override layer must be set before `create_replica`, so the new
638        // replica's first configuration replays with its override. The push
639        // reads the catalog working copy, which already reflects this
640        // transaction's scoped-config changes.
641        if replica_scoped_config_changed {
642            self.push_replica_dyncfg_overrides();
643        }
644
645        for ((cluster_id, replica_id), command) in cluster_replica_commands {
646            tracing::trace!(?command, "have cluster replica command to apply!");
647
648            match command {
649                CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
650                    // Read the cluster name and role from the current catalog
651                    // state. This is correct as long as implications are
652                    // processed right after each catalog transaction. For a
653                    // more future-proof approach that tracks cluster info
654                    // locally across transactions, see the last commit of
655                    // https://github.com/ggevay/materialize/tree/implications-cluster-name-tracking
656                    // which removes that logic.
657                    let cluster = self.catalog().get_cluster(cluster_id);
658                    let cluster_name = cluster.name.clone();
659                    let cluster_role = cluster.role();
660                    self.handle_create_cluster_replica(
661                        cluster_id,
662                        replica_id,
663                        cluster_role,
664                        cluster_name,
665                        replica.name.clone(),
666                        replica.config.clone(),
667                    )
668                    .await;
669                }
670                CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
671                    prev: _prev_replica,
672                    new: _new_replica,
673                }) => {
674                    // No action needed: cluster replica alterations (e.g.
675                    // renames, owner changes, pending flag changes) are
676                    // catalog-only and require no controller changes.
677                }
678                CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
679                    _replica,
680                    _full_name,
681                )) => {
682                    cluster_replicas_to_drop.push((cluster_id, replica_id));
683                }
684                CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
685                    unreachable!("will never leave None in place");
686                }
687                command => {
688                    unreachable!(
689                        "we only handle cluster replica commands in this map, got: {:?}",
690                        command
691                    );
692                }
693            }
694        }
695
696        if !source_collections_to_create.is_empty() {
697            self.create_source_collections(source_collections_to_create)
698                .await?;
699        }
700
701        // Have to create sources first and then tables, because tables within
702        // one transaction can depend on sources.
703        if !table_collections_to_create.is_empty() {
704            self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
705                .await?;
706        }
707        // It is _very_ important that we only initialize read policies after we
708        // have created all the sources/collections. Some of the sources created
709        // in this collection might have dependencies on other sources, so the
710        // controller must get a chance to install read holds before we set a
711        // policy that might make the since advance.
712        self.initialize_storage_collections(storage_policies_to_initialize)
713            .await?;
714
715        // Create VPC endpoints for AWS PrivateLink connections
716        if !vpc_endpoints_to_create.is_empty() {
717            if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
718                for (connection_id, spec) in vpc_endpoints_to_create {
719                    if let Err(err) = cloud_resource_controller
720                        .ensure_vpc_endpoint(connection_id, spec)
721                        .await
722                    {
723                        tracing::error!(?err, "failed to ensure vpc endpoint!");
724                    }
725                }
726            } else {
727                tracing::error!(
728                    "AWS PrivateLink connections unsupported without cloud_resource_controller"
729                );
730            }
731        }
732
733        // Apply batched connection alterations to dependent sources/sinks/tables.
734        if !source_connections_to_alter.is_empty() {
735            self.controller
736                .storage
737                .alter_ingestion_connections(source_connections_to_alter)
738                .await
739                .unwrap_or_terminate("cannot fail to alter ingestion connections");
740        }
741
742        if !sink_connections_to_alter.is_empty() {
743            self.controller
744                .storage
745                .alter_export_connections(sink_connections_to_alter)
746                .await
747                .unwrap_or_terminate("altering export connections after txn must succeed");
748        }
749
750        if !source_export_data_configs_to_alter.is_empty() {
751            self.controller
752                .storage
753                .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
754                .await
755                .unwrap_or_terminate("altering source export data configs after txn must succeed");
756        }
757
758        if !source_descs_to_alter.is_empty() {
759            self.controller
760                .storage
761                .alter_ingestion_source_desc(source_descs_to_alter)
762                .await
763                .unwrap_or_terminate("cannot fail to alter ingestion source desc");
764        }
765
766        // Apply source drop overwrites.
767        sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
768
769        let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
770            .iter()
771            .map(|(_, gid)| *gid)
772            .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
773            .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
774            .chain(view_gids_to_drop.iter().copied())
775            .collect();
776
777        // Clean up any active compute sinks like subscribes or copy to-s that
778        // rely on dropped relations or clusters.
779        for (sink_id, sink) in &self.active_compute_sinks {
780            let cluster_id = sink.cluster_id();
781            if let Some(id) = sink
782                .depends_on()
783                .iter()
784                .find(|id| readable_collections_to_drop.contains(id))
785            {
786                let name = dropped_item_names
787                    .get(id)
788                    .cloned()
789                    .expect("missing relation name");
790                active_compute_sinks_to_drop.insert(
791                    *sink_id,
792                    ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Relation {
793                        name,
794                    }),
795                );
796            } else if clusters_to_drop.contains(&cluster_id) {
797                let name = dropped_cluster_names
798                    .get(&cluster_id)
799                    .cloned()
800                    .expect("missing cluster name");
801                active_compute_sinks_to_drop.insert(
802                    *sink_id,
803                    ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Cluster {
804                        name,
805                    }),
806                );
807            }
808        }
809
810        // Clean up any pending peeks that rely on dropped relations or clusters.
811        for (uuid, pending_peek) in &self.pending_peeks {
812            if let Some(id) = pending_peek
813                .depends_on
814                .iter()
815                .find(|id| readable_collections_to_drop.contains(id))
816            {
817                let name = dropped_item_names
818                    .get(id)
819                    .cloned()
820                    .expect("missing relation name");
821                peeks_to_drop.push((DroppedDependency::Relation { name }, uuid.clone()));
822            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
823                let name = dropped_cluster_names
824                    .get(&pending_peek.cluster_id)
825                    .cloned()
826                    .expect("missing cluster name");
827                peeks_to_drop.push((DroppedDependency::Cluster { name }, uuid.clone()));
828            }
829        }
830
831        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
832        for (conn_id, pending_copy) in &self.active_copies {
833            let dropping_table = tables_to_drop
834                .iter()
835                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
836            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
837
838            if dropping_table || dropping_cluster {
839                copies_to_drop.push(conn_id.clone());
840            }
841        }
842
843        let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
844            .iter()
845            .map(|(_id, gid)| gid)
846            .chain(storage_sink_gids_to_drop.iter())
847            .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
848            .copied()
849            .collect();
850        let compute_gids_to_drop: Vec<_> = indexes_to_drop
851            .iter()
852            .chain(compute_sinks_to_drop.iter())
853            .copied()
854            .collect();
855
856        // Gather resources that we have to remove from timeline state and
857        // pre-check if any Timelines become empty, when we drop the specified
858        // storage and compute resources.
859        //
860        // Note: We only apply these changes below.
861        let mut timeline_id_bundles = BTreeMap::new();
862
863        for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
864            let mut id_bundle = CollectionIdBundle::default();
865
866            for storage_id in read_holds.storage_ids() {
867                if storage_gids_to_drop.contains(&storage_id) {
868                    id_bundle.storage_ids.insert(storage_id);
869                }
870            }
871
872            for (instance_id, id) in read_holds.compute_ids() {
873                if compute_gids_to_drop.contains(&(instance_id, id))
874                    || clusters_to_drop.contains(&instance_id)
875                {
876                    id_bundle
877                        .compute_ids
878                        .entry(instance_id)
879                        .or_default()
880                        .insert(id);
881                }
882            }
883
884            timeline_id_bundles.insert(timeline.clone(), id_bundle);
885        }
886
887        let mut timeline_associations = BTreeMap::new();
888        for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
889            let TimelineState { read_holds, .. } = self
890                .global_timelines
891                .get(&timeline)
892                .expect("all timelines have a timestamp oracle");
893
894            let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
895            timeline_associations.insert(timeline, (empty, id_bundle));
896        }
897
898        // No error returns are allowed after this point. Enforce this at compile time
899        // by using this odd structure so we don't accidentally add a stray `?`.
900        let _: () = async {
901            if !timeline_associations.is_empty() {
902                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
903                    let became_empty =
904                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
905                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
906                }
907            }
908
909            // Note that we drop tables before sources since there can be a weak
910            // dependency on sources from tables in the storage controller that
911            // will result in error logging that we'd prefer to avoid. This
912            // isn't an actual dependency issue but we'd like to keep that error
913            // logging around to indicate when an actual dependency error might
914            // occur.
915            if !tables_to_drop.is_empty() {
916                let ts = self.get_local_write_ts().await;
917                self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
918            }
919
920            if !sources_to_drop.is_empty() {
921                self.drop_sources(sources_to_drop);
922            }
923
924            if !storage_sink_gids_to_drop.is_empty() {
925                self.drop_storage_sinks(storage_sink_gids_to_drop);
926            }
927
928            if !active_compute_sinks_to_drop.is_empty() {
929                self.retire_compute_sinks(active_compute_sinks_to_drop)
930                    .await;
931            }
932
933            if !peeks_to_drop.is_empty() {
934                for (dep, uuid) in peeks_to_drop {
935                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
936                        let cancel_reason = PeekResponse::Error(dep.query_terminated_error());
937                        self.controller
938                            .compute
939                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
940                            .unwrap_or_terminate("unable to cancel peek");
941                        self.retire_execution(
942                            StatementEndedExecutionReason::Canceled,
943                            pending_peek.ctx_extra.defuse(),
944                        );
945                    }
946                }
947            }
948
949            if !copies_to_drop.is_empty() {
950                for conn_id in copies_to_drop {
951                    self.cancel_pending_copy(&conn_id);
952                }
953            }
954
955            if !compute_gids_to_drop.is_empty() {
956                self.drop_compute_collections(compute_gids_to_drop);
957            }
958
959            if !vpc_endpoints_to_drop.is_empty() {
960                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
961            }
962
963            if !cluster_replicas_to_drop.is_empty() {
964                fail::fail_point!("after_catalog_drop_replica");
965
966                for (cluster_id, replica_id) in cluster_replicas_to_drop {
967                    self.drop_replica(cluster_id, replica_id);
968                }
969            }
970            if !clusters_to_drop.is_empty() {
971                for cluster_id in clusters_to_drop {
972                    self.controller.drop_cluster(cluster_id);
973                }
974            }
975
976            // We don't want to block the main coordinator thread on cleaning
977            // up external resources (PostgreSQL replication slots and secrets),
978            // so we perform that cleanup in a background task.
979            //
980            // TODO(14551): This is inherently best effort. An ill-timed crash
981            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
982            // See <https://github.com/MaterializeInc/materialize/issues/14551>
983            task::spawn(|| "drop_replication_slots_and_secrets", {
984                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
985                let caching_secrets_reader = self.caching_secrets_reader.clone();
986                let secrets_controller = Arc::clone(&self.secrets_controller);
987                let secrets_reader = Arc::clone(self.secrets_reader());
988                let storage_config = self.controller.storage.config().clone();
989
990                async move {
991                    for (connection, replication_slot_name) in replication_slots_to_drop {
992                        tracing::info!(?replication_slot_name, "dropping replication slot");
993
994                        // Try to drop the replication slots, but give up after
995                        // a while. The PostgreSQL server may no longer be
996                        // healthy. Users often drop PostgreSQL sources
997                        // *because* the PostgreSQL server has been
998                        // decomissioned.
999                        let result: Result<(), anyhow::Error> = Retry::default()
1000                            .max_duration(Duration::from_secs(60))
1001                            .retry_async(|_state| async {
1002                                let config = connection
1003                                    .config(&secrets_reader, &storage_config, InTask::No)
1004                                    .await
1005                                    .map_err(|e| {
1006                                        anyhow::anyhow!(
1007                                            "error creating Postgres client for \
1008                                            dropping acquired slots: {}",
1009                                            e.display_with_causes()
1010                                        )
1011                                    })?;
1012
1013                                mz_postgres_util::drop_replication_slots(
1014                                    &ssh_tunnel_manager,
1015                                    config.clone(),
1016                                    &[(&replication_slot_name, true)],
1017                                )
1018                                .await?;
1019
1020                                Ok(())
1021                            })
1022                            .await;
1023
1024                        if let Err(err) = result {
1025                            tracing::warn!(
1026                                ?replication_slot_name,
1027                                ?err,
1028                                "failed to drop replication slot"
1029                            );
1030                        }
1031                    }
1032
1033                    // Drop secrets *after* dropping the replication slots,
1034                    // because dropping replication slots may rely on those
1035                    // secrets still being present.
1036                    //
1037                    // It's okay if we crash before processing the secret drops,
1038                    // as we look for and remove any orphaned secrets during
1039                    // startup.
1040                    fail_point!("drop_secrets");
1041                    for secret in secrets_to_drop {
1042                        if let Err(e) = secrets_controller.delete(secret).await {
1043                            warn!("Dropping secrets has encountered an error: {}", e);
1044                        } else {
1045                            caching_secrets_reader.invalidate(secret);
1046                        }
1047                    }
1048                }
1049            });
1050        }
1051        .instrument(info_span!(
1052            "coord::apply_catalog_implications_inner::finalize"
1053        ))
1054        .await;
1055
1056        Ok(())
1057    }
1058
1059    #[instrument(level = "debug")]
1060    async fn create_table_collections(
1061        &mut self,
1062        table_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1063        execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
1064    ) -> Result<(), AdapterError> {
1065        // If we have tables, determine the initial validity for the table.
1066        let write_ts = self.get_local_write_ts().await;
1067        let register_ts = write_ts.timestamp;
1068
1069        // After acquiring `register_ts` but before using it, we need to
1070        // be sure we're still the leader. Otherwise a new generation
1071        // may also be trying to use `register_ts` for a different
1072        // purpose. See materialize#28216.
1073        //
1074        // We also should advance the upper of the catalog shard, to ensure it
1075        // is readable at the oracle read ts after we bump it to the
1076        // `register_ts` below. Both of these needs are served by calling
1077        // `advance_upper`.
1078        self.catalog
1079            .advance_upper(write_ts.advance_to)
1080            .await
1081            .unwrap_or_terminate("unable to advance catalog upper");
1082
1083        for id in execution_timestamps_to_set {
1084            self.set_statement_execution_timestamp(id, register_ts);
1085        }
1086
1087        let storage_metadata = self.catalog.state().storage_metadata();
1088
1089        self.controller
1090            .storage
1091            .create_collections(
1092                storage_metadata,
1093                Some(register_ts),
1094                table_collections_to_create.into_iter().collect_vec(),
1095            )
1096            .await
1097            .unwrap_or_terminate("cannot fail to create collections");
1098
1099        self.apply_local_write(register_ts).await;
1100
1101        Ok(())
1102    }
1103
1104    #[instrument(level = "debug")]
1105    async fn create_source_collections(
1106        &mut self,
1107        source_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1108    ) -> Result<(), AdapterError> {
1109        let storage_metadata = self.catalog.state().storage_metadata();
1110
1111        self.controller
1112            .storage
1113            .create_collections(
1114                storage_metadata,
1115                None, // Sources don't need a write timestamp
1116                source_collections_to_create.into_iter().collect_vec(),
1117            )
1118            .await
1119            .unwrap_or_terminate("cannot fail to create collections");
1120
1121        Ok(())
1122    }
1123
1124    #[instrument(level = "debug")]
1125    async fn initialize_storage_collections(
1126        &mut self,
1127        storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1128    ) -> Result<(), AdapterError> {
1129        for (compaction_window, global_ids) in storage_policies_to_initialize {
1130            self.initialize_read_policies(
1131                &CollectionIdBundle {
1132                    storage_ids: global_ids,
1133                    compute_ids: BTreeMap::new(),
1134                },
1135                compaction_window,
1136            )
1137            .await;
1138        }
1139
1140        Ok(())
1141    }
1142
1143    #[instrument(level = "debug")]
1144    async fn handle_create_table(
1145        &self,
1146        ctx: &Option<&mut ExecuteContext>,
1147        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1148        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1149        execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1150        table_id: CatalogItemId,
1151        table: Table,
1152    ) -> Result<(), AdapterError> {
1153        // The table data_source determines whether this table will be written to
1154        // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1155        // (e.g. a source-fed table).
1156        match &table.data_source {
1157            TableDataSource::TableWrites { defaults: _ } => {
1158                let versions: BTreeMap<_, _> = table
1159                    .collection_descs()
1160                    .map(|(gid, version, desc)| (version, (gid, desc)))
1161                    .collect();
1162                let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1163                    let collection_desc = CollectionDescription::for_table(desc.clone());
1164
1165                    (*gid, collection_desc)
1166                });
1167
1168                let compaction_window = table
1169                    .custom_logical_compaction_window
1170                    .unwrap_or(CompactionWindow::Default);
1171                let ids_to_initialize = storage_policies_to_initialize
1172                    .entry(compaction_window)
1173                    .or_default();
1174
1175                for (gid, collection_desc) in collection_descs {
1176                    storage_collections_to_create.insert(gid, collection_desc);
1177                    ids_to_initialize.insert(gid);
1178                }
1179
1180                if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1181                    execution_timestamps_to_set.insert(id);
1182                }
1183            }
1184            TableDataSource::DataSource {
1185                desc: data_source_desc,
1186                timeline,
1187            } => {
1188                match data_source_desc {
1189                    DataSourceDesc::IngestionExport {
1190                        ingestion_id,
1191                        external_reference: _,
1192                        details,
1193                        data_config,
1194                    } => {
1195                        let global_ingestion_id =
1196                            self.catalog().get_entry(ingestion_id).latest_global_id();
1197
1198                        let collection_desc = CollectionDescription {
1199                            desc: table.desc.latest(),
1200                            data_source: DataSource::IngestionExport {
1201                                ingestion_id: global_ingestion_id,
1202                                details: details.clone(),
1203                                data_config: data_config
1204                                    .clone()
1205                                    .into_inline_connection(self.catalog.state()),
1206                            },
1207                            since: None,
1208                            timeline: Some(timeline.clone()),
1209                            primary: None,
1210                        };
1211
1212                        let global_id = table
1213                            .global_ids()
1214                            .expect_element(|| "subsources cannot have multiple versions");
1215
1216                        storage_collections_to_create.insert(global_id, collection_desc);
1217
1218                        let read_policies = self
1219                            .catalog()
1220                            .state()
1221                            .source_compaction_windows(vec![table_id]);
1222                        for (compaction_window, catalog_ids) in read_policies {
1223                            let compaction_ids = storage_policies_to_initialize
1224                                .entry(compaction_window)
1225                                .or_default();
1226
1227                            let gids = catalog_ids
1228                                .into_iter()
1229                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1230                                .flatten();
1231                            compaction_ids.extend(gids);
1232                        }
1233                    }
1234                    DataSourceDesc::Webhook {
1235                        validate_using: _,
1236                        body_format: _,
1237                        headers: _,
1238                        cluster_id: _,
1239                    } => {
1240                        // Create the underlying collection with the latest schema from the Table.
1241                        assert_eq!(
1242                            table.desc.latest_version(),
1243                            RelationVersion::root(),
1244                            "found webhook with more than 1 relation version, {:?}",
1245                            table.desc
1246                        );
1247                        let desc = table.desc.latest();
1248
1249                        let collection_desc = CollectionDescription {
1250                            desc,
1251                            data_source: DataSource::Webhook,
1252                            since: None,
1253                            timeline: Some(timeline.clone()),
1254                            primary: None,
1255                        };
1256
1257                        let global_id = table
1258                            .global_ids()
1259                            .expect_element(|| "webhooks cannot have multiple versions");
1260
1261                        storage_collections_to_create.insert(global_id, collection_desc);
1262
1263                        let read_policies = self
1264                            .catalog()
1265                            .state()
1266                            .source_compaction_windows(vec![table_id]);
1267
1268                        for (compaction_window, catalog_ids) in read_policies {
1269                            let compaction_ids = storage_policies_to_initialize
1270                                .entry(compaction_window)
1271                                .or_default();
1272
1273                            let gids = catalog_ids
1274                                .into_iter()
1275                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1276                                .flatten();
1277                            compaction_ids.extend(gids);
1278                        }
1279                    }
1280                    _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1281                }
1282            }
1283        }
1284
1285        Ok(())
1286    }
1287
1288    #[instrument(level = "debug")]
1289    async fn handle_alter_table(
1290        &mut self,
1291        catalog_id: CatalogItemId,
1292        prev_table: Table,
1293        new_table: Table,
1294    ) -> Result<(), AdapterError> {
1295        let existing_gid = prev_table.global_id_writes();
1296        let new_gid = new_table.global_id_writes();
1297
1298        if existing_gid == new_gid {
1299            // It's not an ALTER TABLE ADD COLUMN, because we still have the
1300            // same GlobalId. It might be a compaction window change.
1301            if prev_table.custom_logical_compaction_window
1302                != new_table.custom_logical_compaction_window
1303            {
1304                let new_window = new_table
1305                    .custom_logical_compaction_window
1306                    .unwrap_or(CompactionWindow::Default);
1307                self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1308            }
1309            return Ok(());
1310        }
1311
1312        // Acquire a read hold on the original table for the duration of
1313        // the alter to prevent the since of the original table from
1314        // getting advanced, while the ALTER is running.
1315        let existing_table = crate::CollectionIdBundle {
1316            storage_ids: BTreeSet::from([existing_gid]),
1317            compute_ids: BTreeMap::new(),
1318        };
1319        let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1320
1321        let expected_version = prev_table.desc.latest_version();
1322        let new_version = new_table.desc.latest_version();
1323        let new_desc = new_table
1324            .desc
1325            .at_version(RelationVersionSelector::Specific(new_version));
1326
1327        let write_ts = self.get_local_write_ts().await;
1328        let register_ts = write_ts.timestamp;
1329
1330        // Ensure the catalog will be immediately readable at the read ts we're
1331        // about to bump.
1332        self.catalog
1333            .advance_upper(write_ts.advance_to)
1334            .await
1335            .unwrap_or_terminate("unable to advance catalog upper");
1336
1337        // Alter the table description, creating a "new" collection.
1338        self.controller
1339            .storage
1340            .alter_table_desc(
1341                existing_gid,
1342                new_gid,
1343                new_desc,
1344                expected_version,
1345                register_ts,
1346            )
1347            .await
1348            .expect("failed to alter desc of table");
1349
1350        // Initialize the ReadPolicy which ensures we have the correct read holds.
1351        let compaction_window = new_table
1352            .custom_logical_compaction_window
1353            .unwrap_or(CompactionWindow::Default);
1354        self.initialize_read_policies(
1355            &crate::CollectionIdBundle {
1356                storage_ids: BTreeSet::from([new_gid]),
1357                compute_ids: BTreeMap::new(),
1358            },
1359            compaction_window,
1360        )
1361        .await;
1362
1363        self.apply_local_write(register_ts).await;
1364
1365        // Alter is complete! We can drop our read hold.
1366        drop(existing_table_read_hold);
1367
1368        Ok(())
1369    }
1370
1371    #[instrument(level = "debug")]
1372    async fn handle_create_source(
1373        &self,
1374        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1375        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1376        item_id: CatalogItemId,
1377        source: Source,
1378        compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1379    ) -> Result<(), AdapterError> {
1380        let data_source = match source.data_source {
1381            DataSourceDesc::Ingestion { desc, cluster_id } => {
1382                let desc = desc.into_inline_connection(self.catalog().state());
1383                let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1384
1385                let ingestion = mz_storage_types::sources::IngestionDescription::new(
1386                    desc,
1387                    cluster_id,
1388                    item_global_id,
1389                );
1390
1391                DataSource::Ingestion(ingestion)
1392            }
1393            DataSourceDesc::OldSyntaxIngestion {
1394                desc,
1395                progress_subsource,
1396                data_config,
1397                details,
1398                cluster_id,
1399            } => {
1400                let desc = desc.into_inline_connection(self.catalog().state());
1401                let data_config = data_config.into_inline_connection(self.catalog().state());
1402
1403                // TODO(parkmycar): We should probably check the type here, but I'm not
1404                // sure if this will always be a Source or a Table.
1405                let progress_subsource = self
1406                    .catalog()
1407                    .get_entry(&progress_subsource)
1408                    .latest_global_id();
1409
1410                let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1411                    desc,
1412                    cluster_id,
1413                    progress_subsource,
1414                );
1415
1416                let legacy_export = SourceExport {
1417                    storage_metadata: (),
1418                    data_config,
1419                    details,
1420                };
1421
1422                ingestion
1423                    .source_exports
1424                    .insert(source.global_id, legacy_export);
1425
1426                DataSource::Ingestion(ingestion)
1427            }
1428            DataSourceDesc::IngestionExport {
1429                ingestion_id,
1430                external_reference: _,
1431                details,
1432                data_config,
1433            } => {
1434                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1435                // this will always be a Source or a Table.
1436                let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1437
1438                DataSource::IngestionExport {
1439                    ingestion_id,
1440                    details,
1441                    data_config: data_config.into_inline_connection(self.catalog().state()),
1442                }
1443            }
1444            DataSourceDesc::Progress => DataSource::Progress,
1445            DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1446            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1447                unreachable!("cannot create sources with internal data sources")
1448            }
1449        };
1450
1451        storage_collections_to_create.insert(
1452            source.global_id,
1453            CollectionDescription {
1454                desc: source.desc.clone(),
1455                data_source,
1456                timeline: Some(source.timeline),
1457                since: None,
1458                primary: None,
1459            },
1460        );
1461
1462        // Initialize read policies for the source
1463        for (compaction_window, catalog_ids) in compaction_windows {
1464            let compaction_ids = storage_policies_to_initialize
1465                .entry(compaction_window)
1466                .or_default();
1467
1468            let gids = catalog_ids
1469                .into_iter()
1470                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1471                .flatten();
1472            compaction_ids.extend(gids);
1473        }
1474
1475        Ok(())
1476    }
1477
1478    /// Handles altering a connection by collecting all the dependent sources,
1479    /// sinks, and tables that need their connection updated.
1480    ///
1481    /// This mirrors the logic from `sequence_alter_connection_stage_finish` but
1482    /// collects the changes into batched collections for application after all
1483    /// implications are processed.
1484    #[instrument(level = "debug")]
1485    fn handle_alter_connection(
1486        &self,
1487        connection_id: CatalogItemId,
1488        connection: Connection,
1489        vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1490        source_connections_to_alter: &mut BTreeMap<
1491            GlobalId,
1492            GenericSourceConnection<InlinedConnection>,
1493        >,
1494        sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1495        source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1496    ) {
1497        use std::collections::VecDeque;
1498
1499        // Handle AWS PrivateLink connections by queueing VPC endpoint creation.
1500        if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1501            let spec = VpcEndpointConfig {
1502                aws_service_name: privatelink.service_name.to_owned(),
1503                availability_zone_ids: privatelink.availability_zones.to_owned(),
1504            };
1505            vpc_endpoints_to_create.push((connection_id, spec));
1506        }
1507
1508        // Walk the dependency graph to find all sources, sinks, and tables
1509        // that depend on this connection (directly or transitively through
1510        // other connections).
1511        let mut connections_to_process = VecDeque::new();
1512        connections_to_process.push_front(connection_id.clone());
1513
1514        while let Some(id) = connections_to_process.pop_front() {
1515            for dependent_id in self.catalog().get_entry(&id).used_by() {
1516                let dependent_entry = self.catalog().get_entry(dependent_id);
1517                match dependent_entry.item() {
1518                    CatalogItem::Connection(_) => {
1519                        // Connections can depend on other connections (e.g., a
1520                        // Kafka connection using an SSH tunnel connection).
1521                        // Process these transitively.
1522                        connections_to_process.push_back(*dependent_id);
1523                    }
1524                    CatalogItem::Source(source) => {
1525                        let desc = match &dependent_entry
1526                            .source()
1527                            .expect("known to be source")
1528                            .data_source
1529                        {
1530                            DataSourceDesc::Ingestion { desc, .. }
1531                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1532                                desc.clone().into_inline_connection(self.catalog().state())
1533                            }
1534                            DataSourceDesc::IngestionExport { .. }
1535                            | DataSourceDesc::Introspection(_)
1536                            | DataSourceDesc::Progress
1537                            | DataSourceDesc::Webhook { .. }
1538                            | DataSourceDesc::Catalog => {
1539                                // Only ingestions reference connections directly.
1540                                continue;
1541                            }
1542                        };
1543
1544                        source_connections_to_alter.insert(source.global_id, desc.connection);
1545                    }
1546                    CatalogItem::Sink(sink) => {
1547                        let export = dependent_entry.sink().expect("known to be sink");
1548                        sink_connections_to_alter.insert(
1549                            sink.global_id,
1550                            export
1551                                .connection
1552                                .clone()
1553                                .into_inline_connection(self.catalog().state()),
1554                        );
1555                    }
1556                    CatalogItem::Table(table) => {
1557                        // This is a source-fed table that references a schema
1558                        // registry connection as part of its encoding/data
1559                        // config.
1560                        if let Some((_, _, _, export_data_config)) =
1561                            dependent_entry.source_export_details()
1562                        {
1563                            let data_config = export_data_config.clone();
1564                            source_export_data_configs_to_alter.insert(
1565                                table.global_id_writes(),
1566                                data_config.into_inline_connection(self.catalog().state()),
1567                            );
1568                        }
1569                    }
1570                    CatalogItem::Log(_)
1571                    | CatalogItem::View(_)
1572                    | CatalogItem::MaterializedView(_)
1573                    | CatalogItem::Index(_)
1574                    | CatalogItem::Type(_)
1575                    | CatalogItem::Func(_)
1576                    | CatalogItem::Secret(_) => {
1577                        // Other item types don't have connection dependencies
1578                        // that need updating.
1579                    }
1580                }
1581            }
1582        }
1583    }
1584
1585    async fn handle_create_cluster_replica(
1586        &mut self,
1587        cluster_id: ClusterId,
1588        replica_id: ReplicaId,
1589        role: ClusterRole,
1590        cluster_name: String,
1591        replica_name: String,
1592        replica_config: ReplicaConfig,
1593    ) {
1594        let enable_worker_core_affinity =
1595            self.catalog().system_config().enable_worker_core_affinity();
1596        let enable_storage_introspection_logs = self
1597            .catalog()
1598            .system_config()
1599            .enable_storage_introspection_logs();
1600
1601        // This replica's scoped (replica-local) overrides were pushed into the
1602        // controller's per-replica layer before this loop, by the
1603        // replica-scoped-configuration implication, so the replica's first
1604        // configuration replays with them. Render-frozen flags make a later push
1605        // too late, which is why the push precedes `create_replica`.
1606
1607        self.controller
1608            .create_replica(
1609                cluster_id,
1610                replica_id,
1611                cluster_name,
1612                replica_name,
1613                role,
1614                replica_config,
1615                enable_worker_core_affinity,
1616                enable_storage_introspection_logs,
1617            )
1618            .expect("creating replicas must not fail");
1619
1620        self.install_introspection_subscribes(cluster_id, replica_id)
1621            .await;
1622    }
1623}
1624
1625/// A state machine for building catalog implications from catalog updates.
1626///
1627/// Once all [ParsedStateUpdate] of a timestamp are ingested this is a command
1628/// that has to potentially be applied to in-memory state and/or the
1629/// controller(s).
1630#[derive(Debug, Clone)]
1631enum CatalogImplication {
1632    None,
1633    Table(CatalogImplicationKind<Table>),
1634    Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1635    Sink(CatalogImplicationKind<Sink>),
1636    Index(CatalogImplicationKind<Index>),
1637    MaterializedView(CatalogImplicationKind<MaterializedView>),
1638    View(CatalogImplicationKind<View>),
1639    Secret(CatalogImplicationKind<Secret>),
1640    Connection(CatalogImplicationKind<Connection>),
1641    Cluster(CatalogImplicationKind<Cluster>),
1642    ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1643}
1644
1645#[derive(Debug, Clone)]
1646enum CatalogImplicationKind<T> {
1647    /// No operations seen yet.
1648    None,
1649    /// Item was added.
1650    Added(T),
1651    /// Item was dropped (with its name retained for error messages).
1652    Dropped(T, String),
1653    /// Item is being altered from one state to another.
1654    Altered { prev: T, new: T },
1655}
1656
1657impl<T: Clone> CatalogImplicationKind<T> {
1658    /// Apply a state transition based on a diff. Returns an error message if
1659    /// the transition is invalid.
1660    fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1661        use CatalogImplicationKind::*;
1662        use StateDiff::*;
1663
1664        let new_state = match (&*self, diff) {
1665            // Initial state transitions
1666            (None, Addition) => Added(item),
1667            (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1668
1669            // From Added state
1670            (Added(existing), Retraction) => {
1671                // Add -> Drop means the item is being altered
1672                Altered {
1673                    prev: item,
1674                    new: existing.clone(),
1675                }
1676            }
1677            (Added(_), Addition) => {
1678                return Err("Cannot add an already added object".to_string());
1679            }
1680
1681            // From Dropped state
1682            (Dropped(existing, _), Addition) => {
1683                // Drop -> Add means the item is being altered
1684                Altered {
1685                    prev: existing.clone(),
1686                    new: item,
1687                }
1688            }
1689            (Dropped(_, _), Retraction) => {
1690                return Err("Cannot drop an already dropped object".to_string());
1691            }
1692
1693            // From Altered state
1694            (Altered { .. }, _) => {
1695                return Err(format!(
1696                    "Cannot apply {:?} to an object in Altered state",
1697                    diff
1698                ));
1699            }
1700        };
1701
1702        *self = new_state;
1703        Ok(())
1704    }
1705}
1706
1707/// Macro to generate absorb methods for each item type.
1708macro_rules! impl_absorb_method {
1709    (
1710        $method_name:ident,
1711        $variant:ident,
1712        $item_type:ty
1713    ) => {
1714        fn $method_name(
1715            &mut self,
1716            item: $item_type,
1717            parsed_full_name: Option<String>,
1718            diff: StateDiff,
1719        ) {
1720            let state = match self {
1721                CatalogImplication::$variant(state) => state,
1722                CatalogImplication::None => {
1723                    *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1724                    match self {
1725                        CatalogImplication::$variant(state) => state,
1726                        _ => unreachable!(),
1727                    }
1728                }
1729                _ => {
1730                    panic!(
1731                        "Unexpected command type for {:?}: {} {:?}",
1732                        self,
1733                        stringify!($variant),
1734                        diff,
1735                    );
1736                }
1737            };
1738
1739            if let Err(e) = state.transition(item, parsed_full_name, diff) {
1740                panic!(
1741                    "Invalid state transition for {}: {}",
1742                    stringify!($variant),
1743                    e
1744                );
1745            }
1746        }
1747    };
1748}
1749
1750impl CatalogImplication {
1751    /// Absorbs the given catalog update into this [CatalogImplication], causing
1752    /// a state transition or error.
1753    fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1754        match catalog_update.kind {
1755            ParsedStateUpdateKind::Item {
1756                durable_item: _,
1757                parsed_item,
1758                connection,
1759                parsed_full_name,
1760            } => match parsed_item {
1761                CatalogItem::Table(table) => {
1762                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1763                }
1764                CatalogItem::Source(source) => {
1765                    self.absorb_source(
1766                        (source, connection),
1767                        Some(parsed_full_name),
1768                        catalog_update.diff,
1769                    );
1770                }
1771                CatalogItem::Sink(sink) => {
1772                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1773                }
1774                CatalogItem::Index(index) => {
1775                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1776                }
1777                CatalogItem::MaterializedView(mv) => {
1778                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1779                }
1780                CatalogItem::View(view) => {
1781                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1782                }
1783
1784                CatalogItem::Secret(secret) => {
1785                    self.absorb_secret(secret, None, catalog_update.diff);
1786                }
1787                CatalogItem::Connection(connection) => {
1788                    self.absorb_connection(connection, None, catalog_update.diff);
1789                }
1790                CatalogItem::Log(_) => {}
1791                CatalogItem::Type(_) => {}
1792                CatalogItem::Func(_) => {}
1793            },
1794            ParsedStateUpdateKind::TemporaryItem {
1795                durable_item: _,
1796                parsed_item,
1797                connection,
1798                parsed_full_name,
1799            } => match parsed_item {
1800                CatalogItem::Table(table) => {
1801                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1802                }
1803                CatalogItem::Source(source) => {
1804                    self.absorb_source(
1805                        (source, connection),
1806                        Some(parsed_full_name),
1807                        catalog_update.diff,
1808                    );
1809                }
1810                CatalogItem::Sink(sink) => {
1811                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1812                }
1813                CatalogItem::Index(index) => {
1814                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1815                }
1816                CatalogItem::MaterializedView(mv) => {
1817                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1818                }
1819                CatalogItem::View(view) => {
1820                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1821                }
1822
1823                CatalogItem::Secret(secret) => {
1824                    self.absorb_secret(secret, None, catalog_update.diff);
1825                }
1826                CatalogItem::Connection(connection) => {
1827                    self.absorb_connection(connection, None, catalog_update.diff);
1828                }
1829                CatalogItem::Log(_) => {}
1830                CatalogItem::Type(_) => {}
1831                CatalogItem::Func(_) => {}
1832            },
1833            ParsedStateUpdateKind::Cluster {
1834                durable_cluster: _,
1835                parsed_cluster,
1836            } => {
1837                let name = parsed_cluster.name.clone();
1838                self.absorb_cluster(parsed_cluster, Some(name), catalog_update.diff);
1839            }
1840            ParsedStateUpdateKind::ClusterReplica {
1841                durable_cluster_replica: _,
1842                parsed_cluster_replica,
1843            } => {
1844                let name = parsed_cluster_replica.name.clone();
1845                self.absorb_cluster_replica(
1846                    parsed_cluster_replica,
1847                    Some(name),
1848                    catalog_update.diff,
1849                );
1850            }
1851            ParsedStateUpdateKind::IntrospectionSourceIndex { .. } => {
1852                // IntrospectionSourceIndex updates are collected
1853                // separately in apply_catalog_implications and not
1854                // routed through absorb.
1855                unreachable!("IntrospectionSourceIndex should not be passed to absorb");
1856            }
1857            ParsedStateUpdateKind::ReplicaSystemConfiguration { .. } => {
1858                // ReplicaSystemConfiguration updates are collected separately in
1859                // apply_catalog_implications and not routed through absorb.
1860                unreachable!("ReplicaSystemConfiguration should not be passed to absorb");
1861            }
1862        }
1863    }
1864
1865    impl_absorb_method!(absorb_table, Table, Table);
1866    impl_absorb_method!(
1867        absorb_source,
1868        Source,
1869        (Source, Option<GenericSourceConnection>)
1870    );
1871    impl_absorb_method!(absorb_sink, Sink, Sink);
1872    impl_absorb_method!(absorb_index, Index, Index);
1873    impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1874    impl_absorb_method!(absorb_view, View, View);
1875
1876    impl_absorb_method!(absorb_secret, Secret, Secret);
1877    impl_absorb_method!(absorb_connection, Connection, Connection);
1878
1879    impl_absorb_method!(absorb_cluster, Cluster, Cluster);
1880    impl_absorb_method!(absorb_cluster_replica, ClusterReplica, ClusterReplica);
1881}
1882
1883#[cfg(test)]
1884mod tests {
1885    use super::*;
1886    use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1887    use mz_sql::names::ResolvedIds;
1888    use std::collections::BTreeMap;
1889
1890    fn create_test_table(name: &str) -> Table {
1891        Table {
1892            desc: VersionedRelationDesc::new(
1893                RelationDesc::builder()
1894                    .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1895                    .finish(),
1896            ),
1897            create_sql: None,
1898            collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1899            conn_id: None,
1900            resolved_ids: ResolvedIds::empty(),
1901            custom_logical_compaction_window: None,
1902            is_retained_metrics_object: false,
1903            data_source: TableDataSource::TableWrites { defaults: vec![] },
1904        }
1905    }
1906
1907    #[mz_ore::test]
1908    fn test_item_state_transitions() {
1909        // Test None -> Added
1910        let mut state = CatalogImplicationKind::None;
1911        assert!(
1912            state
1913                .transition("item1".to_string(), None, StateDiff::Addition)
1914                .is_ok()
1915        );
1916        assert!(matches!(state, CatalogImplicationKind::Added(_)));
1917
1918        // Test Added -> Altered (via retraction)
1919        let mut state = CatalogImplicationKind::Added("new_item".to_string());
1920        assert!(
1921            state
1922                .transition("old_item".to_string(), None, StateDiff::Retraction)
1923                .is_ok()
1924        );
1925        match &state {
1926            CatalogImplicationKind::Altered { prev, new } => {
1927                // The retracted item is the OLD state
1928                assert_eq!(prev, "old_item");
1929                // The existing Added item is the NEW state
1930                assert_eq!(new, "new_item");
1931            }
1932            _ => panic!("Expected Altered state"),
1933        }
1934
1935        // Test None -> Dropped
1936        let mut state = CatalogImplicationKind::None;
1937        assert!(
1938            state
1939                .transition(
1940                    "item1".to_string(),
1941                    Some("test_name".to_string()),
1942                    StateDiff::Retraction
1943                )
1944                .is_ok()
1945        );
1946        assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1947
1948        // Test Dropped -> Altered (via addition)
1949        let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1950        assert!(
1951            state
1952                .transition("new_item".to_string(), None, StateDiff::Addition)
1953                .is_ok()
1954        );
1955        match &state {
1956            CatalogImplicationKind::Altered { prev, new } => {
1957                // The existing Dropped item is the OLD state
1958                assert_eq!(prev, "old_item");
1959                // The added item is the NEW state
1960                assert_eq!(new, "new_item");
1961            }
1962            _ => panic!("Expected Altered state"),
1963        }
1964
1965        // Test invalid transitions
1966        let mut state = CatalogImplicationKind::Added("item".to_string());
1967        assert!(
1968            state
1969                .transition("item2".to_string(), None, StateDiff::Addition)
1970                .is_err()
1971        );
1972
1973        let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1974        assert!(
1975            state
1976                .transition("item2".to_string(), None, StateDiff::Retraction)
1977                .is_err()
1978        );
1979    }
1980
1981    #[mz_ore::test]
1982    fn test_table_absorb_state_machine() {
1983        let table1 = create_test_table("table1");
1984        let table2 = create_test_table("table2");
1985
1986        // Test None -> AddTable
1987        let mut cmd = CatalogImplication::None;
1988        cmd.absorb_table(
1989            table1.clone(),
1990            Some("schema.table1".to_string()),
1991            StateDiff::Addition,
1992        );
1993        // Check that we have an Added state
1994        match &cmd {
1995            CatalogImplication::Table(state) => match state {
1996                CatalogImplicationKind::Added(t) => {
1997                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1998                }
1999                _ => panic!("Expected Added state"),
2000            },
2001            _ => panic!("Expected Table command"),
2002        }
2003
2004        // Test AddTable -> AlterTable (via retraction)
2005        // This tests the bug fix: when we have AddTable(table1) and receive Retraction(table2),
2006        // table2 is the old state being removed, table1 is the new state
2007        cmd.absorb_table(
2008            table2.clone(),
2009            Some("schema.table2".to_string()),
2010            StateDiff::Retraction,
2011        );
2012        match &cmd {
2013            CatalogImplication::Table(state) => match state {
2014                CatalogImplicationKind::Altered { prev, new } => {
2015                    // Verify the fix: prev should be the retracted table, new should be the added table
2016                    assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
2017                    assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
2018                }
2019                _ => panic!("Expected Altered state"),
2020            },
2021            _ => panic!("Expected Table command"),
2022        }
2023
2024        // Test None -> DropTable
2025        let mut cmd = CatalogImplication::None;
2026        cmd.absorb_table(
2027            table1.clone(),
2028            Some("schema.table1".to_string()),
2029            StateDiff::Retraction,
2030        );
2031        match &cmd {
2032            CatalogImplication::Table(state) => match state {
2033                CatalogImplicationKind::Dropped(t, name) => {
2034                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
2035                    assert_eq!(name, "schema.table1");
2036                }
2037                _ => panic!("Expected Dropped state"),
2038            },
2039            _ => panic!("Expected Table command"),
2040        }
2041
2042        // Test DropTable -> AlterTable (via addition)
2043        cmd.absorb_table(
2044            table2.clone(),
2045            Some("schema.table2".to_string()),
2046            StateDiff::Addition,
2047        );
2048        match &cmd {
2049            CatalogImplication::Table(state) => match state {
2050                CatalogImplicationKind::Altered { prev, new } => {
2051                    // prev should be the dropped table, new should be the added table
2052                    assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
2053                    assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
2054                }
2055                _ => panic!("Expected Altered state"),
2056            },
2057            _ => panic!("Expected Table command"),
2058        }
2059    }
2060
2061    #[mz_ore::test]
2062    #[should_panic(expected = "Cannot add an already added object")]
2063    fn test_invalid_double_add() {
2064        let table = create_test_table("table");
2065        let mut cmd = CatalogImplication::None;
2066
2067        // First addition
2068        cmd.absorb_table(
2069            table.clone(),
2070            Some("schema.table".to_string()),
2071            StateDiff::Addition,
2072        );
2073
2074        // Second addition should panic
2075        cmd.absorb_table(
2076            table.clone(),
2077            Some("schema.table".to_string()),
2078            StateDiff::Addition,
2079        );
2080    }
2081
2082    #[mz_ore::test]
2083    #[should_panic(expected = "Cannot drop an already dropped object")]
2084    fn test_invalid_double_drop() {
2085        let table = create_test_table("table");
2086        let mut cmd = CatalogImplication::None;
2087
2088        // First drop
2089        cmd.absorb_table(
2090            table.clone(),
2091            Some("schema.table".to_string()),
2092            StateDiff::Retraction,
2093        );
2094
2095        // Second drop should panic
2096        cmd.absorb_table(
2097            table.clone(),
2098            Some("schema.table".to_string()),
2099            StateDiff::Retraction,
2100        );
2101    }
2102}