mz_adapter/coord/
catalog_implications.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to deriving and applying implications from [catalog
11//! changes](ParsedStateUpdate).
12//!
13//! The flow from "raw" catalog changes to [CatalogImplication] works like this:
14//!
15//! StateUpdateKind -> ParsedStateUpdate -> CatalogImplication
16//!
17//! [ParsedStateUpdate] adds context to a "raw" catalog change
18//! ([StateUpdateKind](mz_catalog::memory::objects::StateUpdateKind)). It
19//! includes an in-memory representation of the updated object, which can in
20//! theory be derived from the raw change but only when we have access to all
21//! the other raw changes or to an in-memory Catalog, which represents a
22//! "rollup" of all the raw changes.
23//!
24//! [CatalogImplication] is both the state machine that we use for absorbing
25//! multiple state updates for the same object and the final command that has to
26//! be applied to in-memory state and or the controller(s) after absorbing all
27//! the state updates in a given batch of updates.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::builtin;
37use mz_catalog::memory::objects::{
38    CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
39    MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
40};
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::IntoInlineConnection;
55use mz_storage_types::sources::{GenericSourceConnection, SourceExport};
56use tracing::{Instrument, info_span, warn};
57
58use crate::active_compute_sink::ActiveComputeSinkRetireReason;
59use crate::coord::Coordinator;
60use crate::coord::catalog_implications::parsed_state_updates::{
61    ParsedStateUpdate, ParsedStateUpdateKind,
62};
63use crate::coord::statement_logging::StatementLoggingId;
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::StatementEndedExecutionReason;
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71    /// Applies implications from the given bucket of [ParsedStateUpdate] to our
72    /// in-memory state and our controllers. This also applies transitive
73    /// implications, for example, peeks and subscribes will be cancelled when
74    /// referenced objects are dropped.
75    ///
76    /// This _requires_ that the given updates are consolidated. There must be
77    /// at most one addition and/or one retraction for a given item, as
78    /// identified by that items ID type.
79    #[instrument(level = "debug")]
80    pub async fn apply_catalog_implications(
81        &mut self,
82        ctx: Option<&mut ExecuteContext>,
83        catalog_updates: Vec<ParsedStateUpdate>,
84    ) -> Result<(), AdapterError> {
85        let start = Instant::now();
86
87        let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88        let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89        let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90            BTreeMap::new();
91
92        for update in catalog_updates {
93            tracing::trace!(?update, "got parsed state update");
94            match &update.kind {
95                ParsedStateUpdateKind::Item {
96                    durable_item,
97                    parsed_item: _,
98                    connection: _,
99                    parsed_full_name: _,
100                } => {
101                    let entry = catalog_implications
102                        .entry(durable_item.id.clone())
103                        .or_insert_with(|| CatalogImplication::None);
104                    entry.absorb(update);
105                }
106                ParsedStateUpdateKind::TemporaryItem {
107                    durable_item,
108                    parsed_item: _,
109                    connection: _,
110                    parsed_full_name: _,
111                } => {
112                    let entry = catalog_implications
113                        .entry(durable_item.id.clone())
114                        .or_insert_with(|| CatalogImplication::None);
115                    entry.absorb(update);
116                }
117                ParsedStateUpdateKind::Cluster {
118                    durable_cluster,
119                    parsed_cluster: _,
120                } => {
121                    let entry = cluster_commands
122                        .entry(durable_cluster.id)
123                        .or_insert_with(|| CatalogImplication::None);
124                    entry.absorb(update.clone());
125                }
126                ParsedStateUpdateKind::ClusterReplica {
127                    durable_cluster_replica,
128                    parsed_cluster_replica: _,
129                } => {
130                    let entry = cluster_replica_commands
131                        .entry((
132                            durable_cluster_replica.cluster_id,
133                            durable_cluster_replica.replica_id,
134                        ))
135                        .or_insert_with(|| CatalogImplication::None);
136                    entry.absorb(update.clone());
137                }
138            }
139        }
140
141        self.apply_catalog_implications_inner(
142            ctx,
143            catalog_implications.into_iter().collect_vec(),
144            cluster_commands.into_iter().collect_vec(),
145            cluster_replica_commands.into_iter().collect_vec(),
146        )
147        .await?;
148
149        self.metrics
150            .apply_catalog_implications_seconds
151            .observe(start.elapsed().as_secs_f64());
152
153        Ok(())
154    }
155
156    #[instrument(level = "debug")]
157    async fn apply_catalog_implications_inner(
158        &mut self,
159        mut ctx: Option<&mut ExecuteContext>,
160        implications: Vec<(CatalogItemId, CatalogImplication)>,
161        cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162        cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163    ) -> Result<(), AdapterError> {
164        let mut tables_to_drop = BTreeSet::new();
165        let mut sources_to_drop = vec![];
166        let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167        let mut storage_sink_gids_to_drop = vec![];
168        let mut compute_gids_to_drop = vec![];
169        let mut view_gids_to_drop = vec![];
170        let mut secrets_to_drop = vec![];
171        let mut vpc_endpoints_to_drop = vec![];
172        let mut clusters_to_drop = vec![];
173        let mut cluster_replicas_to_drop = vec![];
174        let mut compute_sinks_to_drop = BTreeMap::new();
175        let mut peeks_to_drop = vec![];
176        let mut copies_to_drop = vec![];
177
178        // Maps for storing names of dropped objects for error messages.
179        let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
180        let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
181
182        // Separate collections for tables (which need write timestamps) and
183        // sources (which don't).
184        let mut table_collections_to_create = BTreeMap::new();
185        let mut source_collections_to_create = BTreeMap::new();
186        let mut storage_policies_to_initialize = BTreeMap::new();
187        let mut execution_timestamps_to_set = BTreeSet::new();
188
189        // Replacing a materialized view causes the replacement's catalog entry
190        // to be dropped. Its compute and storage collections are transferred to
191        // the target MV though, so we must make sure not to drop those.
192        let mut replacement_gids = vec![];
193
194        // We're incrementally migrating the code that manipulates the
195        // controller from closures in the sequencer. For some types of catalog
196        // changes we haven't done this migration yet, so there you will see
197        // just a log message. Over the next couple of PRs all of these will go
198        // away.
199
200        for (catalog_id, implication) in implications {
201            tracing::trace!(?implication, "have to apply catalog implication");
202
203            match implication {
204                CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
205                    self.handle_create_table(
206                        &mut ctx,
207                        &mut table_collections_to_create,
208                        &mut storage_policies_to_initialize,
209                        &mut execution_timestamps_to_set,
210                        catalog_id,
211                        table.clone(),
212                    )
213                    .await?
214                }
215                CatalogImplication::Table(CatalogImplicationKind::Altered {
216                    prev: prev_table,
217                    new: new_table,
218                }) => self.handle_alter_table(prev_table, new_table).await?,
219
220                CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
221                    let global_ids = table.global_ids();
222                    for global_id in global_ids {
223                        tables_to_drop.insert((catalog_id, global_id));
224                        dropped_item_names.insert(global_id, full_name.clone());
225                    }
226                }
227                CatalogImplication::Source(CatalogImplicationKind::Added((
228                    source,
229                    _connection,
230                ))) => {
231                    // Get the compaction windows for all sources with this
232                    // catalog_id This replicates the logic from
233                    // sequence_create_source where it collects all item_ids and
234                    // gets their compaction windows
235                    let compaction_windows = self
236                        .catalog()
237                        .state()
238                        .source_compaction_windows(vec![catalog_id]);
239
240                    self.handle_create_source(
241                        &mut source_collections_to_create,
242                        &mut storage_policies_to_initialize,
243                        catalog_id,
244                        source,
245                        compaction_windows,
246                    )
247                    .await?
248                }
249                CatalogImplication::Source(CatalogImplicationKind::Altered {
250                    prev: (prev_source, _prev_connection),
251                    new: (new_source, _new_connection),
252                }) => {
253                    tracing::debug!(
254                        ?prev_source,
255                        ?new_source,
256                        "not handling AlterSource in here yet"
257                    );
258                }
259                CatalogImplication::Source(CatalogImplicationKind::Dropped(
260                    (source, connection),
261                    full_name,
262                )) => {
263                    let global_id = source.global_id();
264                    sources_to_drop.push((catalog_id, global_id));
265                    dropped_item_names.insert(global_id, full_name);
266
267                    if let DataSourceDesc::Ingestion { desc, .. }
268                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
269                    {
270                        match &desc.connection {
271                            GenericSourceConnection::Postgres(_referenced_conn) => {
272                                let inline_conn = connection.expect("missing inlined connection");
273
274                                let pg_conn = match inline_conn {
275                                    GenericSourceConnection::Postgres(pg_conn) => pg_conn,
276                                    other => {
277                                        panic!("expected postgres connection, got: {:?}", other)
278                                    }
279                                };
280                                let pending_drop = (
281                                    pg_conn.connection.clone(),
282                                    pg_conn.publication_details.slot.clone(),
283                                );
284                                replication_slots_to_drop.push(pending_drop);
285                            }
286                            _ => {}
287                        }
288                    }
289                }
290                CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
291                    tracing::debug!(?sink, "not handling AddSink in here yet");
292                }
293                CatalogImplication::Sink(CatalogImplicationKind::Altered {
294                    prev: prev_sink,
295                    new: new_sink,
296                }) => {
297                    tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
298                }
299                CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
300                    storage_sink_gids_to_drop.push(sink.global_id());
301                    dropped_item_names.insert(sink.global_id(), full_name);
302                }
303                CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
304                    tracing::debug!(?index, "not handling AddIndex in here yet");
305                }
306                CatalogImplication::Index(CatalogImplicationKind::Altered {
307                    prev: prev_index,
308                    new: new_index,
309                }) => {
310                    tracing::debug!(
311                        ?prev_index,
312                        ?new_index,
313                        "not handling AlterIndex in here yet"
314                    );
315                }
316                CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
317                    compute_gids_to_drop.push((index.cluster_id, index.global_id()));
318                    dropped_item_names.insert(index.global_id(), full_name);
319                }
320                CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
321                    tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
322                }
323                CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
324                    prev: prev_mv,
325                    new: new_mv,
326                }) => {
327                    let old_gid = prev_mv.global_id_writes();
328                    let new_gid = new_mv.global_id_writes();
329                    if new_gid != old_gid {
330                        replacement_gids.push((new_mv.cluster_id, new_gid));
331                    }
332
333                    self.handle_alter_materialized_view(prev_mv, new_mv)?;
334                }
335                CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
336                    mv,
337                    full_name,
338                )) => {
339                    compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
340                    sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
341                    dropped_item_names.insert(mv.global_id_writes(), full_name);
342                }
343                CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
344                    tracing::debug!(?view, "not handling AddView in here yet");
345                }
346                CatalogImplication::View(CatalogImplicationKind::Altered {
347                    prev: prev_view,
348                    new: new_view,
349                }) => {
350                    tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
351                }
352                CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
353                    view_gids_to_drop.push(view.global_id());
354                    dropped_item_names.insert(view.global_id(), full_name);
355                }
356                CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
357                    tracing::debug!(?ct, "not handling AddContinualTask in here yet");
358                }
359                CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
360                    prev: prev_ct,
361                    new: new_ct,
362                }) => {
363                    tracing::debug!(
364                        ?prev_ct,
365                        ?new_ct,
366                        "not handling AlterContinualTask in here yet"
367                    );
368                }
369                CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
370                    ct,
371                    _full_name,
372                )) => {
373                    compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
374                    sources_to_drop.push((catalog_id, ct.global_id()));
375                }
376                CatalogImplication::Secret(CatalogImplicationKind::Added(secret)) => {
377                    tracing::debug!(?secret, "not handling AddSecret in here yet");
378                }
379                CatalogImplication::Secret(CatalogImplicationKind::Altered {
380                    prev: prev_secret,
381                    new: new_secret,
382                }) => {
383                    tracing::debug!(
384                        ?prev_secret,
385                        ?new_secret,
386                        "not handling AlterSecret in here yet"
387                    );
388                }
389                CatalogImplication::Secret(CatalogImplicationKind::Dropped(
390                    _secret,
391                    _full_name,
392                )) => {
393                    secrets_to_drop.push(catalog_id);
394                }
395                CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
396                    tracing::debug!(?connection, "not handling AddConnection in here yet");
397                }
398                CatalogImplication::Connection(CatalogImplicationKind::Altered {
399                    prev: prev_connection,
400                    new: new_connection,
401                }) => {
402                    tracing::debug!(
403                        ?prev_connection,
404                        ?new_connection,
405                        "not handling AlterConnection in here yet"
406                    );
407                }
408                CatalogImplication::Connection(CatalogImplicationKind::Dropped(
409                    connection,
410                    _full_name,
411                )) => {
412                    match &connection.details {
413                        // SSH connections have an associated secret that should be dropped
414                        ConnectionDetails::Ssh { .. } => {
415                            secrets_to_drop.push(catalog_id);
416                        }
417                        // AWS PrivateLink connections have an associated
418                        // VpcEndpoint K8S resource that should be dropped
419                        ConnectionDetails::AwsPrivatelink(_) => {
420                            vpc_endpoints_to_drop.push(catalog_id);
421                        }
422                        _ => (),
423                    }
424                }
425                CatalogImplication::None => {
426                    // Nothing to do for None commands
427                }
428                CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
429                    unreachable!("clusters and cluster replicas are handled below")
430                }
431                CatalogImplication::Table(CatalogImplicationKind::None)
432                | CatalogImplication::Source(CatalogImplicationKind::None)
433                | CatalogImplication::Sink(CatalogImplicationKind::None)
434                | CatalogImplication::Index(CatalogImplicationKind::None)
435                | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
436                | CatalogImplication::View(CatalogImplicationKind::None)
437                | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
438                | CatalogImplication::Secret(CatalogImplicationKind::None)
439                | CatalogImplication::Connection(CatalogImplicationKind::None) => {
440                    unreachable!("will never leave None in place");
441                }
442            }
443        }
444
445        for (cluster_id, command) in cluster_commands {
446            tracing::trace!(?command, "have cluster command to apply!");
447
448            match command {
449                CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
450                    tracing::debug!(?cluster, "not handling AddCluster in here yet");
451                }
452                CatalogImplication::Cluster(CatalogImplicationKind::Altered {
453                    prev: prev_cluster,
454                    new: new_cluster,
455                }) => {
456                    tracing::debug!(
457                        ?prev_cluster,
458                        ?new_cluster,
459                        "not handling AlterCluster in here yet"
460                    );
461                }
462                CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
463                    cluster,
464                    _full_name,
465                )) => {
466                    clusters_to_drop.push(cluster_id);
467                    dropped_cluster_names.insert(cluster_id, cluster.name);
468                }
469                CatalogImplication::Cluster(CatalogImplicationKind::None) => {
470                    unreachable!("will never leave None in place");
471                }
472                command => {
473                    unreachable!(
474                        "we only handle cluster commands in this map, got: {:?}",
475                        command
476                    );
477                }
478            }
479        }
480
481        for ((cluster_id, replica_id), command) in cluster_replica_commands {
482            tracing::trace!(?command, "have cluster replica command to apply!");
483
484            match command {
485                CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
486                    tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
487                }
488                CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
489                    prev: prev_replica,
490                    new: new_replica,
491                }) => {
492                    tracing::debug!(
493                        ?prev_replica,
494                        ?new_replica,
495                        "not handling AlterClusterReplica in here yet"
496                    );
497                }
498                CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
499                    _replica,
500                    _full_name,
501                )) => {
502                    cluster_replicas_to_drop.push((cluster_id, replica_id));
503                }
504                CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
505                    unreachable!("will never leave None in place");
506                }
507                command => {
508                    unreachable!(
509                        "we only handle cluster replica commands in this map, got: {:?}",
510                        command
511                    );
512                }
513            }
514        }
515
516        // Cancel out drops against replacements.
517        for (cluster_id, gid) in replacement_gids {
518            sources_to_drop.retain(|(_, id)| *id != gid);
519            compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
520        }
521
522        if !source_collections_to_create.is_empty() {
523            self.create_source_collections(source_collections_to_create)
524                .await?;
525        }
526
527        // Have to create sources first and then tables, because tables within
528        // one transaction can depend on sources.
529        if !table_collections_to_create.is_empty() {
530            self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
531                .await?;
532        }
533        // It is _very_ important that we only initialize read policies after we
534        // have created all the sources/collections. Some of the sources created
535        // in this collection might have dependencies on other sources, so the
536        // controller must get a chance to install read holds before we set a
537        // policy that might make the since advance.
538        self.initialize_storage_collections(storage_policies_to_initialize)
539            .await?;
540
541        let collections_to_drop: BTreeSet<_> = sources_to_drop
542            .iter()
543            .map(|(_, gid)| *gid)
544            .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
545            .chain(storage_sink_gids_to_drop.iter().copied())
546            .chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
547            .chain(view_gids_to_drop.iter().copied())
548            .collect();
549
550        // Clean up any active compute sinks like subscribes or copy to-s that
551        // rely on dropped relations or clusters.
552        for (sink_id, sink) in &self.active_compute_sinks {
553            let cluster_id = sink.cluster_id();
554            if let Some(id) = sink
555                .depends_on()
556                .iter()
557                .find(|id| collections_to_drop.contains(id))
558            {
559                let name = dropped_item_names
560                    .get(id)
561                    .map(|n| format!("relation {}", n.quoted()))
562                    .expect("missing relation name");
563                compute_sinks_to_drop.insert(
564                    *sink_id,
565                    ActiveComputeSinkRetireReason::DependencyDropped(name),
566                );
567            } else if clusters_to_drop.contains(&cluster_id) {
568                let name = dropped_cluster_names
569                    .get(&cluster_id)
570                    .map(|n| format!("cluster {}", n.quoted()))
571                    .expect("missing cluster name");
572                compute_sinks_to_drop.insert(
573                    *sink_id,
574                    ActiveComputeSinkRetireReason::DependencyDropped(name),
575                );
576            }
577        }
578
579        // Clean up any pending peeks that rely on dropped relations or clusters.
580        for (uuid, pending_peek) in &self.pending_peeks {
581            if let Some(id) = pending_peek
582                .depends_on
583                .iter()
584                .find(|id| collections_to_drop.contains(id))
585            {
586                let name = dropped_item_names
587                    .get(id)
588                    .map(|n| format!("relation {}", n.quoted()))
589                    .expect("missing relation name");
590                peeks_to_drop.push((name, uuid.clone()));
591            } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
592                let name = dropped_cluster_names
593                    .get(&pending_peek.cluster_id)
594                    .map(|n| format!("cluster {}", n.quoted()))
595                    .expect("missing cluster name");
596                peeks_to_drop.push((name, uuid.clone()));
597            }
598        }
599
600        // Clean up any pending `COPY` statements that rely on dropped relations or clusters.
601        for (conn_id, pending_copy) in &self.active_copies {
602            let dropping_table = tables_to_drop
603                .iter()
604                .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
605            let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
606
607            if dropping_table || dropping_cluster {
608                copies_to_drop.push(conn_id.clone());
609            }
610        }
611
612        let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
613            .iter()
614            .map(|(_id, gid)| gid)
615            .chain(storage_sink_gids_to_drop.iter())
616            .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
617            .copied()
618            .collect();
619
620        // Gather resources that we have to remove from timeline state and
621        // pre-check if any Timelines become empty, when we drop the specified
622        // storage and compute resources.
623        //
624        // Note: We only apply these changes below.
625        let mut timeline_id_bundles = BTreeMap::new();
626
627        for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
628            let mut id_bundle = CollectionIdBundle::default();
629
630            for storage_id in read_holds.storage_ids() {
631                if storage_gids_to_drop.contains(&storage_id) {
632                    id_bundle.storage_ids.insert(storage_id);
633                }
634            }
635
636            for (instance_id, id) in read_holds.compute_ids() {
637                if compute_gids_to_drop.contains(&(instance_id, id))
638                    || clusters_to_drop.contains(&instance_id)
639                {
640                    id_bundle
641                        .compute_ids
642                        .entry(instance_id)
643                        .or_default()
644                        .insert(id);
645                }
646            }
647
648            timeline_id_bundles.insert(timeline.clone(), id_bundle);
649        }
650
651        let mut timeline_associations = BTreeMap::new();
652        for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
653            let TimelineState { read_holds, .. } = self
654                .global_timelines
655                .get(&timeline)
656                .expect("all timelines have a timestamp oracle");
657
658            let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
659            timeline_associations.insert(timeline, (empty, id_bundle));
660        }
661
662        // No error returns are allowed after this point. Enforce this at compile time
663        // by using this odd structure so we don't accidentally add a stray `?`.
664        let _: () = async {
665            if !timeline_associations.is_empty() {
666                for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
667                    let became_empty =
668                        self.remove_resources_associated_with_timeline(timeline, id_bundle);
669                    assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
670                }
671            }
672
673            // Note that we drop tables before sources since there can be a weak
674            // dependency on sources from tables in the storage controller that
675            // will result in error logging that we'd prefer to avoid. This
676            // isn't an actual dependency issue but we'd like to keep that error
677            // logging around to indicate when an actual dependency error might
678            // occur.
679            if !tables_to_drop.is_empty() {
680                let ts = self.get_local_write_ts().await;
681                self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
682            }
683
684            if !sources_to_drop.is_empty() {
685                self.drop_sources(sources_to_drop);
686            }
687
688            if !storage_sink_gids_to_drop.is_empty() {
689                self.drop_storage_sinks(storage_sink_gids_to_drop);
690            }
691
692            if !compute_sinks_to_drop.is_empty() {
693                self.retire_compute_sinks(compute_sinks_to_drop).await;
694            }
695
696            if !peeks_to_drop.is_empty() {
697                for (dropped_name, uuid) in peeks_to_drop {
698                    if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
699                        let cancel_reason = PeekResponse::Error(format!(
700                            "query could not complete because {dropped_name} was dropped"
701                        ));
702                        self.controller
703                            .compute
704                            .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
705                            .unwrap_or_terminate("unable to cancel peek");
706                        self.retire_execution(
707                            StatementEndedExecutionReason::Canceled,
708                            pending_peek.ctx_extra,
709                        );
710                    }
711                }
712            }
713
714            if !copies_to_drop.is_empty() {
715                for conn_id in copies_to_drop {
716                    self.cancel_pending_copy(&conn_id);
717                }
718            }
719
720            if !compute_gids_to_drop.is_empty() {
721                self.drop_compute_collections(compute_gids_to_drop);
722            }
723
724            if !vpc_endpoints_to_drop.is_empty() {
725                self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
726            }
727
728            if !cluster_replicas_to_drop.is_empty() {
729                fail::fail_point!("after_catalog_drop_replica");
730
731                for (cluster_id, replica_id) in cluster_replicas_to_drop {
732                    self.drop_replica(cluster_id, replica_id);
733                }
734            }
735            if !clusters_to_drop.is_empty() {
736                for cluster_id in clusters_to_drop {
737                    self.controller.drop_cluster(cluster_id);
738                }
739            }
740
741            // We don't want to block the main coordinator thread on cleaning
742            // up external resources (PostgreSQL replication slots and secrets),
743            // so we perform that cleanup in a background task.
744            //
745            // TODO(14551): This is inherently best effort. An ill-timed crash
746            // means we'll never clean these resources up. Safer cleanup for non-Materialize resources.
747            // See <https://github.com/MaterializeInc/materialize/issues/14551>
748            task::spawn(|| "drop_replication_slots_and_secrets", {
749                let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
750                let secrets_controller = Arc::clone(&self.secrets_controller);
751                let secrets_reader = Arc::clone(self.secrets_reader());
752                let storage_config = self.controller.storage.config().clone();
753
754                async move {
755                    for (connection, replication_slot_name) in replication_slots_to_drop {
756                        tracing::info!(?replication_slot_name, "dropping replication slot");
757
758                        // Try to drop the replication slots, but give up after
759                        // a while. The PostgreSQL server may no longer be
760                        // healthy. Users often drop PostgreSQL sources
761                        // *because* the PostgreSQL server has been
762                        // decomissioned.
763                        let result: Result<(), anyhow::Error> = Retry::default()
764                            .max_duration(Duration::from_secs(60))
765                            .retry_async(|_state| async {
766                                let config = connection
767                                    .config(&secrets_reader, &storage_config, InTask::No)
768                                    .await
769                                    .map_err(|e| {
770                                        anyhow::anyhow!(
771                                            "error creating Postgres client for \
772                                            dropping acquired slots: {}",
773                                            e.display_with_causes()
774                                        )
775                                    })?;
776
777                                mz_postgres_util::drop_replication_slots(
778                                    &ssh_tunnel_manager,
779                                    config.clone(),
780                                    &[(&replication_slot_name, true)],
781                                )
782                                .await?;
783
784                                Ok(())
785                            })
786                            .await;
787
788                        if let Err(err) = result {
789                            tracing::warn!(
790                                ?replication_slot_name,
791                                ?err,
792                                "failed to drop replication slot"
793                            );
794                        }
795                    }
796
797                    // Drop secrets *after* dropping the replication slots,
798                    // because dropping replication slots may rely on those
799                    // secrets still being present.
800                    //
801                    // It's okay if we crash before processing the secret drops,
802                    // as we look for and remove any orphaned secrets during
803                    // startup.
804                    fail_point!("drop_secrets");
805                    for secret in secrets_to_drop {
806                        if let Err(e) = secrets_controller.delete(secret).await {
807                            warn!("Dropping secrets has encountered an error: {}", e);
808                        }
809                    }
810                }
811            });
812        }
813        .instrument(info_span!(
814            "coord::apply_catalog_implications_inner::finalize"
815        ))
816        .await;
817
818        Ok(())
819    }
820
821    #[instrument(level = "debug")]
822    async fn create_table_collections(
823        &mut self,
824        table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
825        execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
826    ) -> Result<(), AdapterError> {
827        // If we have tables, determine the initial validity for the table.
828        let register_ts = self.get_local_write_ts().await.timestamp;
829
830        // After acquiring `register_ts` but before using it, we need to
831        // be sure we're still the leader. Otherwise a new generation
832        // may also be trying to use `register_ts` for a different
833        // purpose.
834        //
835        // See #28216.
836        self.catalog
837            .confirm_leadership()
838            .await
839            .unwrap_or_terminate("unable to confirm leadership");
840
841        for id in execution_timestamps_to_set {
842            self.set_statement_execution_timestamp(id, register_ts);
843        }
844
845        let storage_metadata = self.catalog.state().storage_metadata();
846
847        self.controller
848            .storage
849            .create_collections(
850                storage_metadata,
851                Some(register_ts),
852                table_collections_to_create.into_iter().collect_vec(),
853            )
854            .await
855            .unwrap_or_terminate("cannot fail to create collections");
856
857        self.apply_local_write(register_ts).await;
858
859        Ok(())
860    }
861
862    #[instrument(level = "debug")]
863    async fn create_source_collections(
864        &mut self,
865        source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
866    ) -> Result<(), AdapterError> {
867        let storage_metadata = self.catalog.state().storage_metadata();
868
869        self.controller
870            .storage
871            .create_collections(
872                storage_metadata,
873                None, // Sources don't need a write timestamp
874                source_collections_to_create.into_iter().collect_vec(),
875            )
876            .await
877            .unwrap_or_terminate("cannot fail to create collections");
878
879        Ok(())
880    }
881
882    #[instrument(level = "debug")]
883    async fn initialize_storage_collections(
884        &mut self,
885        storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
886    ) -> Result<(), AdapterError> {
887        for (compaction_window, global_ids) in storage_policies_to_initialize {
888            self.initialize_read_policies(
889                &CollectionIdBundle {
890                    storage_ids: global_ids,
891                    compute_ids: BTreeMap::new(),
892                },
893                compaction_window,
894            )
895            .await;
896        }
897
898        Ok(())
899    }
900
901    #[instrument(level = "debug")]
902    async fn handle_create_table(
903        &mut self,
904        ctx: &mut Option<&mut ExecuteContext>,
905        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
906        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
907        execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
908        table_id: CatalogItemId,
909        table: Table,
910    ) -> Result<(), AdapterError> {
911        // The table data_source determines whether this table will be written to
912        // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
913        // (e.g. a source-fed table).
914        match &table.data_source {
915            TableDataSource::TableWrites { defaults: _ } => {
916                let versions: BTreeMap<_, _> = table
917                    .collection_descs()
918                    .map(|(gid, version, desc)| (version, (gid, desc)))
919                    .collect();
920                let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
921                    let collection_desc = CollectionDescription::for_table(desc.clone());
922
923                    (*gid, collection_desc)
924                });
925
926                let compaction_window = table
927                    .custom_logical_compaction_window
928                    .unwrap_or(CompactionWindow::Default);
929                let ids_to_initialize = storage_policies_to_initialize
930                    .entry(compaction_window)
931                    .or_default();
932
933                for (gid, collection_desc) in collection_descs {
934                    storage_collections_to_create.insert(gid, collection_desc);
935                    ids_to_initialize.insert(gid);
936                }
937
938                if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
939                    execution_timestamps_to_set.insert(id);
940                }
941            }
942            TableDataSource::DataSource {
943                desc: data_source_desc,
944                timeline,
945            } => {
946                match data_source_desc {
947                    DataSourceDesc::IngestionExport {
948                        ingestion_id,
949                        external_reference: _,
950                        details,
951                        data_config,
952                    } => {
953                        // TODO: It's a little weird that a table will be present in this
954                        // source status collection, we might want to split out into a separate
955                        // status collection.
956                        let status_collection_id = self
957                            .catalog()
958                            .resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
959
960                        let global_ingestion_id =
961                            self.catalog().get_entry(ingestion_id).latest_global_id();
962                        let global_status_collection_id = self
963                            .catalog()
964                            .get_entry(&status_collection_id)
965                            .latest_global_id();
966
967                        let collection_desc = CollectionDescription::<Timestamp> {
968                            desc: table.desc.latest(),
969                            data_source: DataSource::IngestionExport {
970                                ingestion_id: global_ingestion_id,
971                                details: details.clone(),
972                                data_config: data_config
973                                    .clone()
974                                    .into_inline_connection(self.catalog.state()),
975                            },
976                            since: None,
977                            status_collection_id: Some(global_status_collection_id),
978                            timeline: Some(timeline.clone()),
979                            primary: None,
980                        };
981
982                        let global_id = table
983                            .global_ids()
984                            .expect_element(|| "subsources cannot have multiple versions");
985
986                        storage_collections_to_create.insert(global_id, collection_desc);
987
988                        let read_policies = self
989                            .catalog()
990                            .state()
991                            .source_compaction_windows(vec![table_id]);
992                        for (compaction_window, catalog_ids) in read_policies {
993                            let compaction_ids = storage_policies_to_initialize
994                                .entry(compaction_window)
995                                .or_default();
996
997                            let gids = catalog_ids
998                                .into_iter()
999                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1000                                .flatten();
1001                            compaction_ids.extend(gids);
1002                        }
1003                    }
1004                    DataSourceDesc::Webhook {
1005                        validate_using: _,
1006                        body_format: _,
1007                        headers: _,
1008                        cluster_id: _,
1009                    } => {
1010                        // Create the underlying collection with the latest schema from the Table.
1011                        assert_eq!(
1012                            table.desc.latest_version(),
1013                            RelationVersion::root(),
1014                            "found webhook with more than 1 relation version, {:?}",
1015                            table.desc
1016                        );
1017                        let desc = table.desc.latest();
1018
1019                        let collection_desc = CollectionDescription::<Timestamp> {
1020                            desc,
1021                            data_source: DataSource::Webhook,
1022                            since: None,
1023                            status_collection_id: None, // Webhook tables don't use status collections
1024                            timeline: Some(timeline.clone()),
1025                            primary: None,
1026                        };
1027
1028                        let global_id = table
1029                            .global_ids()
1030                            .expect_element(|| "webhooks cannot have multiple versions");
1031
1032                        storage_collections_to_create.insert(global_id, collection_desc);
1033
1034                        let read_policies = self
1035                            .catalog()
1036                            .state()
1037                            .source_compaction_windows(vec![table_id]);
1038
1039                        for (compaction_window, catalog_ids) in read_policies {
1040                            let compaction_ids = storage_policies_to_initialize
1041                                .entry(compaction_window)
1042                                .or_default();
1043
1044                            let gids = catalog_ids
1045                                .into_iter()
1046                                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1047                                .flatten();
1048                            compaction_ids.extend(gids);
1049                        }
1050                    }
1051                    _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1052                }
1053            }
1054        }
1055
1056        Ok(())
1057    }
1058
1059    #[instrument(level = "debug")]
1060    async fn handle_alter_table(
1061        &mut self,
1062        prev_table: Table,
1063        new_table: Table,
1064    ) -> Result<(), AdapterError> {
1065        let existing_gid = prev_table.global_id_writes();
1066        let new_gid = new_table.global_id_writes();
1067
1068        if existing_gid == new_gid {
1069            // It's not an ALTER TABLE as far as the controller is concerned,
1070            // because we still have the same GlobalId. This is likely a change
1071            // from an ALTER SWAP.
1072            return Ok(());
1073        }
1074
1075        // Acquire a read hold on the original table for the duration of
1076        // the alter to prevent the since of the original table from
1077        // getting advanced, while the ALTER is running.
1078        let existing_table = crate::CollectionIdBundle {
1079            storage_ids: BTreeSet::from([existing_gid]),
1080            compute_ids: BTreeMap::new(),
1081        };
1082        let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1083
1084        let expected_version = prev_table.desc.latest_version();
1085        let new_version = new_table.desc.latest_version();
1086        let new_desc = new_table
1087            .desc
1088            .at_version(RelationVersionSelector::Specific(new_version));
1089
1090        let register_ts = self.get_local_write_ts().await.timestamp;
1091
1092        // Alter the table description, creating a "new" collection.
1093        self.controller
1094            .storage
1095            .alter_table_desc(
1096                existing_gid,
1097                new_gid,
1098                new_desc,
1099                expected_version,
1100                register_ts,
1101            )
1102            .await
1103            .expect("failed to alter desc of table");
1104
1105        // Initialize the ReadPolicy which ensures we have the correct read holds.
1106        let compaction_window = new_table
1107            .custom_logical_compaction_window
1108            .unwrap_or(CompactionWindow::Default);
1109        self.initialize_read_policies(
1110            &crate::CollectionIdBundle {
1111                storage_ids: BTreeSet::from([new_gid]),
1112                compute_ids: BTreeMap::new(),
1113            },
1114            compaction_window,
1115        )
1116        .await;
1117
1118        self.apply_local_write(register_ts).await;
1119
1120        // Alter is complete! We can drop our read hold.
1121        drop(existing_table_read_hold);
1122
1123        Ok(())
1124    }
1125
1126    #[instrument(level = "debug")]
1127    fn handle_alter_materialized_view(
1128        &mut self,
1129        prev_mv: MaterializedView,
1130        new_mv: MaterializedView,
1131    ) -> Result<(), AdapterError> {
1132        let old_gid = prev_mv.global_id_writes();
1133        let new_gid = new_mv.global_id_writes();
1134
1135        if old_gid == new_gid {
1136            // It's not an ALTER MATERIALIZED VIEW as far as the controller is
1137            // concerned, because we still have the same GlobalId. This is
1138            // likely a change from an ALTER SWAP.
1139            return Ok(());
1140        }
1141
1142        // Cut over the MV computation, by shutting down the old dataflow and allowing the
1143        // new dataflow to start writing.
1144        //
1145        // Both commands are applied to their respective clusters asynchronously and they
1146        // race, so it is possible that we allow writes by the replacement before having
1147        // dropped the old dataflow. Thus there might be a period where the two dataflows
1148        // are competing in trying to commit conflicting outputs. Eventually the old
1149        // dataflow will be dropped and the replacement takes over.
1150        self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
1151        self.allow_writes(new_mv.cluster_id, new_gid);
1152
1153        Ok(())
1154    }
1155
1156    #[instrument(level = "debug")]
1157    async fn handle_create_source(
1158        &mut self,
1159        storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1160        storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1161        item_id: CatalogItemId,
1162        source: Source,
1163        compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1164    ) -> Result<(), AdapterError> {
1165        let source_status_item_id = self
1166            .catalog()
1167            .resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
1168        let source_status_collection_id = Some(
1169            self.catalog()
1170                .get_entry(&source_status_item_id)
1171                .latest_global_id(),
1172        );
1173
1174        let (data_source, status_collection_id) = match source.data_source {
1175            DataSourceDesc::Ingestion { desc, cluster_id } => {
1176                let desc = desc.into_inline_connection(self.catalog().state());
1177                let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1178
1179                let ingestion = mz_storage_types::sources::IngestionDescription::new(
1180                    desc,
1181                    cluster_id,
1182                    item_global_id,
1183                );
1184
1185                (
1186                    DataSource::Ingestion(ingestion),
1187                    source_status_collection_id,
1188                )
1189            }
1190            DataSourceDesc::OldSyntaxIngestion {
1191                desc,
1192                progress_subsource,
1193                data_config,
1194                details,
1195                cluster_id,
1196            } => {
1197                let desc = desc.into_inline_connection(self.catalog().state());
1198                let data_config = data_config.into_inline_connection(self.catalog().state());
1199
1200                // TODO(parkmycar): We should probably check the type here, but I'm not
1201                // sure if this will always be a Source or a Table.
1202                let progress_subsource = self
1203                    .catalog()
1204                    .get_entry(&progress_subsource)
1205                    .latest_global_id();
1206
1207                let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1208                    desc,
1209                    cluster_id,
1210                    progress_subsource,
1211                );
1212
1213                let legacy_export = SourceExport {
1214                    storage_metadata: (),
1215                    data_config,
1216                    details,
1217                };
1218
1219                ingestion
1220                    .source_exports
1221                    .insert(source.global_id, legacy_export);
1222
1223                (
1224                    DataSource::Ingestion(ingestion),
1225                    source_status_collection_id,
1226                )
1227            }
1228            DataSourceDesc::IngestionExport {
1229                ingestion_id,
1230                external_reference: _,
1231                details,
1232                data_config,
1233            } => {
1234                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1235                // this will always be a Source or a Table.
1236                let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1237                (
1238                    DataSource::IngestionExport {
1239                        ingestion_id,
1240                        details,
1241                        data_config: data_config.into_inline_connection(self.catalog().state()),
1242                    },
1243                    source_status_collection_id,
1244                )
1245            }
1246            DataSourceDesc::Progress => (DataSource::Progress, None),
1247            DataSourceDesc::Webhook { .. } => (DataSource::Webhook, None),
1248            DataSourceDesc::Introspection(_) => {
1249                unreachable!("cannot create sources with introspection data sources")
1250            }
1251        };
1252
1253        storage_collections_to_create.insert(
1254            source.global_id,
1255            CollectionDescription::<Timestamp> {
1256                desc: source.desc.clone(),
1257                data_source,
1258                timeline: Some(source.timeline),
1259                since: None,
1260                status_collection_id,
1261                primary: None,
1262            },
1263        );
1264
1265        // Initialize read policies for the source
1266        for (compaction_window, catalog_ids) in compaction_windows {
1267            let compaction_ids = storage_policies_to_initialize
1268                .entry(compaction_window)
1269                .or_default();
1270
1271            let gids = catalog_ids
1272                .into_iter()
1273                .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1274                .flatten();
1275            compaction_ids.extend(gids);
1276        }
1277
1278        Ok(())
1279    }
1280}
1281
1282/// A state machine for building catalog implications from catalog updates.
1283///
1284/// Once all [ParsedStateUpdate] of a timestamp are ingested this is a command
1285/// that has to potentially be applied to in-memory state and/or the
1286/// controller(s).
1287#[derive(Debug, Clone)]
1288enum CatalogImplication {
1289    None,
1290    Table(CatalogImplicationKind<Table>),
1291    Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1292    Sink(CatalogImplicationKind<Sink>),
1293    Index(CatalogImplicationKind<Index>),
1294    MaterializedView(CatalogImplicationKind<MaterializedView>),
1295    View(CatalogImplicationKind<View>),
1296    ContinualTask(CatalogImplicationKind<ContinualTask>),
1297    Secret(CatalogImplicationKind<Secret>),
1298    Connection(CatalogImplicationKind<Connection>),
1299    Cluster(CatalogImplicationKind<Cluster>),
1300    ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1301}
1302
1303#[derive(Debug, Clone)]
1304enum CatalogImplicationKind<T> {
1305    /// No operations seen yet.
1306    None,
1307    /// Item was added.
1308    Added(T),
1309    /// Item was dropped (with its name retained for error messages).
1310    Dropped(T, String),
1311    /// Item is being altered from one state to another.
1312    Altered { prev: T, new: T },
1313}
1314
1315impl<T: Clone> CatalogImplicationKind<T> {
1316    /// Apply a state transition based on a diff. Returns an error message if
1317    /// the transition is invalid.
1318    fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1319        use CatalogImplicationKind::*;
1320        use StateDiff::*;
1321
1322        let new_state = match (&*self, diff) {
1323            // Initial state transitions
1324            (None, Addition) => Added(item),
1325            (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1326
1327            // From Added state
1328            (Added(existing), Retraction) => {
1329                // Add -> Drop means the item is being altered
1330                Altered {
1331                    prev: item,
1332                    new: existing.clone(),
1333                }
1334            }
1335            (Added(_), Addition) => {
1336                return Err("Cannot add an already added object".to_string());
1337            }
1338
1339            // From Dropped state
1340            (Dropped(existing, _), Addition) => {
1341                // Drop -> Add means the item is being altered
1342                Altered {
1343                    prev: existing.clone(),
1344                    new: item,
1345                }
1346            }
1347            (Dropped(_, _), Retraction) => {
1348                return Err("Cannot drop an already dropped object".to_string());
1349            }
1350
1351            // From Altered state
1352            (Altered { .. }, _) => {
1353                return Err(format!(
1354                    "Cannot apply {:?} to an object in Altered state",
1355                    diff
1356                ));
1357            }
1358        };
1359
1360        *self = new_state;
1361        Ok(())
1362    }
1363}
1364
1365/// Macro to generate absorb methods for each item type.
1366macro_rules! impl_absorb_method {
1367    (
1368        $method_name:ident,
1369        $variant:ident,
1370        $item_type:ty
1371    ) => {
1372        fn $method_name(
1373            &mut self,
1374            item: $item_type,
1375            parsed_full_name: Option<String>,
1376            diff: StateDiff,
1377        ) {
1378            let state = match self {
1379                CatalogImplication::$variant(state) => state,
1380                CatalogImplication::None => {
1381                    *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1382                    match self {
1383                        CatalogImplication::$variant(state) => state,
1384                        _ => unreachable!(),
1385                    }
1386                }
1387                _ => {
1388                    panic!(
1389                        "Unexpected command type for {:?}: {} {:?}",
1390                        self,
1391                        stringify!($variant),
1392                        diff,
1393                    );
1394                }
1395            };
1396
1397            if let Err(e) = state.transition(item, parsed_full_name, diff) {
1398                panic!(
1399                    "Invalid state transition for {}: {}",
1400                    stringify!($variant),
1401                    e
1402                );
1403            }
1404        }
1405    };
1406}
1407
1408impl CatalogImplication {
1409    /// Absorbs the given catalog update into this [CatalogImplication], causing
1410    /// a state transition or error.
1411    fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1412        match catalog_update.kind {
1413            ParsedStateUpdateKind::Item {
1414                durable_item: _,
1415                parsed_item,
1416                connection,
1417                parsed_full_name,
1418            } => match parsed_item {
1419                CatalogItem::Table(table) => {
1420                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1421                }
1422                CatalogItem::Source(source) => {
1423                    self.absorb_source(
1424                        (source, connection),
1425                        Some(parsed_full_name),
1426                        catalog_update.diff,
1427                    );
1428                }
1429                CatalogItem::Sink(sink) => {
1430                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1431                }
1432                CatalogItem::Index(index) => {
1433                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1434                }
1435                CatalogItem::MaterializedView(mv) => {
1436                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1437                }
1438                CatalogItem::View(view) => {
1439                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1440                }
1441                CatalogItem::ContinualTask(ct) => {
1442                    self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1443                }
1444                CatalogItem::Secret(secret) => {
1445                    self.absorb_secret(secret, None, catalog_update.diff);
1446                }
1447                CatalogItem::Connection(connection) => {
1448                    self.absorb_connection(connection, None, catalog_update.diff);
1449                }
1450                CatalogItem::Log(_) => {}
1451                CatalogItem::Type(_) => {}
1452                CatalogItem::Func(_) => {}
1453            },
1454            ParsedStateUpdateKind::TemporaryItem {
1455                durable_item: _,
1456                parsed_item,
1457                connection,
1458                parsed_full_name,
1459            } => match parsed_item {
1460                CatalogItem::Table(table) => {
1461                    self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1462                }
1463                CatalogItem::Source(source) => {
1464                    self.absorb_source(
1465                        (source, connection),
1466                        Some(parsed_full_name),
1467                        catalog_update.diff,
1468                    );
1469                }
1470                CatalogItem::Sink(sink) => {
1471                    self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1472                }
1473                CatalogItem::Index(index) => {
1474                    self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1475                }
1476                CatalogItem::MaterializedView(mv) => {
1477                    self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1478                }
1479                CatalogItem::View(view) => {
1480                    self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1481                }
1482                CatalogItem::ContinualTask(ct) => {
1483                    self.absorb_continual_task(ct, None, catalog_update.diff);
1484                }
1485                CatalogItem::Secret(secret) => {
1486                    self.absorb_secret(secret, None, catalog_update.diff);
1487                }
1488                CatalogItem::Connection(connection) => {
1489                    self.absorb_connection(connection, None, catalog_update.diff);
1490                }
1491                CatalogItem::Log(_) => {}
1492                CatalogItem::Type(_) => {}
1493                CatalogItem::Func(_) => {}
1494            },
1495            ParsedStateUpdateKind::Cluster {
1496                durable_cluster: _,
1497                parsed_cluster,
1498            } => {
1499                self.absorb_cluster(parsed_cluster, catalog_update.diff);
1500            }
1501            ParsedStateUpdateKind::ClusterReplica {
1502                durable_cluster_replica: _,
1503                parsed_cluster_replica,
1504            } => {
1505                self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1506            }
1507        }
1508    }
1509
1510    impl_absorb_method!(absorb_table, Table, Table);
1511    impl_absorb_method!(
1512        absorb_source,
1513        Source,
1514        (Source, Option<GenericSourceConnection>)
1515    );
1516    impl_absorb_method!(absorb_sink, Sink, Sink);
1517    impl_absorb_method!(absorb_index, Index, Index);
1518    impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1519    impl_absorb_method!(absorb_view, View, View);
1520
1521    impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1522    impl_absorb_method!(absorb_secret, Secret, Secret);
1523    impl_absorb_method!(absorb_connection, Connection, Connection);
1524
1525    // Special case for cluster which uses the cluster's name field.
1526    fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1527        let state = match self {
1528            CatalogImplication::Cluster(state) => state,
1529            CatalogImplication::None => {
1530                *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1531                match self {
1532                    CatalogImplication::Cluster(state) => state,
1533                    _ => unreachable!(),
1534                }
1535            }
1536            _ => {
1537                panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1538            }
1539        };
1540
1541        if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1542            panic!("invalid state transition for cluster: {}", e);
1543        }
1544    }
1545
1546    // Special case for cluster replica which uses the cluster replica's name field.
1547    fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1548        let state = match self {
1549            CatalogImplication::ClusterReplica(state) => state,
1550            CatalogImplication::None => {
1551                *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1552                match self {
1553                    CatalogImplication::ClusterReplica(state) => state,
1554                    _ => unreachable!(),
1555                }
1556            }
1557            _ => {
1558                panic!(
1559                    "Unexpected command type for {:?}: ClusterReplica {:?}",
1560                    self, diff
1561                );
1562            }
1563        };
1564
1565        if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1566        {
1567            panic!("invalid state transition for cluster replica: {}", e);
1568        }
1569    }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574    use super::*;
1575    use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1576    use mz_sql::names::ResolvedIds;
1577    use std::collections::BTreeMap;
1578
1579    fn create_test_table(name: &str) -> Table {
1580        Table {
1581            desc: VersionedRelationDesc::new(
1582                RelationDesc::builder()
1583                    .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1584                    .finish(),
1585            ),
1586            create_sql: None,
1587            collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1588            conn_id: None,
1589            resolved_ids: ResolvedIds::empty(),
1590            custom_logical_compaction_window: None,
1591            is_retained_metrics_object: false,
1592            data_source: TableDataSource::TableWrites { defaults: vec![] },
1593        }
1594    }
1595
1596    #[mz_ore::test]
1597    fn test_item_state_transitions() {
1598        // Test None -> Added
1599        let mut state = CatalogImplicationKind::None;
1600        assert!(
1601            state
1602                .transition("item1".to_string(), None, StateDiff::Addition)
1603                .is_ok()
1604        );
1605        assert!(matches!(state, CatalogImplicationKind::Added(_)));
1606
1607        // Test Added -> Altered (via retraction)
1608        let mut state = CatalogImplicationKind::Added("new_item".to_string());
1609        assert!(
1610            state
1611                .transition("old_item".to_string(), None, StateDiff::Retraction)
1612                .is_ok()
1613        );
1614        match &state {
1615            CatalogImplicationKind::Altered { prev, new } => {
1616                // The retracted item is the OLD state
1617                assert_eq!(prev, "old_item");
1618                // The existing Added item is the NEW state
1619                assert_eq!(new, "new_item");
1620            }
1621            _ => panic!("Expected Altered state"),
1622        }
1623
1624        // Test None -> Dropped
1625        let mut state = CatalogImplicationKind::None;
1626        assert!(
1627            state
1628                .transition(
1629                    "item1".to_string(),
1630                    Some("test_name".to_string()),
1631                    StateDiff::Retraction
1632                )
1633                .is_ok()
1634        );
1635        assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1636
1637        // Test Dropped -> Altered (via addition)
1638        let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1639        assert!(
1640            state
1641                .transition("new_item".to_string(), None, StateDiff::Addition)
1642                .is_ok()
1643        );
1644        match &state {
1645            CatalogImplicationKind::Altered { prev, new } => {
1646                // The existing Dropped item is the OLD state
1647                assert_eq!(prev, "old_item");
1648                // The added item is the NEW state
1649                assert_eq!(new, "new_item");
1650            }
1651            _ => panic!("Expected Altered state"),
1652        }
1653
1654        // Test invalid transitions
1655        let mut state = CatalogImplicationKind::Added("item".to_string());
1656        assert!(
1657            state
1658                .transition("item2".to_string(), None, StateDiff::Addition)
1659                .is_err()
1660        );
1661
1662        let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1663        assert!(
1664            state
1665                .transition("item2".to_string(), None, StateDiff::Retraction)
1666                .is_err()
1667        );
1668    }
1669
1670    #[mz_ore::test]
1671    fn test_table_absorb_state_machine() {
1672        let table1 = create_test_table("table1");
1673        let table2 = create_test_table("table2");
1674
1675        // Test None -> AddTable
1676        let mut cmd = CatalogImplication::None;
1677        cmd.absorb_table(
1678            table1.clone(),
1679            Some("schema.table1".to_string()),
1680            StateDiff::Addition,
1681        );
1682        // Check that we have an Added state
1683        match &cmd {
1684            CatalogImplication::Table(state) => match state {
1685                CatalogImplicationKind::Added(t) => {
1686                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1687                }
1688                _ => panic!("Expected Added state"),
1689            },
1690            _ => panic!("Expected Table command"),
1691        }
1692
1693        // Test AddTable -> AlterTable (via retraction)
1694        // This tests the bug fix: when we have AddTable(table1) and receive Retraction(table2),
1695        // table2 is the old state being removed, table1 is the new state
1696        cmd.absorb_table(
1697            table2.clone(),
1698            Some("schema.table2".to_string()),
1699            StateDiff::Retraction,
1700        );
1701        match &cmd {
1702            CatalogImplication::Table(state) => match state {
1703                CatalogImplicationKind::Altered { prev, new } => {
1704                    // Verify the fix: prev should be the retracted table, new should be the added table
1705                    assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1706                    assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1707                }
1708                _ => panic!("Expected Altered state"),
1709            },
1710            _ => panic!("Expected Table command"),
1711        }
1712
1713        // Test None -> DropTable
1714        let mut cmd = CatalogImplication::None;
1715        cmd.absorb_table(
1716            table1.clone(),
1717            Some("schema.table1".to_string()),
1718            StateDiff::Retraction,
1719        );
1720        match &cmd {
1721            CatalogImplication::Table(state) => match state {
1722                CatalogImplicationKind::Dropped(t, name) => {
1723                    assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1724                    assert_eq!(name, "schema.table1");
1725                }
1726                _ => panic!("Expected Dropped state"),
1727            },
1728            _ => panic!("Expected Table command"),
1729        }
1730
1731        // Test DropTable -> AlterTable (via addition)
1732        cmd.absorb_table(
1733            table2.clone(),
1734            Some("schema.table2".to_string()),
1735            StateDiff::Addition,
1736        );
1737        match &cmd {
1738            CatalogImplication::Table(state) => match state {
1739                CatalogImplicationKind::Altered { prev, new } => {
1740                    // prev should be the dropped table, new should be the added table
1741                    assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1742                    assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1743                }
1744                _ => panic!("Expected Altered state"),
1745            },
1746            _ => panic!("Expected Table command"),
1747        }
1748    }
1749
1750    #[mz_ore::test]
1751    #[should_panic(expected = "Cannot add an already added object")]
1752    fn test_invalid_double_add() {
1753        let table = create_test_table("table");
1754        let mut cmd = CatalogImplication::None;
1755
1756        // First addition
1757        cmd.absorb_table(
1758            table.clone(),
1759            Some("schema.table".to_string()),
1760            StateDiff::Addition,
1761        );
1762
1763        // Second addition should panic
1764        cmd.absorb_table(
1765            table.clone(),
1766            Some("schema.table".to_string()),
1767            StateDiff::Addition,
1768        );
1769    }
1770
1771    #[mz_ore::test]
1772    #[should_panic(expected = "Cannot drop an already dropped object")]
1773    fn test_invalid_double_drop() {
1774        let table = create_test_table("table");
1775        let mut cmd = CatalogImplication::None;
1776
1777        // First drop
1778        cmd.absorb_table(
1779            table.clone(),
1780            Some("schema.table".to_string()),
1781            StateDiff::Retraction,
1782        );
1783
1784        // Second drop should panic
1785        cmd.absorb_table(
1786            table.clone(),
1787            Some("schema.table".to_string()),
1788            StateDiff::Retraction,
1789        );
1790    }
1791}