Skip to main content

mz_adapter/coord/
catalog_implications.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to deriving and applying implications from [catalog
11//! changes](ParsedStateUpdate).
12//!
13//! The flow from "raw" catalog changes to [CatalogImplication] works like this:
14//!
15//! StateUpdateKind -> ParsedStateUpdate -> CatalogImplication
16//!
17//! [ParsedStateUpdate] adds context to a "raw" catalog change
18//! ([StateUpdateKind](mz_catalog::memory::objects::StateUpdateKind)). It
19//! includes an in-memory representation of the updated object, which can in
20//! theory be derived from the raw change but only when we have access to all
21//! the other raw changes or to an in-memory Catalog, which represents a
22//! "rollup" of all the raw changes.
23//!
24//! [CatalogImplication] is both the state machine that we use for absorbing
25//! multiple state updates for the same object and the final command that has to
26//! be applied to in-memory state and or the controller(s) after absorbing all
27//! the state updates in a given batch of updates.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37    CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38    MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::logging::LogVariant;
42use mz_compute_client::protocol::response::PeekResponse;
43use mz_controller::clusters::{ClusterRole, ReplicaConfig};
44use mz_controller_types::{ClusterId, ReplicaId};
45use mz_ore::collections::CollectionExt;
46use mz_ore::error::ErrorExt;
47use mz_ore::future::InTask;
48use mz_ore::instrument;
49use mz_ore::retry::Retry;
50use mz_ore::str::StrExt;
51use mz_ore::task;
52use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
53use mz_sql::plan::ConnectionDetails;
54use mz_storage_client::controller::{CollectionDescription, DataSource};
55use mz_storage_types::connections::PostgresConnection;
56use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
57use mz_storage_types::sinks::StorageSinkConnection;
58use mz_storage_types::sources::{
59    GenericSourceConnection, SourceDesc, SourceExport, SourceExportDataConfig,
60};
61use tracing::{Instrument, info_span, warn};
62
63use crate::active_compute_sink::ActiveComputeSinkRetireReason;
64use crate::coord::Coordinator;
65use crate::coord::catalog_implications::parsed_state_updates::{
66    ParsedStateUpdate, ParsedStateUpdateKind,
67};
68use crate::coord::timeline::TimelineState;
69use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
70use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
71
72pub mod parsed_state_updates;
73
74impl Coordinator {
75    /// Applies implications from the given bucket of [ParsedStateUpdate] to our
76    /// in-memory state and our controllers. This also applies transitive
77    /// implications, for example, peeks and subscribes will be cancelled when
78    /// referenced objects are dropped.
79    ///
80    /// This _requires_ that the given updates are consolidated. There must be
81    /// at most one addition and/or one retraction for a given item, as
82    /// identified by that items ID type.
83    #[instrument(level = "debug")]
84    pub async fn apply_catalog_implications(
85        &mut self,
86        ctx: Option<&mut ExecuteContext>,
87        catalog_updates: Vec<ParsedStateUpdate>,
88    ) -> Result<(), AdapterError> {
89        let start = Instant::now();
90
91        let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
92        let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
93        let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
94            BTreeMap::new();
95        // Introspection source index additions, collected separately and
96        // merged into the AddCluster handler. Not routed through the
97        // absorb machinery since they are simple additions that don't
98        // need Altered support.
99        let mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>> =
100            BTreeMap::new();
101
102        for update in catalog_updates {
103            tracing::trace!(?update, "got parsed state update");
104            match &update.kind {
105                ParsedStateUpdateKind::Item {
106                    durable_item,
107                    parsed_item: _,
108                    connection: _,
109                    parsed_full_name: _,
110                } => {
111                    let entry = catalog_implications
112                        .entry(durable_item.id.clone())
113                        .or_insert_with(|| CatalogImplication::None);
114                    entry.absorb(update);
115                }
116                ParsedStateUpdateKind::TemporaryItem {
117                    durable_item,
118                    parsed_item: _,
119                    connection: _,
120                    parsed_full_name: _,
121                } => {
122                    let entry = catalog_implications
123                        .entry(durable_item.id.clone())
124                        .or_insert_with(|| CatalogImplication::None);
125                    entry.absorb(update);
126                }
127                ParsedStateUpdateKind::Cluster {
128                    durable_cluster,
129                    parsed_cluster: _,
130                } => {
131                    let entry = cluster_commands
132                        .entry(durable_cluster.id)
133                        .or_insert_with(|| CatalogImplication::None);
134                    entry.absorb(update.clone());
135                }
136                ParsedStateUpdateKind::ClusterReplica {
137                    durable_cluster_replica,
138                    parsed_cluster_replica: _,
139                } => {
140                    let entry = cluster_replica_commands
141                        .entry((
142                            durable_cluster_replica.cluster_id,
143                            durable_cluster_replica.replica_id,
144                        ))
145                        .or_insert_with(|| CatalogImplication::None);
146                    entry.absorb(update.clone());
147                }
148                ParsedStateUpdateKind::IntrospectionSourceIndex {
149                    cluster_id,
150                    log,
151                    index_id,
152                } => {
153                    if update.diff == StateDiff::Addition {
154                        introspection_source_indexes
155                            .entry(*cluster_id)
156                            .or_default()
157                            .insert(log.clone(), *index_id);
158                    }
159                    // Retractions don't need handling: introspection
160                    // source indexes are dropped with their cluster.
161                }
162            }
163        }
164
165        self.apply_catalog_implications_inner(
166            ctx,
167            catalog_implications.into_iter().collect_vec(),
168            cluster_commands.into_iter().collect_vec(),
169            cluster_replica_commands.into_iter().collect_vec(),
170            introspection_source_indexes,
171        )
172        .await?;
173
174        self.metrics
175            .apply_catalog_implications_seconds
176            .observe(start.elapsed().as_secs_f64());
177
178        Ok(())
179    }
180
181    #[instrument(level = "debug")]
182    async fn apply_catalog_implications_inner(
183        &mut self,
184        ctx: Option<&mut ExecuteContext>,
185        implications: Vec<(CatalogItemId, CatalogImplication)>,
186        cluster_commands: Vec<(ClusterId, CatalogImplication)>,
187        cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
188        mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>>,
189    ) -> Result<(), AdapterError> {
190        let mut tables_to_drop = BTreeSet::new();
191        let mut sources_to_drop = vec![];
192        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
193        let mut storage_sink_gids_to_drop = vec![];
194        let mut indexes_to_drop = vec![];
195        let mut compute_sinks_to_drop = vec![];
196        let mut view_gids_to_drop = vec![];
197        let mut secrets_to_drop = vec![];
198        let mut vpc_endpoints_to_drop = vec![];
199        let mut clusters_to_drop = vec![];
200        let mut cluster_replicas_to_drop = vec![];
201        let mut active_compute_sinks_to_drop = BTreeMap::new();
202        let mut peeks_to_drop = vec![];
203        let mut copies_to_drop = vec![];
204
205        // Maps for storing names of dropped objects for error messages.
206        let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
207        let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
208
209        // Separate collections for tables (which need write timestamps) and
210        // sources (which don't).
211        let mut table_collections_to_create = BTreeMap::new();
212        let mut source_collections_to_create = BTreeMap::new();
213        let mut storage_policies_to_initialize = BTreeMap::new();
214        let mut execution_timestamps_to_set = BTreeSet::new();
215        let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
216
217        // Sources that shouldn't be dropped, even if we saw a `Dropped` event.
218        // Used for correct handling of ALTER MV.
219        let mut source_gids_to_keep = BTreeSet::new();
220
221        // Collections for batching connection-related alterations.
222        let mut source_connections_to_alter: BTreeMap<
223            GlobalId,
224            GenericSourceConnection<InlinedConnection>,
225        > = BTreeMap::new();
226        let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
227            BTreeMap::new();
228        let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
229            BTreeMap::new();
230        let mut source_descs_to_alter: BTreeMap<GlobalId, SourceDesc> = BTreeMap::new();
231
232        // We're incrementally migrating the code that manipulates the
233        // controller from closures in the sequencer. For some types of catalog
234        // changes we haven't done this migration yet, so there you will see
235        // just a log message. Over the next couple of PRs all of these will go
236        // away.
237
238        for (catalog_id, implication) in implications {
239            tracing::trace!(?implication, "have to apply catalog implication");
240
241            match implication {
242                CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
243                    self.handle_create_table(
244                        &ctx,
245                        &mut table_collections_to_create,
246                        &mut storage_policies_to_initialize,
247                        &mut execution_timestamps_to_set,
248                        catalog_id,
249                        table.clone(),
250                    )
251                    .await?
252                }
253                CatalogImplication::Table(CatalogImplicationKind::Altered {
254                    prev: prev_table,
255                    new: new_table,
256                }) => {
257                    self.handle_alter_table(catalog_id, prev_table, new_table)
258                        .await?
259                }
260
261                CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
262                    let global_ids = table.global_ids();
263                    for global_id in global_ids {
264                        tables_to_drop.insert((catalog_id, global_id));
265                        dropped_item_names.insert(global_id, full_name.clone());
266                    }
267                }
268                CatalogImplication::Source(CatalogImplicationKind::Added((
269                    source,
270                    _connection,
271                ))) => {
272                    // Get the compaction windows for all sources with this
273                    // catalog_id This replicates the logic from
274                    // sequence_create_source where it collects all item_ids and
275                    // gets their compaction windows
276                    let compaction_windows = self
277                        .catalog()
278                        .state()
279                        .source_compaction_windows(vec![catalog_id]);
280
281                    self.handle_create_source(
282                        &mut source_collections_to_create,
283                        &mut storage_policies_to_initialize,
284                        catalog_id,
285                        source,
286                        compaction_windows,
287                    )
288                    .await?
289                }
290                CatalogImplication::Source(CatalogImplicationKind::Altered {
291                    prev: (prev_source, _prev_connection),
292                    new: (new_source, new_connection),
293                }) => {
294                    if prev_source.custom_logical_compaction_window
295                        != new_source.custom_logical_compaction_window
296                    {
297                        let new_window = new_source
298                            .custom_logical_compaction_window
299                            .unwrap_or(CompactionWindow::Default);
300                        self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
301                    }
302                    match (&prev_source.data_source, &new_source.data_source) {
303                        (
304                            DataSourceDesc::Ingestion {
305                                desc: prev_desc, ..
306                            }
307                            | DataSourceDesc::OldSyntaxIngestion {
308                                desc: prev_desc, ..
309                            },
310                            DataSourceDesc::Ingestion { desc: new_desc, .. }
311                            | DataSourceDesc::OldSyntaxIngestion { desc: new_desc, .. },
312                        ) => {
313                            if prev_desc != new_desc {
314                                let inlined_connection = new_connection
315                                    .expect("ingestion source should have inlined connection");
316                                let inlined_desc = SourceDesc {
317                                    connection: inlined_connection,
318                                    timestamp_interval: new_desc.timestamp_interval,
319                                };
320                                source_descs_to_alter.insert(new_source.global_id, inlined_desc);
321                            }
322                        }
323                        _ => {}
324                    }
325                }
326                CatalogImplication::Source(CatalogImplicationKind::Dropped(
327                    (source, connection),
328                    full_name,
329                )) => {
330                    let global_id = source.global_id();
331                    sources_to_drop.push((catalog_id, global_id));
332                    dropped_item_names.insert(global_id, full_name);
333
334                    if let DataSourceDesc::Ingestion { desc, .. }
335                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
336                    {
337                        match &desc.connection {
338                            GenericSourceConnection::Postgres(_referenced_conn) => {
339                                let inline_conn = connection.expect("missing inlined connection");
340
341                                let pg_conn = match inline_conn {
342                                    GenericSourceConnection::Postgres(pg_conn) => pg_conn,
343                                    other => {
344                                        panic!("expected postgres connection, got: {:?}", other)
345                                    }
346                                };
347                                let pending_drop = (
348                                    pg_conn.connection.clone(),
349                                    pg_conn.publication_details.slot.clone(),
350                                );
351                                replication_slots_to_drop.push(pending_drop);
352                            }
353                            _ => {}
354                        }
355                    }
356                }
357                CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
358                    tracing::debug!(?sink, "not handling AddSink in here yet");
359                }
360                CatalogImplication::Sink(CatalogImplicationKind::Altered {
361                    prev: prev_sink,
362                    new: new_sink,
363                }) => {
364                    tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
365                }
366                CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
367                    storage_sink_gids_to_drop.push(sink.global_id());
368                    dropped_item_names.insert(sink.global_id(), full_name);
369                }
370                CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
371                    tracing::debug!(?index, "not handling AddIndex in here yet");
372                }
373                CatalogImplication::Index(CatalogImplicationKind::Altered {
374                    prev: prev_index,
375                    new: new_index,
376                }) => {
377                    if prev_index.custom_logical_compaction_window
378                        != new_index.custom_logical_compaction_window
379                    {
380                        let new_window = new_index
381                            .custom_logical_compaction_window
382                            .unwrap_or(CompactionWindow::Default);
383                        self.update_compute_read_policy(
384                            new_index.cluster_id,
385                            catalog_id,
386                            new_window.into(),
387                        );
388                    }
389                }
390                CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
391                    indexes_to_drop.push((index.cluster_id, index.global_id()));
392                    dropped_item_names.insert(index.global_id(), full_name);
393                }
394                CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
395                    tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
396                }
397                CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
398                    prev: prev_mv,
399                    new: new_mv,
400                }) => {
401                    // We get here for three reasons:
402                    //  1. Name changes, like those caused by ALTER SCHEMA.
403                    //  2. Replacement application.
404                    //  3. Compaction window changes (ALTER ... SET (RETAIN HISTORY ...)).
405                    //
406                    // 1. Name changes: We don't have to do anything here.
407                    //
408                    // 2. Replacement application: This is tricky: It changes the `CatalogItemId` of
409                    // the target to that of the replacement and simultaneously drops the replacement.
410                    // Which means when we get here `prev_mv` is the replacement that should be
411                    // dropped, and `new_mv` is the target that already exists but under a different
412                    // ID (which will receive a `Dropped` event separately). We can sniff out this
413                    // case by checking for version differences.
414                    //
415                    // 3. Compaction window changes: We handle this in an `else if`, because if there
416                    // is also a replacement application, then the replacement's storage collections
417                    // already have the correct read policies from when they were created, so we
418                    // don't need to update them here.
419                    if prev_mv.collections != new_mv.collections {
420                        // Sanity check: The replacement's last (and only) version must be the same
421                        // as the new target's last version.
422                        assert_eq!(
423                            prev_mv.global_id_writes(),
424                            new_mv.global_id_writes(),
425                            "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
426                        );
427
428                        let gid = new_mv.global_id_writes();
429                        self.allow_writes(new_mv.cluster_id, gid);
430
431                        // There will be a separate `Dropped` implication for the old definition of
432                        // the target MV. That will drop the old compute collection, as we desire,
433                        // but we need to prevent it from dropping the old storage collection as
434                        // well, since that might still be depended on.
435                        source_gids_to_keep.extend(new_mv.global_ids());
436                    } else if prev_mv.custom_logical_compaction_window
437                        != new_mv.custom_logical_compaction_window
438                    {
439                        let new_window = new_mv
440                            .custom_logical_compaction_window
441                            .unwrap_or(CompactionWindow::Default);
442                        self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
443                    }
444                }
445                CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
446                    mv,
447                    full_name,
448                )) => {
449                    compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
450                    for gid in mv.global_ids() {
451                        sources_to_drop.push((catalog_id, gid));
452                        dropped_item_names.insert(gid, full_name.clone());
453                    }
454                }
455                CatalogImplication::View(CatalogImplicationKind::Added(_view)) => {
456                    // No action needed: views are catalog-only objects with no
457                    // storage collections or dataflows to create.
458                }
459                CatalogImplication::View(CatalogImplicationKind::Altered {
460                    prev: _prev_view,
461                    new: _new_view,
462                }) => {
463                    // No action needed: view alterations (e.g. renames) are
464                    // catalog-only and require no controller changes.
465                }
466                CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
467                    view_gids_to_drop.push(view.global_id());
468                    dropped_item_names.insert(view.global_id(), full_name);
469                }
470                CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
471                    tracing::debug!(?ct, "not handling AddContinualTask in here yet");
472                }
473                CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
474                    prev: _prev_ct,
475                    new: _new_ct,
476                }) => {
477                    // No action needed: continual task alterations (e.g.
478                    // renames, owner changes) are catalog-only and require
479                    // no controller changes.
480                }
481                CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
482                    ct,
483                    full_name,
484                )) => {
485                    compute_sinks_to_drop.push((ct.cluster_id, ct.global_id()));
486                    sources_to_drop.push((catalog_id, ct.global_id()));
487                    dropped_item_names.insert(ct.global_id(), full_name);
488                }
489                CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
490                    // No action needed: the secret payload is stored in
491                    // secrets_controller.ensure() BEFORE the catalog transaction.
492                    // By the time we see this update, the secret is already stored.
493                }
494                CatalogImplication::Secret(CatalogImplicationKind::Altered {
495                    prev: _prev_secret,
496                    new: _new_secret,
497                }) => {
498                    // No action needed: altering a secret updates the payload via
499                    // secrets_controller.ensure() without a catalog transaction.
500                }
501                CatalogImplication::Secret(CatalogImplicationKind::Dropped(
502                    _secret,
503                    _full_name,
504                )) => {
505                    secrets_to_drop.push(catalog_id);
506                }
507                CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
508                    match &connection.details {
509                        // SSH connections: key pair is stored in secrets_controller
510                        // BEFORE the catalog transaction, so no action needed here.
511                        ConnectionDetails::Ssh { .. } => {}
512                        // AWS PrivateLink connections: create the VPC endpoint
513                        ConnectionDetails::AwsPrivatelink(privatelink) => {
514                            let spec = VpcEndpointConfig {
515                                aws_service_name: privatelink.service_name.to_owned(),
516                                availability_zone_ids: privatelink.availability_zones.to_owned(),
517                            };
518                            vpc_endpoints_to_create.push((catalog_id, spec));
519                        }
520                        // Other connection types don't require post-transaction actions
521                        _ => {}
522                    }
523                }
524                CatalogImplication::Connection(CatalogImplicationKind::Altered {
525                    prev: _prev_connection,
526                    new: new_connection,
527                }) => {
528                    self.handle_alter_connection(
529                        catalog_id,
530                        new_connection,
531                        &mut vpc_endpoints_to_create,
532                        &mut source_connections_to_alter,
533                        &mut sink_connections_to_alter,
534                        &mut source_export_data_configs_to_alter,
535                    );
536                }
537                CatalogImplication::Connection(CatalogImplicationKind::Dropped(
538                    connection,
539                    _full_name,
540                )) => {
541                    match &connection.details {
542                        // SSH connections have an associated secret that should be dropped
543                        ConnectionDetails::Ssh { .. } => {
544                            secrets_to_drop.push(catalog_id);
545                        }
546                        // AWS PrivateLink connections have an associated
547                        // VpcEndpoint K8S resource that should be dropped
548                        ConnectionDetails::AwsPrivatelink(_) => {
549                            vpc_endpoints_to_drop.push(catalog_id);
550                        }
551                        _ => (),
552                    }
553                }
554                CatalogImplication::None => {
555                    // Nothing to do for None commands
556                }
557                CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
558                    unreachable!("clusters and cluster replicas are handled below")
559                }
560                CatalogImplication::Table(CatalogImplicationKind::None)
561                | CatalogImplication::Source(CatalogImplicationKind::None)
562                | CatalogImplication::Sink(CatalogImplicationKind::None)
563                | CatalogImplication::Index(CatalogImplicationKind::None)
564                | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
565                | CatalogImplication::View(CatalogImplicationKind::None)
566                | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
567                | CatalogImplication::Secret(CatalogImplicationKind::None)
568                | CatalogImplication::Connection(CatalogImplicationKind::None) => {
569                    unreachable!("will never leave None in place");
570                }
571            }
572        }
573
574        for (cluster_id, command) in cluster_commands {
575            tracing::trace!(?command, "have cluster command to apply!");
576
577            match command {
578                CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
579                    // The Cluster's log_indexes is empty at parse time
580                    // because IntrospectionSourceIndex updates are applied
581                    // after the Cluster update. Use the separately collected
582                    // introspection_source_indexes instead.
583                    let arranged_logs = introspection_source_indexes
584                        .remove(&cluster_id)
585                        .unwrap_or_default();
586                    let introspection_source_ids: Vec<_> =
587                        arranged_logs.values().copied().collect();
588
589                    self.controller
590                        .create_cluster(
591                            cluster_id,
592                            mz_controller::clusters::ClusterConfig {
593                                arranged_logs,
594                                workload_class: cluster.config.workload_class.clone(),
595                            },
596                        )
597                        .expect("creating cluster must not fail");
598
599                    if !introspection_source_ids.is_empty() {
600                        self.initialize_compute_read_policies(
601                            introspection_source_ids,
602                            cluster_id,
603                            CompactionWindow::Default,
604                        )
605                        .await;
606                    }
607                }
608                CatalogImplication::Cluster(CatalogImplicationKind::Altered {
609                    prev: prev_cluster,
610                    new: new_cluster,
611                }) => {
612                    // Replica adds/drops/renames from config changes arrive as
613                    // separate AddClusterReplica/DroppedClusterReplica/
614                    // AlterClusterReplica events, so the only cluster-level
615                    // side effect here is updating the workload class on the
616                    // controller when it changes.
617                    if prev_cluster.config.workload_class != new_cluster.config.workload_class {
618                        self.controller.update_cluster_workload_class(
619                            cluster_id,
620                            new_cluster.config.workload_class.clone(),
621                        );
622                    }
623                }
624                CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
625                    cluster,
626                    _full_name,
627                )) => {
628                    clusters_to_drop.push(cluster_id);
629                    dropped_cluster_names.insert(cluster_id, cluster.name);
630                }
631                CatalogImplication::Cluster(CatalogImplicationKind::None) => {
632                    unreachable!("will never leave None in place");
633                }
634                command => {
635                    unreachable!(
636                        "we only handle cluster commands in this map, got: {:?}",
637                        command
638                    );
639                }
640            }
641        }
642
643        for ((cluster_id, replica_id), command) in cluster_replica_commands {
644            tracing::trace!(?command, "have cluster replica command to apply!");
645
646            match command {
647                CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
648                    // Read the cluster name and role from the current catalog
649                    // state. This is correct as long as implications are
650                    // processed right after each catalog transaction. For a
651                    // more future-proof approach that tracks cluster info
652                    // locally across transactions, see the last commit of
653                    // https://github.com/ggevay/materialize/tree/implications-cluster-name-tracking
654                    // which removes that logic.
655                    let cluster = self.catalog().get_cluster(cluster_id);
656                    let cluster_name = cluster.name.clone();
657                    let cluster_role = cluster.role();
658                    let replica_name = format!("{}.{}", cluster_name, replica.name);
659                    self.handle_create_cluster_replica(
660                        cluster_id,
661                        replica_id,
662                        cluster_role,
663                        cluster_name,
664                        replica_name,
665                        replica.config.clone(),
666                    )
667                    .await;
668                }
669                CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
670                    prev: _prev_replica,
671                    new: _new_replica,
672                }) => {
673                    // No action needed: cluster replica alterations (e.g.
674                    // renames, owner changes, pending flag changes) are
675                    // catalog-only and require no controller changes.
676                }
677                CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
678                    _replica,
679                    _full_name,
680                )) => {
681                    cluster_replicas_to_drop.push((cluster_id, replica_id));
682                }
683                CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
684                    unreachable!("will never leave None in place");
685                }
686                command => {
687                    unreachable!(
688                        "we only handle cluster replica commands in this map, got: {:?}",
689                        command
690                    );
691                }
692            }
693        }
694
695        if !source_collections_to_create.is_empty() {
696            self.create_source_collections(source_collections_to_create)
697                .await?;
698        }
699
700        // Have to create sources first and then tables, because tables within
701        // one transaction can depend on sources.
702        if !table_collections_to_create.is_empty() {
703            self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
704                .await?;
705        }
706        // It is _very_ important that we only initialize read policies after we
707        // have created all the sources/collections. Some of the sources created
708        // in this collection might have dependencies on other sources, so the
709        // controller must get a chance to install read holds before we set a
710        // policy that might make the since advance.
711        self.initialize_storage_collections(storage_policies_to_initialize)
712            .await?;
713
714        // Create VPC endpoints for AWS PrivateLink connections
715        if !vpc_endpoints_to_create.is_empty() {
716            if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
717                for (connection_id, spec) in vpc_endpoints_to_create {
718                    if let Err(err) = cloud_resource_controller
719                        .ensure_vpc_endpoint(connection_id, spec)
720                        .await
721                    {
722                        tracing::error!(?err, "failed to ensure vpc endpoint!");
723                    }
724                }
725            } else {
726                tracing::error!(
727                    "AWS PrivateLink connections unsupported without cloud_resource_controller"
728                );
729            }
730        }
731
732        // Apply batched connection alterations to dependent sources/sinks/tables.
733        if !source_connections_to_alter.is_empty() {
734            self.controller
735                .storage
736                .alter_ingestion_connections(source_connections_to_alter)
737                .await
738                .unwrap_or_terminate("cannot fail to alter ingestion connections");
739        }
740
741        if !sink_connections_to_alter.is_empty() {
742            self.controller
743                .storage
744                .alter_export_connections(sink_connections_to_alter)
745                .await
746                .unwrap_or_terminate("altering export connections after txn must succeed");
747        }
748
749        if !source_export_data_configs_to_alter.is_empty() {
750            self.controller
751                .storage
752                .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
753                .await
754                .unwrap_or_terminate("altering source export data configs after txn must succeed");
755        }
756
757        if !source_descs_to_alter.is_empty() {
758            self.controller
759                .storage
760                .alter_ingestion_source_desc(source_descs_to_alter)
761                .await
762                .unwrap_or_terminate("cannot fail to alter ingestion source desc");
763        }
764
765        // Apply source drop overwrites.
766        sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
767
768        let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
769            .iter()
770            .map(|(_, gid)| *gid)
771            .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
772            .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
773            .chain(view_gids_to_drop.iter().copied())
774            .collect();
775
776        // Clean up any active compute sinks like subscribes or copy to-s that
777        // rely on dropped relations or clusters.
778        for (sink_id, sink) in &self.active_compute_sinks {
779            let cluster_id = sink.cluster_id();
780            if let Some(id) = sink
781                .depends_on()
782                .iter()
783                .find(|id| readable_collections_to_drop.contains(id))
784            {
785                let name = dropped_item_names
786                    .get(id)
787                    .map(|n| format!("relation {}", n.quoted()))
788                    .expect("missing relation name");
789                active_compute_sinks_to_drop.insert(
790                    *sink_id,
791                    ActiveComputeSinkRetireReason::DependencyDropped(name),
792                );
793            } else if clusters_to_drop.contains(&cluster_id) {
794                let name = dropped_cluster_names
795                    .get(&cluster_id)
796                    .map(|n| format!("cluster {}", n.quoted()))
797                    .expect("missing cluster name");
798                active_compute_sinks_to_drop.insert(
799                    *sink_id,
800                    ActiveComputeSinkRetireReason::DependencyDropped(name),
801                );
802            }
803        }
804
805        // Clean up any pending peeks that rely on dropped relations or clusters.
806        for (uuid, pending_peek) in &self.pending_peeks {
807            if let Some(id) = pending_peek
808                .depends_on
809                .iter()
810                .find(|id| readable_collections_to_drop.contains(id))
811            {
812                let name = dropped_item_names
813                    .get(id)
814                    .map(|n| format!("relation {}", n.quoted()))
815                    .expect("missing relation name");
816                peeks_to_drop.push((name, uuid.clone()));
817            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
818                let name = dropped_cluster_names
819                    .get(&pending_peek.cluster_id)
820                    .map(|n| format!("cluster {}", n.quoted()))
821                    .expect("missing cluster name");
822                peeks_to_drop.push((name, uuid.clone()));
823            }
824        }
825
826        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
827        for (conn_id, pending_copy) in &self.active_copies {
828            let dropping_table = tables_to_drop
829                .iter()
830                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
831            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
832
833            if dropping_table || dropping_cluster {
834                copies_to_drop.push(conn_id.clone());
835            }
836        }
837
838        let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
839            .iter()
840            .map(|(_id, gid)| gid)
841            .chain(storage_sink_gids_to_drop.iter())
842            .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
843            .copied()
844            .collect();
845        let compute_gids_to_drop: Vec<_> = indexes_to_drop
846            .iter()
847            .chain(compute_sinks_to_drop.iter())
848            .copied()
849            .collect();
850
851        // Gather resources that we have to remove from timeline state and
852        // pre-check if any Timelines become empty, when we drop the specified
853        // storage and compute resources.
854        //
855        // Note: We only apply these changes below.
856        let mut timeline_id_bundles = BTreeMap::new();
857
858        for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
859            let mut id_bundle = CollectionIdBundle::default();
860
861            for storage_id in read_holds.storage_ids() {
862                if storage_gids_to_drop.contains(&storage_id) {
863                    id_bundle.storage_ids.insert(storage_id);
864                }
865            }
866
867            for (instance_id, id) in read_holds.compute_ids() {
868                if compute_gids_to_drop.contains(&(instance_id, id))
869                    || clusters_to_drop.contains(&instance_id)
870                {
871                    id_bundle
872                        .compute_ids
873                        .entry(instance_id)
874                        .or_default()
875                        .insert(id);
876                }
877            }
878
879            timeline_id_bundles.insert(timeline.clone(), id_bundle);
880        }
881
882        let mut timeline_associations = BTreeMap::new();
883        for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
884            let TimelineState { read_holds, .. } = self
885                .global_timelines
886                .get(&timeline)
887                .expect("all timelines have a timestamp oracle");
888
889            let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
890            timeline_associations.insert(timeline, (empty, id_bundle));
891        }
892
893        // No error returns are allowed after this point. Enforce this at compile time
894        // by using this odd structure so we don't accidentally add a stray `?`.
895        let _: () = async {
896            if !timeline_associations.is_empty() {
897                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
898                    let became_empty =
899                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
900                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
901                }
902            }
903
904            // Note that we drop tables before sources since there can be a weak
905            // dependency on sources from tables in the storage controller that
906            // will result in error logging that we'd prefer to avoid. This
907            // isn't an actual dependency issue but we'd like to keep that error
908            // logging around to indicate when an actual dependency error might
909            // occur.
910            if !tables_to_drop.is_empty() {
911                let ts = self.get_local_write_ts().await;
912                self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
913            }
914
915            if !sources_to_drop.is_empty() {
916                self.drop_sources(sources_to_drop);
917            }
918
919            if !storage_sink_gids_to_drop.is_empty() {
920                self.drop_storage_sinks(storage_sink_gids_to_drop);
921            }
922
923            if !active_compute_sinks_to_drop.is_empty() {
924                self.retire_compute_sinks(active_compute_sinks_to_drop)
925                    .await;
926            }
927
928            if !peeks_to_drop.is_empty() {
929                for (dropped_name, uuid) in peeks_to_drop {
930                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
931                        let cancel_reason = PeekResponse::Error(format!(
932                            "query could not complete because {dropped_name} was dropped"
933                        ));
934                        self.controller
935                            .compute
936                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
937                            .unwrap_or_terminate("unable to cancel peek");
938                        self.retire_execution(
939                            StatementEndedExecutionReason::Canceled,
940                            pending_peek.ctx_extra.defuse(),
941                        );
942                    }
943                }
944            }
945
946            if !copies_to_drop.is_empty() {
947                for conn_id in copies_to_drop {
948                    self.cancel_pending_copy(&conn_id);
949                }
950            }
951
952            if !compute_gids_to_drop.is_empty() {
953                self.drop_compute_collections(compute_gids_to_drop);
954            }
955
956            if !vpc_endpoints_to_drop.is_empty() {
957                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
958            }
959
960            if !cluster_replicas_to_drop.is_empty() {
961                fail::fail_point!("after_catalog_drop_replica");
962
963                for (cluster_id, replica_id) in cluster_replicas_to_drop {
964                    self.drop_replica(cluster_id, replica_id);
965                }
966            }
967            if !clusters_to_drop.is_empty() {
968                for cluster_id in clusters_to_drop {
969                    self.controller.drop_cluster(cluster_id);
970                }
971            }
972
973            // We don't want to block the main coordinator thread on cleaning
974            // up external resources (PostgreSQL replication slots and secrets),
975            // so we perform that cleanup in a background task.
976            //
977            // TODO(14551): This is inherently best effort. An ill-timed crash
978            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
979            // See <https://github.com/MaterializeInc/materialize/issues/14551>
980            task::spawn(|| "drop_replication_slots_and_secrets", {
981                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
982                let caching_secrets_reader = self.caching_secrets_reader.clone();
983                let secrets_controller = Arc::clone(&self.secrets_controller);
984                let secrets_reader = Arc::clone(self.secrets_reader());
985                let storage_config = self.controller.storage.config().clone();
986
987                async move {
988                    for (connection, replication_slot_name) in replication_slots_to_drop {
989                        tracing::info!(?replication_slot_name, "dropping replication slot");
990
991                        // Try to drop the replication slots, but give up after
992                        // a while. The PostgreSQL server may no longer be
993                        // healthy. Users often drop PostgreSQL sources
994                        // *because* the PostgreSQL server has been
995                        // decomissioned.
996                        let result: Result<(), anyhow::Error> = Retry::default()
997                            .max_duration(Duration::from_secs(60))
998                            .retry_async(|_state| async {
999                                let config = connection
1000                                    .config(&secrets_reader, &storage_config, InTask::No)
1001                                    .await
1002                                    .map_err(|e| {
1003                                        anyhow::anyhow!(
1004                                            "error creating Postgres client for \
1005                                            dropping acquired slots: {}",
1006                                            e.display_with_causes()
1007                                        )
1008                                    })?;
1009
1010                                mz_postgres_util::drop_replication_slots(
1011                                    &ssh_tunnel_manager,
1012                                    config.clone(),
1013                                    &[(&replication_slot_name, true)],
1014                                )
1015                                .await?;
1016
1017                                Ok(())
1018                            })
1019                            .await;
1020
1021                        if let Err(err) = result {
1022                            tracing::warn!(
1023                                ?replication_slot_name,
1024                                ?err,
1025                                "failed to drop replication slot"
1026                            );
1027                        }
1028                    }
1029
1030                    // Drop secrets *after* dropping the replication slots,
1031                    // because dropping replication slots may rely on those
1032                    // secrets still being present.
1033                    //
1034                    // It's okay if we crash before processing the secret drops,
1035                    // as we look for and remove any orphaned secrets during
1036                    // startup.
1037                    fail_point!("drop_secrets");
1038                    for secret in secrets_to_drop {
1039                        if let Err(e) = secrets_controller.delete(secret).await {
1040                            warn!("Dropping secrets has encountered an error: {}", e);
1041                        } else {
1042                            caching_secrets_reader.invalidate(secret);
1043                        }
1044                    }
1045                }
1046            });
1047        }
1048        .instrument(info_span!(
1049            "coord::apply_catalog_implications_inner::finalize"
1050        ))
1051        .await;
1052
1053        Ok(())
1054    }
1055
1056    #[instrument(level = "debug")]
1057    async fn create_table_collections(
1058        &mut self,
1059        table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1060        execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
1061    ) -> Result<(), AdapterError> {
1062        // If we have tables, determine the initial validity for the table.
1063        let write_ts = self.get_local_write_ts().await;
1064        let register_ts = write_ts.timestamp;
1065
1066        // After acquiring `register_ts` but before using it, we need to
1067        // be sure we're still the leader. Otherwise a new generation
1068        // may also be trying to use `register_ts` for a different
1069        // purpose. See materialize#28216.
1070        //
1071        // We also should advance the upper of the catalog shard, to ensure it
1072        // is readable at the oracle read ts after we bump it to the
1073        // `register_ts` below. Both of these needs are served by calling
1074        // `advance_upper`.
1075        self.catalog
1076            .advance_upper(write_ts.advance_to)
1077            .await
1078            .unwrap_or_terminate("unable to advance catalog upper");
1079
1080        for id in execution_timestamps_to_set {
1081            self.set_statement_execution_timestamp(id, register_ts);
1082        }
1083
1084        let storage_metadata = self.catalog.state().storage_metadata();
1085
1086        self.controller
1087            .storage
1088            .create_collections(
1089                storage_metadata,
1090                Some(register_ts),
1091                table_collections_to_create.into_iter().collect_vec(),
1092            )
1093            .await
1094            .unwrap_or_terminate("cannot fail to create collections");
1095
1096        self.apply_local_write(register_ts).await;
1097
1098        Ok(())
1099    }
1100
1101    #[instrument(level = "debug")]
1102    async fn create_source_collections(
1103        &mut self,
1104        source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1105    ) -> Result<(), AdapterError> {
1106        let storage_metadata = self.catalog.state().storage_metadata();
1107
1108        self.controller
1109            .storage
1110            .create_collections(
1111                storage_metadata,
1112                None, // Sources don't need a write timestamp
1113                source_collections_to_create.into_iter().collect_vec(),
1114            )
1115            .await
1116            .unwrap_or_terminate("cannot fail to create collections");
1117
1118        Ok(())
1119    }
1120
1121    #[instrument(level = "debug")]
1122    async fn initialize_storage_collections(
1123        &mut self,
1124        storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1125    ) -> Result<(), AdapterError> {
1126        for (compaction_window, global_ids) in storage_policies_to_initialize {
1127            self.initialize_read_policies(
1128                &CollectionIdBundle {
1129                    storage_ids: global_ids,
1130                    compute_ids: BTreeMap::new(),
1131                },
1132                compaction_window,
1133            )
1134            .await;
1135        }
1136
1137        Ok(())
1138    }
1139
1140    #[instrument(level = "debug")]
1141    async fn handle_create_table(
1142        &self,
1143        ctx: &Option<&mut ExecuteContext>,
1144        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1145        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1146        execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1147        table_id: CatalogItemId,
1148        table: Table,
1149    ) -> Result<(), AdapterError> {
1150        // The table data_source determines whether this table will be written to
1151        // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1152        // (e.g. a source-fed table).
1153        match &table.data_source {
1154            TableDataSource::TableWrites { defaults: _ } => {
1155                let versions: BTreeMap<_, _> = table
1156                    .collection_descs()
1157                    .map(|(gid, version, desc)| (version, (gid, desc)))
1158                    .collect();
1159                let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1160                    let collection_desc = CollectionDescription::for_table(desc.clone());
1161
1162                    (*gid, collection_desc)
1163                });
1164
1165                let compaction_window = table
1166                    .custom_logical_compaction_window
1167                    .unwrap_or(CompactionWindow::Default);
1168                let ids_to_initialize = storage_policies_to_initialize
1169                    .entry(compaction_window)
1170                    .or_default();
1171
1172                for (gid, collection_desc) in collection_descs {
1173                    storage_collections_to_create.insert(gid, collection_desc);
1174                    ids_to_initialize.insert(gid);
1175                }
1176
1177                if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1178                    execution_timestamps_to_set.insert(id);
1179                }
1180            }
1181            TableDataSource::DataSource {
1182                desc: data_source_desc,
1183                timeline,
1184            } => {
1185                match data_source_desc {
1186                    DataSourceDesc::IngestionExport {
1187                        ingestion_id,
1188                        external_reference: _,
1189                        details,
1190                        data_config,
1191                    } => {
1192                        let global_ingestion_id =
1193                            self.catalog().get_entry(ingestion_id).latest_global_id();
1194
1195                        let collection_desc = CollectionDescription::<Timestamp> {
1196                            desc: table.desc.latest(),
1197                            data_source: DataSource::IngestionExport {
1198                                ingestion_id: global_ingestion_id,
1199                                details: details.clone(),
1200                                data_config: data_config
1201                                    .clone()
1202                                    .into_inline_connection(self.catalog.state()),
1203                            },
1204                            since: None,
1205                            timeline: Some(timeline.clone()),
1206                            primary: None,
1207                        };
1208
1209                        let global_id = table
1210                            .global_ids()
1211                            .expect_element(|| "subsources cannot have multiple versions");
1212
1213                        storage_collections_to_create.insert(global_id, collection_desc);
1214
1215                        let read_policies = self
1216                            .catalog()
1217                            .state()
1218                            .source_compaction_windows(vec![table_id]);
1219                        for (compaction_window, catalog_ids) in read_policies {
1220                            let compaction_ids = storage_policies_to_initialize
1221                                .entry(compaction_window)
1222                                .or_default();
1223
1224                            let gids = catalog_ids
1225                                .into_iter()
1226                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1227                                .flatten();
1228                            compaction_ids.extend(gids);
1229                        }
1230                    }
1231                    DataSourceDesc::Webhook {
1232                        validate_using: _,
1233                        body_format: _,
1234                        headers: _,
1235                        cluster_id: _,
1236                    } => {
1237                        // Create the underlying collection with the latest schema from the Table.
1238                        assert_eq!(
1239                            table.desc.latest_version(),
1240                            RelationVersion::root(),
1241                            "found webhook with more than 1 relation version, {:?}",
1242                            table.desc
1243                        );
1244                        let desc = table.desc.latest();
1245
1246                        let collection_desc = CollectionDescription::<Timestamp> {
1247                            desc,
1248                            data_source: DataSource::Webhook,
1249                            since: None,
1250                            timeline: Some(timeline.clone()),
1251                            primary: None,
1252                        };
1253
1254                        let global_id = table
1255                            .global_ids()
1256                            .expect_element(|| "webhooks cannot have multiple versions");
1257
1258                        storage_collections_to_create.insert(global_id, collection_desc);
1259
1260                        let read_policies = self
1261                            .catalog()
1262                            .state()
1263                            .source_compaction_windows(vec![table_id]);
1264
1265                        for (compaction_window, catalog_ids) in read_policies {
1266                            let compaction_ids = storage_policies_to_initialize
1267                                .entry(compaction_window)
1268                                .or_default();
1269
1270                            let gids = catalog_ids
1271                                .into_iter()
1272                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1273                                .flatten();
1274                            compaction_ids.extend(gids);
1275                        }
1276                    }
1277                    _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1278                }
1279            }
1280        }
1281
1282        Ok(())
1283    }
1284
1285    #[instrument(level = "debug")]
1286    async fn handle_alter_table(
1287        &mut self,
1288        catalog_id: CatalogItemId,
1289        prev_table: Table,
1290        new_table: Table,
1291    ) -> Result<(), AdapterError> {
1292        let existing_gid = prev_table.global_id_writes();
1293        let new_gid = new_table.global_id_writes();
1294
1295        if existing_gid == new_gid {
1296            // It's not an ALTER TABLE ADD COLUMN, because we still have the
1297            // same GlobalId. It might be a compaction window change.
1298            if prev_table.custom_logical_compaction_window
1299                != new_table.custom_logical_compaction_window
1300            {
1301                let new_window = new_table
1302                    .custom_logical_compaction_window
1303                    .unwrap_or(CompactionWindow::Default);
1304                self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1305            }
1306            return Ok(());
1307        }
1308
1309        // Acquire a read hold on the original table for the duration of
1310        // the alter to prevent the since of the original table from
1311        // getting advanced, while the ALTER is running.
1312        let existing_table = crate::CollectionIdBundle {
1313            storage_ids: BTreeSet::from([existing_gid]),
1314            compute_ids: BTreeMap::new(),
1315        };
1316        let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1317
1318        let expected_version = prev_table.desc.latest_version();
1319        let new_version = new_table.desc.latest_version();
1320        let new_desc = new_table
1321            .desc
1322            .at_version(RelationVersionSelector::Specific(new_version));
1323
1324        let write_ts = self.get_local_write_ts().await;
1325        let register_ts = write_ts.timestamp;
1326
1327        // Ensure the catalog will be immediately readable at the read ts we're
1328        // about to bump.
1329        self.catalog
1330            .advance_upper(write_ts.advance_to)
1331            .await
1332            .unwrap_or_terminate("unable to advance catalog upper");
1333
1334        // Alter the table description, creating a "new" collection.
1335        self.controller
1336            .storage
1337            .alter_table_desc(
1338                existing_gid,
1339                new_gid,
1340                new_desc,
1341                expected_version,
1342                register_ts,
1343            )
1344            .await
1345            .expect("failed to alter desc of table");
1346
1347        // Initialize the ReadPolicy which ensures we have the correct read holds.
1348        let compaction_window = new_table
1349            .custom_logical_compaction_window
1350            .unwrap_or(CompactionWindow::Default);
1351        self.initialize_read_policies(
1352            &crate::CollectionIdBundle {
1353                storage_ids: BTreeSet::from([new_gid]),
1354                compute_ids: BTreeMap::new(),
1355            },
1356            compaction_window,
1357        )
1358        .await;
1359
1360        self.apply_local_write(register_ts).await;
1361
1362        // Alter is complete! We can drop our read hold.
1363        drop(existing_table_read_hold);
1364
1365        Ok(())
1366    }
1367
1368    #[instrument(level = "debug")]
1369    async fn handle_create_source(
1370        &self,
1371        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1372        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1373        item_id: CatalogItemId,
1374        source: Source,
1375        compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1376    ) -> Result<(), AdapterError> {
1377        let data_source = match source.data_source {
1378            DataSourceDesc::Ingestion { desc, cluster_id } => {
1379                let desc = desc.into_inline_connection(self.catalog().state());
1380                let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1381
1382                let ingestion = mz_storage_types::sources::IngestionDescription::new(
1383                    desc,
1384                    cluster_id,
1385                    item_global_id,
1386                );
1387
1388                DataSource::Ingestion(ingestion)
1389            }
1390            DataSourceDesc::OldSyntaxIngestion {
1391                desc,
1392                progress_subsource,
1393                data_config,
1394                details,
1395                cluster_id,
1396            } => {
1397                let desc = desc.into_inline_connection(self.catalog().state());
1398                let data_config = data_config.into_inline_connection(self.catalog().state());
1399
1400                // TODO(parkmycar): We should probably check the type here, but I'm not
1401                // sure if this will always be a Source or a Table.
1402                let progress_subsource = self
1403                    .catalog()
1404                    .get_entry(&progress_subsource)
1405                    .latest_global_id();
1406
1407                let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1408                    desc,
1409                    cluster_id,
1410                    progress_subsource,
1411                );
1412
1413                let legacy_export = SourceExport {
1414                    storage_metadata: (),
1415                    data_config,
1416                    details,
1417                };
1418
1419                ingestion
1420                    .source_exports
1421                    .insert(source.global_id, legacy_export);
1422
1423                DataSource::Ingestion(ingestion)
1424            }
1425            DataSourceDesc::IngestionExport {
1426                ingestion_id,
1427                external_reference: _,
1428                details,
1429                data_config,
1430            } => {
1431                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1432                // this will always be a Source or a Table.
1433                let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1434
1435                DataSource::IngestionExport {
1436                    ingestion_id,
1437                    details,
1438                    data_config: data_config.into_inline_connection(self.catalog().state()),
1439                }
1440            }
1441            DataSourceDesc::Progress => DataSource::Progress,
1442            DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1443            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1444                unreachable!("cannot create sources with internal data sources")
1445            }
1446        };
1447
1448        storage_collections_to_create.insert(
1449            source.global_id,
1450            CollectionDescription::<Timestamp> {
1451                desc: source.desc.clone(),
1452                data_source,
1453                timeline: Some(source.timeline),
1454                since: None,
1455                primary: None,
1456            },
1457        );
1458
1459        // Initialize read policies for the source
1460        for (compaction_window, catalog_ids) in compaction_windows {
1461            let compaction_ids = storage_policies_to_initialize
1462                .entry(compaction_window)
1463                .or_default();
1464
1465            let gids = catalog_ids
1466                .into_iter()
1467                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1468                .flatten();
1469            compaction_ids.extend(gids);
1470        }
1471
1472        Ok(())
1473    }
1474
1475    /// Handles altering a connection by collecting all the dependent sources,
1476    /// sinks, and tables that need their connection updated.
1477    ///
1478    /// This mirrors the logic from `sequence_alter_connection_stage_finish` but
1479    /// collects the changes into batched collections for application after all
1480    /// implications are processed.
1481    #[instrument(level = "debug")]
1482    fn handle_alter_connection(
1483        &self,
1484        connection_id: CatalogItemId,
1485        connection: Connection,
1486        vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1487        source_connections_to_alter: &mut BTreeMap<
1488            GlobalId,
1489            GenericSourceConnection<InlinedConnection>,
1490        >,
1491        sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1492        source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1493    ) {
1494        use std::collections::VecDeque;
1495
1496        // Handle AWS PrivateLink connections by queueing VPC endpoint creation.
1497        if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1498            let spec = VpcEndpointConfig {
1499                aws_service_name: privatelink.service_name.to_owned(),
1500                availability_zone_ids: privatelink.availability_zones.to_owned(),
1501            };
1502            vpc_endpoints_to_create.push((connection_id, spec));
1503        }
1504
1505        // Walk the dependency graph to find all sources, sinks, and tables
1506        // that depend on this connection (directly or transitively through
1507        // other connections).
1508        let mut connections_to_process = VecDeque::new();
1509        connections_to_process.push_front(connection_id.clone());
1510
1511        while let Some(id) = connections_to_process.pop_front() {
1512            for dependent_id in self.catalog().get_entry(&id).used_by() {
1513                let dependent_entry = self.catalog().get_entry(dependent_id);
1514                match dependent_entry.item() {
1515                    CatalogItem::Connection(_) => {
1516                        // Connections can depend on other connections (e.g., a
1517                        // Kafka connection using an SSH tunnel connection).
1518                        // Process these transitively.
1519                        connections_to_process.push_back(*dependent_id);
1520                    }
1521                    CatalogItem::Source(source) => {
1522                        let desc = match &dependent_entry
1523                            .source()
1524                            .expect("known to be source")
1525                            .data_source
1526                        {
1527                            DataSourceDesc::Ingestion { desc, .. }
1528                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1529                                desc.clone().into_inline_connection(self.catalog().state())
1530                            }
1531                            DataSourceDesc::IngestionExport { .. }
1532                            | DataSourceDesc::Introspection(_)
1533                            | DataSourceDesc::Progress
1534                            | DataSourceDesc::Webhook { .. }
1535                            | DataSourceDesc::Catalog => {
1536                                // Only ingestions reference connections directly.
1537                                continue;
1538                            }
1539                        };
1540
1541                        source_connections_to_alter.insert(source.global_id, desc.connection);
1542                    }
1543                    CatalogItem::Sink(sink) => {
1544                        let export = dependent_entry.sink().expect("known to be sink");
1545                        sink_connections_to_alter.insert(
1546                            sink.global_id,
1547                            export
1548                                .connection
1549                                .clone()
1550                                .into_inline_connection(self.catalog().state()),
1551                        );
1552                    }
1553                    CatalogItem::Table(table) => {
1554                        // This is a source-fed table that references a schema
1555                        // registry connection as part of its encoding/data
1556                        // config.
1557                        if let Some((_, _, _, export_data_config)) =
1558                            dependent_entry.source_export_details()
1559                        {
1560                            let data_config = export_data_config.clone();
1561                            source_export_data_configs_to_alter.insert(
1562                                table.global_id_writes(),
1563                                data_config.into_inline_connection(self.catalog().state()),
1564                            );
1565                        }
1566                    }
1567                    CatalogItem::Log(_)
1568                    | CatalogItem::View(_)
1569                    | CatalogItem::MaterializedView(_)
1570                    | CatalogItem::Index(_)
1571                    | CatalogItem::Type(_)
1572                    | CatalogItem::Func(_)
1573                    | CatalogItem::Secret(_)
1574                    | CatalogItem::ContinualTask(_) => {
1575                        // Other item types don't have connection dependencies
1576                        // that need updating.
1577                    }
1578                }
1579            }
1580        }
1581    }
1582
1583    async fn handle_create_cluster_replica(
1584        &mut self,
1585        cluster_id: ClusterId,
1586        replica_id: ReplicaId,
1587        role: ClusterRole,
1588        cluster_name: String,
1589        replica_name: String,
1590        replica_config: ReplicaConfig,
1591    ) {
1592        let enable_worker_core_affinity =
1593            self.catalog().system_config().enable_worker_core_affinity();
1594
1595        self.controller
1596            .create_replica(
1597                cluster_id,
1598                replica_id,
1599                cluster_name,
1600                replica_name,
1601                role,
1602                replica_config,
1603                enable_worker_core_affinity,
1604            )
1605            .expect("creating replicas must not fail");
1606
1607        self.install_introspection_subscribes(cluster_id, replica_id)
1608            .await;
1609    }
1610}
1611
1612/// A state machine for building catalog implications from catalog updates.
1613///
1614/// Once all [ParsedStateUpdate] of a timestamp are ingested this is a command
1615/// that has to potentially be applied to in-memory state and/or the
1616/// controller(s).
1617#[derive(Debug, Clone)]
1618enum CatalogImplication {
1619    None,
1620    Table(CatalogImplicationKind<Table>),
1621    Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1622    Sink(CatalogImplicationKind<Sink>),
1623    Index(CatalogImplicationKind<Index>),
1624    MaterializedView(CatalogImplicationKind<MaterializedView>),
1625    View(CatalogImplicationKind<View>),
1626    ContinualTask(CatalogImplicationKind<ContinualTask>),
1627    Secret(CatalogImplicationKind<Secret>),
1628    Connection(CatalogImplicationKind<Connection>),
1629    Cluster(CatalogImplicationKind<Cluster>),
1630    ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1631}
1632
1633#[derive(Debug, Clone)]
1634enum CatalogImplicationKind<T> {
1635    /// No operations seen yet.
1636    None,
1637    /// Item was added.
1638    Added(T),
1639    /// Item was dropped (with its name retained for error messages).
1640    Dropped(T, String),
1641    /// Item is being altered from one state to another.
1642    Altered { prev: T, new: T },
1643}
1644
1645impl<T: Clone> CatalogImplicationKind<T> {
1646    /// Apply a state transition based on a diff. Returns an error message if
1647    /// the transition is invalid.
1648    fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1649        use CatalogImplicationKind::*;
1650        use StateDiff::*;
1651
1652        let new_state = match (&*self, diff) {
1653            // Initial state transitions
1654            (None, Addition) => Added(item),
1655            (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1656
1657            // From Added state
1658            (Added(existing), Retraction) => {
1659                // Add -> Drop means the item is being altered
1660                Altered {
1661                    prev: item,
1662                    new: existing.clone(),
1663                }
1664            }
1665            (Added(_), Addition) => {
1666                return Err("Cannot add an already added object".to_string());
1667            }
1668
1669            // From Dropped state
1670            (Dropped(existing, _), Addition) => {
1671                // Drop -> Add means the item is being altered
1672                Altered {
1673                    prev: existing.clone(),
1674                    new: item,
1675                }
1676            }
1677            (Dropped(_, _), Retraction) => {
1678                return Err("Cannot drop an already dropped object".to_string());
1679            }
1680
1681            // From Altered state
1682            (Altered { .. }, _) => {
1683                return Err(format!(
1684                    "Cannot apply {:?} to an object in Altered state",
1685                    diff
1686                ));
1687            }
1688        };
1689
1690        *self = new_state;
1691        Ok(())
1692    }
1693}
1694
1695/// Macro to generate absorb methods for each item type.
1696macro_rules! impl_absorb_method {
1697    (
1698        $method_name:ident,
1699        $variant:ident,
1700        $item_type:ty
1701    ) => {
1702        fn $method_name(
1703            &mut self,
1704            item: $item_type,
1705            parsed_full_name: Option<String>,
1706            diff: StateDiff,
1707        ) {
1708            let state = match self {
1709                CatalogImplication::$variant(state) => state,
1710                CatalogImplication::None => {
1711                    *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1712                    match self {
1713                        CatalogImplication::$variant(state) => state,
1714                        _ => unreachable!(),
1715                    }
1716                }
1717                _ => {
1718                    panic!(
1719                        "Unexpected command type for {:?}: {} {:?}",
1720                        self,
1721                        stringify!($variant),
1722                        diff,
1723                    );
1724                }
1725            };
1726
1727            if let Err(e) = state.transition(item, parsed_full_name, diff) {
1728                panic!(
1729                    "Invalid state transition for {}: {}",
1730                    stringify!($variant),
1731                    e
1732                );
1733            }
1734        }
1735    };
1736}
1737
1738impl CatalogImplication {
1739    /// Absorbs the given catalog update into this [CatalogImplication], causing
1740    /// a state transition or error.
1741    fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1742        match catalog_update.kind {
1743            ParsedStateUpdateKind::Item {
1744                durable_item: _,
1745                parsed_item,
1746                connection,
1747                parsed_full_name,
1748            } => match parsed_item {
1749                CatalogItem::Table(table) => {
1750                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1751                }
1752                CatalogItem::Source(source) => {
1753                    self.absorb_source(
1754                        (source, connection),
1755                        Some(parsed_full_name),
1756                        catalog_update.diff,
1757                    );
1758                }
1759                CatalogItem::Sink(sink) => {
1760                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1761                }
1762                CatalogItem::Index(index) => {
1763                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1764                }
1765                CatalogItem::MaterializedView(mv) => {
1766                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1767                }
1768                CatalogItem::View(view) => {
1769                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1770                }
1771                CatalogItem::ContinualTask(ct) => {
1772                    self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1773                }
1774                CatalogItem::Secret(secret) => {
1775                    self.absorb_secret(secret, None, catalog_update.diff);
1776                }
1777                CatalogItem::Connection(connection) => {
1778                    self.absorb_connection(connection, None, catalog_update.diff);
1779                }
1780                CatalogItem::Log(_) => {}
1781                CatalogItem::Type(_) => {}
1782                CatalogItem::Func(_) => {}
1783            },
1784            ParsedStateUpdateKind::TemporaryItem {
1785                durable_item: _,
1786                parsed_item,
1787                connection,
1788                parsed_full_name,
1789            } => match parsed_item {
1790                CatalogItem::Table(table) => {
1791                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1792                }
1793                CatalogItem::Source(source) => {
1794                    self.absorb_source(
1795                        (source, connection),
1796                        Some(parsed_full_name),
1797                        catalog_update.diff,
1798                    );
1799                }
1800                CatalogItem::Sink(sink) => {
1801                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1802                }
1803                CatalogItem::Index(index) => {
1804                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1805                }
1806                CatalogItem::MaterializedView(mv) => {
1807                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1808                }
1809                CatalogItem::View(view) => {
1810                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1811                }
1812                CatalogItem::ContinualTask(ct) => {
1813                    self.absorb_continual_task(ct, None, catalog_update.diff);
1814                }
1815                CatalogItem::Secret(secret) => {
1816                    self.absorb_secret(secret, None, catalog_update.diff);
1817                }
1818                CatalogItem::Connection(connection) => {
1819                    self.absorb_connection(connection, None, catalog_update.diff);
1820                }
1821                CatalogItem::Log(_) => {}
1822                CatalogItem::Type(_) => {}
1823                CatalogItem::Func(_) => {}
1824            },
1825            ParsedStateUpdateKind::Cluster {
1826                durable_cluster: _,
1827                parsed_cluster,
1828            } => {
1829                let name = parsed_cluster.name.clone();
1830                self.absorb_cluster(parsed_cluster, Some(name), catalog_update.diff);
1831            }
1832            ParsedStateUpdateKind::ClusterReplica {
1833                durable_cluster_replica: _,
1834                parsed_cluster_replica,
1835            } => {
1836                let name = parsed_cluster_replica.name.clone();
1837                self.absorb_cluster_replica(
1838                    parsed_cluster_replica,
1839                    Some(name),
1840                    catalog_update.diff,
1841                );
1842            }
1843            ParsedStateUpdateKind::IntrospectionSourceIndex { .. } => {
1844                // IntrospectionSourceIndex updates are collected
1845                // separately in apply_catalog_implications and not
1846                // routed through absorb.
1847                unreachable!("IntrospectionSourceIndex should not be passed to absorb");
1848            }
1849        }
1850    }
1851
1852    impl_absorb_method!(absorb_table, Table, Table);
1853    impl_absorb_method!(
1854        absorb_source,
1855        Source,
1856        (Source, Option<GenericSourceConnection>)
1857    );
1858    impl_absorb_method!(absorb_sink, Sink, Sink);
1859    impl_absorb_method!(absorb_index, Index, Index);
1860    impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1861    impl_absorb_method!(absorb_view, View, View);
1862
1863    impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1864    impl_absorb_method!(absorb_secret, Secret, Secret);
1865    impl_absorb_method!(absorb_connection, Connection, Connection);
1866
1867    impl_absorb_method!(absorb_cluster, Cluster, Cluster);
1868    impl_absorb_method!(absorb_cluster_replica, ClusterReplica, ClusterReplica);
1869}
1870
1871#[cfg(test)]
1872mod tests {
1873    use super::*;
1874    use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1875    use mz_sql::names::ResolvedIds;
1876    use std::collections::BTreeMap;
1877
1878    fn create_test_table(name: &str) -> Table {
1879        Table {
1880            desc: VersionedRelationDesc::new(
1881                RelationDesc::builder()
1882                    .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1883                    .finish(),
1884            ),
1885            create_sql: None,
1886            collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1887            conn_id: None,
1888            resolved_ids: ResolvedIds::empty(),
1889            custom_logical_compaction_window: None,
1890            is_retained_metrics_object: false,
1891            data_source: TableDataSource::TableWrites { defaults: vec![] },
1892        }
1893    }
1894
1895    #[mz_ore::test]
1896    fn test_item_state_transitions() {
1897        // Test None -> Added
1898        let mut state = CatalogImplicationKind::None;
1899        assert!(
1900            state
1901                .transition("item1".to_string(), None, StateDiff::Addition)
1902                .is_ok()
1903        );
1904        assert!(matches!(state, CatalogImplicationKind::Added(_)));
1905
1906        // Test Added -> Altered (via retraction)
1907        let mut state = CatalogImplicationKind::Added("new_item".to_string());
1908        assert!(
1909            state
1910                .transition("old_item".to_string(), None, StateDiff::Retraction)
1911                .is_ok()
1912        );
1913        match &state {
1914            CatalogImplicationKind::Altered { prev, new } => {
1915                // The retracted item is the OLD state
1916                assert_eq!(prev, "old_item");
1917                // The existing Added item is the NEW state
1918                assert_eq!(new, "new_item");
1919            }
1920            _ => panic!("Expected Altered state"),
1921        }
1922
1923        // Test None -> Dropped
1924        let mut state = CatalogImplicationKind::None;
1925        assert!(
1926            state
1927                .transition(
1928                    "item1".to_string(),
1929                    Some("test_name".to_string()),
1930                    StateDiff::Retraction
1931                )
1932                .is_ok()
1933        );
1934        assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1935
1936        // Test Dropped -> Altered (via addition)
1937        let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1938        assert!(
1939            state
1940                .transition("new_item".to_string(), None, StateDiff::Addition)
1941                .is_ok()
1942        );
1943        match &state {
1944            CatalogImplicationKind::Altered { prev, new } => {
1945                // The existing Dropped item is the OLD state
1946                assert_eq!(prev, "old_item");
1947                // The added item is the NEW state
1948                assert_eq!(new, "new_item");
1949            }
1950            _ => panic!("Expected Altered state"),
1951        }
1952
1953        // Test invalid transitions
1954        let mut state = CatalogImplicationKind::Added("item".to_string());
1955        assert!(
1956            state
1957                .transition("item2".to_string(), None, StateDiff::Addition)
1958                .is_err()
1959        );
1960
1961        let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1962        assert!(
1963            state
1964                .transition("item2".to_string(), None, StateDiff::Retraction)
1965                .is_err()
1966        );
1967    }
1968
1969    #[mz_ore::test]
1970    fn test_table_absorb_state_machine() {
1971        let table1 = create_test_table("table1");
1972        let table2 = create_test_table("table2");
1973
1974        // Test None -> AddTable
1975        let mut cmd = CatalogImplication::None;
1976        cmd.absorb_table(
1977            table1.clone(),
1978            Some("schema.table1".to_string()),
1979            StateDiff::Addition,
1980        );
1981        // Check that we have an Added state
1982        match &cmd {
1983            CatalogImplication::Table(state) => match state {
1984                CatalogImplicationKind::Added(t) => {
1985                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1986                }
1987                _ => panic!("Expected Added state"),
1988            },
1989            _ => panic!("Expected Table command"),
1990        }
1991
1992        // Test AddTable -> AlterTable (via retraction)
1993        // This tests the bug fix: when we have AddTable(table1) and receive Retraction(table2),
1994        // table2 is the old state being removed, table1 is the new state
1995        cmd.absorb_table(
1996            table2.clone(),
1997            Some("schema.table2".to_string()),
1998            StateDiff::Retraction,
1999        );
2000        match &cmd {
2001            CatalogImplication::Table(state) => match state {
2002                CatalogImplicationKind::Altered { prev, new } => {
2003                    // Verify the fix: prev should be the retracted table, new should be the added table
2004                    assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
2005                    assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
2006                }
2007                _ => panic!("Expected Altered state"),
2008            },
2009            _ => panic!("Expected Table command"),
2010        }
2011
2012        // Test None -> DropTable
2013        let mut cmd = CatalogImplication::None;
2014        cmd.absorb_table(
2015            table1.clone(),
2016            Some("schema.table1".to_string()),
2017            StateDiff::Retraction,
2018        );
2019        match &cmd {
2020            CatalogImplication::Table(state) => match state {
2021                CatalogImplicationKind::Dropped(t, name) => {
2022                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
2023                    assert_eq!(name, "schema.table1");
2024                }
2025                _ => panic!("Expected Dropped state"),
2026            },
2027            _ => panic!("Expected Table command"),
2028        }
2029
2030        // Test DropTable -> AlterTable (via addition)
2031        cmd.absorb_table(
2032            table2.clone(),
2033            Some("schema.table2".to_string()),
2034            StateDiff::Addition,
2035        );
2036        match &cmd {
2037            CatalogImplication::Table(state) => match state {
2038                CatalogImplicationKind::Altered { prev, new } => {
2039                    // prev should be the dropped table, new should be the added table
2040                    assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
2041                    assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
2042                }
2043                _ => panic!("Expected Altered state"),
2044            },
2045            _ => panic!("Expected Table command"),
2046        }
2047    }
2048
2049    #[mz_ore::test]
2050    #[should_panic(expected = "Cannot add an already added object")]
2051    fn test_invalid_double_add() {
2052        let table = create_test_table("table");
2053        let mut cmd = CatalogImplication::None;
2054
2055        // First addition
2056        cmd.absorb_table(
2057            table.clone(),
2058            Some("schema.table".to_string()),
2059            StateDiff::Addition,
2060        );
2061
2062        // Second addition should panic
2063        cmd.absorb_table(
2064            table.clone(),
2065            Some("schema.table".to_string()),
2066            StateDiff::Addition,
2067        );
2068    }
2069
2070    #[mz_ore::test]
2071    #[should_panic(expected = "Cannot drop an already dropped object")]
2072    fn test_invalid_double_drop() {
2073        let table = create_test_table("table");
2074        let mut cmd = CatalogImplication::None;
2075
2076        // First drop
2077        cmd.absorb_table(
2078            table.clone(),
2079            Some("schema.table".to_string()),
2080            StateDiff::Retraction,
2081        );
2082
2083        // Second drop should panic
2084        cmd.absorb_table(
2085            table.clone(),
2086            Some("schema.table".to_string()),
2087            StateDiff::Retraction,
2088        );
2089    }
2090}