1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37 CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38 MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
55use mz_storage_types::sinks::StorageSinkConnection;
56use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig};
57use tracing::{Instrument, info_span, warn};
58
59use crate::active_compute_sink::ActiveComputeSinkRetireReason;
60use crate::coord::Coordinator;
61use crate::coord::catalog_implications::parsed_state_updates::{
62 ParsedStateUpdate, ParsedStateUpdateKind,
63};
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71 #[instrument(level = "debug")]
80 pub async fn apply_catalog_implications(
81 &mut self,
82 ctx: Option<&mut ExecuteContext>,
83 catalog_updates: Vec<ParsedStateUpdate>,
84 ) -> Result<(), AdapterError> {
85 let start = Instant::now();
86
87 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90 BTreeMap::new();
91
92 for update in catalog_updates {
93 tracing::trace!(?update, "got parsed state update");
94 match &update.kind {
95 ParsedStateUpdateKind::Item {
96 durable_item,
97 parsed_item: _,
98 connection: _,
99 parsed_full_name: _,
100 } => {
101 let entry = catalog_implications
102 .entry(durable_item.id.clone())
103 .or_insert_with(|| CatalogImplication::None);
104 entry.absorb(update);
105 }
106 ParsedStateUpdateKind::TemporaryItem {
107 durable_item,
108 parsed_item: _,
109 connection: _,
110 parsed_full_name: _,
111 } => {
112 let entry = catalog_implications
113 .entry(durable_item.id.clone())
114 .or_insert_with(|| CatalogImplication::None);
115 entry.absorb(update);
116 }
117 ParsedStateUpdateKind::Cluster {
118 durable_cluster,
119 parsed_cluster: _,
120 } => {
121 let entry = cluster_commands
122 .entry(durable_cluster.id)
123 .or_insert_with(|| CatalogImplication::None);
124 entry.absorb(update.clone());
125 }
126 ParsedStateUpdateKind::ClusterReplica {
127 durable_cluster_replica,
128 parsed_cluster_replica: _,
129 } => {
130 let entry = cluster_replica_commands
131 .entry((
132 durable_cluster_replica.cluster_id,
133 durable_cluster_replica.replica_id,
134 ))
135 .or_insert_with(|| CatalogImplication::None);
136 entry.absorb(update.clone());
137 }
138 }
139 }
140
141 self.apply_catalog_implications_inner(
142 ctx,
143 catalog_implications.into_iter().collect_vec(),
144 cluster_commands.into_iter().collect_vec(),
145 cluster_replica_commands.into_iter().collect_vec(),
146 )
147 .await?;
148
149 self.metrics
150 .apply_catalog_implications_seconds
151 .observe(start.elapsed().as_secs_f64());
152
153 Ok(())
154 }
155
156 #[instrument(level = "debug")]
157 async fn apply_catalog_implications_inner(
158 &mut self,
159 ctx: Option<&mut ExecuteContext>,
160 implications: Vec<(CatalogItemId, CatalogImplication)>,
161 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163 ) -> Result<(), AdapterError> {
164 let mut tables_to_drop = BTreeSet::new();
165 let mut sources_to_drop = vec![];
166 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167 let mut storage_sink_gids_to_drop = vec![];
168 let mut indexes_to_drop = vec![];
169 let mut compute_sinks_to_drop = vec![];
170 let mut view_gids_to_drop = vec![];
171 let mut secrets_to_drop = vec![];
172 let mut vpc_endpoints_to_drop = vec![];
173 let mut clusters_to_drop = vec![];
174 let mut cluster_replicas_to_drop = vec![];
175 let mut active_compute_sinks_to_drop = BTreeMap::new();
176 let mut peeks_to_drop = vec![];
177 let mut copies_to_drop = vec![];
178
179 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
181 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
182
183 let mut table_collections_to_create = BTreeMap::new();
186 let mut source_collections_to_create = BTreeMap::new();
187 let mut storage_policies_to_initialize = BTreeMap::new();
188 let mut execution_timestamps_to_set = BTreeSet::new();
189 let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
190
191 let mut source_gids_to_keep = BTreeSet::new();
194
195 let mut source_connections_to_alter: BTreeMap<
197 GlobalId,
198 GenericSourceConnection<InlinedConnection>,
199 > = BTreeMap::new();
200 let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
201 BTreeMap::new();
202 let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
203 BTreeMap::new();
204
205 for (catalog_id, implication) in implications {
212 tracing::trace!(?implication, "have to apply catalog implication");
213
214 match implication {
215 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
216 self.handle_create_table(
217 &ctx,
218 &mut table_collections_to_create,
219 &mut storage_policies_to_initialize,
220 &mut execution_timestamps_to_set,
221 catalog_id,
222 table.clone(),
223 )
224 .await?
225 }
226 CatalogImplication::Table(CatalogImplicationKind::Altered {
227 prev: prev_table,
228 new: new_table,
229 }) => self.handle_alter_table(prev_table, new_table).await?,
230
231 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
232 let global_ids = table.global_ids();
233 for global_id in global_ids {
234 tables_to_drop.insert((catalog_id, global_id));
235 dropped_item_names.insert(global_id, full_name.clone());
236 }
237 }
238 CatalogImplication::Source(CatalogImplicationKind::Added((
239 source,
240 _connection,
241 ))) => {
242 let compaction_windows = self
247 .catalog()
248 .state()
249 .source_compaction_windows(vec![catalog_id]);
250
251 self.handle_create_source(
252 &mut source_collections_to_create,
253 &mut storage_policies_to_initialize,
254 catalog_id,
255 source,
256 compaction_windows,
257 )
258 .await?
259 }
260 CatalogImplication::Source(CatalogImplicationKind::Altered {
261 prev: (prev_source, _prev_connection),
262 new: (new_source, _new_connection),
263 }) => {
264 tracing::debug!(
265 ?prev_source,
266 ?new_source,
267 "not handling AlterSource in here yet"
268 );
269 }
270 CatalogImplication::Source(CatalogImplicationKind::Dropped(
271 (source, connection),
272 full_name,
273 )) => {
274 let global_id = source.global_id();
275 sources_to_drop.push((catalog_id, global_id));
276 dropped_item_names.insert(global_id, full_name);
277
278 if let DataSourceDesc::Ingestion { desc, .. }
279 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
280 {
281 match &desc.connection {
282 GenericSourceConnection::Postgres(_referenced_conn) => {
283 let inline_conn = connection.expect("missing inlined connection");
284
285 let pg_conn = match inline_conn {
286 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
287 other => {
288 panic!("expected postgres connection, got: {:?}", other)
289 }
290 };
291 let pending_drop = (
292 pg_conn.connection.clone(),
293 pg_conn.publication_details.slot.clone(),
294 );
295 replication_slots_to_drop.push(pending_drop);
296 }
297 _ => {}
298 }
299 }
300 }
301 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
302 tracing::debug!(?sink, "not handling AddSink in here yet");
303 }
304 CatalogImplication::Sink(CatalogImplicationKind::Altered {
305 prev: prev_sink,
306 new: new_sink,
307 }) => {
308 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
309 }
310 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
311 storage_sink_gids_to_drop.push(sink.global_id());
312 dropped_item_names.insert(sink.global_id(), full_name);
313 }
314 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
315 tracing::debug!(?index, "not handling AddIndex in here yet");
316 }
317 CatalogImplication::Index(CatalogImplicationKind::Altered {
318 prev: prev_index,
319 new: new_index,
320 }) => {
321 tracing::debug!(
322 ?prev_index,
323 ?new_index,
324 "not handling AlterIndex in here yet"
325 );
326 }
327 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
328 indexes_to_drop.push((index.cluster_id, index.global_id()));
329 dropped_item_names.insert(index.global_id(), full_name);
330 }
331 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
332 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
333 }
334 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
335 prev: prev_mv,
336 new: new_mv,
337 }) => {
338 if prev_mv.collections != new_mv.collections {
350 assert_eq!(
353 prev_mv.global_id_writes(),
354 new_mv.global_id_writes(),
355 "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
356 );
357
358 let gid = new_mv.global_id_writes();
359 self.allow_writes(new_mv.cluster_id, gid);
360
361 source_gids_to_keep.extend(new_mv.global_ids());
366 }
367 }
368 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
369 mv,
370 full_name,
371 )) => {
372 compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
373 for gid in mv.global_ids() {
374 sources_to_drop.push((catalog_id, gid));
375 dropped_item_names.insert(gid, full_name.clone());
376 }
377 }
378 CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
379 tracing::debug!(?view, "not handling AddView in here yet");
380 }
381 CatalogImplication::View(CatalogImplicationKind::Altered {
382 prev: prev_view,
383 new: new_view,
384 }) => {
385 tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
386 }
387 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
388 view_gids_to_drop.push(view.global_id());
389 dropped_item_names.insert(view.global_id(), full_name);
390 }
391 CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
392 tracing::debug!(?ct, "not handling AddContinualTask in here yet");
393 }
394 CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
395 prev: prev_ct,
396 new: new_ct,
397 }) => {
398 tracing::debug!(
399 ?prev_ct,
400 ?new_ct,
401 "not handling AlterContinualTask in here yet"
402 );
403 }
404 CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
405 ct,
406 _full_name,
407 )) => {
408 compute_sinks_to_drop.push((ct.cluster_id, ct.global_id()));
409 sources_to_drop.push((catalog_id, ct.global_id()));
410 }
411 CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
412 }
416 CatalogImplication::Secret(CatalogImplicationKind::Altered {
417 prev: _prev_secret,
418 new: _new_secret,
419 }) => {
420 }
423 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
424 _secret,
425 _full_name,
426 )) => {
427 secrets_to_drop.push(catalog_id);
428 }
429 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
430 match &connection.details {
431 ConnectionDetails::Ssh { .. } => {}
434 ConnectionDetails::AwsPrivatelink(privatelink) => {
436 let spec = VpcEndpointConfig {
437 aws_service_name: privatelink.service_name.to_owned(),
438 availability_zone_ids: privatelink.availability_zones.to_owned(),
439 };
440 vpc_endpoints_to_create.push((catalog_id, spec));
441 }
442 _ => {}
444 }
445 }
446 CatalogImplication::Connection(CatalogImplicationKind::Altered {
447 prev: _prev_connection,
448 new: new_connection,
449 }) => {
450 self.handle_alter_connection(
451 catalog_id,
452 new_connection,
453 &mut vpc_endpoints_to_create,
454 &mut source_connections_to_alter,
455 &mut sink_connections_to_alter,
456 &mut source_export_data_configs_to_alter,
457 );
458 }
459 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
460 connection,
461 _full_name,
462 )) => {
463 match &connection.details {
464 ConnectionDetails::Ssh { .. } => {
466 secrets_to_drop.push(catalog_id);
467 }
468 ConnectionDetails::AwsPrivatelink(_) => {
471 vpc_endpoints_to_drop.push(catalog_id);
472 }
473 _ => (),
474 }
475 }
476 CatalogImplication::None => {
477 }
479 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
480 unreachable!("clusters and cluster replicas are handled below")
481 }
482 CatalogImplication::Table(CatalogImplicationKind::None)
483 | CatalogImplication::Source(CatalogImplicationKind::None)
484 | CatalogImplication::Sink(CatalogImplicationKind::None)
485 | CatalogImplication::Index(CatalogImplicationKind::None)
486 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
487 | CatalogImplication::View(CatalogImplicationKind::None)
488 | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
489 | CatalogImplication::Secret(CatalogImplicationKind::None)
490 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
491 unreachable!("will never leave None in place");
492 }
493 }
494 }
495
496 for (cluster_id, command) in cluster_commands {
497 tracing::trace!(?command, "have cluster command to apply!");
498
499 match command {
500 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
501 tracing::debug!(?cluster, "not handling AddCluster in here yet");
502 }
503 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
504 prev: prev_cluster,
505 new: new_cluster,
506 }) => {
507 tracing::debug!(
508 ?prev_cluster,
509 ?new_cluster,
510 "not handling AlterCluster in here yet"
511 );
512 }
513 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
514 cluster,
515 _full_name,
516 )) => {
517 clusters_to_drop.push(cluster_id);
518 dropped_cluster_names.insert(cluster_id, cluster.name);
519 }
520 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
521 unreachable!("will never leave None in place");
522 }
523 command => {
524 unreachable!(
525 "we only handle cluster commands in this map, got: {:?}",
526 command
527 );
528 }
529 }
530 }
531
532 for ((cluster_id, replica_id), command) in cluster_replica_commands {
533 tracing::trace!(?command, "have cluster replica command to apply!");
534
535 match command {
536 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
537 tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
538 }
539 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
540 prev: prev_replica,
541 new: new_replica,
542 }) => {
543 tracing::debug!(
544 ?prev_replica,
545 ?new_replica,
546 "not handling AlterClusterReplica in here yet"
547 );
548 }
549 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
550 _replica,
551 _full_name,
552 )) => {
553 cluster_replicas_to_drop.push((cluster_id, replica_id));
554 }
555 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
556 unreachable!("will never leave None in place");
557 }
558 command => {
559 unreachable!(
560 "we only handle cluster replica commands in this map, got: {:?}",
561 command
562 );
563 }
564 }
565 }
566
567 if !source_collections_to_create.is_empty() {
568 self.create_source_collections(source_collections_to_create)
569 .await?;
570 }
571
572 if !table_collections_to_create.is_empty() {
575 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
576 .await?;
577 }
578 self.initialize_storage_collections(storage_policies_to_initialize)
584 .await?;
585
586 if !vpc_endpoints_to_create.is_empty() {
588 if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
589 for (connection_id, spec) in vpc_endpoints_to_create {
590 if let Err(err) = cloud_resource_controller
591 .ensure_vpc_endpoint(connection_id, spec)
592 .await
593 {
594 tracing::error!(?err, "failed to ensure vpc endpoint!");
595 }
596 }
597 } else {
598 tracing::error!(
599 "AWS PrivateLink connections unsupported without cloud_resource_controller"
600 );
601 }
602 }
603
604 if !source_connections_to_alter.is_empty() {
606 self.controller
607 .storage
608 .alter_ingestion_connections(source_connections_to_alter)
609 .await
610 .unwrap_or_terminate("cannot fail to alter ingestion connections");
611 }
612
613 if !sink_connections_to_alter.is_empty() {
614 self.controller
615 .storage
616 .alter_export_connections(sink_connections_to_alter)
617 .await
618 .unwrap_or_terminate("altering export connections after txn must succeed");
619 }
620
621 if !source_export_data_configs_to_alter.is_empty() {
622 self.controller
623 .storage
624 .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
625 .await
626 .unwrap_or_terminate("altering source export data configs after txn must succeed");
627 }
628
629 sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
631
632 let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
633 .iter()
634 .map(|(_, gid)| *gid)
635 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
636 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
637 .chain(view_gids_to_drop.iter().copied())
638 .collect();
639
640 for (sink_id, sink) in &self.active_compute_sinks {
643 let cluster_id = sink.cluster_id();
644 if let Some(id) = sink
645 .depends_on()
646 .iter()
647 .find(|id| readable_collections_to_drop.contains(id))
648 {
649 let name = dropped_item_names
650 .get(id)
651 .map(|n| format!("relation {}", n.quoted()))
652 .expect("missing relation name");
653 active_compute_sinks_to_drop.insert(
654 *sink_id,
655 ActiveComputeSinkRetireReason::DependencyDropped(name),
656 );
657 } else if clusters_to_drop.contains(&cluster_id) {
658 let name = dropped_cluster_names
659 .get(&cluster_id)
660 .map(|n| format!("cluster {}", n.quoted()))
661 .expect("missing cluster name");
662 active_compute_sinks_to_drop.insert(
663 *sink_id,
664 ActiveComputeSinkRetireReason::DependencyDropped(name),
665 );
666 }
667 }
668
669 for (uuid, pending_peek) in &self.pending_peeks {
671 if let Some(id) = pending_peek
672 .depends_on
673 .iter()
674 .find(|id| readable_collections_to_drop.contains(id))
675 {
676 let name = dropped_item_names
677 .get(id)
678 .map(|n| format!("relation {}", n.quoted()))
679 .expect("missing relation name");
680 peeks_to_drop.push((name, uuid.clone()));
681 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
682 let name = dropped_cluster_names
683 .get(&pending_peek.cluster_id)
684 .map(|n| format!("cluster {}", n.quoted()))
685 .expect("missing cluster name");
686 peeks_to_drop.push((name, uuid.clone()));
687 }
688 }
689
690 for (conn_id, pending_copy) in &self.active_copies {
692 let dropping_table = tables_to_drop
693 .iter()
694 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
695 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
696
697 if dropping_table || dropping_cluster {
698 copies_to_drop.push(conn_id.clone());
699 }
700 }
701
702 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
703 .iter()
704 .map(|(_id, gid)| gid)
705 .chain(storage_sink_gids_to_drop.iter())
706 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
707 .copied()
708 .collect();
709 let compute_gids_to_drop: Vec<_> = indexes_to_drop
710 .iter()
711 .chain(compute_sinks_to_drop.iter())
712 .copied()
713 .collect();
714
715 let mut timeline_id_bundles = BTreeMap::new();
721
722 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
723 let mut id_bundle = CollectionIdBundle::default();
724
725 for storage_id in read_holds.storage_ids() {
726 if storage_gids_to_drop.contains(&storage_id) {
727 id_bundle.storage_ids.insert(storage_id);
728 }
729 }
730
731 for (instance_id, id) in read_holds.compute_ids() {
732 if compute_gids_to_drop.contains(&(instance_id, id))
733 || clusters_to_drop.contains(&instance_id)
734 {
735 id_bundle
736 .compute_ids
737 .entry(instance_id)
738 .or_default()
739 .insert(id);
740 }
741 }
742
743 timeline_id_bundles.insert(timeline.clone(), id_bundle);
744 }
745
746 let mut timeline_associations = BTreeMap::new();
747 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
748 let TimelineState { read_holds, .. } = self
749 .global_timelines
750 .get(&timeline)
751 .expect("all timelines have a timestamp oracle");
752
753 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
754 timeline_associations.insert(timeline, (empty, id_bundle));
755 }
756
757 let _: () = async {
760 if !timeline_associations.is_empty() {
761 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
762 let became_empty =
763 self.remove_resources_associated_with_timeline(timeline, id_bundle);
764 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
765 }
766 }
767
768 if !tables_to_drop.is_empty() {
775 let ts = self.get_local_write_ts().await;
776 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
777 }
778
779 if !sources_to_drop.is_empty() {
780 self.drop_sources(sources_to_drop);
781 }
782
783 if !storage_sink_gids_to_drop.is_empty() {
784 self.drop_storage_sinks(storage_sink_gids_to_drop);
785 }
786
787 if !active_compute_sinks_to_drop.is_empty() {
788 self.retire_compute_sinks(active_compute_sinks_to_drop)
789 .await;
790 }
791
792 if !peeks_to_drop.is_empty() {
793 for (dropped_name, uuid) in peeks_to_drop {
794 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
795 let cancel_reason = PeekResponse::Error(format!(
796 "query could not complete because {dropped_name} was dropped"
797 ));
798 self.controller
799 .compute
800 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
801 .unwrap_or_terminate("unable to cancel peek");
802 self.retire_execution(
803 StatementEndedExecutionReason::Canceled,
804 pending_peek.ctx_extra.defuse(),
805 );
806 }
807 }
808 }
809
810 if !copies_to_drop.is_empty() {
811 for conn_id in copies_to_drop {
812 self.cancel_pending_copy(&conn_id);
813 }
814 }
815
816 if !compute_gids_to_drop.is_empty() {
817 self.drop_compute_collections(compute_gids_to_drop);
818 }
819
820 if !vpc_endpoints_to_drop.is_empty() {
821 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
822 }
823
824 if !cluster_replicas_to_drop.is_empty() {
825 fail::fail_point!("after_catalog_drop_replica");
826
827 for (cluster_id, replica_id) in cluster_replicas_to_drop {
828 self.drop_replica(cluster_id, replica_id);
829 }
830 }
831 if !clusters_to_drop.is_empty() {
832 for cluster_id in clusters_to_drop {
833 self.controller.drop_cluster(cluster_id);
834 }
835 }
836
837 task::spawn(|| "drop_replication_slots_and_secrets", {
845 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
846 let secrets_controller = Arc::clone(&self.secrets_controller);
847 let secrets_reader = Arc::clone(self.secrets_reader());
848 let storage_config = self.controller.storage.config().clone();
849
850 async move {
851 for (connection, replication_slot_name) in replication_slots_to_drop {
852 tracing::info!(?replication_slot_name, "dropping replication slot");
853
854 let result: Result<(), anyhow::Error> = Retry::default()
860 .max_duration(Duration::from_secs(60))
861 .retry_async(|_state| async {
862 let config = connection
863 .config(&secrets_reader, &storage_config, InTask::No)
864 .await
865 .map_err(|e| {
866 anyhow::anyhow!(
867 "error creating Postgres client for \
868 dropping acquired slots: {}",
869 e.display_with_causes()
870 )
871 })?;
872
873 mz_postgres_util::drop_replication_slots(
874 &ssh_tunnel_manager,
875 config.clone(),
876 &[(&replication_slot_name, true)],
877 )
878 .await?;
879
880 Ok(())
881 })
882 .await;
883
884 if let Err(err) = result {
885 tracing::warn!(
886 ?replication_slot_name,
887 ?err,
888 "failed to drop replication slot"
889 );
890 }
891 }
892
893 fail_point!("drop_secrets");
901 for secret in secrets_to_drop {
902 if let Err(e) = secrets_controller.delete(secret).await {
903 warn!("Dropping secrets has encountered an error: {}", e);
904 }
905 }
906 }
907 });
908 }
909 .instrument(info_span!(
910 "coord::apply_catalog_implications_inner::finalize"
911 ))
912 .await;
913
914 Ok(())
915 }
916
917 #[instrument(level = "debug")]
918 async fn create_table_collections(
919 &mut self,
920 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
921 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
922 ) -> Result<(), AdapterError> {
923 let register_ts = self.get_local_write_ts().await.timestamp;
925
926 self.catalog
933 .confirm_leadership()
934 .await
935 .unwrap_or_terminate("unable to confirm leadership");
936
937 for id in execution_timestamps_to_set {
938 self.set_statement_execution_timestamp(id, register_ts);
939 }
940
941 let storage_metadata = self.catalog.state().storage_metadata();
942
943 self.controller
944 .storage
945 .create_collections(
946 storage_metadata,
947 Some(register_ts),
948 table_collections_to_create.into_iter().collect_vec(),
949 )
950 .await
951 .unwrap_or_terminate("cannot fail to create collections");
952
953 self.apply_local_write(register_ts).await;
954
955 Ok(())
956 }
957
958 #[instrument(level = "debug")]
959 async fn create_source_collections(
960 &mut self,
961 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
962 ) -> Result<(), AdapterError> {
963 let storage_metadata = self.catalog.state().storage_metadata();
964
965 self.controller
966 .storage
967 .create_collections(
968 storage_metadata,
969 None, source_collections_to_create.into_iter().collect_vec(),
971 )
972 .await
973 .unwrap_or_terminate("cannot fail to create collections");
974
975 Ok(())
976 }
977
978 #[instrument(level = "debug")]
979 async fn initialize_storage_collections(
980 &mut self,
981 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
982 ) -> Result<(), AdapterError> {
983 for (compaction_window, global_ids) in storage_policies_to_initialize {
984 self.initialize_read_policies(
985 &CollectionIdBundle {
986 storage_ids: global_ids,
987 compute_ids: BTreeMap::new(),
988 },
989 compaction_window,
990 )
991 .await;
992 }
993
994 Ok(())
995 }
996
997 #[instrument(level = "debug")]
998 async fn handle_create_table(
999 &self,
1000 ctx: &Option<&mut ExecuteContext>,
1001 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1002 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1003 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1004 table_id: CatalogItemId,
1005 table: Table,
1006 ) -> Result<(), AdapterError> {
1007 match &table.data_source {
1011 TableDataSource::TableWrites { defaults: _ } => {
1012 let versions: BTreeMap<_, _> = table
1013 .collection_descs()
1014 .map(|(gid, version, desc)| (version, (gid, desc)))
1015 .collect();
1016 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1017 let collection_desc = CollectionDescription::for_table(desc.clone());
1018
1019 (*gid, collection_desc)
1020 });
1021
1022 let compaction_window = table
1023 .custom_logical_compaction_window
1024 .unwrap_or(CompactionWindow::Default);
1025 let ids_to_initialize = storage_policies_to_initialize
1026 .entry(compaction_window)
1027 .or_default();
1028
1029 for (gid, collection_desc) in collection_descs {
1030 storage_collections_to_create.insert(gid, collection_desc);
1031 ids_to_initialize.insert(gid);
1032 }
1033
1034 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1035 execution_timestamps_to_set.insert(id);
1036 }
1037 }
1038 TableDataSource::DataSource {
1039 desc: data_source_desc,
1040 timeline,
1041 } => {
1042 match data_source_desc {
1043 DataSourceDesc::IngestionExport {
1044 ingestion_id,
1045 external_reference: _,
1046 details,
1047 data_config,
1048 } => {
1049 let global_ingestion_id =
1050 self.catalog().get_entry(ingestion_id).latest_global_id();
1051
1052 let collection_desc = CollectionDescription::<Timestamp> {
1053 desc: table.desc.latest(),
1054 data_source: DataSource::IngestionExport {
1055 ingestion_id: global_ingestion_id,
1056 details: details.clone(),
1057 data_config: data_config
1058 .clone()
1059 .into_inline_connection(self.catalog.state()),
1060 },
1061 since: None,
1062 timeline: Some(timeline.clone()),
1063 primary: None,
1064 };
1065
1066 let global_id = table
1067 .global_ids()
1068 .expect_element(|| "subsources cannot have multiple versions");
1069
1070 storage_collections_to_create.insert(global_id, collection_desc);
1071
1072 let read_policies = self
1073 .catalog()
1074 .state()
1075 .source_compaction_windows(vec![table_id]);
1076 for (compaction_window, catalog_ids) in read_policies {
1077 let compaction_ids = storage_policies_to_initialize
1078 .entry(compaction_window)
1079 .or_default();
1080
1081 let gids = catalog_ids
1082 .into_iter()
1083 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1084 .flatten();
1085 compaction_ids.extend(gids);
1086 }
1087 }
1088 DataSourceDesc::Webhook {
1089 validate_using: _,
1090 body_format: _,
1091 headers: _,
1092 cluster_id: _,
1093 } => {
1094 assert_eq!(
1096 table.desc.latest_version(),
1097 RelationVersion::root(),
1098 "found webhook with more than 1 relation version, {:?}",
1099 table.desc
1100 );
1101 let desc = table.desc.latest();
1102
1103 let collection_desc = CollectionDescription::<Timestamp> {
1104 desc,
1105 data_source: DataSource::Webhook,
1106 since: None,
1107 timeline: Some(timeline.clone()),
1108 primary: None,
1109 };
1110
1111 let global_id = table
1112 .global_ids()
1113 .expect_element(|| "webhooks cannot have multiple versions");
1114
1115 storage_collections_to_create.insert(global_id, collection_desc);
1116
1117 let read_policies = self
1118 .catalog()
1119 .state()
1120 .source_compaction_windows(vec![table_id]);
1121
1122 for (compaction_window, catalog_ids) in read_policies {
1123 let compaction_ids = storage_policies_to_initialize
1124 .entry(compaction_window)
1125 .or_default();
1126
1127 let gids = catalog_ids
1128 .into_iter()
1129 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1130 .flatten();
1131 compaction_ids.extend(gids);
1132 }
1133 }
1134 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1135 }
1136 }
1137 }
1138
1139 Ok(())
1140 }
1141
1142 #[instrument(level = "debug")]
1143 async fn handle_alter_table(
1144 &mut self,
1145 prev_table: Table,
1146 new_table: Table,
1147 ) -> Result<(), AdapterError> {
1148 let existing_gid = prev_table.global_id_writes();
1149 let new_gid = new_table.global_id_writes();
1150
1151 if existing_gid == new_gid {
1152 return Ok(());
1156 }
1157
1158 let existing_table = crate::CollectionIdBundle {
1162 storage_ids: BTreeSet::from([existing_gid]),
1163 compute_ids: BTreeMap::new(),
1164 };
1165 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1166
1167 let expected_version = prev_table.desc.latest_version();
1168 let new_version = new_table.desc.latest_version();
1169 let new_desc = new_table
1170 .desc
1171 .at_version(RelationVersionSelector::Specific(new_version));
1172
1173 let register_ts = self.get_local_write_ts().await.timestamp;
1174
1175 self.controller
1177 .storage
1178 .alter_table_desc(
1179 existing_gid,
1180 new_gid,
1181 new_desc,
1182 expected_version,
1183 register_ts,
1184 )
1185 .await
1186 .expect("failed to alter desc of table");
1187
1188 let compaction_window = new_table
1190 .custom_logical_compaction_window
1191 .unwrap_or(CompactionWindow::Default);
1192 self.initialize_read_policies(
1193 &crate::CollectionIdBundle {
1194 storage_ids: BTreeSet::from([new_gid]),
1195 compute_ids: BTreeMap::new(),
1196 },
1197 compaction_window,
1198 )
1199 .await;
1200
1201 self.apply_local_write(register_ts).await;
1202
1203 drop(existing_table_read_hold);
1205
1206 Ok(())
1207 }
1208
1209 #[instrument(level = "debug")]
1210 async fn handle_create_source(
1211 &self,
1212 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1213 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1214 item_id: CatalogItemId,
1215 source: Source,
1216 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1217 ) -> Result<(), AdapterError> {
1218 let data_source = match source.data_source {
1219 DataSourceDesc::Ingestion { desc, cluster_id } => {
1220 let desc = desc.into_inline_connection(self.catalog().state());
1221 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1222
1223 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1224 desc,
1225 cluster_id,
1226 item_global_id,
1227 );
1228
1229 DataSource::Ingestion(ingestion)
1230 }
1231 DataSourceDesc::OldSyntaxIngestion {
1232 desc,
1233 progress_subsource,
1234 data_config,
1235 details,
1236 cluster_id,
1237 } => {
1238 let desc = desc.into_inline_connection(self.catalog().state());
1239 let data_config = data_config.into_inline_connection(self.catalog().state());
1240
1241 let progress_subsource = self
1244 .catalog()
1245 .get_entry(&progress_subsource)
1246 .latest_global_id();
1247
1248 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1249 desc,
1250 cluster_id,
1251 progress_subsource,
1252 );
1253
1254 let legacy_export = SourceExport {
1255 storage_metadata: (),
1256 data_config,
1257 details,
1258 };
1259
1260 ingestion
1261 .source_exports
1262 .insert(source.global_id, legacy_export);
1263
1264 DataSource::Ingestion(ingestion)
1265 }
1266 DataSourceDesc::IngestionExport {
1267 ingestion_id,
1268 external_reference: _,
1269 details,
1270 data_config,
1271 } => {
1272 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1275
1276 DataSource::IngestionExport {
1277 ingestion_id,
1278 details,
1279 data_config: data_config.into_inline_connection(self.catalog().state()),
1280 }
1281 }
1282 DataSourceDesc::Progress => DataSource::Progress,
1283 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1284 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1285 unreachable!("cannot create sources with internal data sources")
1286 }
1287 };
1288
1289 storage_collections_to_create.insert(
1290 source.global_id,
1291 CollectionDescription::<Timestamp> {
1292 desc: source.desc.clone(),
1293 data_source,
1294 timeline: Some(source.timeline),
1295 since: None,
1296 primary: None,
1297 },
1298 );
1299
1300 for (compaction_window, catalog_ids) in compaction_windows {
1302 let compaction_ids = storage_policies_to_initialize
1303 .entry(compaction_window)
1304 .or_default();
1305
1306 let gids = catalog_ids
1307 .into_iter()
1308 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1309 .flatten();
1310 compaction_ids.extend(gids);
1311 }
1312
1313 Ok(())
1314 }
1315
1316 #[instrument(level = "debug")]
1323 fn handle_alter_connection(
1324 &self,
1325 connection_id: CatalogItemId,
1326 connection: Connection,
1327 vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1328 source_connections_to_alter: &mut BTreeMap<
1329 GlobalId,
1330 GenericSourceConnection<InlinedConnection>,
1331 >,
1332 sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1333 source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1334 ) {
1335 use std::collections::VecDeque;
1336
1337 if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1339 let spec = VpcEndpointConfig {
1340 aws_service_name: privatelink.service_name.to_owned(),
1341 availability_zone_ids: privatelink.availability_zones.to_owned(),
1342 };
1343 vpc_endpoints_to_create.push((connection_id, spec));
1344 }
1345
1346 let mut connections_to_process = VecDeque::new();
1350 connections_to_process.push_front(connection_id.clone());
1351
1352 while let Some(id) = connections_to_process.pop_front() {
1353 for dependent_id in self.catalog().get_entry(&id).used_by() {
1354 let dependent_entry = self.catalog().get_entry(dependent_id);
1355 match dependent_entry.item() {
1356 CatalogItem::Connection(_) => {
1357 connections_to_process.push_back(*dependent_id);
1361 }
1362 CatalogItem::Source(source) => {
1363 let desc = match &dependent_entry
1364 .source()
1365 .expect("known to be source")
1366 .data_source
1367 {
1368 DataSourceDesc::Ingestion { desc, .. }
1369 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1370 desc.clone().into_inline_connection(self.catalog().state())
1371 }
1372 _ => {
1373 continue;
1375 }
1376 };
1377
1378 source_connections_to_alter.insert(source.global_id, desc.connection);
1379 }
1380 CatalogItem::Sink(sink) => {
1381 let export = dependent_entry.sink().expect("known to be sink");
1382 sink_connections_to_alter.insert(
1383 sink.global_id,
1384 export
1385 .connection
1386 .clone()
1387 .into_inline_connection(self.catalog().state()),
1388 );
1389 }
1390 CatalogItem::Table(table) => {
1391 if let Some((_, _, _, export_data_config)) =
1395 dependent_entry.source_export_details()
1396 {
1397 let data_config = export_data_config.clone();
1398 source_export_data_configs_to_alter.insert(
1399 table.global_id_writes(),
1400 data_config.into_inline_connection(self.catalog().state()),
1401 );
1402 }
1403 }
1404 _ => {
1405 }
1408 }
1409 }
1410 }
1411 }
1412}
1413
1414#[derive(Debug, Clone)]
1420enum CatalogImplication {
1421 None,
1422 Table(CatalogImplicationKind<Table>),
1423 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1424 Sink(CatalogImplicationKind<Sink>),
1425 Index(CatalogImplicationKind<Index>),
1426 MaterializedView(CatalogImplicationKind<MaterializedView>),
1427 View(CatalogImplicationKind<View>),
1428 ContinualTask(CatalogImplicationKind<ContinualTask>),
1429 Secret(CatalogImplicationKind<Secret>),
1430 Connection(CatalogImplicationKind<Connection>),
1431 Cluster(CatalogImplicationKind<Cluster>),
1432 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1433}
1434
1435#[derive(Debug, Clone)]
1436enum CatalogImplicationKind<T> {
1437 None,
1439 Added(T),
1441 Dropped(T, String),
1443 Altered { prev: T, new: T },
1445}
1446
1447impl<T: Clone> CatalogImplicationKind<T> {
1448 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1451 use CatalogImplicationKind::*;
1452 use StateDiff::*;
1453
1454 let new_state = match (&*self, diff) {
1455 (None, Addition) => Added(item),
1457 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1458
1459 (Added(existing), Retraction) => {
1461 Altered {
1463 prev: item,
1464 new: existing.clone(),
1465 }
1466 }
1467 (Added(_), Addition) => {
1468 return Err("Cannot add an already added object".to_string());
1469 }
1470
1471 (Dropped(existing, _), Addition) => {
1473 Altered {
1475 prev: existing.clone(),
1476 new: item,
1477 }
1478 }
1479 (Dropped(_, _), Retraction) => {
1480 return Err("Cannot drop an already dropped object".to_string());
1481 }
1482
1483 (Altered { .. }, _) => {
1485 return Err(format!(
1486 "Cannot apply {:?} to an object in Altered state",
1487 diff
1488 ));
1489 }
1490 };
1491
1492 *self = new_state;
1493 Ok(())
1494 }
1495}
1496
1497macro_rules! impl_absorb_method {
1499 (
1500 $method_name:ident,
1501 $variant:ident,
1502 $item_type:ty
1503 ) => {
1504 fn $method_name(
1505 &mut self,
1506 item: $item_type,
1507 parsed_full_name: Option<String>,
1508 diff: StateDiff,
1509 ) {
1510 let state = match self {
1511 CatalogImplication::$variant(state) => state,
1512 CatalogImplication::None => {
1513 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1514 match self {
1515 CatalogImplication::$variant(state) => state,
1516 _ => unreachable!(),
1517 }
1518 }
1519 _ => {
1520 panic!(
1521 "Unexpected command type for {:?}: {} {:?}",
1522 self,
1523 stringify!($variant),
1524 diff,
1525 );
1526 }
1527 };
1528
1529 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1530 panic!(
1531 "Invalid state transition for {}: {}",
1532 stringify!($variant),
1533 e
1534 );
1535 }
1536 }
1537 };
1538}
1539
1540impl CatalogImplication {
1541 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1544 match catalog_update.kind {
1545 ParsedStateUpdateKind::Item {
1546 durable_item: _,
1547 parsed_item,
1548 connection,
1549 parsed_full_name,
1550 } => match parsed_item {
1551 CatalogItem::Table(table) => {
1552 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1553 }
1554 CatalogItem::Source(source) => {
1555 self.absorb_source(
1556 (source, connection),
1557 Some(parsed_full_name),
1558 catalog_update.diff,
1559 );
1560 }
1561 CatalogItem::Sink(sink) => {
1562 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1563 }
1564 CatalogItem::Index(index) => {
1565 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1566 }
1567 CatalogItem::MaterializedView(mv) => {
1568 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1569 }
1570 CatalogItem::View(view) => {
1571 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1572 }
1573 CatalogItem::ContinualTask(ct) => {
1574 self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1575 }
1576 CatalogItem::Secret(secret) => {
1577 self.absorb_secret(secret, None, catalog_update.diff);
1578 }
1579 CatalogItem::Connection(connection) => {
1580 self.absorb_connection(connection, None, catalog_update.diff);
1581 }
1582 CatalogItem::Log(_) => {}
1583 CatalogItem::Type(_) => {}
1584 CatalogItem::Func(_) => {}
1585 },
1586 ParsedStateUpdateKind::TemporaryItem {
1587 durable_item: _,
1588 parsed_item,
1589 connection,
1590 parsed_full_name,
1591 } => match parsed_item {
1592 CatalogItem::Table(table) => {
1593 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1594 }
1595 CatalogItem::Source(source) => {
1596 self.absorb_source(
1597 (source, connection),
1598 Some(parsed_full_name),
1599 catalog_update.diff,
1600 );
1601 }
1602 CatalogItem::Sink(sink) => {
1603 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1604 }
1605 CatalogItem::Index(index) => {
1606 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1607 }
1608 CatalogItem::MaterializedView(mv) => {
1609 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1610 }
1611 CatalogItem::View(view) => {
1612 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1613 }
1614 CatalogItem::ContinualTask(ct) => {
1615 self.absorb_continual_task(ct, None, catalog_update.diff);
1616 }
1617 CatalogItem::Secret(secret) => {
1618 self.absorb_secret(secret, None, catalog_update.diff);
1619 }
1620 CatalogItem::Connection(connection) => {
1621 self.absorb_connection(connection, None, catalog_update.diff);
1622 }
1623 CatalogItem::Log(_) => {}
1624 CatalogItem::Type(_) => {}
1625 CatalogItem::Func(_) => {}
1626 },
1627 ParsedStateUpdateKind::Cluster {
1628 durable_cluster: _,
1629 parsed_cluster,
1630 } => {
1631 self.absorb_cluster(parsed_cluster, catalog_update.diff);
1632 }
1633 ParsedStateUpdateKind::ClusterReplica {
1634 durable_cluster_replica: _,
1635 parsed_cluster_replica,
1636 } => {
1637 self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1638 }
1639 }
1640 }
1641
1642 impl_absorb_method!(absorb_table, Table, Table);
1643 impl_absorb_method!(
1644 absorb_source,
1645 Source,
1646 (Source, Option<GenericSourceConnection>)
1647 );
1648 impl_absorb_method!(absorb_sink, Sink, Sink);
1649 impl_absorb_method!(absorb_index, Index, Index);
1650 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1651 impl_absorb_method!(absorb_view, View, View);
1652
1653 impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1654 impl_absorb_method!(absorb_secret, Secret, Secret);
1655 impl_absorb_method!(absorb_connection, Connection, Connection);
1656
1657 fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1659 let state = match self {
1660 CatalogImplication::Cluster(state) => state,
1661 CatalogImplication::None => {
1662 *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1663 match self {
1664 CatalogImplication::Cluster(state) => state,
1665 _ => unreachable!(),
1666 }
1667 }
1668 _ => {
1669 panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1670 }
1671 };
1672
1673 if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1674 panic!("invalid state transition for cluster: {}", e);
1675 }
1676 }
1677
1678 fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1680 let state = match self {
1681 CatalogImplication::ClusterReplica(state) => state,
1682 CatalogImplication::None => {
1683 *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1684 match self {
1685 CatalogImplication::ClusterReplica(state) => state,
1686 _ => unreachable!(),
1687 }
1688 }
1689 _ => {
1690 panic!(
1691 "Unexpected command type for {:?}: ClusterReplica {:?}",
1692 self, diff
1693 );
1694 }
1695 };
1696
1697 if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1698 {
1699 panic!("invalid state transition for cluster replica: {}", e);
1700 }
1701 }
1702}
1703
1704#[cfg(test)]
1705mod tests {
1706 use super::*;
1707 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1708 use mz_sql::names::ResolvedIds;
1709 use std::collections::BTreeMap;
1710
1711 fn create_test_table(name: &str) -> Table {
1712 Table {
1713 desc: VersionedRelationDesc::new(
1714 RelationDesc::builder()
1715 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1716 .finish(),
1717 ),
1718 create_sql: None,
1719 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1720 conn_id: None,
1721 resolved_ids: ResolvedIds::empty(),
1722 custom_logical_compaction_window: None,
1723 is_retained_metrics_object: false,
1724 data_source: TableDataSource::TableWrites { defaults: vec![] },
1725 }
1726 }
1727
1728 #[mz_ore::test]
1729 fn test_item_state_transitions() {
1730 let mut state = CatalogImplicationKind::None;
1732 assert!(
1733 state
1734 .transition("item1".to_string(), None, StateDiff::Addition)
1735 .is_ok()
1736 );
1737 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1738
1739 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1741 assert!(
1742 state
1743 .transition("old_item".to_string(), None, StateDiff::Retraction)
1744 .is_ok()
1745 );
1746 match &state {
1747 CatalogImplicationKind::Altered { prev, new } => {
1748 assert_eq!(prev, "old_item");
1750 assert_eq!(new, "new_item");
1752 }
1753 _ => panic!("Expected Altered state"),
1754 }
1755
1756 let mut state = CatalogImplicationKind::None;
1758 assert!(
1759 state
1760 .transition(
1761 "item1".to_string(),
1762 Some("test_name".to_string()),
1763 StateDiff::Retraction
1764 )
1765 .is_ok()
1766 );
1767 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1768
1769 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1771 assert!(
1772 state
1773 .transition("new_item".to_string(), None, StateDiff::Addition)
1774 .is_ok()
1775 );
1776 match &state {
1777 CatalogImplicationKind::Altered { prev, new } => {
1778 assert_eq!(prev, "old_item");
1780 assert_eq!(new, "new_item");
1782 }
1783 _ => panic!("Expected Altered state"),
1784 }
1785
1786 let mut state = CatalogImplicationKind::Added("item".to_string());
1788 assert!(
1789 state
1790 .transition("item2".to_string(), None, StateDiff::Addition)
1791 .is_err()
1792 );
1793
1794 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1795 assert!(
1796 state
1797 .transition("item2".to_string(), None, StateDiff::Retraction)
1798 .is_err()
1799 );
1800 }
1801
1802 #[mz_ore::test]
1803 fn test_table_absorb_state_machine() {
1804 let table1 = create_test_table("table1");
1805 let table2 = create_test_table("table2");
1806
1807 let mut cmd = CatalogImplication::None;
1809 cmd.absorb_table(
1810 table1.clone(),
1811 Some("schema.table1".to_string()),
1812 StateDiff::Addition,
1813 );
1814 match &cmd {
1816 CatalogImplication::Table(state) => match state {
1817 CatalogImplicationKind::Added(t) => {
1818 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1819 }
1820 _ => panic!("Expected Added state"),
1821 },
1822 _ => panic!("Expected Table command"),
1823 }
1824
1825 cmd.absorb_table(
1829 table2.clone(),
1830 Some("schema.table2".to_string()),
1831 StateDiff::Retraction,
1832 );
1833 match &cmd {
1834 CatalogImplication::Table(state) => match state {
1835 CatalogImplicationKind::Altered { prev, new } => {
1836 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1838 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1839 }
1840 _ => panic!("Expected Altered state"),
1841 },
1842 _ => panic!("Expected Table command"),
1843 }
1844
1845 let mut cmd = CatalogImplication::None;
1847 cmd.absorb_table(
1848 table1.clone(),
1849 Some("schema.table1".to_string()),
1850 StateDiff::Retraction,
1851 );
1852 match &cmd {
1853 CatalogImplication::Table(state) => match state {
1854 CatalogImplicationKind::Dropped(t, name) => {
1855 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1856 assert_eq!(name, "schema.table1");
1857 }
1858 _ => panic!("Expected Dropped state"),
1859 },
1860 _ => panic!("Expected Table command"),
1861 }
1862
1863 cmd.absorb_table(
1865 table2.clone(),
1866 Some("schema.table2".to_string()),
1867 StateDiff::Addition,
1868 );
1869 match &cmd {
1870 CatalogImplication::Table(state) => match state {
1871 CatalogImplicationKind::Altered { prev, new } => {
1872 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1874 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1875 }
1876 _ => panic!("Expected Altered state"),
1877 },
1878 _ => panic!("Expected Table command"),
1879 }
1880 }
1881
1882 #[mz_ore::test]
1883 #[should_panic(expected = "Cannot add an already added object")]
1884 fn test_invalid_double_add() {
1885 let table = create_test_table("table");
1886 let mut cmd = CatalogImplication::None;
1887
1888 cmd.absorb_table(
1890 table.clone(),
1891 Some("schema.table".to_string()),
1892 StateDiff::Addition,
1893 );
1894
1895 cmd.absorb_table(
1897 table.clone(),
1898 Some("schema.table".to_string()),
1899 StateDiff::Addition,
1900 );
1901 }
1902
1903 #[mz_ore::test]
1904 #[should_panic(expected = "Cannot drop an already dropped object")]
1905 fn test_invalid_double_drop() {
1906 let table = create_test_table("table");
1907 let mut cmd = CatalogImplication::None;
1908
1909 cmd.absorb_table(
1911 table.clone(),
1912 Some("schema.table".to_string()),
1913 StateDiff::Retraction,
1914 );
1915
1916 cmd.absorb_table(
1918 table.clone(),
1919 Some("schema.table".to_string()),
1920 StateDiff::Retraction,
1921 );
1922 }
1923}