1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37 CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38 MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
55use mz_storage_types::sinks::StorageSinkConnection;
56use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig};
57use tracing::{Instrument, info_span, warn};
58
59use crate::active_compute_sink::ActiveComputeSinkRetireReason;
60use crate::coord::Coordinator;
61use crate::coord::catalog_implications::parsed_state_updates::{
62 ParsedStateUpdate, ParsedStateUpdateKind,
63};
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71 #[instrument(level = "debug")]
80 pub async fn apply_catalog_implications(
81 &mut self,
82 ctx: Option<&mut ExecuteContext>,
83 catalog_updates: Vec<ParsedStateUpdate>,
84 ) -> Result<(), AdapterError> {
85 let start = Instant::now();
86
87 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90 BTreeMap::new();
91
92 for update in catalog_updates {
93 tracing::trace!(?update, "got parsed state update");
94 match &update.kind {
95 ParsedStateUpdateKind::Item {
96 durable_item,
97 parsed_item: _,
98 connection: _,
99 parsed_full_name: _,
100 } => {
101 let entry = catalog_implications
102 .entry(durable_item.id.clone())
103 .or_insert_with(|| CatalogImplication::None);
104 entry.absorb(update);
105 }
106 ParsedStateUpdateKind::TemporaryItem {
107 durable_item,
108 parsed_item: _,
109 connection: _,
110 parsed_full_name: _,
111 } => {
112 let entry = catalog_implications
113 .entry(durable_item.id.clone())
114 .or_insert_with(|| CatalogImplication::None);
115 entry.absorb(update);
116 }
117 ParsedStateUpdateKind::Cluster {
118 durable_cluster,
119 parsed_cluster: _,
120 } => {
121 let entry = cluster_commands
122 .entry(durable_cluster.id)
123 .or_insert_with(|| CatalogImplication::None);
124 entry.absorb(update.clone());
125 }
126 ParsedStateUpdateKind::ClusterReplica {
127 durable_cluster_replica,
128 parsed_cluster_replica: _,
129 } => {
130 let entry = cluster_replica_commands
131 .entry((
132 durable_cluster_replica.cluster_id,
133 durable_cluster_replica.replica_id,
134 ))
135 .or_insert_with(|| CatalogImplication::None);
136 entry.absorb(update.clone());
137 }
138 }
139 }
140
141 self.apply_catalog_implications_inner(
142 ctx,
143 catalog_implications.into_iter().collect_vec(),
144 cluster_commands.into_iter().collect_vec(),
145 cluster_replica_commands.into_iter().collect_vec(),
146 )
147 .await?;
148
149 self.metrics
150 .apply_catalog_implications_seconds
151 .observe(start.elapsed().as_secs_f64());
152
153 Ok(())
154 }
155
156 #[instrument(level = "debug")]
157 async fn apply_catalog_implications_inner(
158 &mut self,
159 ctx: Option<&mut ExecuteContext>,
160 implications: Vec<(CatalogItemId, CatalogImplication)>,
161 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163 ) -> Result<(), AdapterError> {
164 let mut tables_to_drop = BTreeSet::new();
165 let mut sources_to_drop = vec![];
166 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167 let mut storage_sink_gids_to_drop = vec![];
168 let mut indexes_to_drop = vec![];
169 let mut compute_sinks_to_drop = vec![];
170 let mut view_gids_to_drop = vec![];
171 let mut secrets_to_drop = vec![];
172 let mut vpc_endpoints_to_drop = vec![];
173 let mut clusters_to_drop = vec![];
174 let mut cluster_replicas_to_drop = vec![];
175 let mut active_compute_sinks_to_drop = BTreeMap::new();
176 let mut peeks_to_drop = vec![];
177 let mut copies_to_drop = vec![];
178
179 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
181 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
182
183 let mut table_collections_to_create = BTreeMap::new();
186 let mut source_collections_to_create = BTreeMap::new();
187 let mut storage_policies_to_initialize = BTreeMap::new();
188 let mut execution_timestamps_to_set = BTreeSet::new();
189 let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
190
191 let mut source_gids_to_keep = BTreeSet::new();
194
195 let mut source_connections_to_alter: BTreeMap<
197 GlobalId,
198 GenericSourceConnection<InlinedConnection>,
199 > = BTreeMap::new();
200 let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
201 BTreeMap::new();
202 let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
203 BTreeMap::new();
204
205 for (catalog_id, implication) in implications {
212 tracing::trace!(?implication, "have to apply catalog implication");
213
214 match implication {
215 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
216 self.handle_create_table(
217 &ctx,
218 &mut table_collections_to_create,
219 &mut storage_policies_to_initialize,
220 &mut execution_timestamps_to_set,
221 catalog_id,
222 table.clone(),
223 )
224 .await?
225 }
226 CatalogImplication::Table(CatalogImplicationKind::Altered {
227 prev: prev_table,
228 new: new_table,
229 }) => self.handle_alter_table(prev_table, new_table).await?,
230
231 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
232 let global_ids = table.global_ids();
233 for global_id in global_ids {
234 tables_to_drop.insert((catalog_id, global_id));
235 dropped_item_names.insert(global_id, full_name.clone());
236 }
237 }
238 CatalogImplication::Source(CatalogImplicationKind::Added((
239 source,
240 _connection,
241 ))) => {
242 let compaction_windows = self
247 .catalog()
248 .state()
249 .source_compaction_windows(vec![catalog_id]);
250
251 self.handle_create_source(
252 &mut source_collections_to_create,
253 &mut storage_policies_to_initialize,
254 catalog_id,
255 source,
256 compaction_windows,
257 )
258 .await?
259 }
260 CatalogImplication::Source(CatalogImplicationKind::Altered {
261 prev: (prev_source, _prev_connection),
262 new: (new_source, _new_connection),
263 }) => {
264 tracing::debug!(
265 ?prev_source,
266 ?new_source,
267 "not handling AlterSource in here yet"
268 );
269 }
270 CatalogImplication::Source(CatalogImplicationKind::Dropped(
271 (source, connection),
272 full_name,
273 )) => {
274 let global_id = source.global_id();
275 sources_to_drop.push((catalog_id, global_id));
276 dropped_item_names.insert(global_id, full_name);
277
278 if let DataSourceDesc::Ingestion { desc, .. }
279 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
280 {
281 match &desc.connection {
282 GenericSourceConnection::Postgres(_referenced_conn) => {
283 let inline_conn = connection.expect("missing inlined connection");
284
285 let pg_conn = match inline_conn {
286 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
287 other => {
288 panic!("expected postgres connection, got: {:?}", other)
289 }
290 };
291 let pending_drop = (
292 pg_conn.connection.clone(),
293 pg_conn.publication_details.slot.clone(),
294 );
295 replication_slots_to_drop.push(pending_drop);
296 }
297 _ => {}
298 }
299 }
300 }
301 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
302 tracing::debug!(?sink, "not handling AddSink in here yet");
303 }
304 CatalogImplication::Sink(CatalogImplicationKind::Altered {
305 prev: prev_sink,
306 new: new_sink,
307 }) => {
308 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
309 }
310 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
311 storage_sink_gids_to_drop.push(sink.global_id());
312 dropped_item_names.insert(sink.global_id(), full_name);
313 }
314 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
315 tracing::debug!(?index, "not handling AddIndex in here yet");
316 }
317 CatalogImplication::Index(CatalogImplicationKind::Altered {
318 prev: prev_index,
319 new: new_index,
320 }) => {
321 tracing::debug!(
322 ?prev_index,
323 ?new_index,
324 "not handling AlterIndex in here yet"
325 );
326 }
327 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
328 indexes_to_drop.push((index.cluster_id, index.global_id()));
329 dropped_item_names.insert(index.global_id(), full_name);
330 }
331 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
332 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
333 }
334 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
335 prev: prev_mv,
336 new: new_mv,
337 }) => {
338 if prev_mv.collections != new_mv.collections {
350 assert_eq!(
353 prev_mv.global_id_writes(),
354 new_mv.global_id_writes(),
355 "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
356 );
357
358 let gid = new_mv.global_id_writes();
359 self.allow_writes(new_mv.cluster_id, gid);
360
361 source_gids_to_keep.extend(new_mv.global_ids());
366 }
367 }
368 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
369 mv,
370 full_name,
371 )) => {
372 compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
373 sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
374 dropped_item_names.insert(mv.global_id_writes(), full_name);
375 }
376 CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
377 tracing::debug!(?view, "not handling AddView in here yet");
378 }
379 CatalogImplication::View(CatalogImplicationKind::Altered {
380 prev: prev_view,
381 new: new_view,
382 }) => {
383 tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
384 }
385 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
386 view_gids_to_drop.push(view.global_id());
387 dropped_item_names.insert(view.global_id(), full_name);
388 }
389 CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
390 tracing::debug!(?ct, "not handling AddContinualTask in here yet");
391 }
392 CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
393 prev: prev_ct,
394 new: new_ct,
395 }) => {
396 tracing::debug!(
397 ?prev_ct,
398 ?new_ct,
399 "not handling AlterContinualTask in here yet"
400 );
401 }
402 CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
403 ct,
404 _full_name,
405 )) => {
406 compute_sinks_to_drop.push((ct.cluster_id, ct.global_id()));
407 sources_to_drop.push((catalog_id, ct.global_id()));
408 }
409 CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
410 }
414 CatalogImplication::Secret(CatalogImplicationKind::Altered {
415 prev: _prev_secret,
416 new: _new_secret,
417 }) => {
418 }
421 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
422 _secret,
423 _full_name,
424 )) => {
425 secrets_to_drop.push(catalog_id);
426 }
427 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
428 match &connection.details {
429 ConnectionDetails::Ssh { .. } => {}
432 ConnectionDetails::AwsPrivatelink(privatelink) => {
434 let spec = VpcEndpointConfig {
435 aws_service_name: privatelink.service_name.to_owned(),
436 availability_zone_ids: privatelink.availability_zones.to_owned(),
437 };
438 vpc_endpoints_to_create.push((catalog_id, spec));
439 }
440 _ => {}
442 }
443 }
444 CatalogImplication::Connection(CatalogImplicationKind::Altered {
445 prev: _prev_connection,
446 new: new_connection,
447 }) => {
448 self.handle_alter_connection(
449 catalog_id,
450 new_connection,
451 &mut vpc_endpoints_to_create,
452 &mut source_connections_to_alter,
453 &mut sink_connections_to_alter,
454 &mut source_export_data_configs_to_alter,
455 );
456 }
457 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
458 connection,
459 _full_name,
460 )) => {
461 match &connection.details {
462 ConnectionDetails::Ssh { .. } => {
464 secrets_to_drop.push(catalog_id);
465 }
466 ConnectionDetails::AwsPrivatelink(_) => {
469 vpc_endpoints_to_drop.push(catalog_id);
470 }
471 _ => (),
472 }
473 }
474 CatalogImplication::None => {
475 }
477 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
478 unreachable!("clusters and cluster replicas are handled below")
479 }
480 CatalogImplication::Table(CatalogImplicationKind::None)
481 | CatalogImplication::Source(CatalogImplicationKind::None)
482 | CatalogImplication::Sink(CatalogImplicationKind::None)
483 | CatalogImplication::Index(CatalogImplicationKind::None)
484 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
485 | CatalogImplication::View(CatalogImplicationKind::None)
486 | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
487 | CatalogImplication::Secret(CatalogImplicationKind::None)
488 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
489 unreachable!("will never leave None in place");
490 }
491 }
492 }
493
494 for (cluster_id, command) in cluster_commands {
495 tracing::trace!(?command, "have cluster command to apply!");
496
497 match command {
498 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
499 tracing::debug!(?cluster, "not handling AddCluster in here yet");
500 }
501 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
502 prev: prev_cluster,
503 new: new_cluster,
504 }) => {
505 tracing::debug!(
506 ?prev_cluster,
507 ?new_cluster,
508 "not handling AlterCluster in here yet"
509 );
510 }
511 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
512 cluster,
513 _full_name,
514 )) => {
515 clusters_to_drop.push(cluster_id);
516 dropped_cluster_names.insert(cluster_id, cluster.name);
517 }
518 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
519 unreachable!("will never leave None in place");
520 }
521 command => {
522 unreachable!(
523 "we only handle cluster commands in this map, got: {:?}",
524 command
525 );
526 }
527 }
528 }
529
530 for ((cluster_id, replica_id), command) in cluster_replica_commands {
531 tracing::trace!(?command, "have cluster replica command to apply!");
532
533 match command {
534 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
535 tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
536 }
537 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
538 prev: prev_replica,
539 new: new_replica,
540 }) => {
541 tracing::debug!(
542 ?prev_replica,
543 ?new_replica,
544 "not handling AlterClusterReplica in here yet"
545 );
546 }
547 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
548 _replica,
549 _full_name,
550 )) => {
551 cluster_replicas_to_drop.push((cluster_id, replica_id));
552 }
553 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
554 unreachable!("will never leave None in place");
555 }
556 command => {
557 unreachable!(
558 "we only handle cluster replica commands in this map, got: {:?}",
559 command
560 );
561 }
562 }
563 }
564
565 if !source_collections_to_create.is_empty() {
566 self.create_source_collections(source_collections_to_create)
567 .await?;
568 }
569
570 if !table_collections_to_create.is_empty() {
573 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
574 .await?;
575 }
576 self.initialize_storage_collections(storage_policies_to_initialize)
582 .await?;
583
584 if !vpc_endpoints_to_create.is_empty() {
586 if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
587 for (connection_id, spec) in vpc_endpoints_to_create {
588 if let Err(err) = cloud_resource_controller
589 .ensure_vpc_endpoint(connection_id, spec)
590 .await
591 {
592 tracing::error!(?err, "failed to ensure vpc endpoint!");
593 }
594 }
595 } else {
596 tracing::error!(
597 "AWS PrivateLink connections unsupported without cloud_resource_controller"
598 );
599 }
600 }
601
602 if !source_connections_to_alter.is_empty() {
604 self.controller
605 .storage
606 .alter_ingestion_connections(source_connections_to_alter)
607 .await
608 .unwrap_or_terminate("cannot fail to alter ingestion connections");
609 }
610
611 if !sink_connections_to_alter.is_empty() {
612 self.controller
613 .storage
614 .alter_export_connections(sink_connections_to_alter)
615 .await
616 .unwrap_or_terminate("altering export connections after txn must succeed");
617 }
618
619 if !source_export_data_configs_to_alter.is_empty() {
620 self.controller
621 .storage
622 .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
623 .await
624 .unwrap_or_terminate("altering source export data configs after txn must succeed");
625 }
626
627 sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
629
630 let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
631 .iter()
632 .map(|(_, gid)| *gid)
633 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
634 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
635 .chain(view_gids_to_drop.iter().copied())
636 .collect();
637
638 for (sink_id, sink) in &self.active_compute_sinks {
641 let cluster_id = sink.cluster_id();
642 if let Some(id) = sink
643 .depends_on()
644 .iter()
645 .find(|id| readable_collections_to_drop.contains(id))
646 {
647 let name = dropped_item_names
648 .get(id)
649 .map(|n| format!("relation {}", n.quoted()))
650 .expect("missing relation name");
651 active_compute_sinks_to_drop.insert(
652 *sink_id,
653 ActiveComputeSinkRetireReason::DependencyDropped(name),
654 );
655 } else if clusters_to_drop.contains(&cluster_id) {
656 let name = dropped_cluster_names
657 .get(&cluster_id)
658 .map(|n| format!("cluster {}", n.quoted()))
659 .expect("missing cluster name");
660 active_compute_sinks_to_drop.insert(
661 *sink_id,
662 ActiveComputeSinkRetireReason::DependencyDropped(name),
663 );
664 }
665 }
666
667 for (uuid, pending_peek) in &self.pending_peeks {
669 if let Some(id) = pending_peek
670 .depends_on
671 .iter()
672 .find(|id| readable_collections_to_drop.contains(id))
673 {
674 let name = dropped_item_names
675 .get(id)
676 .map(|n| format!("relation {}", n.quoted()))
677 .expect("missing relation name");
678 peeks_to_drop.push((name, uuid.clone()));
679 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
680 let name = dropped_cluster_names
681 .get(&pending_peek.cluster_id)
682 .map(|n| format!("cluster {}", n.quoted()))
683 .expect("missing cluster name");
684 peeks_to_drop.push((name, uuid.clone()));
685 }
686 }
687
688 for (conn_id, pending_copy) in &self.active_copies {
690 let dropping_table = tables_to_drop
691 .iter()
692 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
693 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
694
695 if dropping_table || dropping_cluster {
696 copies_to_drop.push(conn_id.clone());
697 }
698 }
699
700 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
701 .iter()
702 .map(|(_id, gid)| gid)
703 .chain(storage_sink_gids_to_drop.iter())
704 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
705 .copied()
706 .collect();
707 let compute_gids_to_drop: Vec<_> = indexes_to_drop
708 .iter()
709 .chain(compute_sinks_to_drop.iter())
710 .copied()
711 .collect();
712
713 let mut timeline_id_bundles = BTreeMap::new();
719
720 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
721 let mut id_bundle = CollectionIdBundle::default();
722
723 for storage_id in read_holds.storage_ids() {
724 if storage_gids_to_drop.contains(&storage_id) {
725 id_bundle.storage_ids.insert(storage_id);
726 }
727 }
728
729 for (instance_id, id) in read_holds.compute_ids() {
730 if compute_gids_to_drop.contains(&(instance_id, id))
731 || clusters_to_drop.contains(&instance_id)
732 {
733 id_bundle
734 .compute_ids
735 .entry(instance_id)
736 .or_default()
737 .insert(id);
738 }
739 }
740
741 timeline_id_bundles.insert(timeline.clone(), id_bundle);
742 }
743
744 let mut timeline_associations = BTreeMap::new();
745 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
746 let TimelineState { read_holds, .. } = self
747 .global_timelines
748 .get(&timeline)
749 .expect("all timelines have a timestamp oracle");
750
751 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
752 timeline_associations.insert(timeline, (empty, id_bundle));
753 }
754
755 let _: () = async {
758 if !timeline_associations.is_empty() {
759 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
760 let became_empty =
761 self.remove_resources_associated_with_timeline(timeline, id_bundle);
762 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
763 }
764 }
765
766 if !tables_to_drop.is_empty() {
773 let ts = self.get_local_write_ts().await;
774 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
775 }
776
777 if !sources_to_drop.is_empty() {
778 self.drop_sources(sources_to_drop);
779 }
780
781 if !storage_sink_gids_to_drop.is_empty() {
782 self.drop_storage_sinks(storage_sink_gids_to_drop);
783 }
784
785 if !active_compute_sinks_to_drop.is_empty() {
786 self.retire_compute_sinks(active_compute_sinks_to_drop)
787 .await;
788 }
789
790 if !peeks_to_drop.is_empty() {
791 for (dropped_name, uuid) in peeks_to_drop {
792 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
793 let cancel_reason = PeekResponse::Error(format!(
794 "query could not complete because {dropped_name} was dropped"
795 ));
796 self.controller
797 .compute
798 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
799 .unwrap_or_terminate("unable to cancel peek");
800 self.retire_execution(
801 StatementEndedExecutionReason::Canceled,
802 pending_peek.ctx_extra.defuse(),
803 );
804 }
805 }
806 }
807
808 if !copies_to_drop.is_empty() {
809 for conn_id in copies_to_drop {
810 self.cancel_pending_copy(&conn_id);
811 }
812 }
813
814 if !compute_gids_to_drop.is_empty() {
815 self.drop_compute_collections(compute_gids_to_drop);
816 }
817
818 if !vpc_endpoints_to_drop.is_empty() {
819 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
820 }
821
822 if !cluster_replicas_to_drop.is_empty() {
823 fail::fail_point!("after_catalog_drop_replica");
824
825 for (cluster_id, replica_id) in cluster_replicas_to_drop {
826 self.drop_replica(cluster_id, replica_id);
827 }
828 }
829 if !clusters_to_drop.is_empty() {
830 for cluster_id in clusters_to_drop {
831 self.controller.drop_cluster(cluster_id);
832 }
833 }
834
835 task::spawn(|| "drop_replication_slots_and_secrets", {
843 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
844 let secrets_controller = Arc::clone(&self.secrets_controller);
845 let secrets_reader = Arc::clone(self.secrets_reader());
846 let storage_config = self.controller.storage.config().clone();
847
848 async move {
849 for (connection, replication_slot_name) in replication_slots_to_drop {
850 tracing::info!(?replication_slot_name, "dropping replication slot");
851
852 let result: Result<(), anyhow::Error> = Retry::default()
858 .max_duration(Duration::from_secs(60))
859 .retry_async(|_state| async {
860 let config = connection
861 .config(&secrets_reader, &storage_config, InTask::No)
862 .await
863 .map_err(|e| {
864 anyhow::anyhow!(
865 "error creating Postgres client for \
866 dropping acquired slots: {}",
867 e.display_with_causes()
868 )
869 })?;
870
871 mz_postgres_util::drop_replication_slots(
872 &ssh_tunnel_manager,
873 config.clone(),
874 &[(&replication_slot_name, true)],
875 )
876 .await?;
877
878 Ok(())
879 })
880 .await;
881
882 if let Err(err) = result {
883 tracing::warn!(
884 ?replication_slot_name,
885 ?err,
886 "failed to drop replication slot"
887 );
888 }
889 }
890
891 fail_point!("drop_secrets");
899 for secret in secrets_to_drop {
900 if let Err(e) = secrets_controller.delete(secret).await {
901 warn!("Dropping secrets has encountered an error: {}", e);
902 }
903 }
904 }
905 });
906 }
907 .instrument(info_span!(
908 "coord::apply_catalog_implications_inner::finalize"
909 ))
910 .await;
911
912 Ok(())
913 }
914
915 #[instrument(level = "debug")]
916 async fn create_table_collections(
917 &mut self,
918 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
919 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
920 ) -> Result<(), AdapterError> {
921 let register_ts = self.get_local_write_ts().await.timestamp;
923
924 self.catalog
931 .confirm_leadership()
932 .await
933 .unwrap_or_terminate("unable to confirm leadership");
934
935 for id in execution_timestamps_to_set {
936 self.set_statement_execution_timestamp(id, register_ts);
937 }
938
939 let storage_metadata = self.catalog.state().storage_metadata();
940
941 self.controller
942 .storage
943 .create_collections(
944 storage_metadata,
945 Some(register_ts),
946 table_collections_to_create.into_iter().collect_vec(),
947 )
948 .await
949 .unwrap_or_terminate("cannot fail to create collections");
950
951 self.apply_local_write(register_ts).await;
952
953 Ok(())
954 }
955
956 #[instrument(level = "debug")]
957 async fn create_source_collections(
958 &mut self,
959 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
960 ) -> Result<(), AdapterError> {
961 let storage_metadata = self.catalog.state().storage_metadata();
962
963 self.controller
964 .storage
965 .create_collections(
966 storage_metadata,
967 None, source_collections_to_create.into_iter().collect_vec(),
969 )
970 .await
971 .unwrap_or_terminate("cannot fail to create collections");
972
973 Ok(())
974 }
975
976 #[instrument(level = "debug")]
977 async fn initialize_storage_collections(
978 &mut self,
979 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
980 ) -> Result<(), AdapterError> {
981 for (compaction_window, global_ids) in storage_policies_to_initialize {
982 self.initialize_read_policies(
983 &CollectionIdBundle {
984 storage_ids: global_ids,
985 compute_ids: BTreeMap::new(),
986 },
987 compaction_window,
988 )
989 .await;
990 }
991
992 Ok(())
993 }
994
995 #[instrument(level = "debug")]
996 async fn handle_create_table(
997 &self,
998 ctx: &Option<&mut ExecuteContext>,
999 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1000 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1001 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1002 table_id: CatalogItemId,
1003 table: Table,
1004 ) -> Result<(), AdapterError> {
1005 match &table.data_source {
1009 TableDataSource::TableWrites { defaults: _ } => {
1010 let versions: BTreeMap<_, _> = table
1011 .collection_descs()
1012 .map(|(gid, version, desc)| (version, (gid, desc)))
1013 .collect();
1014 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1015 let collection_desc = CollectionDescription::for_table(desc.clone());
1016
1017 (*gid, collection_desc)
1018 });
1019
1020 let compaction_window = table
1021 .custom_logical_compaction_window
1022 .unwrap_or(CompactionWindow::Default);
1023 let ids_to_initialize = storage_policies_to_initialize
1024 .entry(compaction_window)
1025 .or_default();
1026
1027 for (gid, collection_desc) in collection_descs {
1028 storage_collections_to_create.insert(gid, collection_desc);
1029 ids_to_initialize.insert(gid);
1030 }
1031
1032 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1033 execution_timestamps_to_set.insert(id);
1034 }
1035 }
1036 TableDataSource::DataSource {
1037 desc: data_source_desc,
1038 timeline,
1039 } => {
1040 match data_source_desc {
1041 DataSourceDesc::IngestionExport {
1042 ingestion_id,
1043 external_reference: _,
1044 details,
1045 data_config,
1046 } => {
1047 let global_ingestion_id =
1048 self.catalog().get_entry(ingestion_id).latest_global_id();
1049
1050 let collection_desc = CollectionDescription::<Timestamp> {
1051 desc: table.desc.latest(),
1052 data_source: DataSource::IngestionExport {
1053 ingestion_id: global_ingestion_id,
1054 details: details.clone(),
1055 data_config: data_config
1056 .clone()
1057 .into_inline_connection(self.catalog.state()),
1058 },
1059 since: None,
1060 timeline: Some(timeline.clone()),
1061 primary: None,
1062 };
1063
1064 let global_id = table
1065 .global_ids()
1066 .expect_element(|| "subsources cannot have multiple versions");
1067
1068 storage_collections_to_create.insert(global_id, collection_desc);
1069
1070 let read_policies = self
1071 .catalog()
1072 .state()
1073 .source_compaction_windows(vec![table_id]);
1074 for (compaction_window, catalog_ids) in read_policies {
1075 let compaction_ids = storage_policies_to_initialize
1076 .entry(compaction_window)
1077 .or_default();
1078
1079 let gids = catalog_ids
1080 .into_iter()
1081 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1082 .flatten();
1083 compaction_ids.extend(gids);
1084 }
1085 }
1086 DataSourceDesc::Webhook {
1087 validate_using: _,
1088 body_format: _,
1089 headers: _,
1090 cluster_id: _,
1091 } => {
1092 assert_eq!(
1094 table.desc.latest_version(),
1095 RelationVersion::root(),
1096 "found webhook with more than 1 relation version, {:?}",
1097 table.desc
1098 );
1099 let desc = table.desc.latest();
1100
1101 let collection_desc = CollectionDescription::<Timestamp> {
1102 desc,
1103 data_source: DataSource::Webhook,
1104 since: None,
1105 timeline: Some(timeline.clone()),
1106 primary: None,
1107 };
1108
1109 let global_id = table
1110 .global_ids()
1111 .expect_element(|| "webhooks cannot have multiple versions");
1112
1113 storage_collections_to_create.insert(global_id, collection_desc);
1114
1115 let read_policies = self
1116 .catalog()
1117 .state()
1118 .source_compaction_windows(vec![table_id]);
1119
1120 for (compaction_window, catalog_ids) in read_policies {
1121 let compaction_ids = storage_policies_to_initialize
1122 .entry(compaction_window)
1123 .or_default();
1124
1125 let gids = catalog_ids
1126 .into_iter()
1127 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1128 .flatten();
1129 compaction_ids.extend(gids);
1130 }
1131 }
1132 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1133 }
1134 }
1135 }
1136
1137 Ok(())
1138 }
1139
1140 #[instrument(level = "debug")]
1141 async fn handle_alter_table(
1142 &mut self,
1143 prev_table: Table,
1144 new_table: Table,
1145 ) -> Result<(), AdapterError> {
1146 let existing_gid = prev_table.global_id_writes();
1147 let new_gid = new_table.global_id_writes();
1148
1149 if existing_gid == new_gid {
1150 return Ok(());
1154 }
1155
1156 let existing_table = crate::CollectionIdBundle {
1160 storage_ids: BTreeSet::from([existing_gid]),
1161 compute_ids: BTreeMap::new(),
1162 };
1163 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1164
1165 let expected_version = prev_table.desc.latest_version();
1166 let new_version = new_table.desc.latest_version();
1167 let new_desc = new_table
1168 .desc
1169 .at_version(RelationVersionSelector::Specific(new_version));
1170
1171 let register_ts = self.get_local_write_ts().await.timestamp;
1172
1173 self.controller
1175 .storage
1176 .alter_table_desc(
1177 existing_gid,
1178 new_gid,
1179 new_desc,
1180 expected_version,
1181 register_ts,
1182 )
1183 .await
1184 .expect("failed to alter desc of table");
1185
1186 let compaction_window = new_table
1188 .custom_logical_compaction_window
1189 .unwrap_or(CompactionWindow::Default);
1190 self.initialize_read_policies(
1191 &crate::CollectionIdBundle {
1192 storage_ids: BTreeSet::from([new_gid]),
1193 compute_ids: BTreeMap::new(),
1194 },
1195 compaction_window,
1196 )
1197 .await;
1198
1199 self.apply_local_write(register_ts).await;
1200
1201 drop(existing_table_read_hold);
1203
1204 Ok(())
1205 }
1206
1207 #[instrument(level = "debug")]
1208 async fn handle_create_source(
1209 &self,
1210 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1211 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1212 item_id: CatalogItemId,
1213 source: Source,
1214 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1215 ) -> Result<(), AdapterError> {
1216 let data_source = match source.data_source {
1217 DataSourceDesc::Ingestion { desc, cluster_id } => {
1218 let desc = desc.into_inline_connection(self.catalog().state());
1219 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1220
1221 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1222 desc,
1223 cluster_id,
1224 item_global_id,
1225 );
1226
1227 DataSource::Ingestion(ingestion)
1228 }
1229 DataSourceDesc::OldSyntaxIngestion {
1230 desc,
1231 progress_subsource,
1232 data_config,
1233 details,
1234 cluster_id,
1235 } => {
1236 let desc = desc.into_inline_connection(self.catalog().state());
1237 let data_config = data_config.into_inline_connection(self.catalog().state());
1238
1239 let progress_subsource = self
1242 .catalog()
1243 .get_entry(&progress_subsource)
1244 .latest_global_id();
1245
1246 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1247 desc,
1248 cluster_id,
1249 progress_subsource,
1250 );
1251
1252 let legacy_export = SourceExport {
1253 storage_metadata: (),
1254 data_config,
1255 details,
1256 };
1257
1258 ingestion
1259 .source_exports
1260 .insert(source.global_id, legacy_export);
1261
1262 DataSource::Ingestion(ingestion)
1263 }
1264 DataSourceDesc::IngestionExport {
1265 ingestion_id,
1266 external_reference: _,
1267 details,
1268 data_config,
1269 } => {
1270 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1273
1274 DataSource::IngestionExport {
1275 ingestion_id,
1276 details,
1277 data_config: data_config.into_inline_connection(self.catalog().state()),
1278 }
1279 }
1280 DataSourceDesc::Progress => DataSource::Progress,
1281 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1282 DataSourceDesc::Introspection(_) => {
1283 unreachable!("cannot create sources with introspection data sources")
1284 }
1285 };
1286
1287 storage_collections_to_create.insert(
1288 source.global_id,
1289 CollectionDescription::<Timestamp> {
1290 desc: source.desc.clone(),
1291 data_source,
1292 timeline: Some(source.timeline),
1293 since: None,
1294 primary: None,
1295 },
1296 );
1297
1298 for (compaction_window, catalog_ids) in compaction_windows {
1300 let compaction_ids = storage_policies_to_initialize
1301 .entry(compaction_window)
1302 .or_default();
1303
1304 let gids = catalog_ids
1305 .into_iter()
1306 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1307 .flatten();
1308 compaction_ids.extend(gids);
1309 }
1310
1311 Ok(())
1312 }
1313
1314 #[instrument(level = "debug")]
1321 fn handle_alter_connection(
1322 &self,
1323 connection_id: CatalogItemId,
1324 connection: Connection,
1325 vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1326 source_connections_to_alter: &mut BTreeMap<
1327 GlobalId,
1328 GenericSourceConnection<InlinedConnection>,
1329 >,
1330 sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1331 source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1332 ) {
1333 use std::collections::VecDeque;
1334
1335 if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1337 let spec = VpcEndpointConfig {
1338 aws_service_name: privatelink.service_name.to_owned(),
1339 availability_zone_ids: privatelink.availability_zones.to_owned(),
1340 };
1341 vpc_endpoints_to_create.push((connection_id, spec));
1342 }
1343
1344 let mut connections_to_process = VecDeque::new();
1348 connections_to_process.push_front(connection_id.clone());
1349
1350 while let Some(id) = connections_to_process.pop_front() {
1351 for dependent_id in self.catalog().get_entry(&id).used_by() {
1352 let dependent_entry = self.catalog().get_entry(dependent_id);
1353 match dependent_entry.item() {
1354 CatalogItem::Connection(_) => {
1355 connections_to_process.push_back(*dependent_id);
1359 }
1360 CatalogItem::Source(source) => {
1361 let desc = match &dependent_entry
1362 .source()
1363 .expect("known to be source")
1364 .data_source
1365 {
1366 DataSourceDesc::Ingestion { desc, .. }
1367 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1368 desc.clone().into_inline_connection(self.catalog().state())
1369 }
1370 _ => {
1371 continue;
1373 }
1374 };
1375
1376 source_connections_to_alter.insert(source.global_id, desc.connection);
1377 }
1378 CatalogItem::Sink(sink) => {
1379 let export = dependent_entry.sink().expect("known to be sink");
1380 sink_connections_to_alter.insert(
1381 sink.global_id,
1382 export
1383 .connection
1384 .clone()
1385 .into_inline_connection(self.catalog().state()),
1386 );
1387 }
1388 CatalogItem::Table(table) => {
1389 if let Some((_, _, _, export_data_config)) =
1393 dependent_entry.source_export_details()
1394 {
1395 let data_config = export_data_config.clone();
1396 source_export_data_configs_to_alter.insert(
1397 table.global_id_writes(),
1398 data_config.into_inline_connection(self.catalog().state()),
1399 );
1400 }
1401 }
1402 _ => {
1403 }
1406 }
1407 }
1408 }
1409 }
1410}
1411
1412#[derive(Debug, Clone)]
1418enum CatalogImplication {
1419 None,
1420 Table(CatalogImplicationKind<Table>),
1421 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1422 Sink(CatalogImplicationKind<Sink>),
1423 Index(CatalogImplicationKind<Index>),
1424 MaterializedView(CatalogImplicationKind<MaterializedView>),
1425 View(CatalogImplicationKind<View>),
1426 ContinualTask(CatalogImplicationKind<ContinualTask>),
1427 Secret(CatalogImplicationKind<Secret>),
1428 Connection(CatalogImplicationKind<Connection>),
1429 Cluster(CatalogImplicationKind<Cluster>),
1430 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1431}
1432
1433#[derive(Debug, Clone)]
1434enum CatalogImplicationKind<T> {
1435 None,
1437 Added(T),
1439 Dropped(T, String),
1441 Altered { prev: T, new: T },
1443}
1444
1445impl<T: Clone> CatalogImplicationKind<T> {
1446 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1449 use CatalogImplicationKind::*;
1450 use StateDiff::*;
1451
1452 let new_state = match (&*self, diff) {
1453 (None, Addition) => Added(item),
1455 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1456
1457 (Added(existing), Retraction) => {
1459 Altered {
1461 prev: item,
1462 new: existing.clone(),
1463 }
1464 }
1465 (Added(_), Addition) => {
1466 return Err("Cannot add an already added object".to_string());
1467 }
1468
1469 (Dropped(existing, _), Addition) => {
1471 Altered {
1473 prev: existing.clone(),
1474 new: item,
1475 }
1476 }
1477 (Dropped(_, _), Retraction) => {
1478 return Err("Cannot drop an already dropped object".to_string());
1479 }
1480
1481 (Altered { .. }, _) => {
1483 return Err(format!(
1484 "Cannot apply {:?} to an object in Altered state",
1485 diff
1486 ));
1487 }
1488 };
1489
1490 *self = new_state;
1491 Ok(())
1492 }
1493}
1494
1495macro_rules! impl_absorb_method {
1497 (
1498 $method_name:ident,
1499 $variant:ident,
1500 $item_type:ty
1501 ) => {
1502 fn $method_name(
1503 &mut self,
1504 item: $item_type,
1505 parsed_full_name: Option<String>,
1506 diff: StateDiff,
1507 ) {
1508 let state = match self {
1509 CatalogImplication::$variant(state) => state,
1510 CatalogImplication::None => {
1511 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1512 match self {
1513 CatalogImplication::$variant(state) => state,
1514 _ => unreachable!(),
1515 }
1516 }
1517 _ => {
1518 panic!(
1519 "Unexpected command type for {:?}: {} {:?}",
1520 self,
1521 stringify!($variant),
1522 diff,
1523 );
1524 }
1525 };
1526
1527 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1528 panic!(
1529 "Invalid state transition for {}: {}",
1530 stringify!($variant),
1531 e
1532 );
1533 }
1534 }
1535 };
1536}
1537
1538impl CatalogImplication {
1539 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1542 match catalog_update.kind {
1543 ParsedStateUpdateKind::Item {
1544 durable_item: _,
1545 parsed_item,
1546 connection,
1547 parsed_full_name,
1548 } => match parsed_item {
1549 CatalogItem::Table(table) => {
1550 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1551 }
1552 CatalogItem::Source(source) => {
1553 self.absorb_source(
1554 (source, connection),
1555 Some(parsed_full_name),
1556 catalog_update.diff,
1557 );
1558 }
1559 CatalogItem::Sink(sink) => {
1560 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1561 }
1562 CatalogItem::Index(index) => {
1563 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1564 }
1565 CatalogItem::MaterializedView(mv) => {
1566 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1567 }
1568 CatalogItem::View(view) => {
1569 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1570 }
1571 CatalogItem::ContinualTask(ct) => {
1572 self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1573 }
1574 CatalogItem::Secret(secret) => {
1575 self.absorb_secret(secret, None, catalog_update.diff);
1576 }
1577 CatalogItem::Connection(connection) => {
1578 self.absorb_connection(connection, None, catalog_update.diff);
1579 }
1580 CatalogItem::Log(_) => {}
1581 CatalogItem::Type(_) => {}
1582 CatalogItem::Func(_) => {}
1583 },
1584 ParsedStateUpdateKind::TemporaryItem {
1585 durable_item: _,
1586 parsed_item,
1587 connection,
1588 parsed_full_name,
1589 } => match parsed_item {
1590 CatalogItem::Table(table) => {
1591 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1592 }
1593 CatalogItem::Source(source) => {
1594 self.absorb_source(
1595 (source, connection),
1596 Some(parsed_full_name),
1597 catalog_update.diff,
1598 );
1599 }
1600 CatalogItem::Sink(sink) => {
1601 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1602 }
1603 CatalogItem::Index(index) => {
1604 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1605 }
1606 CatalogItem::MaterializedView(mv) => {
1607 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1608 }
1609 CatalogItem::View(view) => {
1610 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1611 }
1612 CatalogItem::ContinualTask(ct) => {
1613 self.absorb_continual_task(ct, None, catalog_update.diff);
1614 }
1615 CatalogItem::Secret(secret) => {
1616 self.absorb_secret(secret, None, catalog_update.diff);
1617 }
1618 CatalogItem::Connection(connection) => {
1619 self.absorb_connection(connection, None, catalog_update.diff);
1620 }
1621 CatalogItem::Log(_) => {}
1622 CatalogItem::Type(_) => {}
1623 CatalogItem::Func(_) => {}
1624 },
1625 ParsedStateUpdateKind::Cluster {
1626 durable_cluster: _,
1627 parsed_cluster,
1628 } => {
1629 self.absorb_cluster(parsed_cluster, catalog_update.diff);
1630 }
1631 ParsedStateUpdateKind::ClusterReplica {
1632 durable_cluster_replica: _,
1633 parsed_cluster_replica,
1634 } => {
1635 self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1636 }
1637 }
1638 }
1639
1640 impl_absorb_method!(absorb_table, Table, Table);
1641 impl_absorb_method!(
1642 absorb_source,
1643 Source,
1644 (Source, Option<GenericSourceConnection>)
1645 );
1646 impl_absorb_method!(absorb_sink, Sink, Sink);
1647 impl_absorb_method!(absorb_index, Index, Index);
1648 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1649 impl_absorb_method!(absorb_view, View, View);
1650
1651 impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1652 impl_absorb_method!(absorb_secret, Secret, Secret);
1653 impl_absorb_method!(absorb_connection, Connection, Connection);
1654
1655 fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1657 let state = match self {
1658 CatalogImplication::Cluster(state) => state,
1659 CatalogImplication::None => {
1660 *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1661 match self {
1662 CatalogImplication::Cluster(state) => state,
1663 _ => unreachable!(),
1664 }
1665 }
1666 _ => {
1667 panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1668 }
1669 };
1670
1671 if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1672 panic!("invalid state transition for cluster: {}", e);
1673 }
1674 }
1675
1676 fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1678 let state = match self {
1679 CatalogImplication::ClusterReplica(state) => state,
1680 CatalogImplication::None => {
1681 *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1682 match self {
1683 CatalogImplication::ClusterReplica(state) => state,
1684 _ => unreachable!(),
1685 }
1686 }
1687 _ => {
1688 panic!(
1689 "Unexpected command type for {:?}: ClusterReplica {:?}",
1690 self, diff
1691 );
1692 }
1693 };
1694
1695 if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1696 {
1697 panic!("invalid state transition for cluster replica: {}", e);
1698 }
1699 }
1700}
1701
1702#[cfg(test)]
1703mod tests {
1704 use super::*;
1705 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1706 use mz_sql::names::ResolvedIds;
1707 use std::collections::BTreeMap;
1708
1709 fn create_test_table(name: &str) -> Table {
1710 Table {
1711 desc: VersionedRelationDesc::new(
1712 RelationDesc::builder()
1713 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1714 .finish(),
1715 ),
1716 create_sql: None,
1717 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1718 conn_id: None,
1719 resolved_ids: ResolvedIds::empty(),
1720 custom_logical_compaction_window: None,
1721 is_retained_metrics_object: false,
1722 data_source: TableDataSource::TableWrites { defaults: vec![] },
1723 }
1724 }
1725
1726 #[mz_ore::test]
1727 fn test_item_state_transitions() {
1728 let mut state = CatalogImplicationKind::None;
1730 assert!(
1731 state
1732 .transition("item1".to_string(), None, StateDiff::Addition)
1733 .is_ok()
1734 );
1735 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1736
1737 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1739 assert!(
1740 state
1741 .transition("old_item".to_string(), None, StateDiff::Retraction)
1742 .is_ok()
1743 );
1744 match &state {
1745 CatalogImplicationKind::Altered { prev, new } => {
1746 assert_eq!(prev, "old_item");
1748 assert_eq!(new, "new_item");
1750 }
1751 _ => panic!("Expected Altered state"),
1752 }
1753
1754 let mut state = CatalogImplicationKind::None;
1756 assert!(
1757 state
1758 .transition(
1759 "item1".to_string(),
1760 Some("test_name".to_string()),
1761 StateDiff::Retraction
1762 )
1763 .is_ok()
1764 );
1765 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1766
1767 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1769 assert!(
1770 state
1771 .transition("new_item".to_string(), None, StateDiff::Addition)
1772 .is_ok()
1773 );
1774 match &state {
1775 CatalogImplicationKind::Altered { prev, new } => {
1776 assert_eq!(prev, "old_item");
1778 assert_eq!(new, "new_item");
1780 }
1781 _ => panic!("Expected Altered state"),
1782 }
1783
1784 let mut state = CatalogImplicationKind::Added("item".to_string());
1786 assert!(
1787 state
1788 .transition("item2".to_string(), None, StateDiff::Addition)
1789 .is_err()
1790 );
1791
1792 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1793 assert!(
1794 state
1795 .transition("item2".to_string(), None, StateDiff::Retraction)
1796 .is_err()
1797 );
1798 }
1799
1800 #[mz_ore::test]
1801 fn test_table_absorb_state_machine() {
1802 let table1 = create_test_table("table1");
1803 let table2 = create_test_table("table2");
1804
1805 let mut cmd = CatalogImplication::None;
1807 cmd.absorb_table(
1808 table1.clone(),
1809 Some("schema.table1".to_string()),
1810 StateDiff::Addition,
1811 );
1812 match &cmd {
1814 CatalogImplication::Table(state) => match state {
1815 CatalogImplicationKind::Added(t) => {
1816 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1817 }
1818 _ => panic!("Expected Added state"),
1819 },
1820 _ => panic!("Expected Table command"),
1821 }
1822
1823 cmd.absorb_table(
1827 table2.clone(),
1828 Some("schema.table2".to_string()),
1829 StateDiff::Retraction,
1830 );
1831 match &cmd {
1832 CatalogImplication::Table(state) => match state {
1833 CatalogImplicationKind::Altered { prev, new } => {
1834 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1836 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1837 }
1838 _ => panic!("Expected Altered state"),
1839 },
1840 _ => panic!("Expected Table command"),
1841 }
1842
1843 let mut cmd = CatalogImplication::None;
1845 cmd.absorb_table(
1846 table1.clone(),
1847 Some("schema.table1".to_string()),
1848 StateDiff::Retraction,
1849 );
1850 match &cmd {
1851 CatalogImplication::Table(state) => match state {
1852 CatalogImplicationKind::Dropped(t, name) => {
1853 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1854 assert_eq!(name, "schema.table1");
1855 }
1856 _ => panic!("Expected Dropped state"),
1857 },
1858 _ => panic!("Expected Table command"),
1859 }
1860
1861 cmd.absorb_table(
1863 table2.clone(),
1864 Some("schema.table2".to_string()),
1865 StateDiff::Addition,
1866 );
1867 match &cmd {
1868 CatalogImplication::Table(state) => match state {
1869 CatalogImplicationKind::Altered { prev, new } => {
1870 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1872 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1873 }
1874 _ => panic!("Expected Altered state"),
1875 },
1876 _ => panic!("Expected Table command"),
1877 }
1878 }
1879
1880 #[mz_ore::test]
1881 #[should_panic(expected = "Cannot add an already added object")]
1882 fn test_invalid_double_add() {
1883 let table = create_test_table("table");
1884 let mut cmd = CatalogImplication::None;
1885
1886 cmd.absorb_table(
1888 table.clone(),
1889 Some("schema.table".to_string()),
1890 StateDiff::Addition,
1891 );
1892
1893 cmd.absorb_table(
1895 table.clone(),
1896 Some("schema.table".to_string()),
1897 StateDiff::Addition,
1898 );
1899 }
1900
1901 #[mz_ore::test]
1902 #[should_panic(expected = "Cannot drop an already dropped object")]
1903 fn test_invalid_double_drop() {
1904 let table = create_test_table("table");
1905 let mut cmd = CatalogImplication::None;
1906
1907 cmd.absorb_table(
1909 table.clone(),
1910 Some("schema.table".to_string()),
1911 StateDiff::Retraction,
1912 );
1913
1914 cmd.absorb_table(
1916 table.clone(),
1917 Some("schema.table".to_string()),
1918 StateDiff::Retraction,
1919 );
1920 }
1921}