1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37 CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38 MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
55use mz_storage_types::sinks::StorageSinkConnection;
56use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig};
57use tracing::{Instrument, info_span, warn};
58
59use crate::active_compute_sink::ActiveComputeSinkRetireReason;
60use crate::coord::Coordinator;
61use crate::coord::catalog_implications::parsed_state_updates::{
62 ParsedStateUpdate, ParsedStateUpdateKind,
63};
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71 #[instrument(level = "debug")]
80 pub async fn apply_catalog_implications(
81 &mut self,
82 ctx: Option<&mut ExecuteContext>,
83 catalog_updates: Vec<ParsedStateUpdate>,
84 ) -> Result<(), AdapterError> {
85 let start = Instant::now();
86
87 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90 BTreeMap::new();
91
92 for update in catalog_updates {
93 tracing::trace!(?update, "got parsed state update");
94 match &update.kind {
95 ParsedStateUpdateKind::Item {
96 durable_item,
97 parsed_item: _,
98 connection: _,
99 parsed_full_name: _,
100 } => {
101 let entry = catalog_implications
102 .entry(durable_item.id.clone())
103 .or_insert_with(|| CatalogImplication::None);
104 entry.absorb(update);
105 }
106 ParsedStateUpdateKind::TemporaryItem {
107 durable_item,
108 parsed_item: _,
109 connection: _,
110 parsed_full_name: _,
111 } => {
112 let entry = catalog_implications
113 .entry(durable_item.id.clone())
114 .or_insert_with(|| CatalogImplication::None);
115 entry.absorb(update);
116 }
117 ParsedStateUpdateKind::Cluster {
118 durable_cluster,
119 parsed_cluster: _,
120 } => {
121 let entry = cluster_commands
122 .entry(durable_cluster.id)
123 .or_insert_with(|| CatalogImplication::None);
124 entry.absorb(update.clone());
125 }
126 ParsedStateUpdateKind::ClusterReplica {
127 durable_cluster_replica,
128 parsed_cluster_replica: _,
129 } => {
130 let entry = cluster_replica_commands
131 .entry((
132 durable_cluster_replica.cluster_id,
133 durable_cluster_replica.replica_id,
134 ))
135 .or_insert_with(|| CatalogImplication::None);
136 entry.absorb(update.clone());
137 }
138 }
139 }
140
141 self.apply_catalog_implications_inner(
142 ctx,
143 catalog_implications.into_iter().collect_vec(),
144 cluster_commands.into_iter().collect_vec(),
145 cluster_replica_commands.into_iter().collect_vec(),
146 )
147 .await?;
148
149 self.metrics
150 .apply_catalog_implications_seconds
151 .observe(start.elapsed().as_secs_f64());
152
153 Ok(())
154 }
155
156 #[instrument(level = "debug")]
157 async fn apply_catalog_implications_inner(
158 &mut self,
159 ctx: Option<&mut ExecuteContext>,
160 implications: Vec<(CatalogItemId, CatalogImplication)>,
161 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163 ) -> Result<(), AdapterError> {
164 let mut tables_to_drop = BTreeSet::new();
165 let mut sources_to_drop = vec![];
166 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167 let mut storage_sink_gids_to_drop = vec![];
168 let mut compute_gids_to_drop = vec![];
169 let mut view_gids_to_drop = vec![];
170 let mut secrets_to_drop = vec![];
171 let mut vpc_endpoints_to_drop = vec![];
172 let mut clusters_to_drop = vec![];
173 let mut cluster_replicas_to_drop = vec![];
174 let mut compute_sinks_to_drop = BTreeMap::new();
175 let mut peeks_to_drop = vec![];
176 let mut copies_to_drop = vec![];
177
178 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
180 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
181
182 let mut table_collections_to_create = BTreeMap::new();
185 let mut source_collections_to_create = BTreeMap::new();
186 let mut storage_policies_to_initialize = BTreeMap::new();
187 let mut execution_timestamps_to_set = BTreeSet::new();
188 let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
189
190 let mut replacement_gids = vec![];
194
195 let mut source_connections_to_alter: BTreeMap<
197 GlobalId,
198 GenericSourceConnection<InlinedConnection>,
199 > = BTreeMap::new();
200 let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
201 BTreeMap::new();
202 let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
203 BTreeMap::new();
204
205 for (catalog_id, implication) in implications {
212 tracing::trace!(?implication, "have to apply catalog implication");
213
214 match implication {
215 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
216 self.handle_create_table(
217 &ctx,
218 &mut table_collections_to_create,
219 &mut storage_policies_to_initialize,
220 &mut execution_timestamps_to_set,
221 catalog_id,
222 table.clone(),
223 )
224 .await?
225 }
226 CatalogImplication::Table(CatalogImplicationKind::Altered {
227 prev: prev_table,
228 new: new_table,
229 }) => self.handle_alter_table(prev_table, new_table).await?,
230
231 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
232 let global_ids = table.global_ids();
233 for global_id in global_ids {
234 tables_to_drop.insert((catalog_id, global_id));
235 dropped_item_names.insert(global_id, full_name.clone());
236 }
237 }
238 CatalogImplication::Source(CatalogImplicationKind::Added((
239 source,
240 _connection,
241 ))) => {
242 let compaction_windows = self
247 .catalog()
248 .state()
249 .source_compaction_windows(vec![catalog_id]);
250
251 self.handle_create_source(
252 &mut source_collections_to_create,
253 &mut storage_policies_to_initialize,
254 catalog_id,
255 source,
256 compaction_windows,
257 )
258 .await?
259 }
260 CatalogImplication::Source(CatalogImplicationKind::Altered {
261 prev: (prev_source, _prev_connection),
262 new: (new_source, _new_connection),
263 }) => {
264 tracing::debug!(
265 ?prev_source,
266 ?new_source,
267 "not handling AlterSource in here yet"
268 );
269 }
270 CatalogImplication::Source(CatalogImplicationKind::Dropped(
271 (source, connection),
272 full_name,
273 )) => {
274 let global_id = source.global_id();
275 sources_to_drop.push((catalog_id, global_id));
276 dropped_item_names.insert(global_id, full_name);
277
278 if let DataSourceDesc::Ingestion { desc, .. }
279 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
280 {
281 match &desc.connection {
282 GenericSourceConnection::Postgres(_referenced_conn) => {
283 let inline_conn = connection.expect("missing inlined connection");
284
285 let pg_conn = match inline_conn {
286 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
287 other => {
288 panic!("expected postgres connection, got: {:?}", other)
289 }
290 };
291 let pending_drop = (
292 pg_conn.connection.clone(),
293 pg_conn.publication_details.slot.clone(),
294 );
295 replication_slots_to_drop.push(pending_drop);
296 }
297 _ => {}
298 }
299 }
300 }
301 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
302 tracing::debug!(?sink, "not handling AddSink in here yet");
303 }
304 CatalogImplication::Sink(CatalogImplicationKind::Altered {
305 prev: prev_sink,
306 new: new_sink,
307 }) => {
308 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
309 }
310 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
311 storage_sink_gids_to_drop.push(sink.global_id());
312 dropped_item_names.insert(sink.global_id(), full_name);
313 }
314 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
315 tracing::debug!(?index, "not handling AddIndex in here yet");
316 }
317 CatalogImplication::Index(CatalogImplicationKind::Altered {
318 prev: prev_index,
319 new: new_index,
320 }) => {
321 tracing::debug!(
322 ?prev_index,
323 ?new_index,
324 "not handling AlterIndex in here yet"
325 );
326 }
327 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
328 compute_gids_to_drop.push((index.cluster_id, index.global_id()));
329 dropped_item_names.insert(index.global_id(), full_name);
330 }
331 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
332 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
333 }
334 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
335 prev: prev_mv,
336 new: new_mv,
337 }) => {
338 let old_gid = prev_mv.global_id_writes();
339 let new_gid = new_mv.global_id_writes();
340 if new_gid != old_gid {
341 replacement_gids.push((new_mv.cluster_id, new_gid));
342 }
343
344 self.handle_alter_materialized_view(prev_mv, new_mv)?;
345 }
346 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
347 mv,
348 full_name,
349 )) => {
350 compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
351 sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
352 dropped_item_names.insert(mv.global_id_writes(), full_name);
353 }
354 CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
355 tracing::debug!(?view, "not handling AddView in here yet");
356 }
357 CatalogImplication::View(CatalogImplicationKind::Altered {
358 prev: prev_view,
359 new: new_view,
360 }) => {
361 tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
362 }
363 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
364 view_gids_to_drop.push(view.global_id());
365 dropped_item_names.insert(view.global_id(), full_name);
366 }
367 CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
368 tracing::debug!(?ct, "not handling AddContinualTask in here yet");
369 }
370 CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
371 prev: prev_ct,
372 new: new_ct,
373 }) => {
374 tracing::debug!(
375 ?prev_ct,
376 ?new_ct,
377 "not handling AlterContinualTask in here yet"
378 );
379 }
380 CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
381 ct,
382 _full_name,
383 )) => {
384 compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
385 sources_to_drop.push((catalog_id, ct.global_id()));
386 }
387 CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
388 }
392 CatalogImplication::Secret(CatalogImplicationKind::Altered {
393 prev: _prev_secret,
394 new: _new_secret,
395 }) => {
396 }
399 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
400 _secret,
401 _full_name,
402 )) => {
403 secrets_to_drop.push(catalog_id);
404 }
405 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
406 match &connection.details {
407 ConnectionDetails::Ssh { .. } => {}
410 ConnectionDetails::AwsPrivatelink(privatelink) => {
412 let spec = VpcEndpointConfig {
413 aws_service_name: privatelink.service_name.to_owned(),
414 availability_zone_ids: privatelink.availability_zones.to_owned(),
415 };
416 vpc_endpoints_to_create.push((catalog_id, spec));
417 }
418 _ => {}
420 }
421 }
422 CatalogImplication::Connection(CatalogImplicationKind::Altered {
423 prev: _prev_connection,
424 new: new_connection,
425 }) => {
426 self.handle_alter_connection(
427 catalog_id,
428 new_connection,
429 &mut vpc_endpoints_to_create,
430 &mut source_connections_to_alter,
431 &mut sink_connections_to_alter,
432 &mut source_export_data_configs_to_alter,
433 );
434 }
435 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
436 connection,
437 _full_name,
438 )) => {
439 match &connection.details {
440 ConnectionDetails::Ssh { .. } => {
442 secrets_to_drop.push(catalog_id);
443 }
444 ConnectionDetails::AwsPrivatelink(_) => {
447 vpc_endpoints_to_drop.push(catalog_id);
448 }
449 _ => (),
450 }
451 }
452 CatalogImplication::None => {
453 }
455 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
456 unreachable!("clusters and cluster replicas are handled below")
457 }
458 CatalogImplication::Table(CatalogImplicationKind::None)
459 | CatalogImplication::Source(CatalogImplicationKind::None)
460 | CatalogImplication::Sink(CatalogImplicationKind::None)
461 | CatalogImplication::Index(CatalogImplicationKind::None)
462 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
463 | CatalogImplication::View(CatalogImplicationKind::None)
464 | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
465 | CatalogImplication::Secret(CatalogImplicationKind::None)
466 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
467 unreachable!("will never leave None in place");
468 }
469 }
470 }
471
472 for (cluster_id, command) in cluster_commands {
473 tracing::trace!(?command, "have cluster command to apply!");
474
475 match command {
476 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
477 tracing::debug!(?cluster, "not handling AddCluster in here yet");
478 }
479 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
480 prev: prev_cluster,
481 new: new_cluster,
482 }) => {
483 tracing::debug!(
484 ?prev_cluster,
485 ?new_cluster,
486 "not handling AlterCluster in here yet"
487 );
488 }
489 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
490 cluster,
491 _full_name,
492 )) => {
493 clusters_to_drop.push(cluster_id);
494 dropped_cluster_names.insert(cluster_id, cluster.name);
495 }
496 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
497 unreachable!("will never leave None in place");
498 }
499 command => {
500 unreachable!(
501 "we only handle cluster commands in this map, got: {:?}",
502 command
503 );
504 }
505 }
506 }
507
508 for ((cluster_id, replica_id), command) in cluster_replica_commands {
509 tracing::trace!(?command, "have cluster replica command to apply!");
510
511 match command {
512 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
513 tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
514 }
515 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
516 prev: prev_replica,
517 new: new_replica,
518 }) => {
519 tracing::debug!(
520 ?prev_replica,
521 ?new_replica,
522 "not handling AlterClusterReplica in here yet"
523 );
524 }
525 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
526 _replica,
527 _full_name,
528 )) => {
529 cluster_replicas_to_drop.push((cluster_id, replica_id));
530 }
531 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
532 unreachable!("will never leave None in place");
533 }
534 command => {
535 unreachable!(
536 "we only handle cluster replica commands in this map, got: {:?}",
537 command
538 );
539 }
540 }
541 }
542
543 for (cluster_id, gid) in replacement_gids {
545 sources_to_drop.retain(|(_, id)| *id != gid);
546 compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
547 }
548
549 if !source_collections_to_create.is_empty() {
550 self.create_source_collections(source_collections_to_create)
551 .await?;
552 }
553
554 if !table_collections_to_create.is_empty() {
557 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
558 .await?;
559 }
560 self.initialize_storage_collections(storage_policies_to_initialize)
566 .await?;
567
568 if !vpc_endpoints_to_create.is_empty() {
570 if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
571 for (connection_id, spec) in vpc_endpoints_to_create {
572 if let Err(err) = cloud_resource_controller
573 .ensure_vpc_endpoint(connection_id, spec)
574 .await
575 {
576 tracing::error!(?err, "failed to ensure vpc endpoint!");
577 }
578 }
579 } else {
580 tracing::error!(
581 "AWS PrivateLink connections unsupported without cloud_resource_controller"
582 );
583 }
584 }
585
586 if !source_connections_to_alter.is_empty() {
588 self.controller
589 .storage
590 .alter_ingestion_connections(source_connections_to_alter)
591 .await
592 .unwrap_or_terminate("cannot fail to alter ingestion connections");
593 }
594
595 if !sink_connections_to_alter.is_empty() {
596 self.controller
597 .storage
598 .alter_export_connections(sink_connections_to_alter)
599 .await
600 .unwrap_or_terminate("altering export connections after txn must succeed");
601 }
602
603 if !source_export_data_configs_to_alter.is_empty() {
604 self.controller
605 .storage
606 .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
607 .await
608 .unwrap_or_terminate("altering source export data configs after txn must succeed");
609 }
610
611 let collections_to_drop: BTreeSet<_> = sources_to_drop
612 .iter()
613 .map(|(_, gid)| *gid)
614 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
615 .chain(storage_sink_gids_to_drop.iter().copied())
616 .chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
617 .chain(view_gids_to_drop.iter().copied())
618 .collect();
619
620 for (sink_id, sink) in &self.active_compute_sinks {
623 let cluster_id = sink.cluster_id();
624 if let Some(id) = sink
625 .depends_on()
626 .iter()
627 .find(|id| collections_to_drop.contains(id))
628 {
629 let name = dropped_item_names
630 .get(id)
631 .map(|n| format!("relation {}", n.quoted()))
632 .expect("missing relation name");
633 compute_sinks_to_drop.insert(
634 *sink_id,
635 ActiveComputeSinkRetireReason::DependencyDropped(name),
636 );
637 } else if clusters_to_drop.contains(&cluster_id) {
638 let name = dropped_cluster_names
639 .get(&cluster_id)
640 .map(|n| format!("cluster {}", n.quoted()))
641 .expect("missing cluster name");
642 compute_sinks_to_drop.insert(
643 *sink_id,
644 ActiveComputeSinkRetireReason::DependencyDropped(name),
645 );
646 }
647 }
648
649 for (uuid, pending_peek) in &self.pending_peeks {
651 if let Some(id) = pending_peek
652 .depends_on
653 .iter()
654 .find(|id| collections_to_drop.contains(id))
655 {
656 let name = dropped_item_names
657 .get(id)
658 .map(|n| format!("relation {}", n.quoted()))
659 .expect("missing relation name");
660 peeks_to_drop.push((name, uuid.clone()));
661 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
662 let name = dropped_cluster_names
663 .get(&pending_peek.cluster_id)
664 .map(|n| format!("cluster {}", n.quoted()))
665 .expect("missing cluster name");
666 peeks_to_drop.push((name, uuid.clone()));
667 }
668 }
669
670 for (conn_id, pending_copy) in &self.active_copies {
672 let dropping_table = tables_to_drop
673 .iter()
674 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
675 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
676
677 if dropping_table || dropping_cluster {
678 copies_to_drop.push(conn_id.clone());
679 }
680 }
681
682 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
683 .iter()
684 .map(|(_id, gid)| gid)
685 .chain(storage_sink_gids_to_drop.iter())
686 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
687 .copied()
688 .collect();
689
690 let mut timeline_id_bundles = BTreeMap::new();
696
697 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
698 let mut id_bundle = CollectionIdBundle::default();
699
700 for storage_id in read_holds.storage_ids() {
701 if storage_gids_to_drop.contains(&storage_id) {
702 id_bundle.storage_ids.insert(storage_id);
703 }
704 }
705
706 for (instance_id, id) in read_holds.compute_ids() {
707 if compute_gids_to_drop.contains(&(instance_id, id))
708 || clusters_to_drop.contains(&instance_id)
709 {
710 id_bundle
711 .compute_ids
712 .entry(instance_id)
713 .or_default()
714 .insert(id);
715 }
716 }
717
718 timeline_id_bundles.insert(timeline.clone(), id_bundle);
719 }
720
721 let mut timeline_associations = BTreeMap::new();
722 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
723 let TimelineState { read_holds, .. } = self
724 .global_timelines
725 .get(&timeline)
726 .expect("all timelines have a timestamp oracle");
727
728 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
729 timeline_associations.insert(timeline, (empty, id_bundle));
730 }
731
732 let _: () = async {
735 if !timeline_associations.is_empty() {
736 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
737 let became_empty =
738 self.remove_resources_associated_with_timeline(timeline, id_bundle);
739 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
740 }
741 }
742
743 if !tables_to_drop.is_empty() {
750 let ts = self.get_local_write_ts().await;
751 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
752 }
753
754 if !sources_to_drop.is_empty() {
755 self.drop_sources(sources_to_drop);
756 }
757
758 if !storage_sink_gids_to_drop.is_empty() {
759 self.drop_storage_sinks(storage_sink_gids_to_drop);
760 }
761
762 if !compute_sinks_to_drop.is_empty() {
763 self.retire_compute_sinks(compute_sinks_to_drop).await;
764 }
765
766 if !peeks_to_drop.is_empty() {
767 for (dropped_name, uuid) in peeks_to_drop {
768 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
769 let cancel_reason = PeekResponse::Error(format!(
770 "query could not complete because {dropped_name} was dropped"
771 ));
772 self.controller
773 .compute
774 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
775 .unwrap_or_terminate("unable to cancel peek");
776 self.retire_execution(
777 StatementEndedExecutionReason::Canceled,
778 pending_peek.ctx_extra.defuse(),
779 );
780 }
781 }
782 }
783
784 if !copies_to_drop.is_empty() {
785 for conn_id in copies_to_drop {
786 self.cancel_pending_copy(&conn_id);
787 }
788 }
789
790 if !compute_gids_to_drop.is_empty() {
791 self.drop_compute_collections(compute_gids_to_drop);
792 }
793
794 if !vpc_endpoints_to_drop.is_empty() {
795 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
796 }
797
798 if !cluster_replicas_to_drop.is_empty() {
799 fail::fail_point!("after_catalog_drop_replica");
800
801 for (cluster_id, replica_id) in cluster_replicas_to_drop {
802 self.drop_replica(cluster_id, replica_id);
803 }
804 }
805 if !clusters_to_drop.is_empty() {
806 for cluster_id in clusters_to_drop {
807 self.controller.drop_cluster(cluster_id);
808 }
809 }
810
811 task::spawn(|| "drop_replication_slots_and_secrets", {
819 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
820 let secrets_controller = Arc::clone(&self.secrets_controller);
821 let secrets_reader = Arc::clone(self.secrets_reader());
822 let storage_config = self.controller.storage.config().clone();
823
824 async move {
825 for (connection, replication_slot_name) in replication_slots_to_drop {
826 tracing::info!(?replication_slot_name, "dropping replication slot");
827
828 let result: Result<(), anyhow::Error> = Retry::default()
834 .max_duration(Duration::from_secs(60))
835 .retry_async(|_state| async {
836 let config = connection
837 .config(&secrets_reader, &storage_config, InTask::No)
838 .await
839 .map_err(|e| {
840 anyhow::anyhow!(
841 "error creating Postgres client for \
842 dropping acquired slots: {}",
843 e.display_with_causes()
844 )
845 })?;
846
847 mz_postgres_util::drop_replication_slots(
848 &ssh_tunnel_manager,
849 config.clone(),
850 &[(&replication_slot_name, true)],
851 )
852 .await?;
853
854 Ok(())
855 })
856 .await;
857
858 if let Err(err) = result {
859 tracing::warn!(
860 ?replication_slot_name,
861 ?err,
862 "failed to drop replication slot"
863 );
864 }
865 }
866
867 fail_point!("drop_secrets");
875 for secret in secrets_to_drop {
876 if let Err(e) = secrets_controller.delete(secret).await {
877 warn!("Dropping secrets has encountered an error: {}", e);
878 }
879 }
880 }
881 });
882 }
883 .instrument(info_span!(
884 "coord::apply_catalog_implications_inner::finalize"
885 ))
886 .await;
887
888 Ok(())
889 }
890
891 #[instrument(level = "debug")]
892 async fn create_table_collections(
893 &mut self,
894 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
895 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
896 ) -> Result<(), AdapterError> {
897 let register_ts = self.get_local_write_ts().await.timestamp;
899
900 self.catalog
907 .confirm_leadership()
908 .await
909 .unwrap_or_terminate("unable to confirm leadership");
910
911 for id in execution_timestamps_to_set {
912 self.set_statement_execution_timestamp(id, register_ts);
913 }
914
915 let storage_metadata = self.catalog.state().storage_metadata();
916
917 self.controller
918 .storage
919 .create_collections(
920 storage_metadata,
921 Some(register_ts),
922 table_collections_to_create.into_iter().collect_vec(),
923 )
924 .await
925 .unwrap_or_terminate("cannot fail to create collections");
926
927 self.apply_local_write(register_ts).await;
928
929 Ok(())
930 }
931
932 #[instrument(level = "debug")]
933 async fn create_source_collections(
934 &mut self,
935 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
936 ) -> Result<(), AdapterError> {
937 let storage_metadata = self.catalog.state().storage_metadata();
938
939 self.controller
940 .storage
941 .create_collections(
942 storage_metadata,
943 None, source_collections_to_create.into_iter().collect_vec(),
945 )
946 .await
947 .unwrap_or_terminate("cannot fail to create collections");
948
949 Ok(())
950 }
951
952 #[instrument(level = "debug")]
953 async fn initialize_storage_collections(
954 &mut self,
955 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
956 ) -> Result<(), AdapterError> {
957 for (compaction_window, global_ids) in storage_policies_to_initialize {
958 self.initialize_read_policies(
959 &CollectionIdBundle {
960 storage_ids: global_ids,
961 compute_ids: BTreeMap::new(),
962 },
963 compaction_window,
964 )
965 .await;
966 }
967
968 Ok(())
969 }
970
971 #[instrument(level = "debug")]
972 async fn handle_create_table(
973 &self,
974 ctx: &Option<&mut ExecuteContext>,
975 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
976 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
977 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
978 table_id: CatalogItemId,
979 table: Table,
980 ) -> Result<(), AdapterError> {
981 match &table.data_source {
985 TableDataSource::TableWrites { defaults: _ } => {
986 let versions: BTreeMap<_, _> = table
987 .collection_descs()
988 .map(|(gid, version, desc)| (version, (gid, desc)))
989 .collect();
990 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
991 let collection_desc = CollectionDescription::for_table(desc.clone());
992
993 (*gid, collection_desc)
994 });
995
996 let compaction_window = table
997 .custom_logical_compaction_window
998 .unwrap_or(CompactionWindow::Default);
999 let ids_to_initialize = storage_policies_to_initialize
1000 .entry(compaction_window)
1001 .or_default();
1002
1003 for (gid, collection_desc) in collection_descs {
1004 storage_collections_to_create.insert(gid, collection_desc);
1005 ids_to_initialize.insert(gid);
1006 }
1007
1008 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1009 execution_timestamps_to_set.insert(id);
1010 }
1011 }
1012 TableDataSource::DataSource {
1013 desc: data_source_desc,
1014 timeline,
1015 } => {
1016 match data_source_desc {
1017 DataSourceDesc::IngestionExport {
1018 ingestion_id,
1019 external_reference: _,
1020 details,
1021 data_config,
1022 } => {
1023 let global_ingestion_id =
1024 self.catalog().get_entry(ingestion_id).latest_global_id();
1025
1026 let collection_desc = CollectionDescription::<Timestamp> {
1027 desc: table.desc.latest(),
1028 data_source: DataSource::IngestionExport {
1029 ingestion_id: global_ingestion_id,
1030 details: details.clone(),
1031 data_config: data_config
1032 .clone()
1033 .into_inline_connection(self.catalog.state()),
1034 },
1035 since: None,
1036 timeline: Some(timeline.clone()),
1037 primary: None,
1038 };
1039
1040 let global_id = table
1041 .global_ids()
1042 .expect_element(|| "subsources cannot have multiple versions");
1043
1044 storage_collections_to_create.insert(global_id, collection_desc);
1045
1046 let read_policies = self
1047 .catalog()
1048 .state()
1049 .source_compaction_windows(vec![table_id]);
1050 for (compaction_window, catalog_ids) in read_policies {
1051 let compaction_ids = storage_policies_to_initialize
1052 .entry(compaction_window)
1053 .or_default();
1054
1055 let gids = catalog_ids
1056 .into_iter()
1057 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1058 .flatten();
1059 compaction_ids.extend(gids);
1060 }
1061 }
1062 DataSourceDesc::Webhook {
1063 validate_using: _,
1064 body_format: _,
1065 headers: _,
1066 cluster_id: _,
1067 } => {
1068 assert_eq!(
1070 table.desc.latest_version(),
1071 RelationVersion::root(),
1072 "found webhook with more than 1 relation version, {:?}",
1073 table.desc
1074 );
1075 let desc = table.desc.latest();
1076
1077 let collection_desc = CollectionDescription::<Timestamp> {
1078 desc,
1079 data_source: DataSource::Webhook,
1080 since: None,
1081 timeline: Some(timeline.clone()),
1082 primary: None,
1083 };
1084
1085 let global_id = table
1086 .global_ids()
1087 .expect_element(|| "webhooks cannot have multiple versions");
1088
1089 storage_collections_to_create.insert(global_id, collection_desc);
1090
1091 let read_policies = self
1092 .catalog()
1093 .state()
1094 .source_compaction_windows(vec![table_id]);
1095
1096 for (compaction_window, catalog_ids) in read_policies {
1097 let compaction_ids = storage_policies_to_initialize
1098 .entry(compaction_window)
1099 .or_default();
1100
1101 let gids = catalog_ids
1102 .into_iter()
1103 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1104 .flatten();
1105 compaction_ids.extend(gids);
1106 }
1107 }
1108 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1109 }
1110 }
1111 }
1112
1113 Ok(())
1114 }
1115
1116 #[instrument(level = "debug")]
1117 async fn handle_alter_table(
1118 &mut self,
1119 prev_table: Table,
1120 new_table: Table,
1121 ) -> Result<(), AdapterError> {
1122 let existing_gid = prev_table.global_id_writes();
1123 let new_gid = new_table.global_id_writes();
1124
1125 if existing_gid == new_gid {
1126 return Ok(());
1130 }
1131
1132 let existing_table = crate::CollectionIdBundle {
1136 storage_ids: BTreeSet::from([existing_gid]),
1137 compute_ids: BTreeMap::new(),
1138 };
1139 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1140
1141 let expected_version = prev_table.desc.latest_version();
1142 let new_version = new_table.desc.latest_version();
1143 let new_desc = new_table
1144 .desc
1145 .at_version(RelationVersionSelector::Specific(new_version));
1146
1147 let register_ts = self.get_local_write_ts().await.timestamp;
1148
1149 self.controller
1151 .storage
1152 .alter_table_desc(
1153 existing_gid,
1154 new_gid,
1155 new_desc,
1156 expected_version,
1157 register_ts,
1158 )
1159 .await
1160 .expect("failed to alter desc of table");
1161
1162 let compaction_window = new_table
1164 .custom_logical_compaction_window
1165 .unwrap_or(CompactionWindow::Default);
1166 self.initialize_read_policies(
1167 &crate::CollectionIdBundle {
1168 storage_ids: BTreeSet::from([new_gid]),
1169 compute_ids: BTreeMap::new(),
1170 },
1171 compaction_window,
1172 )
1173 .await;
1174
1175 self.apply_local_write(register_ts).await;
1176
1177 drop(existing_table_read_hold);
1179
1180 Ok(())
1181 }
1182
1183 #[instrument(level = "debug")]
1184 fn handle_alter_materialized_view(
1185 &mut self,
1186 prev_mv: MaterializedView,
1187 new_mv: MaterializedView,
1188 ) -> Result<(), AdapterError> {
1189 let old_gid = prev_mv.global_id_writes();
1190 let new_gid = new_mv.global_id_writes();
1191
1192 if old_gid == new_gid {
1193 return Ok(());
1197 }
1198
1199 self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
1208 self.allow_writes(new_mv.cluster_id, new_gid);
1209
1210 Ok(())
1211 }
1212
1213 #[instrument(level = "debug")]
1214 async fn handle_create_source(
1215 &self,
1216 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1217 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1218 item_id: CatalogItemId,
1219 source: Source,
1220 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1221 ) -> Result<(), AdapterError> {
1222 let data_source = match source.data_source {
1223 DataSourceDesc::Ingestion { desc, cluster_id } => {
1224 let desc = desc.into_inline_connection(self.catalog().state());
1225 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1226
1227 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1228 desc,
1229 cluster_id,
1230 item_global_id,
1231 );
1232
1233 DataSource::Ingestion(ingestion)
1234 }
1235 DataSourceDesc::OldSyntaxIngestion {
1236 desc,
1237 progress_subsource,
1238 data_config,
1239 details,
1240 cluster_id,
1241 } => {
1242 let desc = desc.into_inline_connection(self.catalog().state());
1243 let data_config = data_config.into_inline_connection(self.catalog().state());
1244
1245 let progress_subsource = self
1248 .catalog()
1249 .get_entry(&progress_subsource)
1250 .latest_global_id();
1251
1252 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1253 desc,
1254 cluster_id,
1255 progress_subsource,
1256 );
1257
1258 let legacy_export = SourceExport {
1259 storage_metadata: (),
1260 data_config,
1261 details,
1262 };
1263
1264 ingestion
1265 .source_exports
1266 .insert(source.global_id, legacy_export);
1267
1268 DataSource::Ingestion(ingestion)
1269 }
1270 DataSourceDesc::IngestionExport {
1271 ingestion_id,
1272 external_reference: _,
1273 details,
1274 data_config,
1275 } => {
1276 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1279
1280 DataSource::IngestionExport {
1281 ingestion_id,
1282 details,
1283 data_config: data_config.into_inline_connection(self.catalog().state()),
1284 }
1285 }
1286 DataSourceDesc::Progress => DataSource::Progress,
1287 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1288 DataSourceDesc::Introspection(_) => {
1289 unreachable!("cannot create sources with introspection data sources")
1290 }
1291 };
1292
1293 storage_collections_to_create.insert(
1294 source.global_id,
1295 CollectionDescription::<Timestamp> {
1296 desc: source.desc.clone(),
1297 data_source,
1298 timeline: Some(source.timeline),
1299 since: None,
1300 primary: None,
1301 },
1302 );
1303
1304 for (compaction_window, catalog_ids) in compaction_windows {
1306 let compaction_ids = storage_policies_to_initialize
1307 .entry(compaction_window)
1308 .or_default();
1309
1310 let gids = catalog_ids
1311 .into_iter()
1312 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1313 .flatten();
1314 compaction_ids.extend(gids);
1315 }
1316
1317 Ok(())
1318 }
1319
1320 #[instrument(level = "debug")]
1327 fn handle_alter_connection(
1328 &self,
1329 connection_id: CatalogItemId,
1330 connection: Connection,
1331 vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1332 source_connections_to_alter: &mut BTreeMap<
1333 GlobalId,
1334 GenericSourceConnection<InlinedConnection>,
1335 >,
1336 sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1337 source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1338 ) {
1339 use std::collections::VecDeque;
1340
1341 if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1343 let spec = VpcEndpointConfig {
1344 aws_service_name: privatelink.service_name.to_owned(),
1345 availability_zone_ids: privatelink.availability_zones.to_owned(),
1346 };
1347 vpc_endpoints_to_create.push((connection_id, spec));
1348 }
1349
1350 let mut connections_to_process = VecDeque::new();
1354 connections_to_process.push_front(connection_id.clone());
1355
1356 while let Some(id) = connections_to_process.pop_front() {
1357 for dependent_id in self.catalog().get_entry(&id).used_by() {
1358 let dependent_entry = self.catalog().get_entry(dependent_id);
1359 match dependent_entry.item() {
1360 CatalogItem::Connection(_) => {
1361 connections_to_process.push_back(*dependent_id);
1365 }
1366 CatalogItem::Source(source) => {
1367 let desc = match &dependent_entry
1368 .source()
1369 .expect("known to be source")
1370 .data_source
1371 {
1372 DataSourceDesc::Ingestion { desc, .. }
1373 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1374 desc.clone().into_inline_connection(self.catalog().state())
1375 }
1376 _ => {
1377 continue;
1379 }
1380 };
1381
1382 source_connections_to_alter.insert(source.global_id, desc.connection);
1383 }
1384 CatalogItem::Sink(sink) => {
1385 let export = dependent_entry.sink().expect("known to be sink");
1386 sink_connections_to_alter.insert(
1387 sink.global_id,
1388 export
1389 .connection
1390 .clone()
1391 .into_inline_connection(self.catalog().state()),
1392 );
1393 }
1394 CatalogItem::Table(table) => {
1395 if let Some((_, _, _, export_data_config)) =
1399 dependent_entry.source_export_details()
1400 {
1401 let data_config = export_data_config.clone();
1402 source_export_data_configs_to_alter.insert(
1403 table.global_id_writes(),
1404 data_config.into_inline_connection(self.catalog().state()),
1405 );
1406 }
1407 }
1408 _ => {
1409 }
1412 }
1413 }
1414 }
1415 }
1416}
1417
1418#[derive(Debug, Clone)]
1424enum CatalogImplication {
1425 None,
1426 Table(CatalogImplicationKind<Table>),
1427 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1428 Sink(CatalogImplicationKind<Sink>),
1429 Index(CatalogImplicationKind<Index>),
1430 MaterializedView(CatalogImplicationKind<MaterializedView>),
1431 View(CatalogImplicationKind<View>),
1432 ContinualTask(CatalogImplicationKind<ContinualTask>),
1433 Secret(CatalogImplicationKind<Secret>),
1434 Connection(CatalogImplicationKind<Connection>),
1435 Cluster(CatalogImplicationKind<Cluster>),
1436 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1437}
1438
1439#[derive(Debug, Clone)]
1440enum CatalogImplicationKind<T> {
1441 None,
1443 Added(T),
1445 Dropped(T, String),
1447 Altered { prev: T, new: T },
1449}
1450
1451impl<T: Clone> CatalogImplicationKind<T> {
1452 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1455 use CatalogImplicationKind::*;
1456 use StateDiff::*;
1457
1458 let new_state = match (&*self, diff) {
1459 (None, Addition) => Added(item),
1461 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1462
1463 (Added(existing), Retraction) => {
1465 Altered {
1467 prev: item,
1468 new: existing.clone(),
1469 }
1470 }
1471 (Added(_), Addition) => {
1472 return Err("Cannot add an already added object".to_string());
1473 }
1474
1475 (Dropped(existing, _), Addition) => {
1477 Altered {
1479 prev: existing.clone(),
1480 new: item,
1481 }
1482 }
1483 (Dropped(_, _), Retraction) => {
1484 return Err("Cannot drop an already dropped object".to_string());
1485 }
1486
1487 (Altered { .. }, _) => {
1489 return Err(format!(
1490 "Cannot apply {:?} to an object in Altered state",
1491 diff
1492 ));
1493 }
1494 };
1495
1496 *self = new_state;
1497 Ok(())
1498 }
1499}
1500
1501macro_rules! impl_absorb_method {
1503 (
1504 $method_name:ident,
1505 $variant:ident,
1506 $item_type:ty
1507 ) => {
1508 fn $method_name(
1509 &mut self,
1510 item: $item_type,
1511 parsed_full_name: Option<String>,
1512 diff: StateDiff,
1513 ) {
1514 let state = match self {
1515 CatalogImplication::$variant(state) => state,
1516 CatalogImplication::None => {
1517 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1518 match self {
1519 CatalogImplication::$variant(state) => state,
1520 _ => unreachable!(),
1521 }
1522 }
1523 _ => {
1524 panic!(
1525 "Unexpected command type for {:?}: {} {:?}",
1526 self,
1527 stringify!($variant),
1528 diff,
1529 );
1530 }
1531 };
1532
1533 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1534 panic!(
1535 "Invalid state transition for {}: {}",
1536 stringify!($variant),
1537 e
1538 );
1539 }
1540 }
1541 };
1542}
1543
1544impl CatalogImplication {
1545 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1548 match catalog_update.kind {
1549 ParsedStateUpdateKind::Item {
1550 durable_item: _,
1551 parsed_item,
1552 connection,
1553 parsed_full_name,
1554 } => match parsed_item {
1555 CatalogItem::Table(table) => {
1556 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1557 }
1558 CatalogItem::Source(source) => {
1559 self.absorb_source(
1560 (source, connection),
1561 Some(parsed_full_name),
1562 catalog_update.diff,
1563 );
1564 }
1565 CatalogItem::Sink(sink) => {
1566 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1567 }
1568 CatalogItem::Index(index) => {
1569 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1570 }
1571 CatalogItem::MaterializedView(mv) => {
1572 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1573 }
1574 CatalogItem::View(view) => {
1575 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1576 }
1577 CatalogItem::ContinualTask(ct) => {
1578 self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1579 }
1580 CatalogItem::Secret(secret) => {
1581 self.absorb_secret(secret, None, catalog_update.diff);
1582 }
1583 CatalogItem::Connection(connection) => {
1584 self.absorb_connection(connection, None, catalog_update.diff);
1585 }
1586 CatalogItem::Log(_) => {}
1587 CatalogItem::Type(_) => {}
1588 CatalogItem::Func(_) => {}
1589 },
1590 ParsedStateUpdateKind::TemporaryItem {
1591 durable_item: _,
1592 parsed_item,
1593 connection,
1594 parsed_full_name,
1595 } => match parsed_item {
1596 CatalogItem::Table(table) => {
1597 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1598 }
1599 CatalogItem::Source(source) => {
1600 self.absorb_source(
1601 (source, connection),
1602 Some(parsed_full_name),
1603 catalog_update.diff,
1604 );
1605 }
1606 CatalogItem::Sink(sink) => {
1607 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1608 }
1609 CatalogItem::Index(index) => {
1610 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1611 }
1612 CatalogItem::MaterializedView(mv) => {
1613 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1614 }
1615 CatalogItem::View(view) => {
1616 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1617 }
1618 CatalogItem::ContinualTask(ct) => {
1619 self.absorb_continual_task(ct, None, catalog_update.diff);
1620 }
1621 CatalogItem::Secret(secret) => {
1622 self.absorb_secret(secret, None, catalog_update.diff);
1623 }
1624 CatalogItem::Connection(connection) => {
1625 self.absorb_connection(connection, None, catalog_update.diff);
1626 }
1627 CatalogItem::Log(_) => {}
1628 CatalogItem::Type(_) => {}
1629 CatalogItem::Func(_) => {}
1630 },
1631 ParsedStateUpdateKind::Cluster {
1632 durable_cluster: _,
1633 parsed_cluster,
1634 } => {
1635 self.absorb_cluster(parsed_cluster, catalog_update.diff);
1636 }
1637 ParsedStateUpdateKind::ClusterReplica {
1638 durable_cluster_replica: _,
1639 parsed_cluster_replica,
1640 } => {
1641 self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1642 }
1643 }
1644 }
1645
1646 impl_absorb_method!(absorb_table, Table, Table);
1647 impl_absorb_method!(
1648 absorb_source,
1649 Source,
1650 (Source, Option<GenericSourceConnection>)
1651 );
1652 impl_absorb_method!(absorb_sink, Sink, Sink);
1653 impl_absorb_method!(absorb_index, Index, Index);
1654 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1655 impl_absorb_method!(absorb_view, View, View);
1656
1657 impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1658 impl_absorb_method!(absorb_secret, Secret, Secret);
1659 impl_absorb_method!(absorb_connection, Connection, Connection);
1660
1661 fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1663 let state = match self {
1664 CatalogImplication::Cluster(state) => state,
1665 CatalogImplication::None => {
1666 *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1667 match self {
1668 CatalogImplication::Cluster(state) => state,
1669 _ => unreachable!(),
1670 }
1671 }
1672 _ => {
1673 panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1674 }
1675 };
1676
1677 if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1678 panic!("invalid state transition for cluster: {}", e);
1679 }
1680 }
1681
1682 fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1684 let state = match self {
1685 CatalogImplication::ClusterReplica(state) => state,
1686 CatalogImplication::None => {
1687 *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1688 match self {
1689 CatalogImplication::ClusterReplica(state) => state,
1690 _ => unreachable!(),
1691 }
1692 }
1693 _ => {
1694 panic!(
1695 "Unexpected command type for {:?}: ClusterReplica {:?}",
1696 self, diff
1697 );
1698 }
1699 };
1700
1701 if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1702 {
1703 panic!("invalid state transition for cluster replica: {}", e);
1704 }
1705 }
1706}
1707
1708#[cfg(test)]
1709mod tests {
1710 use super::*;
1711 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1712 use mz_sql::names::ResolvedIds;
1713 use std::collections::BTreeMap;
1714
1715 fn create_test_table(name: &str) -> Table {
1716 Table {
1717 desc: VersionedRelationDesc::new(
1718 RelationDesc::builder()
1719 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1720 .finish(),
1721 ),
1722 create_sql: None,
1723 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1724 conn_id: None,
1725 resolved_ids: ResolvedIds::empty(),
1726 custom_logical_compaction_window: None,
1727 is_retained_metrics_object: false,
1728 data_source: TableDataSource::TableWrites { defaults: vec![] },
1729 }
1730 }
1731
1732 #[mz_ore::test]
1733 fn test_item_state_transitions() {
1734 let mut state = CatalogImplicationKind::None;
1736 assert!(
1737 state
1738 .transition("item1".to_string(), None, StateDiff::Addition)
1739 .is_ok()
1740 );
1741 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1742
1743 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1745 assert!(
1746 state
1747 .transition("old_item".to_string(), None, StateDiff::Retraction)
1748 .is_ok()
1749 );
1750 match &state {
1751 CatalogImplicationKind::Altered { prev, new } => {
1752 assert_eq!(prev, "old_item");
1754 assert_eq!(new, "new_item");
1756 }
1757 _ => panic!("Expected Altered state"),
1758 }
1759
1760 let mut state = CatalogImplicationKind::None;
1762 assert!(
1763 state
1764 .transition(
1765 "item1".to_string(),
1766 Some("test_name".to_string()),
1767 StateDiff::Retraction
1768 )
1769 .is_ok()
1770 );
1771 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1772
1773 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1775 assert!(
1776 state
1777 .transition("new_item".to_string(), None, StateDiff::Addition)
1778 .is_ok()
1779 );
1780 match &state {
1781 CatalogImplicationKind::Altered { prev, new } => {
1782 assert_eq!(prev, "old_item");
1784 assert_eq!(new, "new_item");
1786 }
1787 _ => panic!("Expected Altered state"),
1788 }
1789
1790 let mut state = CatalogImplicationKind::Added("item".to_string());
1792 assert!(
1793 state
1794 .transition("item2".to_string(), None, StateDiff::Addition)
1795 .is_err()
1796 );
1797
1798 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1799 assert!(
1800 state
1801 .transition("item2".to_string(), None, StateDiff::Retraction)
1802 .is_err()
1803 );
1804 }
1805
1806 #[mz_ore::test]
1807 fn test_table_absorb_state_machine() {
1808 let table1 = create_test_table("table1");
1809 let table2 = create_test_table("table2");
1810
1811 let mut cmd = CatalogImplication::None;
1813 cmd.absorb_table(
1814 table1.clone(),
1815 Some("schema.table1".to_string()),
1816 StateDiff::Addition,
1817 );
1818 match &cmd {
1820 CatalogImplication::Table(state) => match state {
1821 CatalogImplicationKind::Added(t) => {
1822 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1823 }
1824 _ => panic!("Expected Added state"),
1825 },
1826 _ => panic!("Expected Table command"),
1827 }
1828
1829 cmd.absorb_table(
1833 table2.clone(),
1834 Some("schema.table2".to_string()),
1835 StateDiff::Retraction,
1836 );
1837 match &cmd {
1838 CatalogImplication::Table(state) => match state {
1839 CatalogImplicationKind::Altered { prev, new } => {
1840 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1842 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1843 }
1844 _ => panic!("Expected Altered state"),
1845 },
1846 _ => panic!("Expected Table command"),
1847 }
1848
1849 let mut cmd = CatalogImplication::None;
1851 cmd.absorb_table(
1852 table1.clone(),
1853 Some("schema.table1".to_string()),
1854 StateDiff::Retraction,
1855 );
1856 match &cmd {
1857 CatalogImplication::Table(state) => match state {
1858 CatalogImplicationKind::Dropped(t, name) => {
1859 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1860 assert_eq!(name, "schema.table1");
1861 }
1862 _ => panic!("Expected Dropped state"),
1863 },
1864 _ => panic!("Expected Table command"),
1865 }
1866
1867 cmd.absorb_table(
1869 table2.clone(),
1870 Some("schema.table2".to_string()),
1871 StateDiff::Addition,
1872 );
1873 match &cmd {
1874 CatalogImplication::Table(state) => match state {
1875 CatalogImplicationKind::Altered { prev, new } => {
1876 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1878 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1879 }
1880 _ => panic!("Expected Altered state"),
1881 },
1882 _ => panic!("Expected Table command"),
1883 }
1884 }
1885
1886 #[mz_ore::test]
1887 #[should_panic(expected = "Cannot add an already added object")]
1888 fn test_invalid_double_add() {
1889 let table = create_test_table("table");
1890 let mut cmd = CatalogImplication::None;
1891
1892 cmd.absorb_table(
1894 table.clone(),
1895 Some("schema.table".to_string()),
1896 StateDiff::Addition,
1897 );
1898
1899 cmd.absorb_table(
1901 table.clone(),
1902 Some("schema.table".to_string()),
1903 StateDiff::Addition,
1904 );
1905 }
1906
1907 #[mz_ore::test]
1908 #[should_panic(expected = "Cannot drop an already dropped object")]
1909 fn test_invalid_double_drop() {
1910 let table = create_test_table("table");
1911 let mut cmd = CatalogImplication::None;
1912
1913 cmd.absorb_table(
1915 table.clone(),
1916 Some("schema.table".to_string()),
1917 StateDiff::Retraction,
1918 );
1919
1920 cmd.absorb_table(
1922 table.clone(),
1923 Some("schema.table".to_string()),
1924 StateDiff::Retraction,
1925 );
1926 }
1927}