1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37 CatalogItem, Cluster, ClusterReplica, Connection, DataSourceDesc, Index, MaterializedView,
38 Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::logging::LogVariant;
42use mz_compute_client::protocol::response::PeekResponse;
43use mz_controller::clusters::{ClusterRole, ReplicaConfig};
44use mz_controller_types::{ClusterId, ReplicaId};
45use mz_ore::collections::CollectionExt;
46use mz_ore::error::ErrorExt;
47use mz_ore::future::InTask;
48use mz_ore::instrument;
49use mz_ore::retry::Retry;
50use mz_ore::task;
51use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector};
52use mz_sql::plan::ConnectionDetails;
53use mz_storage_client::controller::{CollectionDescription, DataSource};
54use mz_storage_types::connections::PostgresConnection;
55use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
56use mz_storage_types::sinks::StorageSinkConnection;
57use mz_storage_types::sources::{
58 GenericSourceConnection, SourceDesc, SourceExport, SourceExportDataConfig,
59};
60use tracing::{Instrument, info_span, warn};
61
62use crate::active_compute_sink::ActiveComputeSinkRetireReason;
63use crate::coord::Coordinator;
64use crate::coord::catalog_implications::parsed_state_updates::{
65 ParsedStateUpdate, ParsedStateUpdateKind,
66};
67use crate::coord::peek::DroppedDependency;
68use crate::coord::timeline::TimelineState;
69use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
70use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
71
72pub mod parsed_state_updates;
73
74impl Coordinator {
75 #[instrument(level = "debug")]
84 pub async fn apply_catalog_implications(
85 &mut self,
86 ctx: Option<&mut ExecuteContext>,
87 catalog_updates: Vec<ParsedStateUpdate>,
88 ) -> Result<(), AdapterError> {
89 let start = Instant::now();
90
91 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
92 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
93 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
94 BTreeMap::new();
95 let mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>> =
100 BTreeMap::new();
101
102 for update in catalog_updates {
103 tracing::trace!(?update, "got parsed state update");
104 match &update.kind {
105 ParsedStateUpdateKind::Item {
106 durable_item,
107 parsed_item: _,
108 connection: _,
109 parsed_full_name: _,
110 } => {
111 let entry = catalog_implications
112 .entry(durable_item.id.clone())
113 .or_insert_with(|| CatalogImplication::None);
114 entry.absorb(update);
115 }
116 ParsedStateUpdateKind::TemporaryItem {
117 durable_item,
118 parsed_item: _,
119 connection: _,
120 parsed_full_name: _,
121 } => {
122 let entry = catalog_implications
123 .entry(durable_item.id.clone())
124 .or_insert_with(|| CatalogImplication::None);
125 entry.absorb(update);
126 }
127 ParsedStateUpdateKind::Cluster {
128 durable_cluster,
129 parsed_cluster: _,
130 } => {
131 let entry = cluster_commands
132 .entry(durable_cluster.id)
133 .or_insert_with(|| CatalogImplication::None);
134 entry.absorb(update.clone());
135 }
136 ParsedStateUpdateKind::ClusterReplica {
137 durable_cluster_replica,
138 parsed_cluster_replica: _,
139 } => {
140 let entry = cluster_replica_commands
141 .entry((
142 durable_cluster_replica.cluster_id,
143 durable_cluster_replica.replica_id,
144 ))
145 .or_insert_with(|| CatalogImplication::None);
146 entry.absorb(update.clone());
147 }
148 ParsedStateUpdateKind::IntrospectionSourceIndex {
149 cluster_id,
150 log,
151 index_id,
152 } => {
153 if update.diff == StateDiff::Addition {
154 introspection_source_indexes
155 .entry(*cluster_id)
156 .or_default()
157 .insert(log.clone(), *index_id);
158 }
159 }
162 }
163 }
164
165 self.apply_catalog_implications_inner(
166 ctx,
167 catalog_implications.into_iter().collect_vec(),
168 cluster_commands.into_iter().collect_vec(),
169 cluster_replica_commands.into_iter().collect_vec(),
170 introspection_source_indexes,
171 )
172 .await?;
173
174 self.metrics
175 .apply_catalog_implications_seconds
176 .observe(start.elapsed().as_secs_f64());
177
178 Ok(())
179 }
180
181 #[instrument(level = "debug")]
182 async fn apply_catalog_implications_inner(
183 &mut self,
184 ctx: Option<&mut ExecuteContext>,
185 implications: Vec<(CatalogItemId, CatalogImplication)>,
186 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
187 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
188 mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>>,
189 ) -> Result<(), AdapterError> {
190 let mut tables_to_drop = BTreeSet::new();
191 let mut sources_to_drop = vec![];
192 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
193 let mut storage_sink_gids_to_drop = vec![];
194 let mut indexes_to_drop = vec![];
195 let mut compute_sinks_to_drop = vec![];
196 let mut view_gids_to_drop = vec![];
197 let mut secrets_to_drop = vec![];
198 let mut vpc_endpoints_to_drop = vec![];
199 let mut clusters_to_drop = vec![];
200 let mut cluster_replicas_to_drop = vec![];
201 let mut active_compute_sinks_to_drop = BTreeMap::new();
202 let mut peeks_to_drop = vec![];
203 let mut copies_to_drop = vec![];
204
205 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
207 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
208
209 let mut table_collections_to_create = BTreeMap::new();
212 let mut source_collections_to_create = BTreeMap::new();
213 let mut storage_policies_to_initialize = BTreeMap::new();
214 let mut execution_timestamps_to_set = BTreeSet::new();
215 let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
216
217 let mut source_gids_to_keep = BTreeSet::new();
220
221 let mut source_connections_to_alter: BTreeMap<
223 GlobalId,
224 GenericSourceConnection<InlinedConnection>,
225 > = BTreeMap::new();
226 let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
227 BTreeMap::new();
228 let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
229 BTreeMap::new();
230 let mut source_descs_to_alter: BTreeMap<GlobalId, SourceDesc> = BTreeMap::new();
231
232 for (catalog_id, implication) in implications {
239 tracing::trace!(?implication, "have to apply catalog implication");
240
241 match implication {
242 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
243 self.handle_create_table(
244 &ctx,
245 &mut table_collections_to_create,
246 &mut storage_policies_to_initialize,
247 &mut execution_timestamps_to_set,
248 catalog_id,
249 table.clone(),
250 )
251 .await?
252 }
253 CatalogImplication::Table(CatalogImplicationKind::Altered {
254 prev: prev_table,
255 new: new_table,
256 }) => {
257 self.handle_alter_table(catalog_id, prev_table, new_table)
258 .await?
259 }
260
261 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
262 let global_ids = table.global_ids();
263 for global_id in global_ids {
264 tables_to_drop.insert((catalog_id, global_id));
265 dropped_item_names.insert(global_id, full_name.clone());
266 }
267 }
268 CatalogImplication::Source(CatalogImplicationKind::Added((
269 source,
270 _connection,
271 ))) => {
272 let compaction_windows = self
277 .catalog()
278 .state()
279 .source_compaction_windows(vec![catalog_id]);
280
281 self.handle_create_source(
282 &mut source_collections_to_create,
283 &mut storage_policies_to_initialize,
284 catalog_id,
285 source,
286 compaction_windows,
287 )
288 .await?
289 }
290 CatalogImplication::Source(CatalogImplicationKind::Altered {
291 prev: (prev_source, _prev_connection),
292 new: (new_source, new_connection),
293 }) => {
294 if prev_source.custom_logical_compaction_window
295 != new_source.custom_logical_compaction_window
296 {
297 let new_window = new_source
298 .custom_logical_compaction_window
299 .unwrap_or(CompactionWindow::Default);
300 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
301 }
302 match (&prev_source.data_source, &new_source.data_source) {
303 (
304 DataSourceDesc::Ingestion {
305 desc: prev_desc, ..
306 }
307 | DataSourceDesc::OldSyntaxIngestion {
308 desc: prev_desc, ..
309 },
310 DataSourceDesc::Ingestion { desc: new_desc, .. }
311 | DataSourceDesc::OldSyntaxIngestion { desc: new_desc, .. },
312 ) => {
313 if prev_desc != new_desc {
314 let inlined_connection = new_connection
315 .expect("ingestion source should have inlined connection");
316 let inlined_desc = SourceDesc {
317 connection: inlined_connection,
318 timestamp_interval: new_desc.timestamp_interval,
319 };
320 source_descs_to_alter.insert(new_source.global_id, inlined_desc);
321 }
322 }
323 _ => {}
324 }
325 }
326 CatalogImplication::Source(CatalogImplicationKind::Dropped(
327 (source, connection),
328 full_name,
329 )) => {
330 let global_id = source.global_id();
331 sources_to_drop.push((catalog_id, global_id));
332 dropped_item_names.insert(global_id, full_name);
333
334 if let DataSourceDesc::Ingestion { desc, .. }
335 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
336 {
337 match &desc.connection {
338 GenericSourceConnection::Postgres(_referenced_conn) => {
339 let inline_conn = connection.expect("missing inlined connection");
340
341 let pg_conn = match inline_conn {
342 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
343 other => {
344 panic!("expected postgres connection, got: {:?}", other)
345 }
346 };
347 let pending_drop = (
348 pg_conn.connection.clone(),
349 pg_conn.publication_details.slot.clone(),
350 );
351 replication_slots_to_drop.push(pending_drop);
352 }
353 _ => {}
354 }
355 }
356 }
357 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
358 tracing::debug!(?sink, "not handling AddSink in here yet");
359 }
360 CatalogImplication::Sink(CatalogImplicationKind::Altered {
361 prev: prev_sink,
362 new: new_sink,
363 }) => {
364 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
365 }
366 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
367 storage_sink_gids_to_drop.push(sink.global_id());
368 dropped_item_names.insert(sink.global_id(), full_name);
369 }
370 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
371 tracing::debug!(?index, "not handling AddIndex in here yet");
372 }
373 CatalogImplication::Index(CatalogImplicationKind::Altered {
374 prev: prev_index,
375 new: new_index,
376 }) => {
377 if prev_index.custom_logical_compaction_window
378 != new_index.custom_logical_compaction_window
379 {
380 let new_window = new_index
381 .custom_logical_compaction_window
382 .unwrap_or(CompactionWindow::Default);
383 self.update_compute_read_policy(
384 new_index.cluster_id,
385 catalog_id,
386 new_window.into(),
387 );
388 }
389 }
390 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
391 indexes_to_drop.push((index.cluster_id, index.global_id()));
392 dropped_item_names.insert(index.global_id(), full_name);
393 }
394 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
395 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
396 }
397 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
398 prev: prev_mv,
399 new: new_mv,
400 }) => {
401 if prev_mv.collections != new_mv.collections {
420 assert_eq!(
423 prev_mv.global_id_writes(),
424 new_mv.global_id_writes(),
425 "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
426 );
427
428 let gid = new_mv.global_id_writes();
429 self.allow_writes(new_mv.cluster_id, gid);
430
431 source_gids_to_keep.extend(new_mv.global_ids());
436 } else if prev_mv.custom_logical_compaction_window
437 != new_mv.custom_logical_compaction_window
438 {
439 let new_window = new_mv
440 .custom_logical_compaction_window
441 .unwrap_or(CompactionWindow::Default);
442 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
443 }
444 }
445 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
446 mv,
447 full_name,
448 )) => {
449 compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
450 for gid in mv.global_ids() {
451 sources_to_drop.push((catalog_id, gid));
452 dropped_item_names.insert(gid, full_name.clone());
453 }
454 }
455 CatalogImplication::View(CatalogImplicationKind::Added(_view)) => {
456 }
459 CatalogImplication::View(CatalogImplicationKind::Altered {
460 prev: _prev_view,
461 new: _new_view,
462 }) => {
463 }
466 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
467 view_gids_to_drop.push(view.global_id());
468 dropped_item_names.insert(view.global_id(), full_name);
469 }
470 CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
471 }
475 CatalogImplication::Secret(CatalogImplicationKind::Altered {
476 prev: _prev_secret,
477 new: _new_secret,
478 }) => {
479 }
482 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
483 _secret,
484 _full_name,
485 )) => {
486 secrets_to_drop.push(catalog_id);
487 }
488 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
489 match &connection.details {
490 ConnectionDetails::Ssh { .. } => {}
493 ConnectionDetails::AwsPrivatelink(privatelink) => {
495 let spec = VpcEndpointConfig {
496 aws_service_name: privatelink.service_name.to_owned(),
497 availability_zone_ids: privatelink.availability_zones.to_owned(),
498 };
499 vpc_endpoints_to_create.push((catalog_id, spec));
500 }
501 _ => {}
503 }
504 }
505 CatalogImplication::Connection(CatalogImplicationKind::Altered {
506 prev: _prev_connection,
507 new: new_connection,
508 }) => {
509 self.handle_alter_connection(
510 catalog_id,
511 new_connection,
512 &mut vpc_endpoints_to_create,
513 &mut source_connections_to_alter,
514 &mut sink_connections_to_alter,
515 &mut source_export_data_configs_to_alter,
516 );
517 }
518 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
519 connection,
520 _full_name,
521 )) => {
522 match &connection.details {
523 ConnectionDetails::Ssh { .. } => {
525 secrets_to_drop.push(catalog_id);
526 }
527 ConnectionDetails::AwsPrivatelink(_) => {
530 vpc_endpoints_to_drop.push(catalog_id);
531 }
532 _ => (),
533 }
534 }
535 CatalogImplication::None => {
536 }
538 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
539 unreachable!("clusters and cluster replicas are handled below")
540 }
541 CatalogImplication::Table(CatalogImplicationKind::None)
542 | CatalogImplication::Source(CatalogImplicationKind::None)
543 | CatalogImplication::Sink(CatalogImplicationKind::None)
544 | CatalogImplication::Index(CatalogImplicationKind::None)
545 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
546 | CatalogImplication::View(CatalogImplicationKind::None)
547 | CatalogImplication::Secret(CatalogImplicationKind::None)
548 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
549 unreachable!("will never leave None in place");
550 }
551 }
552 }
553
554 for (cluster_id, command) in cluster_commands {
555 tracing::trace!(?command, "have cluster command to apply!");
556
557 match command {
558 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
559 let arranged_logs = introspection_source_indexes
564 .remove(&cluster_id)
565 .unwrap_or_default();
566 let introspection_source_ids: Vec<_> =
567 arranged_logs.values().copied().collect();
568
569 self.controller
570 .create_cluster(
571 cluster_id,
572 mz_controller::clusters::ClusterConfig {
573 arranged_logs,
574 workload_class: cluster.config.workload_class.clone(),
575 },
576 )
577 .expect("creating cluster must not fail");
578
579 if !introspection_source_ids.is_empty() {
580 self.initialize_compute_read_policies(
581 introspection_source_ids,
582 cluster_id,
583 CompactionWindow::Default,
584 )
585 .await;
586 }
587 }
588 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
589 prev: prev_cluster,
590 new: new_cluster,
591 }) => {
592 if prev_cluster.config.workload_class != new_cluster.config.workload_class {
598 self.controller.update_cluster_workload_class(
599 cluster_id,
600 new_cluster.config.workload_class.clone(),
601 );
602 }
603 }
604 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
605 cluster,
606 _full_name,
607 )) => {
608 clusters_to_drop.push(cluster_id);
609 dropped_cluster_names.insert(cluster_id, cluster.name);
610 }
611 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
612 unreachable!("will never leave None in place");
613 }
614 command => {
615 unreachable!(
616 "we only handle cluster commands in this map, got: {:?}",
617 command
618 );
619 }
620 }
621 }
622
623 for ((cluster_id, replica_id), command) in cluster_replica_commands {
624 tracing::trace!(?command, "have cluster replica command to apply!");
625
626 match command {
627 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
628 let cluster = self.catalog().get_cluster(cluster_id);
636 let cluster_name = cluster.name.clone();
637 let cluster_role = cluster.role();
638 self.handle_create_cluster_replica(
639 cluster_id,
640 replica_id,
641 cluster_role,
642 cluster_name,
643 replica.name.clone(),
644 replica.config.clone(),
645 )
646 .await;
647 }
648 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
649 prev: _prev_replica,
650 new: _new_replica,
651 }) => {
652 }
656 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
657 _replica,
658 _full_name,
659 )) => {
660 cluster_replicas_to_drop.push((cluster_id, replica_id));
661 }
662 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
663 unreachable!("will never leave None in place");
664 }
665 command => {
666 unreachable!(
667 "we only handle cluster replica commands in this map, got: {:?}",
668 command
669 );
670 }
671 }
672 }
673
674 if !source_collections_to_create.is_empty() {
675 self.create_source_collections(source_collections_to_create)
676 .await?;
677 }
678
679 if !table_collections_to_create.is_empty() {
682 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
683 .await?;
684 }
685 self.initialize_storage_collections(storage_policies_to_initialize)
691 .await?;
692
693 if !vpc_endpoints_to_create.is_empty() {
695 if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
696 for (connection_id, spec) in vpc_endpoints_to_create {
697 if let Err(err) = cloud_resource_controller
698 .ensure_vpc_endpoint(connection_id, spec)
699 .await
700 {
701 tracing::error!(?err, "failed to ensure vpc endpoint!");
702 }
703 }
704 } else {
705 tracing::error!(
706 "AWS PrivateLink connections unsupported without cloud_resource_controller"
707 );
708 }
709 }
710
711 if !source_connections_to_alter.is_empty() {
713 self.controller
714 .storage
715 .alter_ingestion_connections(source_connections_to_alter)
716 .await
717 .unwrap_or_terminate("cannot fail to alter ingestion connections");
718 }
719
720 if !sink_connections_to_alter.is_empty() {
721 self.controller
722 .storage
723 .alter_export_connections(sink_connections_to_alter)
724 .await
725 .unwrap_or_terminate("altering export connections after txn must succeed");
726 }
727
728 if !source_export_data_configs_to_alter.is_empty() {
729 self.controller
730 .storage
731 .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
732 .await
733 .unwrap_or_terminate("altering source export data configs after txn must succeed");
734 }
735
736 if !source_descs_to_alter.is_empty() {
737 self.controller
738 .storage
739 .alter_ingestion_source_desc(source_descs_to_alter)
740 .await
741 .unwrap_or_terminate("cannot fail to alter ingestion source desc");
742 }
743
744 sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
746
747 let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
748 .iter()
749 .map(|(_, gid)| *gid)
750 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
751 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
752 .chain(view_gids_to_drop.iter().copied())
753 .collect();
754
755 for (sink_id, sink) in &self.active_compute_sinks {
758 let cluster_id = sink.cluster_id();
759 if let Some(id) = sink
760 .depends_on()
761 .iter()
762 .find(|id| readable_collections_to_drop.contains(id))
763 {
764 let name = dropped_item_names
765 .get(id)
766 .cloned()
767 .expect("missing relation name");
768 active_compute_sinks_to_drop.insert(
769 *sink_id,
770 ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Relation {
771 name,
772 }),
773 );
774 } else if clusters_to_drop.contains(&cluster_id) {
775 let name = dropped_cluster_names
776 .get(&cluster_id)
777 .cloned()
778 .expect("missing cluster name");
779 active_compute_sinks_to_drop.insert(
780 *sink_id,
781 ActiveComputeSinkRetireReason::DependencyDropped(DroppedDependency::Cluster {
782 name,
783 }),
784 );
785 }
786 }
787
788 for (uuid, pending_peek) in &self.pending_peeks {
790 if let Some(id) = pending_peek
791 .depends_on
792 .iter()
793 .find(|id| readable_collections_to_drop.contains(id))
794 {
795 let name = dropped_item_names
796 .get(id)
797 .cloned()
798 .expect("missing relation name");
799 peeks_to_drop.push((DroppedDependency::Relation { name }, uuid.clone()));
800 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
801 let name = dropped_cluster_names
802 .get(&pending_peek.cluster_id)
803 .cloned()
804 .expect("missing cluster name");
805 peeks_to_drop.push((DroppedDependency::Cluster { name }, uuid.clone()));
806 }
807 }
808
809 for (conn_id, pending_copy) in &self.active_copies {
811 let dropping_table = tables_to_drop
812 .iter()
813 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
814 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
815
816 if dropping_table || dropping_cluster {
817 copies_to_drop.push(conn_id.clone());
818 }
819 }
820
821 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
822 .iter()
823 .map(|(_id, gid)| gid)
824 .chain(storage_sink_gids_to_drop.iter())
825 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
826 .copied()
827 .collect();
828 let compute_gids_to_drop: Vec<_> = indexes_to_drop
829 .iter()
830 .chain(compute_sinks_to_drop.iter())
831 .copied()
832 .collect();
833
834 let mut timeline_id_bundles = BTreeMap::new();
840
841 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
842 let mut id_bundle = CollectionIdBundle::default();
843
844 for storage_id in read_holds.storage_ids() {
845 if storage_gids_to_drop.contains(&storage_id) {
846 id_bundle.storage_ids.insert(storage_id);
847 }
848 }
849
850 for (instance_id, id) in read_holds.compute_ids() {
851 if compute_gids_to_drop.contains(&(instance_id, id))
852 || clusters_to_drop.contains(&instance_id)
853 {
854 id_bundle
855 .compute_ids
856 .entry(instance_id)
857 .or_default()
858 .insert(id);
859 }
860 }
861
862 timeline_id_bundles.insert(timeline.clone(), id_bundle);
863 }
864
865 let mut timeline_associations = BTreeMap::new();
866 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
867 let TimelineState { read_holds, .. } = self
868 .global_timelines
869 .get(&timeline)
870 .expect("all timelines have a timestamp oracle");
871
872 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
873 timeline_associations.insert(timeline, (empty, id_bundle));
874 }
875
876 let _: () = async {
879 if !timeline_associations.is_empty() {
880 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
881 let became_empty =
882 self.remove_resources_associated_with_timeline(timeline, id_bundle);
883 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
884 }
885 }
886
887 if !tables_to_drop.is_empty() {
894 let ts = self.get_local_write_ts().await;
895 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
896 }
897
898 if !sources_to_drop.is_empty() {
899 self.drop_sources(sources_to_drop);
900 }
901
902 if !storage_sink_gids_to_drop.is_empty() {
903 self.drop_storage_sinks(storage_sink_gids_to_drop);
904 }
905
906 if !active_compute_sinks_to_drop.is_empty() {
907 self.retire_compute_sinks(active_compute_sinks_to_drop)
908 .await;
909 }
910
911 if !peeks_to_drop.is_empty() {
912 for (dep, uuid) in peeks_to_drop {
913 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
914 let cancel_reason = PeekResponse::Error(dep.query_terminated_error());
915 self.controller
916 .compute
917 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
918 .unwrap_or_terminate("unable to cancel peek");
919 self.retire_execution(
920 StatementEndedExecutionReason::Canceled,
921 pending_peek.ctx_extra.defuse(),
922 );
923 }
924 }
925 }
926
927 if !copies_to_drop.is_empty() {
928 for conn_id in copies_to_drop {
929 self.cancel_pending_copy(&conn_id);
930 }
931 }
932
933 if !compute_gids_to_drop.is_empty() {
934 self.drop_compute_collections(compute_gids_to_drop);
935 }
936
937 if !vpc_endpoints_to_drop.is_empty() {
938 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
939 }
940
941 if !cluster_replicas_to_drop.is_empty() {
942 fail::fail_point!("after_catalog_drop_replica");
943
944 for (cluster_id, replica_id) in cluster_replicas_to_drop {
945 self.drop_replica(cluster_id, replica_id);
946 }
947 }
948 if !clusters_to_drop.is_empty() {
949 for cluster_id in clusters_to_drop {
950 self.controller.drop_cluster(cluster_id);
951 }
952 }
953
954 task::spawn(|| "drop_replication_slots_and_secrets", {
962 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
963 let caching_secrets_reader = self.caching_secrets_reader.clone();
964 let secrets_controller = Arc::clone(&self.secrets_controller);
965 let secrets_reader = Arc::clone(self.secrets_reader());
966 let storage_config = self.controller.storage.config().clone();
967
968 async move {
969 for (connection, replication_slot_name) in replication_slots_to_drop {
970 tracing::info!(?replication_slot_name, "dropping replication slot");
971
972 let result: Result<(), anyhow::Error> = Retry::default()
978 .max_duration(Duration::from_secs(60))
979 .retry_async(|_state| async {
980 let config = connection
981 .config(&secrets_reader, &storage_config, InTask::No)
982 .await
983 .map_err(|e| {
984 anyhow::anyhow!(
985 "error creating Postgres client for \
986 dropping acquired slots: {}",
987 e.display_with_causes()
988 )
989 })?;
990
991 mz_postgres_util::drop_replication_slots(
992 &ssh_tunnel_manager,
993 config.clone(),
994 &[(&replication_slot_name, true)],
995 )
996 .await?;
997
998 Ok(())
999 })
1000 .await;
1001
1002 if let Err(err) = result {
1003 tracing::warn!(
1004 ?replication_slot_name,
1005 ?err,
1006 "failed to drop replication slot"
1007 );
1008 }
1009 }
1010
1011 fail_point!("drop_secrets");
1019 for secret in secrets_to_drop {
1020 if let Err(e) = secrets_controller.delete(secret).await {
1021 warn!("Dropping secrets has encountered an error: {}", e);
1022 } else {
1023 caching_secrets_reader.invalidate(secret);
1024 }
1025 }
1026 }
1027 });
1028 }
1029 .instrument(info_span!(
1030 "coord::apply_catalog_implications_inner::finalize"
1031 ))
1032 .await;
1033
1034 Ok(())
1035 }
1036
1037 #[instrument(level = "debug")]
1038 async fn create_table_collections(
1039 &mut self,
1040 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1041 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
1042 ) -> Result<(), AdapterError> {
1043 let write_ts = self.get_local_write_ts().await;
1045 let register_ts = write_ts.timestamp;
1046
1047 self.catalog
1057 .advance_upper(write_ts.advance_to)
1058 .await
1059 .unwrap_or_terminate("unable to advance catalog upper");
1060
1061 for id in execution_timestamps_to_set {
1062 self.set_statement_execution_timestamp(id, register_ts);
1063 }
1064
1065 let storage_metadata = self.catalog.state().storage_metadata();
1066
1067 self.controller
1068 .storage
1069 .create_collections(
1070 storage_metadata,
1071 Some(register_ts),
1072 table_collections_to_create.into_iter().collect_vec(),
1073 )
1074 .await
1075 .unwrap_or_terminate("cannot fail to create collections");
1076
1077 self.apply_local_write(register_ts).await;
1078
1079 Ok(())
1080 }
1081
1082 #[instrument(level = "debug")]
1083 async fn create_source_collections(
1084 &mut self,
1085 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription>,
1086 ) -> Result<(), AdapterError> {
1087 let storage_metadata = self.catalog.state().storage_metadata();
1088
1089 self.controller
1090 .storage
1091 .create_collections(
1092 storage_metadata,
1093 None, source_collections_to_create.into_iter().collect_vec(),
1095 )
1096 .await
1097 .unwrap_or_terminate("cannot fail to create collections");
1098
1099 Ok(())
1100 }
1101
1102 #[instrument(level = "debug")]
1103 async fn initialize_storage_collections(
1104 &mut self,
1105 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1106 ) -> Result<(), AdapterError> {
1107 for (compaction_window, global_ids) in storage_policies_to_initialize {
1108 self.initialize_read_policies(
1109 &CollectionIdBundle {
1110 storage_ids: global_ids,
1111 compute_ids: BTreeMap::new(),
1112 },
1113 compaction_window,
1114 )
1115 .await;
1116 }
1117
1118 Ok(())
1119 }
1120
1121 #[instrument(level = "debug")]
1122 async fn handle_create_table(
1123 &self,
1124 ctx: &Option<&mut ExecuteContext>,
1125 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1126 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1127 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1128 table_id: CatalogItemId,
1129 table: Table,
1130 ) -> Result<(), AdapterError> {
1131 match &table.data_source {
1135 TableDataSource::TableWrites { defaults: _ } => {
1136 let versions: BTreeMap<_, _> = table
1137 .collection_descs()
1138 .map(|(gid, version, desc)| (version, (gid, desc)))
1139 .collect();
1140 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1141 let collection_desc = CollectionDescription::for_table(desc.clone());
1142
1143 (*gid, collection_desc)
1144 });
1145
1146 let compaction_window = table
1147 .custom_logical_compaction_window
1148 .unwrap_or(CompactionWindow::Default);
1149 let ids_to_initialize = storage_policies_to_initialize
1150 .entry(compaction_window)
1151 .or_default();
1152
1153 for (gid, collection_desc) in collection_descs {
1154 storage_collections_to_create.insert(gid, collection_desc);
1155 ids_to_initialize.insert(gid);
1156 }
1157
1158 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1159 execution_timestamps_to_set.insert(id);
1160 }
1161 }
1162 TableDataSource::DataSource {
1163 desc: data_source_desc,
1164 timeline,
1165 } => {
1166 match data_source_desc {
1167 DataSourceDesc::IngestionExport {
1168 ingestion_id,
1169 external_reference: _,
1170 details,
1171 data_config,
1172 } => {
1173 let global_ingestion_id =
1174 self.catalog().get_entry(ingestion_id).latest_global_id();
1175
1176 let collection_desc = CollectionDescription {
1177 desc: table.desc.latest(),
1178 data_source: DataSource::IngestionExport {
1179 ingestion_id: global_ingestion_id,
1180 details: details.clone(),
1181 data_config: data_config
1182 .clone()
1183 .into_inline_connection(self.catalog.state()),
1184 },
1185 since: None,
1186 timeline: Some(timeline.clone()),
1187 primary: None,
1188 };
1189
1190 let global_id = table
1191 .global_ids()
1192 .expect_element(|| "subsources cannot have multiple versions");
1193
1194 storage_collections_to_create.insert(global_id, collection_desc);
1195
1196 let read_policies = self
1197 .catalog()
1198 .state()
1199 .source_compaction_windows(vec![table_id]);
1200 for (compaction_window, catalog_ids) in read_policies {
1201 let compaction_ids = storage_policies_to_initialize
1202 .entry(compaction_window)
1203 .or_default();
1204
1205 let gids = catalog_ids
1206 .into_iter()
1207 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1208 .flatten();
1209 compaction_ids.extend(gids);
1210 }
1211 }
1212 DataSourceDesc::Webhook {
1213 validate_using: _,
1214 body_format: _,
1215 headers: _,
1216 cluster_id: _,
1217 } => {
1218 assert_eq!(
1220 table.desc.latest_version(),
1221 RelationVersion::root(),
1222 "found webhook with more than 1 relation version, {:?}",
1223 table.desc
1224 );
1225 let desc = table.desc.latest();
1226
1227 let collection_desc = CollectionDescription {
1228 desc,
1229 data_source: DataSource::Webhook,
1230 since: None,
1231 timeline: Some(timeline.clone()),
1232 primary: None,
1233 };
1234
1235 let global_id = table
1236 .global_ids()
1237 .expect_element(|| "webhooks cannot have multiple versions");
1238
1239 storage_collections_to_create.insert(global_id, collection_desc);
1240
1241 let read_policies = self
1242 .catalog()
1243 .state()
1244 .source_compaction_windows(vec![table_id]);
1245
1246 for (compaction_window, catalog_ids) in read_policies {
1247 let compaction_ids = storage_policies_to_initialize
1248 .entry(compaction_window)
1249 .or_default();
1250
1251 let gids = catalog_ids
1252 .into_iter()
1253 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1254 .flatten();
1255 compaction_ids.extend(gids);
1256 }
1257 }
1258 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1259 }
1260 }
1261 }
1262
1263 Ok(())
1264 }
1265
1266 #[instrument(level = "debug")]
1267 async fn handle_alter_table(
1268 &mut self,
1269 catalog_id: CatalogItemId,
1270 prev_table: Table,
1271 new_table: Table,
1272 ) -> Result<(), AdapterError> {
1273 let existing_gid = prev_table.global_id_writes();
1274 let new_gid = new_table.global_id_writes();
1275
1276 if existing_gid == new_gid {
1277 if prev_table.custom_logical_compaction_window
1280 != new_table.custom_logical_compaction_window
1281 {
1282 let new_window = new_table
1283 .custom_logical_compaction_window
1284 .unwrap_or(CompactionWindow::Default);
1285 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1286 }
1287 return Ok(());
1288 }
1289
1290 let existing_table = crate::CollectionIdBundle {
1294 storage_ids: BTreeSet::from([existing_gid]),
1295 compute_ids: BTreeMap::new(),
1296 };
1297 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1298
1299 let expected_version = prev_table.desc.latest_version();
1300 let new_version = new_table.desc.latest_version();
1301 let new_desc = new_table
1302 .desc
1303 .at_version(RelationVersionSelector::Specific(new_version));
1304
1305 let write_ts = self.get_local_write_ts().await;
1306 let register_ts = write_ts.timestamp;
1307
1308 self.catalog
1311 .advance_upper(write_ts.advance_to)
1312 .await
1313 .unwrap_or_terminate("unable to advance catalog upper");
1314
1315 self.controller
1317 .storage
1318 .alter_table_desc(
1319 existing_gid,
1320 new_gid,
1321 new_desc,
1322 expected_version,
1323 register_ts,
1324 )
1325 .await
1326 .expect("failed to alter desc of table");
1327
1328 let compaction_window = new_table
1330 .custom_logical_compaction_window
1331 .unwrap_or(CompactionWindow::Default);
1332 self.initialize_read_policies(
1333 &crate::CollectionIdBundle {
1334 storage_ids: BTreeSet::from([new_gid]),
1335 compute_ids: BTreeMap::new(),
1336 },
1337 compaction_window,
1338 )
1339 .await;
1340
1341 self.apply_local_write(register_ts).await;
1342
1343 drop(existing_table_read_hold);
1345
1346 Ok(())
1347 }
1348
1349 #[instrument(level = "debug")]
1350 async fn handle_create_source(
1351 &self,
1352 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription>,
1353 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1354 item_id: CatalogItemId,
1355 source: Source,
1356 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1357 ) -> Result<(), AdapterError> {
1358 let data_source = match source.data_source {
1359 DataSourceDesc::Ingestion { desc, cluster_id } => {
1360 let desc = desc.into_inline_connection(self.catalog().state());
1361 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1362
1363 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1364 desc,
1365 cluster_id,
1366 item_global_id,
1367 );
1368
1369 DataSource::Ingestion(ingestion)
1370 }
1371 DataSourceDesc::OldSyntaxIngestion {
1372 desc,
1373 progress_subsource,
1374 data_config,
1375 details,
1376 cluster_id,
1377 } => {
1378 let desc = desc.into_inline_connection(self.catalog().state());
1379 let data_config = data_config.into_inline_connection(self.catalog().state());
1380
1381 let progress_subsource = self
1384 .catalog()
1385 .get_entry(&progress_subsource)
1386 .latest_global_id();
1387
1388 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1389 desc,
1390 cluster_id,
1391 progress_subsource,
1392 );
1393
1394 let legacy_export = SourceExport {
1395 storage_metadata: (),
1396 data_config,
1397 details,
1398 };
1399
1400 ingestion
1401 .source_exports
1402 .insert(source.global_id, legacy_export);
1403
1404 DataSource::Ingestion(ingestion)
1405 }
1406 DataSourceDesc::IngestionExport {
1407 ingestion_id,
1408 external_reference: _,
1409 details,
1410 data_config,
1411 } => {
1412 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1415
1416 DataSource::IngestionExport {
1417 ingestion_id,
1418 details,
1419 data_config: data_config.into_inline_connection(self.catalog().state()),
1420 }
1421 }
1422 DataSourceDesc::Progress => DataSource::Progress,
1423 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1424 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1425 unreachable!("cannot create sources with internal data sources")
1426 }
1427 };
1428
1429 storage_collections_to_create.insert(
1430 source.global_id,
1431 CollectionDescription {
1432 desc: source.desc.clone(),
1433 data_source,
1434 timeline: Some(source.timeline),
1435 since: None,
1436 primary: None,
1437 },
1438 );
1439
1440 for (compaction_window, catalog_ids) in compaction_windows {
1442 let compaction_ids = storage_policies_to_initialize
1443 .entry(compaction_window)
1444 .or_default();
1445
1446 let gids = catalog_ids
1447 .into_iter()
1448 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1449 .flatten();
1450 compaction_ids.extend(gids);
1451 }
1452
1453 Ok(())
1454 }
1455
1456 #[instrument(level = "debug")]
1463 fn handle_alter_connection(
1464 &self,
1465 connection_id: CatalogItemId,
1466 connection: Connection,
1467 vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1468 source_connections_to_alter: &mut BTreeMap<
1469 GlobalId,
1470 GenericSourceConnection<InlinedConnection>,
1471 >,
1472 sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1473 source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1474 ) {
1475 use std::collections::VecDeque;
1476
1477 if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1479 let spec = VpcEndpointConfig {
1480 aws_service_name: privatelink.service_name.to_owned(),
1481 availability_zone_ids: privatelink.availability_zones.to_owned(),
1482 };
1483 vpc_endpoints_to_create.push((connection_id, spec));
1484 }
1485
1486 let mut connections_to_process = VecDeque::new();
1490 connections_to_process.push_front(connection_id.clone());
1491
1492 while let Some(id) = connections_to_process.pop_front() {
1493 for dependent_id in self.catalog().get_entry(&id).used_by() {
1494 let dependent_entry = self.catalog().get_entry(dependent_id);
1495 match dependent_entry.item() {
1496 CatalogItem::Connection(_) => {
1497 connections_to_process.push_back(*dependent_id);
1501 }
1502 CatalogItem::Source(source) => {
1503 let desc = match &dependent_entry
1504 .source()
1505 .expect("known to be source")
1506 .data_source
1507 {
1508 DataSourceDesc::Ingestion { desc, .. }
1509 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1510 desc.clone().into_inline_connection(self.catalog().state())
1511 }
1512 DataSourceDesc::IngestionExport { .. }
1513 | DataSourceDesc::Introspection(_)
1514 | DataSourceDesc::Progress
1515 | DataSourceDesc::Webhook { .. }
1516 | DataSourceDesc::Catalog => {
1517 continue;
1519 }
1520 };
1521
1522 source_connections_to_alter.insert(source.global_id, desc.connection);
1523 }
1524 CatalogItem::Sink(sink) => {
1525 let export = dependent_entry.sink().expect("known to be sink");
1526 sink_connections_to_alter.insert(
1527 sink.global_id,
1528 export
1529 .connection
1530 .clone()
1531 .into_inline_connection(self.catalog().state()),
1532 );
1533 }
1534 CatalogItem::Table(table) => {
1535 if let Some((_, _, _, export_data_config)) =
1539 dependent_entry.source_export_details()
1540 {
1541 let data_config = export_data_config.clone();
1542 source_export_data_configs_to_alter.insert(
1543 table.global_id_writes(),
1544 data_config.into_inline_connection(self.catalog().state()),
1545 );
1546 }
1547 }
1548 CatalogItem::Log(_)
1549 | CatalogItem::View(_)
1550 | CatalogItem::MaterializedView(_)
1551 | CatalogItem::Index(_)
1552 | CatalogItem::Type(_)
1553 | CatalogItem::Func(_)
1554 | CatalogItem::Secret(_) => {
1555 }
1558 }
1559 }
1560 }
1561 }
1562
1563 async fn handle_create_cluster_replica(
1564 &mut self,
1565 cluster_id: ClusterId,
1566 replica_id: ReplicaId,
1567 role: ClusterRole,
1568 cluster_name: String,
1569 replica_name: String,
1570 replica_config: ReplicaConfig,
1571 ) {
1572 let enable_worker_core_affinity =
1573 self.catalog().system_config().enable_worker_core_affinity();
1574 let enable_storage_introspection_logs = self
1575 .catalog()
1576 .system_config()
1577 .enable_storage_introspection_logs();
1578
1579 self.controller
1580 .create_replica(
1581 cluster_id,
1582 replica_id,
1583 cluster_name,
1584 replica_name,
1585 role,
1586 replica_config,
1587 enable_worker_core_affinity,
1588 enable_storage_introspection_logs,
1589 )
1590 .expect("creating replicas must not fail");
1591
1592 self.install_introspection_subscribes(cluster_id, replica_id)
1593 .await;
1594 }
1595}
1596
1597#[derive(Debug, Clone)]
1603enum CatalogImplication {
1604 None,
1605 Table(CatalogImplicationKind<Table>),
1606 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1607 Sink(CatalogImplicationKind<Sink>),
1608 Index(CatalogImplicationKind<Index>),
1609 MaterializedView(CatalogImplicationKind<MaterializedView>),
1610 View(CatalogImplicationKind<View>),
1611 Secret(CatalogImplicationKind<Secret>),
1612 Connection(CatalogImplicationKind<Connection>),
1613 Cluster(CatalogImplicationKind<Cluster>),
1614 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1615}
1616
1617#[derive(Debug, Clone)]
1618enum CatalogImplicationKind<T> {
1619 None,
1621 Added(T),
1623 Dropped(T, String),
1625 Altered { prev: T, new: T },
1627}
1628
1629impl<T: Clone> CatalogImplicationKind<T> {
1630 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1633 use CatalogImplicationKind::*;
1634 use StateDiff::*;
1635
1636 let new_state = match (&*self, diff) {
1637 (None, Addition) => Added(item),
1639 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1640
1641 (Added(existing), Retraction) => {
1643 Altered {
1645 prev: item,
1646 new: existing.clone(),
1647 }
1648 }
1649 (Added(_), Addition) => {
1650 return Err("Cannot add an already added object".to_string());
1651 }
1652
1653 (Dropped(existing, _), Addition) => {
1655 Altered {
1657 prev: existing.clone(),
1658 new: item,
1659 }
1660 }
1661 (Dropped(_, _), Retraction) => {
1662 return Err("Cannot drop an already dropped object".to_string());
1663 }
1664
1665 (Altered { .. }, _) => {
1667 return Err(format!(
1668 "Cannot apply {:?} to an object in Altered state",
1669 diff
1670 ));
1671 }
1672 };
1673
1674 *self = new_state;
1675 Ok(())
1676 }
1677}
1678
1679macro_rules! impl_absorb_method {
1681 (
1682 $method_name:ident,
1683 $variant:ident,
1684 $item_type:ty
1685 ) => {
1686 fn $method_name(
1687 &mut self,
1688 item: $item_type,
1689 parsed_full_name: Option<String>,
1690 diff: StateDiff,
1691 ) {
1692 let state = match self {
1693 CatalogImplication::$variant(state) => state,
1694 CatalogImplication::None => {
1695 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1696 match self {
1697 CatalogImplication::$variant(state) => state,
1698 _ => unreachable!(),
1699 }
1700 }
1701 _ => {
1702 panic!(
1703 "Unexpected command type for {:?}: {} {:?}",
1704 self,
1705 stringify!($variant),
1706 diff,
1707 );
1708 }
1709 };
1710
1711 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1712 panic!(
1713 "Invalid state transition for {}: {}",
1714 stringify!($variant),
1715 e
1716 );
1717 }
1718 }
1719 };
1720}
1721
1722impl CatalogImplication {
1723 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1726 match catalog_update.kind {
1727 ParsedStateUpdateKind::Item {
1728 durable_item: _,
1729 parsed_item,
1730 connection,
1731 parsed_full_name,
1732 } => match parsed_item {
1733 CatalogItem::Table(table) => {
1734 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1735 }
1736 CatalogItem::Source(source) => {
1737 self.absorb_source(
1738 (source, connection),
1739 Some(parsed_full_name),
1740 catalog_update.diff,
1741 );
1742 }
1743 CatalogItem::Sink(sink) => {
1744 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1745 }
1746 CatalogItem::Index(index) => {
1747 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1748 }
1749 CatalogItem::MaterializedView(mv) => {
1750 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1751 }
1752 CatalogItem::View(view) => {
1753 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1754 }
1755
1756 CatalogItem::Secret(secret) => {
1757 self.absorb_secret(secret, None, catalog_update.diff);
1758 }
1759 CatalogItem::Connection(connection) => {
1760 self.absorb_connection(connection, None, catalog_update.diff);
1761 }
1762 CatalogItem::Log(_) => {}
1763 CatalogItem::Type(_) => {}
1764 CatalogItem::Func(_) => {}
1765 },
1766 ParsedStateUpdateKind::TemporaryItem {
1767 durable_item: _,
1768 parsed_item,
1769 connection,
1770 parsed_full_name,
1771 } => match parsed_item {
1772 CatalogItem::Table(table) => {
1773 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1774 }
1775 CatalogItem::Source(source) => {
1776 self.absorb_source(
1777 (source, connection),
1778 Some(parsed_full_name),
1779 catalog_update.diff,
1780 );
1781 }
1782 CatalogItem::Sink(sink) => {
1783 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1784 }
1785 CatalogItem::Index(index) => {
1786 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1787 }
1788 CatalogItem::MaterializedView(mv) => {
1789 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1790 }
1791 CatalogItem::View(view) => {
1792 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1793 }
1794
1795 CatalogItem::Secret(secret) => {
1796 self.absorb_secret(secret, None, catalog_update.diff);
1797 }
1798 CatalogItem::Connection(connection) => {
1799 self.absorb_connection(connection, None, catalog_update.diff);
1800 }
1801 CatalogItem::Log(_) => {}
1802 CatalogItem::Type(_) => {}
1803 CatalogItem::Func(_) => {}
1804 },
1805 ParsedStateUpdateKind::Cluster {
1806 durable_cluster: _,
1807 parsed_cluster,
1808 } => {
1809 let name = parsed_cluster.name.clone();
1810 self.absorb_cluster(parsed_cluster, Some(name), catalog_update.diff);
1811 }
1812 ParsedStateUpdateKind::ClusterReplica {
1813 durable_cluster_replica: _,
1814 parsed_cluster_replica,
1815 } => {
1816 let name = parsed_cluster_replica.name.clone();
1817 self.absorb_cluster_replica(
1818 parsed_cluster_replica,
1819 Some(name),
1820 catalog_update.diff,
1821 );
1822 }
1823 ParsedStateUpdateKind::IntrospectionSourceIndex { .. } => {
1824 unreachable!("IntrospectionSourceIndex should not be passed to absorb");
1828 }
1829 }
1830 }
1831
1832 impl_absorb_method!(absorb_table, Table, Table);
1833 impl_absorb_method!(
1834 absorb_source,
1835 Source,
1836 (Source, Option<GenericSourceConnection>)
1837 );
1838 impl_absorb_method!(absorb_sink, Sink, Sink);
1839 impl_absorb_method!(absorb_index, Index, Index);
1840 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1841 impl_absorb_method!(absorb_view, View, View);
1842
1843 impl_absorb_method!(absorb_secret, Secret, Secret);
1844 impl_absorb_method!(absorb_connection, Connection, Connection);
1845
1846 impl_absorb_method!(absorb_cluster, Cluster, Cluster);
1847 impl_absorb_method!(absorb_cluster_replica, ClusterReplica, ClusterReplica);
1848}
1849
1850#[cfg(test)]
1851mod tests {
1852 use super::*;
1853 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1854 use mz_sql::names::ResolvedIds;
1855 use std::collections::BTreeMap;
1856
1857 fn create_test_table(name: &str) -> Table {
1858 Table {
1859 desc: VersionedRelationDesc::new(
1860 RelationDesc::builder()
1861 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1862 .finish(),
1863 ),
1864 create_sql: None,
1865 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1866 conn_id: None,
1867 resolved_ids: ResolvedIds::empty(),
1868 custom_logical_compaction_window: None,
1869 is_retained_metrics_object: false,
1870 data_source: TableDataSource::TableWrites { defaults: vec![] },
1871 }
1872 }
1873
1874 #[mz_ore::test]
1875 fn test_item_state_transitions() {
1876 let mut state = CatalogImplicationKind::None;
1878 assert!(
1879 state
1880 .transition("item1".to_string(), None, StateDiff::Addition)
1881 .is_ok()
1882 );
1883 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1884
1885 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1887 assert!(
1888 state
1889 .transition("old_item".to_string(), None, StateDiff::Retraction)
1890 .is_ok()
1891 );
1892 match &state {
1893 CatalogImplicationKind::Altered { prev, new } => {
1894 assert_eq!(prev, "old_item");
1896 assert_eq!(new, "new_item");
1898 }
1899 _ => panic!("Expected Altered state"),
1900 }
1901
1902 let mut state = CatalogImplicationKind::None;
1904 assert!(
1905 state
1906 .transition(
1907 "item1".to_string(),
1908 Some("test_name".to_string()),
1909 StateDiff::Retraction
1910 )
1911 .is_ok()
1912 );
1913 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1914
1915 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1917 assert!(
1918 state
1919 .transition("new_item".to_string(), None, StateDiff::Addition)
1920 .is_ok()
1921 );
1922 match &state {
1923 CatalogImplicationKind::Altered { prev, new } => {
1924 assert_eq!(prev, "old_item");
1926 assert_eq!(new, "new_item");
1928 }
1929 _ => panic!("Expected Altered state"),
1930 }
1931
1932 let mut state = CatalogImplicationKind::Added("item".to_string());
1934 assert!(
1935 state
1936 .transition("item2".to_string(), None, StateDiff::Addition)
1937 .is_err()
1938 );
1939
1940 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1941 assert!(
1942 state
1943 .transition("item2".to_string(), None, StateDiff::Retraction)
1944 .is_err()
1945 );
1946 }
1947
1948 #[mz_ore::test]
1949 fn test_table_absorb_state_machine() {
1950 let table1 = create_test_table("table1");
1951 let table2 = create_test_table("table2");
1952
1953 let mut cmd = CatalogImplication::None;
1955 cmd.absorb_table(
1956 table1.clone(),
1957 Some("schema.table1".to_string()),
1958 StateDiff::Addition,
1959 );
1960 match &cmd {
1962 CatalogImplication::Table(state) => match state {
1963 CatalogImplicationKind::Added(t) => {
1964 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1965 }
1966 _ => panic!("Expected Added state"),
1967 },
1968 _ => panic!("Expected Table command"),
1969 }
1970
1971 cmd.absorb_table(
1975 table2.clone(),
1976 Some("schema.table2".to_string()),
1977 StateDiff::Retraction,
1978 );
1979 match &cmd {
1980 CatalogImplication::Table(state) => match state {
1981 CatalogImplicationKind::Altered { prev, new } => {
1982 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1984 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1985 }
1986 _ => panic!("Expected Altered state"),
1987 },
1988 _ => panic!("Expected Table command"),
1989 }
1990
1991 let mut cmd = CatalogImplication::None;
1993 cmd.absorb_table(
1994 table1.clone(),
1995 Some("schema.table1".to_string()),
1996 StateDiff::Retraction,
1997 );
1998 match &cmd {
1999 CatalogImplication::Table(state) => match state {
2000 CatalogImplicationKind::Dropped(t, name) => {
2001 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
2002 assert_eq!(name, "schema.table1");
2003 }
2004 _ => panic!("Expected Dropped state"),
2005 },
2006 _ => panic!("Expected Table command"),
2007 }
2008
2009 cmd.absorb_table(
2011 table2.clone(),
2012 Some("schema.table2".to_string()),
2013 StateDiff::Addition,
2014 );
2015 match &cmd {
2016 CatalogImplication::Table(state) => match state {
2017 CatalogImplicationKind::Altered { prev, new } => {
2018 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
2020 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
2021 }
2022 _ => panic!("Expected Altered state"),
2023 },
2024 _ => panic!("Expected Table command"),
2025 }
2026 }
2027
2028 #[mz_ore::test]
2029 #[should_panic(expected = "Cannot add an already added object")]
2030 fn test_invalid_double_add() {
2031 let table = create_test_table("table");
2032 let mut cmd = CatalogImplication::None;
2033
2034 cmd.absorb_table(
2036 table.clone(),
2037 Some("schema.table".to_string()),
2038 StateDiff::Addition,
2039 );
2040
2041 cmd.absorb_table(
2043 table.clone(),
2044 Some("schema.table".to_string()),
2045 StateDiff::Addition,
2046 );
2047 }
2048
2049 #[mz_ore::test]
2050 #[should_panic(expected = "Cannot drop an already dropped object")]
2051 fn test_invalid_double_drop() {
2052 let table = create_test_table("table");
2053 let mut cmd = CatalogImplication::None;
2054
2055 cmd.absorb_table(
2057 table.clone(),
2058 Some("schema.table".to_string()),
2059 StateDiff::Retraction,
2060 );
2061
2062 cmd.absorb_table(
2064 table.clone(),
2065 Some("schema.table".to_string()),
2066 StateDiff::Retraction,
2067 );
2068 }
2069}