1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37 CatalogItem, Cluster, ClusterReplica, Connection, DataSourceDesc, Index, MaterializedView,
38 Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::logging::LogVariant;
42use mz_compute_client::protocol::response::PeekResponse;
43use mz_controller::clusters::{ClusterRole, ReplicaConfig};
44use mz_controller_types::{ClusterId, ReplicaId};
45use mz_ore::collections::CollectionExt;
46use mz_ore::error::ErrorExt;
47use mz_ore::future::InTask;
48use mz_ore::instrument;
49use mz_ore::retry::Retry;
50use mz_ore::task;
51use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector};
52use mz_sql::plan::ConnectionDetails;
53use mz_storage_client::controller::{CollectionDescription, DataSource};
54use mz_storage_types::connections::PostgresConnection;
55use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
56use mz_storage_types::sinks::StorageSinkConnection;
57use mz_storage_types::sources::{
58 GenericSourceConnection, SourceDesc, SourceExport, SourceExportDataConfig,
59};
60use tracing::{Instrument, info_span, warn};
61
62use crate::active_compute_sink::ActiveComputeSinkRetireReason;
63use crate::coord::Coordinator;
64use crate::coord::catalog_implications::parsed_state_updates::{
65 ParsedStateUpdate, ParsedStateUpdateKind,
66};
67use crate::coord::peek::DroppedDependency;
68use crate::coord::timeline::TimelineState;
69use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
70use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
71
72pub mod parsed_state_updates;
73
74impl Coordinator {
75 #[instrument(level = "debug")]
84 pub async fn apply_catalog_implications(
85 &mut self,
86 ctx: Option<&mut ExecuteContext>,
87 catalog_updates: Vec<ParsedStateUpdate>,
88 ) -> Result<(), AdapterError> {
89 let start = Instant::now();
90
91 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
92 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
93 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
94 BTreeMap::new();
95 let mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>> =
100 BTreeMap::new();
101 let mut replica_scoped_config_changed = false;
105
106 for update in catalog_updates {
107 tracing::trace!(?update, "got parsed state update");
108 match &update.kind {
109 ParsedStateUpdateKind::Item {
110 durable_item,
111 parsed_item: _,
112 connection: _,
113 parsed_full_name: _,
114 } => {
115 let entry = catalog_implications
116 .entry(durable_item.id.clone())
117 .or_insert_with(|| CatalogImplication::None);
118 entry.absorb(update);
119 }
120 ParsedStateUpdateKind::TemporaryItem {
121 durable_item,
122 parsed_item: _,
123 connection: _,
124 parsed_full_name: _,
125 } => {
126 let entry = catalog_implications
127 .entry(durable_item.id.clone())
128 .or_insert_with(|| CatalogImplication::None);
129 entry.absorb(update);
130 }
131 ParsedStateUpdateKind::Cluster {
132 durable_cluster,
133 parsed_cluster: _,
134 } => {
135 let entry = cluster_commands
136 .entry(durable_cluster.id)
137 .or_insert_with(|| CatalogImplication::None);
138 entry.absorb(update.clone());
139 }
140 ParsedStateUpdateKind::ClusterReplica {
141 durable_cluster_replica,
142 parsed_cluster_replica: _,
143 } => {
144 let entry = cluster_replica_commands
145 .entry((
146 durable_cluster_replica.cluster_id,
147 durable_cluster_replica.replica_id,
148 ))
149 .or_insert_with(|| CatalogImplication::None);
150 entry.absorb(update.clone());
151 }
152 ParsedStateUpdateKind::IntrospectionSourceIndex {
153 cluster_id,
154 log,
155 index_id,
156 } => {
157 if update.diff == StateDiff::Addition {
158 introspection_source_indexes
159 .entry(*cluster_id)
160 .or_default()
161 .insert(log.clone(), *index_id);
162 }
163 }
166 ParsedStateUpdateKind::ReplicaSystemConfiguration { durable: _ } => {
167 replica_scoped_config_changed = true;
171 }
172 }
173 }
174
175 self.apply_catalog_implications_inner(
176 ctx,
177 catalog_implications.into_iter().collect_vec(),
178 cluster_commands.into_iter().collect_vec(),
179 cluster_replica_commands.into_iter().collect_vec(),
180 introspection_source_indexes,
181 replica_scoped_config_changed,
182 )
183 .await?;
184
185 self.metrics
186 .apply_catalog_implications_seconds
187 .observe(start.elapsed().as_secs_f64());
188
189 Ok(())
190 }
191
192 #[instrument(level = "debug")]
193 async fn apply_catalog_implications_inner(
194 &mut self,
195 ctx: Option<&mut ExecuteContext>,
196 implications: Vec<(CatalogItemId, CatalogImplication)>,
197 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
198 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
199 mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>>,
200 replica_scoped_config_changed: bool,
201 ) -> Result<(), AdapterError> {
202 let mut tables_to_drop = BTreeSet::new();
203 let mut sources_to_drop = vec![];
204 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
205 let mut storage_sink_gids_to_drop = vec![];
206 let mut indexes_to_drop = vec![];
207 let mut compute_sinks_to_drop = vec![];
208 let mut view_gids_to_drop = vec![];
209 let mut secrets_to_drop = vec![];
210 let mut vpc_endpoints_to_drop = vec![];
211 let mut clusters_to_drop = vec![];
212 let mut cluster_replicas_to_drop = vec![];
213 let mut active_compute_sinks_to_drop = BTreeMap::new();
214 let mut peeks_to_drop = vec![];
215 let mut copies_to_drop = vec![];
216
217 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
219 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
220
221 let mut table_collections_to_create = BTreeMap::new();
224 let mut source_collections_to_create = BTreeMap::new();
225 let mut storage_policies_to_initialize = BTreeMap::new();
226 let mut execution_timestamps_to_set = BTreeSet::new();
227 let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
228
229 let mut source_gids_to_keep = BTreeSet::new();
232
233 let mut source_connections_to_alter: BTreeMap<
235 GlobalId,
236 GenericSourceConnection<InlinedConnection>,
237 > = BTreeMap::new();
238 let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
239 BTreeMap::new();
240 let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
241 BTreeMap::new();
242 let mut source_descs_to_alter: BTreeMap<GlobalId, SourceDesc> = BTreeMap::new();
243
244 for (catalog_id, implication) in implications {
251 tracing::trace!(?implication, "have to apply catalog implication");
252
253 match implication {
254 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
255 self.handle_create_table(
256 &ctx,
257 &mut table_collections_to_create,
258 &mut storage_policies_to_initialize,
259 &mut execution_timestamps_to_set,
260 catalog_id,
261 table.clone(),
262 )
263 .await?
264 }
265 CatalogImplication::Table(CatalogImplicationKind::Altered {
266 prev: prev_table,
267 new: new_table,
268 }) => {
269 self.handle_alter_table(catalog_id, prev_table, new_table)
270 .await?
271 }
272
273 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
274 let global_ids = table.global_ids();
275 for global_id in global_ids {
276 tables_to_drop.insert((catalog_id, global_id));
277 dropped_item_names.insert(global_id, full_name.clone());
278 }
279 }
280 CatalogImplication::Source(CatalogImplicationKind::Added((
281 source,
282 _connection,
283 ))) => {
284 let compaction_windows = self
289 .catalog()
290 .state()
291 .source_compaction_windows(vec![catalog_id]);
292
293 self.handle_create_source(
294 &mut source_collections_to_create,
295 &mut storage_policies_to_initialize,
296 catalog_id,
297 source,
298 compaction_windows,
299 )
300 .await?
301 }
302 CatalogImplication::Source(CatalogImplicationKind::Altered {
303 prev: (prev_source, _prev_connection),
304 new: (new_source, new_connection),
305 }) => {
306 if prev_source.custom_logical_compaction_window
307 != new_source.custom_logical_compaction_window
308 {
309 let new_window = new_source
310 .custom_logical_compaction_window
311 .unwrap_or(CompactionWindow::Default);
312 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
313 }
314 match (&prev_source.data_source, &new_source.data_source) {
315 (
316 DataSourceDesc::Ingestion {
317 desc: prev_desc, ..
318 }
319 | DataSourceDesc::OldSyntaxIngestion {
320 desc: prev_desc, ..
321 },
322 DataSourceDesc::Ingestion { desc: new_desc, .. }
323 | DataSourceDesc::OldSyntaxIngestion { desc: new_desc, .. },
324 ) => {
325 if prev_desc != new_desc {
326 let inlined_connection = new_connection
327 .expect("ingestion source should have inlined connection");
328 let inlined_desc = SourceDesc {
329 connection: inlined_connection,
330 timestamp_interval: new_desc.timestamp_interval,
331 };
332 source_descs_to_alter.insert(new_source.global_id, inlined_desc);
333 }
334 }
335 _ => {}
336 }
337 }
338 CatalogImplication::Source(CatalogImplicationKind::Dropped(
339 (source, connection),
340 full_name,
341 )) => {
342 let global_id = source.global_id();
343 sources_to_drop.push((catalog_id, global_id));
344 dropped_item_names.insert(global_id, full_name);
345
346 if let DataSourceDesc::Ingestion { desc, .. }
347 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
348 {
349 match &desc.connection {
350 GenericSourceConnection::Postgres(_referenced_conn) => {
351 let inline_conn = connection.expect("missing inlined connection");
352
353 let pg_conn = match inline_conn {
354 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
355 other => {
356 panic!("expected postgres connection, got: {:?}", other)
357 }
358 };
359 let pending_drop = (
360 pg_conn.connection.clone(),
361 pg_conn.publication_details.slot.clone(),
362 );
363 replication_slots_to_drop.push(pending_drop);
364 }
365 _ => {}
366 }
367 }
368 }
369 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
370 tracing::debug!(?sink, "not handling AddSink in here yet");
371 }
372 CatalogImplication::Sink(CatalogImplicationKind::Altered {
373 prev: prev_sink,
374 new: new_sink,
375 }) => {
376 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
377 }
378 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
379 storage_sink_gids_to_drop.push(sink.global_id());
380 dropped_item_names.insert(sink.global_id(), full_name);
381 }
382 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
383 tracing::debug!(?index, "not handling AddIndex in here yet");
384 }
385 CatalogImplication::Index(CatalogImplicationKind::Altered {
386 prev: prev_index,
387 new: new_index,
388 }) => {
389 if prev_index.custom_logical_compaction_window
390 != new_index.custom_logical_compaction_window
391 {
392 let new_window = new_index
393 .custom_logical_compaction_window
394 .unwrap_or(CompactionWindow::Default);
395 self.update_compute_read_policy(
396 new_index.cluster_id,
397 catalog_id,
398 new_window.into(),
399 );
400 }
401 }
402 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
403 indexes_to_drop.push((index.cluster_id, index.global_id()));
404 dropped_item_names.insert(index.global_id(), full_name);
405 }
406 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
407 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
408 }
409 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
410 prev: prev_mv,
411 new: new_mv,
412 }) => {
413 if prev_mv.collections != new_mv.collections {
432 assert_eq!(
435 prev_mv.global_id_writes(),
436 new_mv.global_id_writes(),
437 "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
438 );
439
440 let gid = new_mv.global_id_writes();
441 self.allow_writes(new_mv.cluster_id, gid);
442
443 source_gids_to_keep.extend(new_mv.global_ids());
448 } else if prev_mv.custom_logical_compaction_window
449 != new_mv.custom_logical_compaction_window
450 {
451 let new_window = new_mv
452 .custom_logical_compaction_window
453 .unwrap_or(CompactionWindow::Default);
454 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
455 }
456 }
457 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
458 mv,
459 full_name,
460 )) => {
461 compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
462 for gid in mv.global_ids() {
463 sources_to_drop.push((catalog_id, gid));
464 dropped_item_names.insert(gid, full_name.clone());
465 }
466 }
467 CatalogImplication::View(CatalogImplicationKind::Added(_view)) => {
468 }
471 CatalogImplication::View(CatalogImplicationKind::Altered {
472 prev: _prev_view,
473 new: _new_view,
474 }) => {
475 }
478 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
479 view_gids_to_drop.push(view.global_id());
480 dropped_item_names.insert(view.global_id(), full_name);
481 }
482 CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
483 }
487 CatalogImplication::Secret(CatalogImplicationKind::Altered {
488 prev: _prev_secret,
489 new: _new_secret,
490 }) => {
491 }
494 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
495 _secret,
496 _full_name,
497 )) => {
498 secrets_to_drop.push(catalog_id);
499 }
500 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
501 match &connection.details {
502 ConnectionDetails::Ssh { .. } => {}
505 ConnectionDetails::AwsPrivatelink(privatelink) => {
507 let spec = VpcEndpointConfig {
508 aws_service_name: privatelink.service_name.to_owned(),
509 availability_zone_ids: privatelink.availability_zones.to_owned(),
510 };
511 vpc_endpoints_to_create.push((catalog_id, spec));
512 }
513 _ => {}
515 }
516 }
517 CatalogImplication::Connection(CatalogImplicationKind::Altered {
518 prev: _prev_connection,
519 new: new_connection,
520 }) => {
521 self.handle_alter_connection(
522 catalog_id,
523 new_connection,
524 &mut vpc_endpoints_to_create,
525 &mut source_connections_to_alter,
526 &mut sink_connections_to_alter,
527 &mut source_export_data_configs_to_alter,
528 );
529 }
530 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
531 connection,
532 _full_name,
533 )) => {
534 match &connection.details {
535 ConnectionDetails::Ssh { .. } => {
537 secrets_to_drop.push(catalog_id);
538 }
539 ConnectionDetails::AwsPrivatelink(_) => {
542 vpc_endpoints_to_drop.push(catalog_id);
543 }
544 _ => (),
545 }
546 }
547 CatalogImplication::None => {
548 }
550 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
551 unreachable!("clusters and cluster replicas are handled below")
552 }
553 CatalogImplication::Table(CatalogImplicationKind::None)
554 | CatalogImplication::Source(CatalogImplicationKind::None)
555 | CatalogImplication::Sink(CatalogImplicationKind::None)
556 | CatalogImplication::Index(CatalogImplicationKind::None)
557 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
558 | CatalogImplication::View(CatalogImplicationKind::None)
559 | CatalogImplication::Secret(CatalogImplicationKind::None)
560 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
561 unreachable!("will never leave None in place");
562 }
563 }
564 }
565
566 for (cluster_id, command) in cluster_commands {
567 tracing::trace!(?command, "have cluster command to apply!");
568
569 match command {
570 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
571 let arranged_logs = introspection_source_indexes
576 .remove(&cluster_id)
577 .unwrap_or_default();
578 let introspection_source_ids: Vec<_> =
579 arranged_logs.values().copied().collect();
580
581 self.controller
582 .create_cluster(
583 cluster_id,
584 mz_controller::clusters::ClusterConfig {
585 arranged_logs,
586 workload_class: cluster.config.workload_class.clone(),
587 },
588 )
589 .expect("creating cluster must not fail");
590
591 if !introspection_source_ids.is_empty() {
592 self.initialize_compute_read_policies(
593 introspection_source_ids,
594 cluster_id,
595 CompactionWindow::Default,
596 )
597 .await;
598 }
599 }
600 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
601 prev: prev_cluster,
602 new: new_cluster,
603 }) => {
604 if prev_cluster.config.workload_class != new_cluster.config.workload_class {
610 self.controller.update_cluster_workload_class(
611 cluster_id,
612 new_cluster.config.workload_class.clone(),
613 );
614 }
615 }
616 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
617 cluster,
618 _full_name,
619 )) => {
620 clusters_to_drop.push(cluster_id);
621 dropped_cluster_names.insert(cluster_id, cluster.name);
622 }
623 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
624 unreachable!("will never leave None in place");
625 }
626 command => {
627 unreachable!(
628 "we only handle cluster commands in this map, got: {:?}",
629 command
630 );
631 }
632 }
633 }
634
635 if replica_scoped_config_changed {
642 self.push_replica_dyncfg_overrides();
643 }
644
645 for ((cluster_id, replica_id), command) in cluster_replica_commands {
646 tracing::trace!(?command, "have cluster replica command to apply!");
647
648 match command {
649 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
650 let cluster = self.catalog().get_cluster(cluster_id);
658 let cluster_name = cluster.name.clone();
659 let cluster_role = cluster.role();
660 self.handle_create_cluster_replica(
661 cluster_id,
662 replica_id,
663 cluster_role,
664 cluster_name,
665 replica.name.clone(),
666 replica.config.clone(),
667 )
668 .await;
669 }
670 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
671 prev: _prev_replica,
672 new: _new_replica,
673 }) => {
674 }
678 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
679 _replica,
680 _full_name,
681 )) => {
682 cluster_replicas_to_drop.push((cluster_id, replica_id));
683 }
684 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
685 unreachable!("will never leave None in place");
686 }
687 command => {
688 unreachable!(
689 "we only handle cluster replica commands in this map, got: {:?}",
690 command
691 );
692 }
693 }
694 }
695
696 if !source_collections_to_create.is_empty() {
697 self.create_source_collections(source_collections_to_create)
698 .await?;
699 }
700
701 if !table_collections_to_create.is_empty() {
704 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
705 .await?;
706 }
707 self.initialize_storage_collections(storage_policies_to_initialize)
713 .await?;
714
715 if !vpc_endpoints_to_create.is_empty() {
717 if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
718 for (connection_id, spec) in vpc_endpoints_to_create {
719 if let Err(err) = cloud_resource_controller
720 .ensure_vpc_endpoint(connection_id, spec)
721 .await
722 {
723 tracing::error!(?err, "failed to ensure vpc endpoint!");
724 }
725 }
726 } else {
727 tracing::error!(
728 "AWS PrivateLink connections unsupported without cloud_resource_controller"
729 );
730 }
731 }
732
733 if !source_connections_to_alter.is_empty() {
735 self.controller
736 .storage
737 .alter_ingestion_connections(source_connections_to_alter)
738 .await
739 .unwrap_or_terminate("cannot fail to alter ingestion connections");
740 }
741
742 if !sink_connections_to_alter.is_empty() {
743 self.controller
744 .storage
745 .alter_export_connections(sink_connections_to_alter)
746 .await
747 .unwrap_or_terminate("altering export connections after txn must succeed");
748 }
749
750 if !source_export_data_configs_to_alter.is_empty() {
751 self.controller
752 .storage
753 .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
754 .await
755 .unwrap_or_terminate("altering source export data configs after txn must succeed");
756 }
757
758 if !source_descs_to_alter.is_empty() {
759 self.controller
760 .storage
761 .alter_ingestion_source_desc(source_descs_to_alter)
762 .await
763 .unwrap_or_terminate("cannot fail to alter ingestion source desc");
764 }
765
766 sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
768
769 let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
770 .iter()
771 .map(|(_, gid)| *gid)
772 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
773 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
774 .chain(view_gids_to_drop.iter().copied())
775 .collect();
776
777 for (sink_id, sink) in &self.active_compute_sinks {
780 let cluster_id = sink.cluster_id();
781 if let Some(id) = sink
782 .depends_on()
783 .iter()
784 .find(|id| readable_collections_to_drop.contains(id))
785 {
786 let name = dropped_item_names
787 .get(id)
788 .cloned()
789 .expect("missing relation name");
790 active_compute_sinks_to_drop.insert(
791 *sink_id,
792 ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Relation {
793 name,
794 }),
795 );
796 } else if clusters_to_drop.contains(&cluster_id) {
797 let name = dropped_cluster_names
798 .get(&cluster_id)
799 .cloned()
800 .expect("missing cluster name");
801 active_compute_sinks_to_drop.insert(
802 *sink_id,
803 ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Cluster {
804 name,
805 }),
806 );
807 }
808 }
809
810 for (uuid, pending_peek) in &self.pending_peeks {
812 if let Some(id) = pending_peek
813 .depends_on
814 .iter()
815 .find(|id| readable_collections_to_drop.contains(id))
816 {
817 let name = dropped_item_names
818 .get(id)
819 .cloned()
820 .expect("missing relation name");
821 peeks_to_drop.push((DroppedDependency::Relation { name }, uuid.clone()));
822 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
823 let name = dropped_cluster_names
824 .get(&pending_peek.cluster_id)
825 .cloned()
826 .expect("missing cluster name");
827 peeks_to_drop.push((DroppedDependency::Cluster { name }, uuid.clone()));
828 }
829 }
830
831 for (conn_id, pending_copy) in &self.active_copies {
833 let dropping_table = tables_to_drop
834 .iter()
835 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
836 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
837
838 if dropping_table || dropping_cluster {
839 copies_to_drop.push(conn_id.clone());
840 }
841 }
842
843 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
844 .iter()
845 .map(|(_id, gid)| gid)
846 .chain(storage_sink_gids_to_drop.iter())
847 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
848 .copied()
849 .collect();
850 let compute_gids_to_drop: Vec<_> = indexes_to_drop
851 .iter()
852 .chain(compute_sinks_to_drop.iter())
853 .copied()
854 .collect();
855
856 let mut timeline_id_bundles = BTreeMap::new();
862
863 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
864 let mut id_bundle = CollectionIdBundle::default();
865
866 for storage_id in read_holds.storage_ids() {
867 if storage_gids_to_drop.contains(&storage_id) {
868 id_bundle.storage_ids.insert(storage_id);
869 }
870 }
871
872 for (instance_id, id) in read_holds.compute_ids() {
873 if compute_gids_to_drop.contains(&(instance_id, id))
874 || clusters_to_drop.contains(&instance_id)
875 {
876 id_bundle
877 .compute_ids
878 .entry(instance_id)
879 .or_default()
880 .insert(id);
881 }
882 }
883
884 timeline_id_bundles.insert(timeline.clone(), id_bundle);
885 }
886
887 let mut timeline_associations = BTreeMap::new();
888 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
889 let TimelineState { read_holds, .. } = self
890 .global_timelines
891 .get(&timeline)
892 .expect("all timelines have a timestamp oracle");
893
894 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
895 timeline_associations.insert(timeline, (empty, id_bundle));
896 }
897
898 let _: () = async {
901 if !timeline_associations.is_empty() {
902 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
903 let became_empty =
904 self.remove_resources_associated_with_timeline(timeline, id_bundle);
905 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
906 }
907 }
908
909 if !tables_to_drop.is_empty() {
916 let ts = self.get_local_write_ts().await;
917 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
918 }
919
920 if !sources_to_drop.is_empty() {
921 self.drop_sources(sources_to_drop);
922 }
923
924 if !storage_sink_gids_to_drop.is_empty() {
925 self.drop_storage_sinks(storage_sink_gids_to_drop);
926 }
927
928 if !active_compute_sinks_to_drop.is_empty() {
929 self.retire_compute_sinks(active_compute_sinks_to_drop)
930 .await;
931 }
932
933 if !peeks_to_drop.is_empty() {
934 for (dep, uuid) in peeks_to_drop {
935 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
936 let cancel_reason = PeekResponse::Error(dep.query_terminated_error());
937 self.controller
938 .compute
939 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
940 .unwrap_or_terminate("unable to cancel peek");
941 self.retire_execution(
942 StatementEndedExecutionReason::Canceled,
943 pending_peek.ctx_extra.defuse(),
944 );
945 }
946 }
947 }
948
949 if !copies_to_drop.is_empty() {
950 for conn_id in copies_to_drop {
951 self.cancel_pending_copy(&conn_id);
952 }
953 }
954
955 if !compute_gids_to_drop.is_empty() {
956 self.drop_compute_collections(compute_gids_to_drop);
957 }
958
959 if !vpc_endpoints_to_drop.is_empty() {
960 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
961 }
962
963 if !cluster_replicas_to_drop.is_empty() {
964 fail::fail_point!("after_catalog_drop_replica");
965
966 for (cluster_id, replica_id) in cluster_replicas_to_drop {
967 self.drop_replica(cluster_id, replica_id);
968 }
969 }
970 if !clusters_to_drop.is_empty() {
971 for cluster_id in clusters_to_drop {
972 self.controller.drop_cluster(cluster_id);
973 }
974 }
975
976 task::spawn(|| "drop_replication_slots_and_secrets", {
984 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
985 let caching_secrets_reader = self.caching_secrets_reader.clone();
986 let secrets_controller = Arc::clone(&self.secrets_controller);
987 let secrets_reader = Arc::clone(self.secrets_reader());
988 let storage_config = self.controller.storage.config().clone();
989
990 async move {
991 for (connection, replication_slot_name) in replication_slots_to_drop {
992 tracing::info!(?replication_slot_name, "dropping replication slot");
993
994 let result: Result<(), anyhow::Error> = Retry::default()
1000 .max_duration(Duration::from_secs(60))
1001 .retry_async(|_state| async {
1002 let config = connection
1003 .config(&secrets_reader, &storage_config, InTask::No)
1004 .await
1005 .map_err(|e| {
1006 anyhow::anyhow!(
1007 "error creating Postgres client for \
1008 dropping acquired slots: {}",
1009 e.display_with_causes()
1010 )
1011 })?;
1012
1013 mz_postgres_util::drop_replication_slots(
1014 &ssh_tunnel_manager,
1015 config.clone(),
1016 &[(&replication_slot_name, true)],
1017 )
1018 .await?;
1019
1020 Ok(())
1021 })
1022 .await;
1023
1024 if let Err(err) = result {
1025 tracing::warn!(
1026 ?replication_slot_name,
1027 ?err,
1028 "failed to drop replication slot"
1029 );
1030 }
1031 }
1032
1033 fail_point!("drop_secrets");
1041 for secret in secrets_to_drop {
1042 if let Err(e) = secrets_controller.delete(secret).await {
1043 warn!("Dropping secrets has encountered an error: {}", e);
1044 } else {
1045 caching_secrets_reader.invalidate(secret);
1046 }
1047 }
1048 }
1049 });
1050 }
1051 .instrument(info_span!(
1052 "coord::apply_catalog_implications_inner::finalize"
1053 ))
1054 .await;
1055
1056 Ok(())
1057 }
1058
1059 #[instrument(level = "debug")]
1060 async fn create_table_collections(
1061 &mut self,
1062 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1063 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
1064 ) -> Result<(), AdapterError> {
1065 let write_ts = self.get_local_write_ts().await;
1067 let register_ts = write_ts.timestamp;
1068
1069 self.catalog
1079 .advance_upper(write_ts.advance_to)
1080 .await
1081 .unwrap_or_terminate("unable to advance catalog upper");
1082
1083 for id in execution_timestamps_to_set {
1084 self.set_statement_execution_timestamp(id, register_ts);
1085 }
1086
1087 let storage_metadata = self.catalog.state().storage_metadata();
1088
1089 self.controller
1090 .storage
1091 .create_collections(
1092 storage_metadata,
1093 Some(register_ts),
1094 table_collections_to_create.into_iter().collect_vec(),
1095 )
1096 .await
1097 .unwrap_or_terminate("cannot fail to create collections");
1098
1099 self.apply_local_write(register_ts).await;
1100
1101 Ok(())
1102 }
1103
1104 #[instrument(level = "debug")]
1105 async fn create_source_collections(
1106 &mut self,
1107 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1108 ) -> Result<(), AdapterError> {
1109 let storage_metadata = self.catalog.state().storage_metadata();
1110
1111 self.controller
1112 .storage
1113 .create_collections(
1114 storage_metadata,
1115 None, source_collections_to_create.into_iter().collect_vec(),
1117 )
1118 .await
1119 .unwrap_or_terminate("cannot fail to create collections");
1120
1121 Ok(())
1122 }
1123
1124 #[instrument(level = "debug")]
1125 async fn initialize_storage_collections(
1126 &mut self,
1127 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1128 ) -> Result<(), AdapterError> {
1129 for (compaction_window, global_ids) in storage_policies_to_initialize {
1130 self.initialize_read_policies(
1131 &CollectionIdBundle {
1132 storage_ids: global_ids,
1133 compute_ids: BTreeMap::new(),
1134 },
1135 compaction_window,
1136 )
1137 .await;
1138 }
1139
1140 Ok(())
1141 }
1142
1143 #[instrument(level = "debug")]
1144 async fn handle_create_table(
1145 &self,
1146 ctx: &Option<&mut ExecuteContext>,
1147 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1148 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1149 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1150 table_id: CatalogItemId,
1151 table: Table,
1152 ) -> Result<(), AdapterError> {
1153 match &table.data_source {
1157 TableDataSource::TableWrites { defaults: _ } => {
1158 let versions: BTreeMap<_, _> = table
1159 .collection_descs()
1160 .map(|(gid, version, desc)| (version, (gid, desc)))
1161 .collect();
1162 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1163 let collection_desc = CollectionDescription::for_table(desc.clone());
1164
1165 (*gid, collection_desc)
1166 });
1167
1168 let compaction_window = table
1169 .custom_logical_compaction_window
1170 .unwrap_or(CompactionWindow::Default);
1171 let ids_to_initialize = storage_policies_to_initialize
1172 .entry(compaction_window)
1173 .or_default();
1174
1175 for (gid, collection_desc) in collection_descs {
1176 storage_collections_to_create.insert(gid, collection_desc);
1177 ids_to_initialize.insert(gid);
1178 }
1179
1180 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1181 execution_timestamps_to_set.insert(id);
1182 }
1183 }
1184 TableDataSource::DataSource {
1185 desc: data_source_desc,
1186 timeline,
1187 } => {
1188 match data_source_desc {
1189 DataSourceDesc::IngestionExport {
1190 ingestion_id,
1191 external_reference: _,
1192 details,
1193 data_config,
1194 } => {
1195 let global_ingestion_id =
1196 self.catalog().get_entry(ingestion_id).latest_global_id();
1197
1198 let collection_desc = CollectionDescription {
1199 desc: table.desc.latest(),
1200 data_source: DataSource::IngestionExport {
1201 ingestion_id: global_ingestion_id,
1202 details: details.clone(),
1203 data_config: data_config
1204 .clone()
1205 .into_inline_connection(self.catalog.state()),
1206 },
1207 since: None,
1208 timeline: Some(timeline.clone()),
1209 primary: None,
1210 };
1211
1212 let global_id = table
1213 .global_ids()
1214 .expect_element(|| "subsources cannot have multiple versions");
1215
1216 storage_collections_to_create.insert(global_id, collection_desc);
1217
1218 let read_policies = self
1219 .catalog()
1220 .state()
1221 .source_compaction_windows(vec![table_id]);
1222 for (compaction_window, catalog_ids) in read_policies {
1223 let compaction_ids = storage_policies_to_initialize
1224 .entry(compaction_window)
1225 .or_default();
1226
1227 let gids = catalog_ids
1228 .into_iter()
1229 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1230 .flatten();
1231 compaction_ids.extend(gids);
1232 }
1233 }
1234 DataSourceDesc::Webhook {
1235 validate_using: _,
1236 body_format: _,
1237 headers: _,
1238 cluster_id: _,
1239 } => {
1240 assert_eq!(
1242 table.desc.latest_version(),
1243 RelationVersion::root(),
1244 "found webhook with more than 1 relation version, {:?}",
1245 table.desc
1246 );
1247 let desc = table.desc.latest();
1248
1249 let collection_desc = CollectionDescription {
1250 desc,
1251 data_source: DataSource::Webhook,
1252 since: None,
1253 timeline: Some(timeline.clone()),
1254 primary: None,
1255 };
1256
1257 let global_id = table
1258 .global_ids()
1259 .expect_element(|| "webhooks cannot have multiple versions");
1260
1261 storage_collections_to_create.insert(global_id, collection_desc);
1262
1263 let read_policies = self
1264 .catalog()
1265 .state()
1266 .source_compaction_windows(vec![table_id]);
1267
1268 for (compaction_window, catalog_ids) in read_policies {
1269 let compaction_ids = storage_policies_to_initialize
1270 .entry(compaction_window)
1271 .or_default();
1272
1273 let gids = catalog_ids
1274 .into_iter()
1275 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1276 .flatten();
1277 compaction_ids.extend(gids);
1278 }
1279 }
1280 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1281 }
1282 }
1283 }
1284
1285 Ok(())
1286 }
1287
1288 #[instrument(level = "debug")]
1289 async fn handle_alter_table(
1290 &mut self,
1291 catalog_id: CatalogItemId,
1292 prev_table: Table,
1293 new_table: Table,
1294 ) -> Result<(), AdapterError> {
1295 let existing_gid = prev_table.global_id_writes();
1296 let new_gid = new_table.global_id_writes();
1297
1298 if existing_gid == new_gid {
1299 if prev_table.custom_logical_compaction_window
1302 != new_table.custom_logical_compaction_window
1303 {
1304 let new_window = new_table
1305 .custom_logical_compaction_window
1306 .unwrap_or(CompactionWindow::Default);
1307 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1308 }
1309 return Ok(());
1310 }
1311
1312 let existing_table = crate::CollectionIdBundle {
1316 storage_ids: BTreeSet::from([existing_gid]),
1317 compute_ids: BTreeMap::new(),
1318 };
1319 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1320
1321 let expected_version = prev_table.desc.latest_version();
1322 let new_version = new_table.desc.latest_version();
1323 let new_desc = new_table
1324 .desc
1325 .at_version(RelationVersionSelector::Specific(new_version));
1326
1327 let write_ts = self.get_local_write_ts().await;
1328 let register_ts = write_ts.timestamp;
1329
1330 self.catalog
1333 .advance_upper(write_ts.advance_to)
1334 .await
1335 .unwrap_or_terminate("unable to advance catalog upper");
1336
1337 self.controller
1339 .storage
1340 .alter_table_desc(
1341 existing_gid,
1342 new_gid,
1343 new_desc,
1344 expected_version,
1345 register_ts,
1346 )
1347 .await
1348 .expect("failed to alter desc of table");
1349
1350 let compaction_window = new_table
1352 .custom_logical_compaction_window
1353 .unwrap_or(CompactionWindow::Default);
1354 self.initialize_read_policies(
1355 &crate::CollectionIdBundle {
1356 storage_ids: BTreeSet::from([new_gid]),
1357 compute_ids: BTreeMap::new(),
1358 },
1359 compaction_window,
1360 )
1361 .await;
1362
1363 self.apply_local_write(register_ts).await;
1364
1365 drop(existing_table_read_hold);
1367
1368 Ok(())
1369 }
1370
1371 #[instrument(level = "debug")]
1372 async fn handle_create_source(
1373 &self,
1374 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1375 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1376 item_id: CatalogItemId,
1377 source: Source,
1378 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1379 ) -> Result<(), AdapterError> {
1380 let data_source = match source.data_source {
1381 DataSourceDesc::Ingestion { desc, cluster_id } => {
1382 let desc = desc.into_inline_connection(self.catalog().state());
1383 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1384
1385 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1386 desc,
1387 cluster_id,
1388 item_global_id,
1389 );
1390
1391 DataSource::Ingestion(ingestion)
1392 }
1393 DataSourceDesc::OldSyntaxIngestion {
1394 desc,
1395 progress_subsource,
1396 data_config,
1397 details,
1398 cluster_id,
1399 } => {
1400 let desc = desc.into_inline_connection(self.catalog().state());
1401 let data_config = data_config.into_inline_connection(self.catalog().state());
1402
1403 let progress_subsource = self
1406 .catalog()
1407 .get_entry(&progress_subsource)
1408 .latest_global_id();
1409
1410 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1411 desc,
1412 cluster_id,
1413 progress_subsource,
1414 );
1415
1416 let legacy_export = SourceExport {
1417 storage_metadata: (),
1418 data_config,
1419 details,
1420 };
1421
1422 ingestion
1423 .source_exports
1424 .insert(source.global_id, legacy_export);
1425
1426 DataSource::Ingestion(ingestion)
1427 }
1428 DataSourceDesc::IngestionExport {
1429 ingestion_id,
1430 external_reference: _,
1431 details,
1432 data_config,
1433 } => {
1434 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1437
1438 DataSource::IngestionExport {
1439 ingestion_id,
1440 details,
1441 data_config: data_config.into_inline_connection(self.catalog().state()),
1442 }
1443 }
1444 DataSourceDesc::Progress => DataSource::Progress,
1445 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1446 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1447 unreachable!("cannot create sources with internal data sources")
1448 }
1449 };
1450
1451 storage_collections_to_create.insert(
1452 source.global_id,
1453 CollectionDescription {
1454 desc: source.desc.clone(),
1455 data_source,
1456 timeline: Some(source.timeline),
1457 since: None,
1458 primary: None,
1459 },
1460 );
1461
1462 for (compaction_window, catalog_ids) in compaction_windows {
1464 let compaction_ids = storage_policies_to_initialize
1465 .entry(compaction_window)
1466 .or_default();
1467
1468 let gids = catalog_ids
1469 .into_iter()
1470 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1471 .flatten();
1472 compaction_ids.extend(gids);
1473 }
1474
1475 Ok(())
1476 }
1477
1478 #[instrument(level = "debug")]
1485 fn handle_alter_connection(
1486 &self,
1487 connection_id: CatalogItemId,
1488 connection: Connection,
1489 vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1490 source_connections_to_alter: &mut BTreeMap<
1491 GlobalId,
1492 GenericSourceConnection<InlinedConnection>,
1493 >,
1494 sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1495 source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1496 ) {
1497 use std::collections::VecDeque;
1498
1499 if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1501 let spec = VpcEndpointConfig {
1502 aws_service_name: privatelink.service_name.to_owned(),
1503 availability_zone_ids: privatelink.availability_zones.to_owned(),
1504 };
1505 vpc_endpoints_to_create.push((connection_id, spec));
1506 }
1507
1508 let mut connections_to_process = VecDeque::new();
1512 connections_to_process.push_front(connection_id.clone());
1513
1514 while let Some(id) = connections_to_process.pop_front() {
1515 for dependent_id in self.catalog().get_entry(&id).used_by() {
1516 let dependent_entry = self.catalog().get_entry(dependent_id);
1517 match dependent_entry.item() {
1518 CatalogItem::Connection(_) => {
1519 connections_to_process.push_back(*dependent_id);
1523 }
1524 CatalogItem::Source(source) => {
1525 let desc = match &dependent_entry
1526 .source()
1527 .expect("known to be source")
1528 .data_source
1529 {
1530 DataSourceDesc::Ingestion { desc, .. }
1531 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1532 desc.clone().into_inline_connection(self.catalog().state())
1533 }
1534 DataSourceDesc::IngestionExport { .. }
1535 | DataSourceDesc::Introspection(_)
1536 | DataSourceDesc::Progress
1537 | DataSourceDesc::Webhook { .. }
1538 | DataSourceDesc::Catalog => {
1539 continue;
1541 }
1542 };
1543
1544 source_connections_to_alter.insert(source.global_id, desc.connection);
1545 }
1546 CatalogItem::Sink(sink) => {
1547 let export = dependent_entry.sink().expect("known to be sink");
1548 sink_connections_to_alter.insert(
1549 sink.global_id,
1550 export
1551 .connection
1552 .clone()
1553 .into_inline_connection(self.catalog().state()),
1554 );
1555 }
1556 CatalogItem::Table(table) => {
1557 if let Some((_, _, _, export_data_config)) =
1561 dependent_entry.source_export_details()
1562 {
1563 let data_config = export_data_config.clone();
1564 source_export_data_configs_to_alter.insert(
1565 table.global_id_writes(),
1566 data_config.into_inline_connection(self.catalog().state()),
1567 );
1568 }
1569 }
1570 CatalogItem::Log(_)
1571 | CatalogItem::View(_)
1572 | CatalogItem::MaterializedView(_)
1573 | CatalogItem::Index(_)
1574 | CatalogItem::Type(_)
1575 | CatalogItem::Func(_)
1576 | CatalogItem::Secret(_) => {
1577 }
1580 }
1581 }
1582 }
1583 }
1584
1585 async fn handle_create_cluster_replica(
1586 &mut self,
1587 cluster_id: ClusterId,
1588 replica_id: ReplicaId,
1589 role: ClusterRole,
1590 cluster_name: String,
1591 replica_name: String,
1592 replica_config: ReplicaConfig,
1593 ) {
1594 let enable_worker_core_affinity =
1595 self.catalog().system_config().enable_worker_core_affinity();
1596 let enable_storage_introspection_logs = self
1597 .catalog()
1598 .system_config()
1599 .enable_storage_introspection_logs();
1600
1601 self.controller
1608 .create_replica(
1609 cluster_id,
1610 replica_id,
1611 cluster_name,
1612 replica_name,
1613 role,
1614 replica_config,
1615 enable_worker_core_affinity,
1616 enable_storage_introspection_logs,
1617 )
1618 .expect("creating replicas must not fail");
1619
1620 self.install_introspection_subscribes(cluster_id, replica_id)
1621 .await;
1622 }
1623}
1624
1625#[derive(Debug, Clone)]
1631enum CatalogImplication {
1632 None,
1633 Table(CatalogImplicationKind<Table>),
1634 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1635 Sink(CatalogImplicationKind<Sink>),
1636 Index(CatalogImplicationKind<Index>),
1637 MaterializedView(CatalogImplicationKind<MaterializedView>),
1638 View(CatalogImplicationKind<View>),
1639 Secret(CatalogImplicationKind<Secret>),
1640 Connection(CatalogImplicationKind<Connection>),
1641 Cluster(CatalogImplicationKind<Cluster>),
1642 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1643}
1644
1645#[derive(Debug, Clone)]
1646enum CatalogImplicationKind<T> {
1647 None,
1649 Added(T),
1651 Dropped(T, String),
1653 Altered { prev: T, new: T },
1655}
1656
1657impl<T: Clone> CatalogImplicationKind<T> {
1658 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1661 use CatalogImplicationKind::*;
1662 use StateDiff::*;
1663
1664 let new_state = match (&*self, diff) {
1665 (None, Addition) => Added(item),
1667 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1668
1669 (Added(existing), Retraction) => {
1671 Altered {
1673 prev: item,
1674 new: existing.clone(),
1675 }
1676 }
1677 (Added(_), Addition) => {
1678 return Err("Cannot add an already added object".to_string());
1679 }
1680
1681 (Dropped(existing, _), Addition) => {
1683 Altered {
1685 prev: existing.clone(),
1686 new: item,
1687 }
1688 }
1689 (Dropped(_, _), Retraction) => {
1690 return Err("Cannot drop an already dropped object".to_string());
1691 }
1692
1693 (Altered { .. }, _) => {
1695 return Err(format!(
1696 "Cannot apply {:?} to an object in Altered state",
1697 diff
1698 ));
1699 }
1700 };
1701
1702 *self = new_state;
1703 Ok(())
1704 }
1705}
1706
1707macro_rules! impl_absorb_method {
1709 (
1710 $method_name:ident,
1711 $variant:ident,
1712 $item_type:ty
1713 ) => {
1714 fn $method_name(
1715 &mut self,
1716 item: $item_type,
1717 parsed_full_name: Option<String>,
1718 diff: StateDiff,
1719 ) {
1720 let state = match self {
1721 CatalogImplication::$variant(state) => state,
1722 CatalogImplication::None => {
1723 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1724 match self {
1725 CatalogImplication::$variant(state) => state,
1726 _ => unreachable!(),
1727 }
1728 }
1729 _ => {
1730 panic!(
1731 "Unexpected command type for {:?}: {} {:?}",
1732 self,
1733 stringify!($variant),
1734 diff,
1735 );
1736 }
1737 };
1738
1739 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1740 panic!(
1741 "Invalid state transition for {}: {}",
1742 stringify!($variant),
1743 e
1744 );
1745 }
1746 }
1747 };
1748}
1749
1750impl CatalogImplication {
1751 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1754 match catalog_update.kind {
1755 ParsedStateUpdateKind::Item {
1756 durable_item: _,
1757 parsed_item,
1758 connection,
1759 parsed_full_name,
1760 } => match parsed_item {
1761 CatalogItem::Table(table) => {
1762 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1763 }
1764 CatalogItem::Source(source) => {
1765 self.absorb_source(
1766 (source, connection),
1767 Some(parsed_full_name),
1768 catalog_update.diff,
1769 );
1770 }
1771 CatalogItem::Sink(sink) => {
1772 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1773 }
1774 CatalogItem::Index(index) => {
1775 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1776 }
1777 CatalogItem::MaterializedView(mv) => {
1778 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1779 }
1780 CatalogItem::View(view) => {
1781 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1782 }
1783
1784 CatalogItem::Secret(secret) => {
1785 self.absorb_secret(secret, None, catalog_update.diff);
1786 }
1787 CatalogItem::Connection(connection) => {
1788 self.absorb_connection(connection, None, catalog_update.diff);
1789 }
1790 CatalogItem::Log(_) => {}
1791 CatalogItem::Type(_) => {}
1792 CatalogItem::Func(_) => {}
1793 },
1794 ParsedStateUpdateKind::TemporaryItem {
1795 durable_item: _,
1796 parsed_item,
1797 connection,
1798 parsed_full_name,
1799 } => match parsed_item {
1800 CatalogItem::Table(table) => {
1801 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1802 }
1803 CatalogItem::Source(source) => {
1804 self.absorb_source(
1805 (source, connection),
1806 Some(parsed_full_name),
1807 catalog_update.diff,
1808 );
1809 }
1810 CatalogItem::Sink(sink) => {
1811 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1812 }
1813 CatalogItem::Index(index) => {
1814 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1815 }
1816 CatalogItem::MaterializedView(mv) => {
1817 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1818 }
1819 CatalogItem::View(view) => {
1820 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1821 }
1822
1823 CatalogItem::Secret(secret) => {
1824 self.absorb_secret(secret, None, catalog_update.diff);
1825 }
1826 CatalogItem::Connection(connection) => {
1827 self.absorb_connection(connection, None, catalog_update.diff);
1828 }
1829 CatalogItem::Log(_) => {}
1830 CatalogItem::Type(_) => {}
1831 CatalogItem::Func(_) => {}
1832 },
1833 ParsedStateUpdateKind::Cluster {
1834 durable_cluster: _,
1835 parsed_cluster,
1836 } => {
1837 let name = parsed_cluster.name.clone();
1838 self.absorb_cluster(parsed_cluster, Some(name), catalog_update.diff);
1839 }
1840 ParsedStateUpdateKind::ClusterReplica {
1841 durable_cluster_replica: _,
1842 parsed_cluster_replica,
1843 } => {
1844 let name = parsed_cluster_replica.name.clone();
1845 self.absorb_cluster_replica(
1846 parsed_cluster_replica,
1847 Some(name),
1848 catalog_update.diff,
1849 );
1850 }
1851 ParsedStateUpdateKind::IntrospectionSourceIndex { .. } => {
1852 unreachable!("IntrospectionSourceIndex should not be passed to absorb");
1856 }
1857 ParsedStateUpdateKind::ReplicaSystemConfiguration { .. } => {
1858 unreachable!("ReplicaSystemConfiguration should not be passed to absorb");
1861 }
1862 }
1863 }
1864
1865 impl_absorb_method!(absorb_table, Table, Table);
1866 impl_absorb_method!(
1867 absorb_source,
1868 Source,
1869 (Source, Option<GenericSourceConnection>)
1870 );
1871 impl_absorb_method!(absorb_sink, Sink, Sink);
1872 impl_absorb_method!(absorb_index, Index, Index);
1873 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1874 impl_absorb_method!(absorb_view, View, View);
1875
1876 impl_absorb_method!(absorb_secret, Secret, Secret);
1877 impl_absorb_method!(absorb_connection, Connection, Connection);
1878
1879 impl_absorb_method!(absorb_cluster, Cluster, Cluster);
1880 impl_absorb_method!(absorb_cluster_replica, ClusterReplica, ClusterReplica);
1881}
1882
1883#[cfg(test)]
1884mod tests {
1885 use super::*;
1886 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1887 use mz_sql::names::ResolvedIds;
1888 use std::collections::BTreeMap;
1889
1890 fn create_test_table(name: &str) -> Table {
1891 Table {
1892 desc: VersionedRelationDesc::new(
1893 RelationDesc::builder()
1894 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1895 .finish(),
1896 ),
1897 create_sql: None,
1898 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1899 conn_id: None,
1900 resolved_ids: ResolvedIds::empty(),
1901 custom_logical_compaction_window: None,
1902 is_retained_metrics_object: false,
1903 data_source: TableDataSource::TableWrites { defaults: vec![] },
1904 }
1905 }
1906
1907 #[mz_ore::test]
1908 fn test_item_state_transitions() {
1909 let mut state = CatalogImplicationKind::None;
1911 assert!(
1912 state
1913 .transition("item1".to_string(), None, StateDiff::Addition)
1914 .is_ok()
1915 );
1916 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1917
1918 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1920 assert!(
1921 state
1922 .transition("old_item".to_string(), None, StateDiff::Retraction)
1923 .is_ok()
1924 );
1925 match &state {
1926 CatalogImplicationKind::Altered { prev, new } => {
1927 assert_eq!(prev, "old_item");
1929 assert_eq!(new, "new_item");
1931 }
1932 _ => panic!("Expected Altered state"),
1933 }
1934
1935 let mut state = CatalogImplicationKind::None;
1937 assert!(
1938 state
1939 .transition(
1940 "item1".to_string(),
1941 Some("test_name".to_string()),
1942 StateDiff::Retraction
1943 )
1944 .is_ok()
1945 );
1946 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1947
1948 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1950 assert!(
1951 state
1952 .transition("new_item".to_string(), None, StateDiff::Addition)
1953 .is_ok()
1954 );
1955 match &state {
1956 CatalogImplicationKind::Altered { prev, new } => {
1957 assert_eq!(prev, "old_item");
1959 assert_eq!(new, "new_item");
1961 }
1962 _ => panic!("Expected Altered state"),
1963 }
1964
1965 let mut state = CatalogImplicationKind::Added("item".to_string());
1967 assert!(
1968 state
1969 .transition("item2".to_string(), None, StateDiff::Addition)
1970 .is_err()
1971 );
1972
1973 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1974 assert!(
1975 state
1976 .transition("item2".to_string(), None, StateDiff::Retraction)
1977 .is_err()
1978 );
1979 }
1980
1981 #[mz_ore::test]
1982 fn test_table_absorb_state_machine() {
1983 let table1 = create_test_table("table1");
1984 let table2 = create_test_table("table2");
1985
1986 let mut cmd = CatalogImplication::None;
1988 cmd.absorb_table(
1989 table1.clone(),
1990 Some("schema.table1".to_string()),
1991 StateDiff::Addition,
1992 );
1993 match &cmd {
1995 CatalogImplication::Table(state) => match state {
1996 CatalogImplicationKind::Added(t) => {
1997 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1998 }
1999 _ => panic!("Expected Added state"),
2000 },
2001 _ => panic!("Expected Table command"),
2002 }
2003
2004 cmd.absorb_table(
2008 table2.clone(),
2009 Some("schema.table2".to_string()),
2010 StateDiff::Retraction,
2011 );
2012 match &cmd {
2013 CatalogImplication::Table(state) => match state {
2014 CatalogImplicationKind::Altered { prev, new } => {
2015 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
2017 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
2018 }
2019 _ => panic!("Expected Altered state"),
2020 },
2021 _ => panic!("Expected Table command"),
2022 }
2023
2024 let mut cmd = CatalogImplication::None;
2026 cmd.absorb_table(
2027 table1.clone(),
2028 Some("schema.table1".to_string()),
2029 StateDiff::Retraction,
2030 );
2031 match &cmd {
2032 CatalogImplication::Table(state) => match state {
2033 CatalogImplicationKind::Dropped(t, name) => {
2034 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
2035 assert_eq!(name, "schema.table1");
2036 }
2037 _ => panic!("Expected Dropped state"),
2038 },
2039 _ => panic!("Expected Table command"),
2040 }
2041
2042 cmd.absorb_table(
2044 table2.clone(),
2045 Some("schema.table2".to_string()),
2046 StateDiff::Addition,
2047 );
2048 match &cmd {
2049 CatalogImplication::Table(state) => match state {
2050 CatalogImplicationKind::Altered { prev, new } => {
2051 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
2053 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
2054 }
2055 _ => panic!("Expected Altered state"),
2056 },
2057 _ => panic!("Expected Table command"),
2058 }
2059 }
2060
2061 #[mz_ore::test]
2062 #[should_panic(expected = "Cannot add an already added object")]
2063 fn test_invalid_double_add() {
2064 let table = create_test_table("table");
2065 let mut cmd = CatalogImplication::None;
2066
2067 cmd.absorb_table(
2069 table.clone(),
2070 Some("schema.table".to_string()),
2071 StateDiff::Addition,
2072 );
2073
2074 cmd.absorb_table(
2076 table.clone(),
2077 Some("schema.table".to_string()),
2078 StateDiff::Addition,
2079 );
2080 }
2081
2082 #[mz_ore::test]
2083 #[should_panic(expected = "Cannot drop an already dropped object")]
2084 fn test_invalid_double_drop() {
2085 let table = create_test_table("table");
2086 let mut cmd = CatalogImplication::None;
2087
2088 cmd.absorb_table(
2090 table.clone(),
2091 Some("schema.table".to_string()),
2092 StateDiff::Retraction,
2093 );
2094
2095 cmd.absorb_table(
2097 table.clone(),
2098 Some("schema.table".to_string()),
2099 StateDiff::Retraction,
2100 );
2101 }
2102}