1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::memory::objects::{
37 CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
38 MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
39};
40use mz_cloud_resources::VpcEndpointConfig;
41use mz_compute_client::logging::LogVariant;
42use mz_compute_client::protocol::response::PeekResponse;
43use mz_controller::clusters::{ClusterRole, ReplicaConfig};
44use mz_controller_types::{ClusterId, ReplicaId};
45use mz_ore::collections::CollectionExt;
46use mz_ore::error::ErrorExt;
47use mz_ore::future::InTask;
48use mz_ore::instrument;
49use mz_ore::retry::Retry;
50use mz_ore::str::StrExt;
51use mz_ore::task;
52use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
53use mz_sql::plan::ConnectionDetails;
54use mz_storage_client::controller::{CollectionDescription, DataSource};
55use mz_storage_types::connections::PostgresConnection;
56use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
57use mz_storage_types::sinks::StorageSinkConnection;
58use mz_storage_types::sources::{
59 GenericSourceConnection, SourceDesc, SourceExport, SourceExportDataConfig,
60};
61use tracing::{Instrument, info_span, warn};
62
63use crate::active_compute_sink::ActiveComputeSinkRetireReason;
64use crate::coord::Coordinator;
65use crate::coord::catalog_implications::parsed_state_updates::{
66 ParsedStateUpdate, ParsedStateUpdateKind,
67};
68use crate::coord::timeline::TimelineState;
69use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
70use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
71
72pub mod parsed_state_updates;
73
74impl Coordinator {
75 #[instrument(level = "debug")]
84 pub async fn apply_catalog_implications(
85 &mut self,
86 ctx: Option<&mut ExecuteContext>,
87 catalog_updates: Vec<ParsedStateUpdate>,
88 ) -> Result<(), AdapterError> {
89 let start = Instant::now();
90
91 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
92 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
93 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
94 BTreeMap::new();
95 let mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>> =
100 BTreeMap::new();
101
102 for update in catalog_updates {
103 tracing::trace!(?update, "got parsed state update");
104 match &update.kind {
105 ParsedStateUpdateKind::Item {
106 durable_item,
107 parsed_item: _,
108 connection: _,
109 parsed_full_name: _,
110 } => {
111 let entry = catalog_implications
112 .entry(durable_item.id.clone())
113 .or_insert_with(|| CatalogImplication::None);
114 entry.absorb(update);
115 }
116 ParsedStateUpdateKind::TemporaryItem {
117 durable_item,
118 parsed_item: _,
119 connection: _,
120 parsed_full_name: _,
121 } => {
122 let entry = catalog_implications
123 .entry(durable_item.id.clone())
124 .or_insert_with(|| CatalogImplication::None);
125 entry.absorb(update);
126 }
127 ParsedStateUpdateKind::Cluster {
128 durable_cluster,
129 parsed_cluster: _,
130 } => {
131 let entry = cluster_commands
132 .entry(durable_cluster.id)
133 .or_insert_with(|| CatalogImplication::None);
134 entry.absorb(update.clone());
135 }
136 ParsedStateUpdateKind::ClusterReplica {
137 durable_cluster_replica,
138 parsed_cluster_replica: _,
139 } => {
140 let entry = cluster_replica_commands
141 .entry((
142 durable_cluster_replica.cluster_id,
143 durable_cluster_replica.replica_id,
144 ))
145 .or_insert_with(|| CatalogImplication::None);
146 entry.absorb(update.clone());
147 }
148 ParsedStateUpdateKind::IntrospectionSourceIndex {
149 cluster_id,
150 log,
151 index_id,
152 } => {
153 if update.diff == StateDiff::Addition {
154 introspection_source_indexes
155 .entry(*cluster_id)
156 .or_default()
157 .insert(log.clone(), *index_id);
158 }
159 }
162 }
163 }
164
165 self.apply_catalog_implications_inner(
166 ctx,
167 catalog_implications.into_iter().collect_vec(),
168 cluster_commands.into_iter().collect_vec(),
169 cluster_replica_commands.into_iter().collect_vec(),
170 introspection_source_indexes,
171 )
172 .await?;
173
174 self.metrics
175 .apply_catalog_implications_seconds
176 .observe(start.elapsed().as_secs_f64());
177
178 Ok(())
179 }
180
181 #[instrument(level = "debug")]
182 async fn apply_catalog_implications_inner(
183 &mut self,
184 ctx: Option<&mut ExecuteContext>,
185 implications: Vec<(CatalogItemId, CatalogImplication)>,
186 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
187 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
188 mut introspection_source_indexes: BTreeMap<ClusterId, BTreeMap<LogVariant, GlobalId>>,
189 ) -> Result<(), AdapterError> {
190 let mut tables_to_drop = BTreeSet::new();
191 let mut sources_to_drop = vec![];
192 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
193 let mut storage_sink_gids_to_drop = vec![];
194 let mut indexes_to_drop = vec![];
195 let mut compute_sinks_to_drop = vec![];
196 let mut view_gids_to_drop = vec![];
197 let mut secrets_to_drop = vec![];
198 let mut vpc_endpoints_to_drop = vec![];
199 let mut clusters_to_drop = vec![];
200 let mut cluster_replicas_to_drop = vec![];
201 let mut active_compute_sinks_to_drop = BTreeMap::new();
202 let mut peeks_to_drop = vec![];
203 let mut copies_to_drop = vec![];
204
205 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
207 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
208
209 let mut table_collections_to_create = BTreeMap::new();
212 let mut source_collections_to_create = BTreeMap::new();
213 let mut storage_policies_to_initialize = BTreeMap::new();
214 let mut execution_timestamps_to_set = BTreeSet::new();
215 let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
216
217 let mut source_gids_to_keep = BTreeSet::new();
220
221 let mut source_connections_to_alter: BTreeMap<
223 GlobalId,
224 GenericSourceConnection<InlinedConnection>,
225 > = BTreeMap::new();
226 let mut sink_connections_to_alter: BTreeMap<GlobalId, StorageSinkConnection> =
227 BTreeMap::new();
228 let mut source_export_data_configs_to_alter: BTreeMap<GlobalId, SourceExportDataConfig> =
229 BTreeMap::new();
230 let mut source_descs_to_alter: BTreeMap<GlobalId, SourceDesc> = BTreeMap::new();
231
232 for (catalog_id, implication) in implications {
239 tracing::trace!(?implication, "have to apply catalog implication");
240
241 match implication {
242 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
243 self.handle_create_table(
244 &ctx,
245 &mut table_collections_to_create,
246 &mut storage_policies_to_initialize,
247 &mut execution_timestamps_to_set,
248 catalog_id,
249 table.clone(),
250 )
251 .await?
252 }
253 CatalogImplication::Table(CatalogImplicationKind::Altered {
254 prev: prev_table,
255 new: new_table,
256 }) => {
257 self.handle_alter_table(catalog_id, prev_table, new_table)
258 .await?
259 }
260
261 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
262 let global_ids = table.global_ids();
263 for global_id in global_ids {
264 tables_to_drop.insert((catalog_id, global_id));
265 dropped_item_names.insert(global_id, full_name.clone());
266 }
267 }
268 CatalogImplication::Source(CatalogImplicationKind::Added((
269 source,
270 _connection,
271 ))) => {
272 let compaction_windows = self
277 .catalog()
278 .state()
279 .source_compaction_windows(vec![catalog_id]);
280
281 self.handle_create_source(
282 &mut source_collections_to_create,
283 &mut storage_policies_to_initialize,
284 catalog_id,
285 source,
286 compaction_windows,
287 )
288 .await?
289 }
290 CatalogImplication::Source(CatalogImplicationKind::Altered {
291 prev: (prev_source, _prev_connection),
292 new: (new_source, new_connection),
293 }) => {
294 if prev_source.custom_logical_compaction_window
295 != new_source.custom_logical_compaction_window
296 {
297 let new_window = new_source
298 .custom_logical_compaction_window
299 .unwrap_or(CompactionWindow::Default);
300 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
301 }
302 match (&prev_source.data_source, &new_source.data_source) {
303 (
304 DataSourceDesc::Ingestion {
305 desc: prev_desc, ..
306 }
307 | DataSourceDesc::OldSyntaxIngestion {
308 desc: prev_desc, ..
309 },
310 DataSourceDesc::Ingestion { desc: new_desc, .. }
311 | DataSourceDesc::OldSyntaxIngestion { desc: new_desc, .. },
312 ) => {
313 if prev_desc != new_desc {
314 let inlined_connection = new_connection
315 .expect("ingestion source should have inlined connection");
316 let inlined_desc = SourceDesc {
317 connection: inlined_connection,
318 timestamp_interval: new_desc.timestamp_interval,
319 };
320 source_descs_to_alter.insert(new_source.global_id, inlined_desc);
321 }
322 }
323 _ => {}
324 }
325 }
326 CatalogImplication::Source(CatalogImplicationKind::Dropped(
327 (source, connection),
328 full_name,
329 )) => {
330 let global_id = source.global_id();
331 sources_to_drop.push((catalog_id, global_id));
332 dropped_item_names.insert(global_id, full_name);
333
334 if let DataSourceDesc::Ingestion { desc, .. }
335 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
336 {
337 match &desc.connection {
338 GenericSourceConnection::Postgres(_referenced_conn) => {
339 let inline_conn = connection.expect("missing inlined connection");
340
341 let pg_conn = match inline_conn {
342 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
343 other => {
344 panic!("expected postgres connection, got: {:?}", other)
345 }
346 };
347 let pending_drop = (
348 pg_conn.connection.clone(),
349 pg_conn.publication_details.slot.clone(),
350 );
351 replication_slots_to_drop.push(pending_drop);
352 }
353 _ => {}
354 }
355 }
356 }
357 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
358 tracing::debug!(?sink, "not handling AddSink in here yet");
359 }
360 CatalogImplication::Sink(CatalogImplicationKind::Altered {
361 prev: prev_sink,
362 new: new_sink,
363 }) => {
364 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
365 }
366 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
367 storage_sink_gids_to_drop.push(sink.global_id());
368 dropped_item_names.insert(sink.global_id(), full_name);
369 }
370 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
371 tracing::debug!(?index, "not handling AddIndex in here yet");
372 }
373 CatalogImplication::Index(CatalogImplicationKind::Altered {
374 prev: prev_index,
375 new: new_index,
376 }) => {
377 if prev_index.custom_logical_compaction_window
378 != new_index.custom_logical_compaction_window
379 {
380 let new_window = new_index
381 .custom_logical_compaction_window
382 .unwrap_or(CompactionWindow::Default);
383 self.update_compute_read_policy(
384 new_index.cluster_id,
385 catalog_id,
386 new_window.into(),
387 );
388 }
389 }
390 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
391 indexes_to_drop.push((index.cluster_id, index.global_id()));
392 dropped_item_names.insert(index.global_id(), full_name);
393 }
394 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
395 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
396 }
397 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
398 prev: prev_mv,
399 new: new_mv,
400 }) => {
401 if prev_mv.collections != new_mv.collections {
420 assert_eq!(
423 prev_mv.global_id_writes(),
424 new_mv.global_id_writes(),
425 "unexpected MV Altered implication: prev={prev_mv:?}, new={new_mv:?}",
426 );
427
428 let gid = new_mv.global_id_writes();
429 self.allow_writes(new_mv.cluster_id, gid);
430
431 source_gids_to_keep.extend(new_mv.global_ids());
436 } else if prev_mv.custom_logical_compaction_window
437 != new_mv.custom_logical_compaction_window
438 {
439 let new_window = new_mv
440 .custom_logical_compaction_window
441 .unwrap_or(CompactionWindow::Default);
442 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
443 }
444 }
445 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
446 mv,
447 full_name,
448 )) => {
449 compute_sinks_to_drop.push((mv.cluster_id, mv.global_id_writes()));
450 for gid in mv.global_ids() {
451 sources_to_drop.push((catalog_id, gid));
452 dropped_item_names.insert(gid, full_name.clone());
453 }
454 }
455 CatalogImplication::View(CatalogImplicationKind::Added(_view)) => {
456 }
459 CatalogImplication::View(CatalogImplicationKind::Altered {
460 prev: _prev_view,
461 new: _new_view,
462 }) => {
463 }
466 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
467 view_gids_to_drop.push(view.global_id());
468 dropped_item_names.insert(view.global_id(), full_name);
469 }
470 CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
471 tracing::debug!(?ct, "not handling AddContinualTask in here yet");
472 }
473 CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
474 prev: _prev_ct,
475 new: _new_ct,
476 }) => {
477 }
481 CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
482 ct,
483 full_name,
484 )) => {
485 compute_sinks_to_drop.push((ct.cluster_id, ct.global_id()));
486 sources_to_drop.push((catalog_id, ct.global_id()));
487 dropped_item_names.insert(ct.global_id(), full_name);
488 }
489 CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => {
490 }
494 CatalogImplication::Secret(CatalogImplicationKind::Altered {
495 prev: _prev_secret,
496 new: _new_secret,
497 }) => {
498 }
501 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
502 _secret,
503 _full_name,
504 )) => {
505 secrets_to_drop.push(catalog_id);
506 }
507 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
508 match &connection.details {
509 ConnectionDetails::Ssh { .. } => {}
512 ConnectionDetails::AwsPrivatelink(privatelink) => {
514 let spec = VpcEndpointConfig {
515 aws_service_name: privatelink.service_name.to_owned(),
516 availability_zone_ids: privatelink.availability_zones.to_owned(),
517 };
518 vpc_endpoints_to_create.push((catalog_id, spec));
519 }
520 _ => {}
522 }
523 }
524 CatalogImplication::Connection(CatalogImplicationKind::Altered {
525 prev: _prev_connection,
526 new: new_connection,
527 }) => {
528 self.handle_alter_connection(
529 catalog_id,
530 new_connection,
531 &mut vpc_endpoints_to_create,
532 &mut source_connections_to_alter,
533 &mut sink_connections_to_alter,
534 &mut source_export_data_configs_to_alter,
535 );
536 }
537 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
538 connection,
539 _full_name,
540 )) => {
541 match &connection.details {
542 ConnectionDetails::Ssh { .. } => {
544 secrets_to_drop.push(catalog_id);
545 }
546 ConnectionDetails::AwsPrivatelink(_) => {
549 vpc_endpoints_to_drop.push(catalog_id);
550 }
551 _ => (),
552 }
553 }
554 CatalogImplication::None => {
555 }
557 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
558 unreachable!("clusters and cluster replicas are handled below")
559 }
560 CatalogImplication::Table(CatalogImplicationKind::None)
561 | CatalogImplication::Source(CatalogImplicationKind::None)
562 | CatalogImplication::Sink(CatalogImplicationKind::None)
563 | CatalogImplication::Index(CatalogImplicationKind::None)
564 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
565 | CatalogImplication::View(CatalogImplicationKind::None)
566 | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
567 | CatalogImplication::Secret(CatalogImplicationKind::None)
568 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
569 unreachable!("will never leave None in place");
570 }
571 }
572 }
573
574 for (cluster_id, command) in cluster_commands {
575 tracing::trace!(?command, "have cluster command to apply!");
576
577 match command {
578 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
579 let arranged_logs = introspection_source_indexes
584 .remove(&cluster_id)
585 .unwrap_or_default();
586 let introspection_source_ids: Vec<_> =
587 arranged_logs.values().copied().collect();
588
589 self.controller
590 .create_cluster(
591 cluster_id,
592 mz_controller::clusters::ClusterConfig {
593 arranged_logs,
594 workload_class: cluster.config.workload_class.clone(),
595 },
596 )
597 .expect("creating cluster must not fail");
598
599 if !introspection_source_ids.is_empty() {
600 self.initialize_compute_read_policies(
601 introspection_source_ids,
602 cluster_id,
603 CompactionWindow::Default,
604 )
605 .await;
606 }
607 }
608 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
609 prev: prev_cluster,
610 new: new_cluster,
611 }) => {
612 if prev_cluster.config.workload_class != new_cluster.config.workload_class {
618 self.controller.update_cluster_workload_class(
619 cluster_id,
620 new_cluster.config.workload_class.clone(),
621 );
622 }
623 }
624 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
625 cluster,
626 _full_name,
627 )) => {
628 clusters_to_drop.push(cluster_id);
629 dropped_cluster_names.insert(cluster_id, cluster.name);
630 }
631 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
632 unreachable!("will never leave None in place");
633 }
634 command => {
635 unreachable!(
636 "we only handle cluster commands in this map, got: {:?}",
637 command
638 );
639 }
640 }
641 }
642
643 for ((cluster_id, replica_id), command) in cluster_replica_commands {
644 tracing::trace!(?command, "have cluster replica command to apply!");
645
646 match command {
647 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
648 let cluster = self.catalog().get_cluster(cluster_id);
656 let cluster_name = cluster.name.clone();
657 let cluster_role = cluster.role();
658 let replica_name = format!("{}.{}", cluster_name, replica.name);
659 self.handle_create_cluster_replica(
660 cluster_id,
661 replica_id,
662 cluster_role,
663 cluster_name,
664 replica_name,
665 replica.config.clone(),
666 )
667 .await;
668 }
669 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
670 prev: _prev_replica,
671 new: _new_replica,
672 }) => {
673 }
677 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
678 _replica,
679 _full_name,
680 )) => {
681 cluster_replicas_to_drop.push((cluster_id, replica_id));
682 }
683 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
684 unreachable!("will never leave None in place");
685 }
686 command => {
687 unreachable!(
688 "we only handle cluster replica commands in this map, got: {:?}",
689 command
690 );
691 }
692 }
693 }
694
695 if !source_collections_to_create.is_empty() {
696 self.create_source_collections(source_collections_to_create)
697 .await?;
698 }
699
700 if !table_collections_to_create.is_empty() {
703 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
704 .await?;
705 }
706 self.initialize_storage_collections(storage_policies_to_initialize)
712 .await?;
713
714 if !vpc_endpoints_to_create.is_empty() {
716 if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
717 for (connection_id, spec) in vpc_endpoints_to_create {
718 if let Err(err) = cloud_resource_controller
719 .ensure_vpc_endpoint(connection_id, spec)
720 .await
721 {
722 tracing::error!(?err, "failed to ensure vpc endpoint!");
723 }
724 }
725 } else {
726 tracing::error!(
727 "AWS PrivateLink connections unsupported without cloud_resource_controller"
728 );
729 }
730 }
731
732 if !source_connections_to_alter.is_empty() {
734 self.controller
735 .storage
736 .alter_ingestion_connections(source_connections_to_alter)
737 .await
738 .unwrap_or_terminate("cannot fail to alter ingestion connections");
739 }
740
741 if !sink_connections_to_alter.is_empty() {
742 self.controller
743 .storage
744 .alter_export_connections(sink_connections_to_alter)
745 .await
746 .unwrap_or_terminate("altering export connections after txn must succeed");
747 }
748
749 if !source_export_data_configs_to_alter.is_empty() {
750 self.controller
751 .storage
752 .alter_ingestion_export_data_configs(source_export_data_configs_to_alter)
753 .await
754 .unwrap_or_terminate("altering source export data configs after txn must succeed");
755 }
756
757 if !source_descs_to_alter.is_empty() {
758 self.controller
759 .storage
760 .alter_ingestion_source_desc(source_descs_to_alter)
761 .await
762 .unwrap_or_terminate("cannot fail to alter ingestion source desc");
763 }
764
765 sources_to_drop.retain(|(_, gid)| !source_gids_to_keep.contains(gid));
767
768 let readable_collections_to_drop: BTreeSet<_> = sources_to_drop
769 .iter()
770 .map(|(_, gid)| *gid)
771 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
772 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
773 .chain(view_gids_to_drop.iter().copied())
774 .collect();
775
776 for (sink_id, sink) in &self.active_compute_sinks {
779 let cluster_id = sink.cluster_id();
780 if let Some(id) = sink
781 .depends_on()
782 .iter()
783 .find(|id| readable_collections_to_drop.contains(id))
784 {
785 let name = dropped_item_names
786 .get(id)
787 .map(|n| format!("relation {}", n.quoted()))
788 .expect("missing relation name");
789 active_compute_sinks_to_drop.insert(
790 *sink_id,
791 ActiveComputeSinkRetireReason::DependencyDropped(name),
792 );
793 } else if clusters_to_drop.contains(&cluster_id) {
794 let name = dropped_cluster_names
795 .get(&cluster_id)
796 .map(|n| format!("cluster {}", n.quoted()))
797 .expect("missing cluster name");
798 active_compute_sinks_to_drop.insert(
799 *sink_id,
800 ActiveComputeSinkRetireReason::DependencyDropped(name),
801 );
802 }
803 }
804
805 for (uuid, pending_peek) in &self.pending_peeks {
807 if let Some(id) = pending_peek
808 .depends_on
809 .iter()
810 .find(|id| readable_collections_to_drop.contains(id))
811 {
812 let name = dropped_item_names
813 .get(id)
814 .map(|n| format!("relation {}", n.quoted()))
815 .expect("missing relation name");
816 peeks_to_drop.push((name, uuid.clone()));
817 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
818 let name = dropped_cluster_names
819 .get(&pending_peek.cluster_id)
820 .map(|n| format!("cluster {}", n.quoted()))
821 .expect("missing cluster name");
822 peeks_to_drop.push((name, uuid.clone()));
823 }
824 }
825
826 for (conn_id, pending_copy) in &self.active_copies {
828 let dropping_table = tables_to_drop
829 .iter()
830 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
831 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
832
833 if dropping_table || dropping_cluster {
834 copies_to_drop.push(conn_id.clone());
835 }
836 }
837
838 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
839 .iter()
840 .map(|(_id, gid)| gid)
841 .chain(storage_sink_gids_to_drop.iter())
842 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
843 .copied()
844 .collect();
845 let compute_gids_to_drop: Vec<_> = indexes_to_drop
846 .iter()
847 .chain(compute_sinks_to_drop.iter())
848 .copied()
849 .collect();
850
851 let mut timeline_id_bundles = BTreeMap::new();
857
858 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
859 let mut id_bundle = CollectionIdBundle::default();
860
861 for storage_id in read_holds.storage_ids() {
862 if storage_gids_to_drop.contains(&storage_id) {
863 id_bundle.storage_ids.insert(storage_id);
864 }
865 }
866
867 for (instance_id, id) in read_holds.compute_ids() {
868 if compute_gids_to_drop.contains(&(instance_id, id))
869 || clusters_to_drop.contains(&instance_id)
870 {
871 id_bundle
872 .compute_ids
873 .entry(instance_id)
874 .or_default()
875 .insert(id);
876 }
877 }
878
879 timeline_id_bundles.insert(timeline.clone(), id_bundle);
880 }
881
882 let mut timeline_associations = BTreeMap::new();
883 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
884 let TimelineState { read_holds, .. } = self
885 .global_timelines
886 .get(&timeline)
887 .expect("all timelines have a timestamp oracle");
888
889 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
890 timeline_associations.insert(timeline, (empty, id_bundle));
891 }
892
893 let _: () = async {
896 if !timeline_associations.is_empty() {
897 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
898 let became_empty =
899 self.remove_resources_associated_with_timeline(timeline, id_bundle);
900 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
901 }
902 }
903
904 if !tables_to_drop.is_empty() {
911 let ts = self.get_local_write_ts().await;
912 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
913 }
914
915 if !sources_to_drop.is_empty() {
916 self.drop_sources(sources_to_drop);
917 }
918
919 if !storage_sink_gids_to_drop.is_empty() {
920 self.drop_storage_sinks(storage_sink_gids_to_drop);
921 }
922
923 if !active_compute_sinks_to_drop.is_empty() {
924 self.retire_compute_sinks(active_compute_sinks_to_drop)
925 .await;
926 }
927
928 if !peeks_to_drop.is_empty() {
929 for (dropped_name, uuid) in peeks_to_drop {
930 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
931 let cancel_reason = PeekResponse::Error(format!(
932 "query could not complete because {dropped_name} was dropped"
933 ));
934 self.controller
935 .compute
936 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
937 .unwrap_or_terminate("unable to cancel peek");
938 self.retire_execution(
939 StatementEndedExecutionReason::Canceled,
940 pending_peek.ctx_extra.defuse(),
941 );
942 }
943 }
944 }
945
946 if !copies_to_drop.is_empty() {
947 for conn_id in copies_to_drop {
948 self.cancel_pending_copy(&conn_id);
949 }
950 }
951
952 if !compute_gids_to_drop.is_empty() {
953 self.drop_compute_collections(compute_gids_to_drop);
954 }
955
956 if !vpc_endpoints_to_drop.is_empty() {
957 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
958 }
959
960 if !cluster_replicas_to_drop.is_empty() {
961 fail::fail_point!("after_catalog_drop_replica");
962
963 for (cluster_id, replica_id) in cluster_replicas_to_drop {
964 self.drop_replica(cluster_id, replica_id);
965 }
966 }
967 if !clusters_to_drop.is_empty() {
968 for cluster_id in clusters_to_drop {
969 self.controller.drop_cluster(cluster_id);
970 }
971 }
972
973 task::spawn(|| "drop_replication_slots_and_secrets", {
981 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
982 let caching_secrets_reader = self.caching_secrets_reader.clone();
983 let secrets_controller = Arc::clone(&self.secrets_controller);
984 let secrets_reader = Arc::clone(self.secrets_reader());
985 let storage_config = self.controller.storage.config().clone();
986
987 async move {
988 for (connection, replication_slot_name) in replication_slots_to_drop {
989 tracing::info!(?replication_slot_name, "dropping replication slot");
990
991 let result: Result<(), anyhow::Error> = Retry::default()
997 .max_duration(Duration::from_secs(60))
998 .retry_async(|_state| async {
999 let config = connection
1000 .config(&secrets_reader, &storage_config, InTask::No)
1001 .await
1002 .map_err(|e| {
1003 anyhow::anyhow!(
1004 "error creating Postgres client for \
1005 dropping acquired slots: {}",
1006 e.display_with_causes()
1007 )
1008 })?;
1009
1010 mz_postgres_util::drop_replication_slots(
1011 &ssh_tunnel_manager,
1012 config.clone(),
1013 &[(&replication_slot_name, true)],
1014 )
1015 .await?;
1016
1017 Ok(())
1018 })
1019 .await;
1020
1021 if let Err(err) = result {
1022 tracing::warn!(
1023 ?replication_slot_name,
1024 ?err,
1025 "failed to drop replication slot"
1026 );
1027 }
1028 }
1029
1030 fail_point!("drop_secrets");
1038 for secret in secrets_to_drop {
1039 if let Err(e) = secrets_controller.delete(secret).await {
1040 warn!("Dropping secrets has encountered an error: {}", e);
1041 } else {
1042 caching_secrets_reader.invalidate(secret);
1043 }
1044 }
1045 }
1046 });
1047 }
1048 .instrument(info_span!(
1049 "coord::apply_catalog_implications_inner::finalize"
1050 ))
1051 .await;
1052
1053 Ok(())
1054 }
1055
1056 #[instrument(level = "debug")]
1057 async fn create_table_collections(
1058 &mut self,
1059 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1060 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
1061 ) -> Result<(), AdapterError> {
1062 let write_ts = self.get_local_write_ts().await;
1064 let register_ts = write_ts.timestamp;
1065
1066 self.catalog
1076 .advance_upper(write_ts.advance_to)
1077 .await
1078 .unwrap_or_terminate("unable to advance catalog upper");
1079
1080 for id in execution_timestamps_to_set {
1081 self.set_statement_execution_timestamp(id, register_ts);
1082 }
1083
1084 let storage_metadata = self.catalog.state().storage_metadata();
1085
1086 self.controller
1087 .storage
1088 .create_collections(
1089 storage_metadata,
1090 Some(register_ts),
1091 table_collections_to_create.into_iter().collect_vec(),
1092 )
1093 .await
1094 .unwrap_or_terminate("cannot fail to create collections");
1095
1096 self.apply_local_write(register_ts).await;
1097
1098 Ok(())
1099 }
1100
1101 #[instrument(level = "debug")]
1102 async fn create_source_collections(
1103 &mut self,
1104 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1105 ) -> Result<(), AdapterError> {
1106 let storage_metadata = self.catalog.state().storage_metadata();
1107
1108 self.controller
1109 .storage
1110 .create_collections(
1111 storage_metadata,
1112 None, source_collections_to_create.into_iter().collect_vec(),
1114 )
1115 .await
1116 .unwrap_or_terminate("cannot fail to create collections");
1117
1118 Ok(())
1119 }
1120
1121 #[instrument(level = "debug")]
1122 async fn initialize_storage_collections(
1123 &mut self,
1124 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1125 ) -> Result<(), AdapterError> {
1126 for (compaction_window, global_ids) in storage_policies_to_initialize {
1127 self.initialize_read_policies(
1128 &CollectionIdBundle {
1129 storage_ids: global_ids,
1130 compute_ids: BTreeMap::new(),
1131 },
1132 compaction_window,
1133 )
1134 .await;
1135 }
1136
1137 Ok(())
1138 }
1139
1140 #[instrument(level = "debug")]
1141 async fn handle_create_table(
1142 &self,
1143 ctx: &Option<&mut ExecuteContext>,
1144 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1145 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1146 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
1147 table_id: CatalogItemId,
1148 table: Table,
1149 ) -> Result<(), AdapterError> {
1150 match &table.data_source {
1154 TableDataSource::TableWrites { defaults: _ } => {
1155 let versions: BTreeMap<_, _> = table
1156 .collection_descs()
1157 .map(|(gid, version, desc)| (version, (gid, desc)))
1158 .collect();
1159 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
1160 let collection_desc = CollectionDescription::for_table(desc.clone());
1161
1162 (*gid, collection_desc)
1163 });
1164
1165 let compaction_window = table
1166 .custom_logical_compaction_window
1167 .unwrap_or(CompactionWindow::Default);
1168 let ids_to_initialize = storage_policies_to_initialize
1169 .entry(compaction_window)
1170 .or_default();
1171
1172 for (gid, collection_desc) in collection_descs {
1173 storage_collections_to_create.insert(gid, collection_desc);
1174 ids_to_initialize.insert(gid);
1175 }
1176
1177 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1178 execution_timestamps_to_set.insert(id);
1179 }
1180 }
1181 TableDataSource::DataSource {
1182 desc: data_source_desc,
1183 timeline,
1184 } => {
1185 match data_source_desc {
1186 DataSourceDesc::IngestionExport {
1187 ingestion_id,
1188 external_reference: _,
1189 details,
1190 data_config,
1191 } => {
1192 let global_ingestion_id =
1193 self.catalog().get_entry(ingestion_id).latest_global_id();
1194
1195 let collection_desc = CollectionDescription::<Timestamp> {
1196 desc: table.desc.latest(),
1197 data_source: DataSource::IngestionExport {
1198 ingestion_id: global_ingestion_id,
1199 details: details.clone(),
1200 data_config: data_config
1201 .clone()
1202 .into_inline_connection(self.catalog.state()),
1203 },
1204 since: None,
1205 timeline: Some(timeline.clone()),
1206 primary: None,
1207 };
1208
1209 let global_id = table
1210 .global_ids()
1211 .expect_element(|| "subsources cannot have multiple versions");
1212
1213 storage_collections_to_create.insert(global_id, collection_desc);
1214
1215 let read_policies = self
1216 .catalog()
1217 .state()
1218 .source_compaction_windows(vec![table_id]);
1219 for (compaction_window, catalog_ids) in read_policies {
1220 let compaction_ids = storage_policies_to_initialize
1221 .entry(compaction_window)
1222 .or_default();
1223
1224 let gids = catalog_ids
1225 .into_iter()
1226 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1227 .flatten();
1228 compaction_ids.extend(gids);
1229 }
1230 }
1231 DataSourceDesc::Webhook {
1232 validate_using: _,
1233 body_format: _,
1234 headers: _,
1235 cluster_id: _,
1236 } => {
1237 assert_eq!(
1239 table.desc.latest_version(),
1240 RelationVersion::root(),
1241 "found webhook with more than 1 relation version, {:?}",
1242 table.desc
1243 );
1244 let desc = table.desc.latest();
1245
1246 let collection_desc = CollectionDescription::<Timestamp> {
1247 desc,
1248 data_source: DataSource::Webhook,
1249 since: None,
1250 timeline: Some(timeline.clone()),
1251 primary: None,
1252 };
1253
1254 let global_id = table
1255 .global_ids()
1256 .expect_element(|| "webhooks cannot have multiple versions");
1257
1258 storage_collections_to_create.insert(global_id, collection_desc);
1259
1260 let read_policies = self
1261 .catalog()
1262 .state()
1263 .source_compaction_windows(vec![table_id]);
1264
1265 for (compaction_window, catalog_ids) in read_policies {
1266 let compaction_ids = storage_policies_to_initialize
1267 .entry(compaction_window)
1268 .or_default();
1269
1270 let gids = catalog_ids
1271 .into_iter()
1272 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1273 .flatten();
1274 compaction_ids.extend(gids);
1275 }
1276 }
1277 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1278 }
1279 }
1280 }
1281
1282 Ok(())
1283 }
1284
1285 #[instrument(level = "debug")]
1286 async fn handle_alter_table(
1287 &mut self,
1288 catalog_id: CatalogItemId,
1289 prev_table: Table,
1290 new_table: Table,
1291 ) -> Result<(), AdapterError> {
1292 let existing_gid = prev_table.global_id_writes();
1293 let new_gid = new_table.global_id_writes();
1294
1295 if existing_gid == new_gid {
1296 if prev_table.custom_logical_compaction_window
1299 != new_table.custom_logical_compaction_window
1300 {
1301 let new_window = new_table
1302 .custom_logical_compaction_window
1303 .unwrap_or(CompactionWindow::Default);
1304 self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1305 }
1306 return Ok(());
1307 }
1308
1309 let existing_table = crate::CollectionIdBundle {
1313 storage_ids: BTreeSet::from([existing_gid]),
1314 compute_ids: BTreeMap::new(),
1315 };
1316 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1317
1318 let expected_version = prev_table.desc.latest_version();
1319 let new_version = new_table.desc.latest_version();
1320 let new_desc = new_table
1321 .desc
1322 .at_version(RelationVersionSelector::Specific(new_version));
1323
1324 let write_ts = self.get_local_write_ts().await;
1325 let register_ts = write_ts.timestamp;
1326
1327 self.catalog
1330 .advance_upper(write_ts.advance_to)
1331 .await
1332 .unwrap_or_terminate("unable to advance catalog upper");
1333
1334 self.controller
1336 .storage
1337 .alter_table_desc(
1338 existing_gid,
1339 new_gid,
1340 new_desc,
1341 expected_version,
1342 register_ts,
1343 )
1344 .await
1345 .expect("failed to alter desc of table");
1346
1347 let compaction_window = new_table
1349 .custom_logical_compaction_window
1350 .unwrap_or(CompactionWindow::Default);
1351 self.initialize_read_policies(
1352 &crate::CollectionIdBundle {
1353 storage_ids: BTreeSet::from([new_gid]),
1354 compute_ids: BTreeMap::new(),
1355 },
1356 compaction_window,
1357 )
1358 .await;
1359
1360 self.apply_local_write(register_ts).await;
1361
1362 drop(existing_table_read_hold);
1364
1365 Ok(())
1366 }
1367
1368 #[instrument(level = "debug")]
1369 async fn handle_create_source(
1370 &self,
1371 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1372 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1373 item_id: CatalogItemId,
1374 source: Source,
1375 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1376 ) -> Result<(), AdapterError> {
1377 let data_source = match source.data_source {
1378 DataSourceDesc::Ingestion { desc, cluster_id } => {
1379 let desc = desc.into_inline_connection(self.catalog().state());
1380 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1381
1382 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1383 desc,
1384 cluster_id,
1385 item_global_id,
1386 );
1387
1388 DataSource::Ingestion(ingestion)
1389 }
1390 DataSourceDesc::OldSyntaxIngestion {
1391 desc,
1392 progress_subsource,
1393 data_config,
1394 details,
1395 cluster_id,
1396 } => {
1397 let desc = desc.into_inline_connection(self.catalog().state());
1398 let data_config = data_config.into_inline_connection(self.catalog().state());
1399
1400 let progress_subsource = self
1403 .catalog()
1404 .get_entry(&progress_subsource)
1405 .latest_global_id();
1406
1407 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1408 desc,
1409 cluster_id,
1410 progress_subsource,
1411 );
1412
1413 let legacy_export = SourceExport {
1414 storage_metadata: (),
1415 data_config,
1416 details,
1417 };
1418
1419 ingestion
1420 .source_exports
1421 .insert(source.global_id, legacy_export);
1422
1423 DataSource::Ingestion(ingestion)
1424 }
1425 DataSourceDesc::IngestionExport {
1426 ingestion_id,
1427 external_reference: _,
1428 details,
1429 data_config,
1430 } => {
1431 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1434
1435 DataSource::IngestionExport {
1436 ingestion_id,
1437 details,
1438 data_config: data_config.into_inline_connection(self.catalog().state()),
1439 }
1440 }
1441 DataSourceDesc::Progress => DataSource::Progress,
1442 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
1443 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => {
1444 unreachable!("cannot create sources with internal data sources")
1445 }
1446 };
1447
1448 storage_collections_to_create.insert(
1449 source.global_id,
1450 CollectionDescription::<Timestamp> {
1451 desc: source.desc.clone(),
1452 data_source,
1453 timeline: Some(source.timeline),
1454 since: None,
1455 primary: None,
1456 },
1457 );
1458
1459 for (compaction_window, catalog_ids) in compaction_windows {
1461 let compaction_ids = storage_policies_to_initialize
1462 .entry(compaction_window)
1463 .or_default();
1464
1465 let gids = catalog_ids
1466 .into_iter()
1467 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1468 .flatten();
1469 compaction_ids.extend(gids);
1470 }
1471
1472 Ok(())
1473 }
1474
1475 #[instrument(level = "debug")]
1482 fn handle_alter_connection(
1483 &self,
1484 connection_id: CatalogItemId,
1485 connection: Connection,
1486 vpc_endpoints_to_create: &mut Vec<(CatalogItemId, VpcEndpointConfig)>,
1487 source_connections_to_alter: &mut BTreeMap<
1488 GlobalId,
1489 GenericSourceConnection<InlinedConnection>,
1490 >,
1491 sink_connections_to_alter: &mut BTreeMap<GlobalId, StorageSinkConnection>,
1492 source_export_data_configs_to_alter: &mut BTreeMap<GlobalId, SourceExportDataConfig>,
1493 ) {
1494 use std::collections::VecDeque;
1495
1496 if let ConnectionDetails::AwsPrivatelink(ref privatelink) = connection.details {
1498 let spec = VpcEndpointConfig {
1499 aws_service_name: privatelink.service_name.to_owned(),
1500 availability_zone_ids: privatelink.availability_zones.to_owned(),
1501 };
1502 vpc_endpoints_to_create.push((connection_id, spec));
1503 }
1504
1505 let mut connections_to_process = VecDeque::new();
1509 connections_to_process.push_front(connection_id.clone());
1510
1511 while let Some(id) = connections_to_process.pop_front() {
1512 for dependent_id in self.catalog().get_entry(&id).used_by() {
1513 let dependent_entry = self.catalog().get_entry(dependent_id);
1514 match dependent_entry.item() {
1515 CatalogItem::Connection(_) => {
1516 connections_to_process.push_back(*dependent_id);
1520 }
1521 CatalogItem::Source(source) => {
1522 let desc = match &dependent_entry
1523 .source()
1524 .expect("known to be source")
1525 .data_source
1526 {
1527 DataSourceDesc::Ingestion { desc, .. }
1528 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1529 desc.clone().into_inline_connection(self.catalog().state())
1530 }
1531 DataSourceDesc::IngestionExport { .. }
1532 | DataSourceDesc::Introspection(_)
1533 | DataSourceDesc::Progress
1534 | DataSourceDesc::Webhook { .. }
1535 | DataSourceDesc::Catalog => {
1536 continue;
1538 }
1539 };
1540
1541 source_connections_to_alter.insert(source.global_id, desc.connection);
1542 }
1543 CatalogItem::Sink(sink) => {
1544 let export = dependent_entry.sink().expect("known to be sink");
1545 sink_connections_to_alter.insert(
1546 sink.global_id,
1547 export
1548 .connection
1549 .clone()
1550 .into_inline_connection(self.catalog().state()),
1551 );
1552 }
1553 CatalogItem::Table(table) => {
1554 if let Some((_, _, _, export_data_config)) =
1558 dependent_entry.source_export_details()
1559 {
1560 let data_config = export_data_config.clone();
1561 source_export_data_configs_to_alter.insert(
1562 table.global_id_writes(),
1563 data_config.into_inline_connection(self.catalog().state()),
1564 );
1565 }
1566 }
1567 CatalogItem::Log(_)
1568 | CatalogItem::View(_)
1569 | CatalogItem::MaterializedView(_)
1570 | CatalogItem::Index(_)
1571 | CatalogItem::Type(_)
1572 | CatalogItem::Func(_)
1573 | CatalogItem::Secret(_)
1574 | CatalogItem::ContinualTask(_) => {
1575 }
1578 }
1579 }
1580 }
1581 }
1582
1583 async fn handle_create_cluster_replica(
1584 &mut self,
1585 cluster_id: ClusterId,
1586 replica_id: ReplicaId,
1587 role: ClusterRole,
1588 cluster_name: String,
1589 replica_name: String,
1590 replica_config: ReplicaConfig,
1591 ) {
1592 let enable_worker_core_affinity =
1593 self.catalog().system_config().enable_worker_core_affinity();
1594
1595 self.controller
1596 .create_replica(
1597 cluster_id,
1598 replica_id,
1599 cluster_name,
1600 replica_name,
1601 role,
1602 replica_config,
1603 enable_worker_core_affinity,
1604 )
1605 .expect("creating replicas must not fail");
1606
1607 self.install_introspection_subscribes(cluster_id, replica_id)
1608 .await;
1609 }
1610}
1611
1612#[derive(Debug, Clone)]
1618enum CatalogImplication {
1619 None,
1620 Table(CatalogImplicationKind<Table>),
1621 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1622 Sink(CatalogImplicationKind<Sink>),
1623 Index(CatalogImplicationKind<Index>),
1624 MaterializedView(CatalogImplicationKind<MaterializedView>),
1625 View(CatalogImplicationKind<View>),
1626 ContinualTask(CatalogImplicationKind<ContinualTask>),
1627 Secret(CatalogImplicationKind<Secret>),
1628 Connection(CatalogImplicationKind<Connection>),
1629 Cluster(CatalogImplicationKind<Cluster>),
1630 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1631}
1632
1633#[derive(Debug, Clone)]
1634enum CatalogImplicationKind<T> {
1635 None,
1637 Added(T),
1639 Dropped(T, String),
1641 Altered { prev: T, new: T },
1643}
1644
1645impl<T: Clone> CatalogImplicationKind<T> {
1646 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1649 use CatalogImplicationKind::*;
1650 use StateDiff::*;
1651
1652 let new_state = match (&*self, diff) {
1653 (None, Addition) => Added(item),
1655 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1656
1657 (Added(existing), Retraction) => {
1659 Altered {
1661 prev: item,
1662 new: existing.clone(),
1663 }
1664 }
1665 (Added(_), Addition) => {
1666 return Err("Cannot add an already added object".to_string());
1667 }
1668
1669 (Dropped(existing, _), Addition) => {
1671 Altered {
1673 prev: existing.clone(),
1674 new: item,
1675 }
1676 }
1677 (Dropped(_, _), Retraction) => {
1678 return Err("Cannot drop an already dropped object".to_string());
1679 }
1680
1681 (Altered { .. }, _) => {
1683 return Err(format!(
1684 "Cannot apply {:?} to an object in Altered state",
1685 diff
1686 ));
1687 }
1688 };
1689
1690 *self = new_state;
1691 Ok(())
1692 }
1693}
1694
1695macro_rules! impl_absorb_method {
1697 (
1698 $method_name:ident,
1699 $variant:ident,
1700 $item_type:ty
1701 ) => {
1702 fn $method_name(
1703 &mut self,
1704 item: $item_type,
1705 parsed_full_name: Option<String>,
1706 diff: StateDiff,
1707 ) {
1708 let state = match self {
1709 CatalogImplication::$variant(state) => state,
1710 CatalogImplication::None => {
1711 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1712 match self {
1713 CatalogImplication::$variant(state) => state,
1714 _ => unreachable!(),
1715 }
1716 }
1717 _ => {
1718 panic!(
1719 "Unexpected command type for {:?}: {} {:?}",
1720 self,
1721 stringify!($variant),
1722 diff,
1723 );
1724 }
1725 };
1726
1727 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1728 panic!(
1729 "Invalid state transition for {}: {}",
1730 stringify!($variant),
1731 e
1732 );
1733 }
1734 }
1735 };
1736}
1737
1738impl CatalogImplication {
1739 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1742 match catalog_update.kind {
1743 ParsedStateUpdateKind::Item {
1744 durable_item: _,
1745 parsed_item,
1746 connection,
1747 parsed_full_name,
1748 } => match parsed_item {
1749 CatalogItem::Table(table) => {
1750 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1751 }
1752 CatalogItem::Source(source) => {
1753 self.absorb_source(
1754 (source, connection),
1755 Some(parsed_full_name),
1756 catalog_update.diff,
1757 );
1758 }
1759 CatalogItem::Sink(sink) => {
1760 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1761 }
1762 CatalogItem::Index(index) => {
1763 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1764 }
1765 CatalogItem::MaterializedView(mv) => {
1766 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1767 }
1768 CatalogItem::View(view) => {
1769 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1770 }
1771 CatalogItem::ContinualTask(ct) => {
1772 self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1773 }
1774 CatalogItem::Secret(secret) => {
1775 self.absorb_secret(secret, None, catalog_update.diff);
1776 }
1777 CatalogItem::Connection(connection) => {
1778 self.absorb_connection(connection, None, catalog_update.diff);
1779 }
1780 CatalogItem::Log(_) => {}
1781 CatalogItem::Type(_) => {}
1782 CatalogItem::Func(_) => {}
1783 },
1784 ParsedStateUpdateKind::TemporaryItem {
1785 durable_item: _,
1786 parsed_item,
1787 connection,
1788 parsed_full_name,
1789 } => match parsed_item {
1790 CatalogItem::Table(table) => {
1791 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1792 }
1793 CatalogItem::Source(source) => {
1794 self.absorb_source(
1795 (source, connection),
1796 Some(parsed_full_name),
1797 catalog_update.diff,
1798 );
1799 }
1800 CatalogItem::Sink(sink) => {
1801 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1802 }
1803 CatalogItem::Index(index) => {
1804 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1805 }
1806 CatalogItem::MaterializedView(mv) => {
1807 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1808 }
1809 CatalogItem::View(view) => {
1810 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1811 }
1812 CatalogItem::ContinualTask(ct) => {
1813 self.absorb_continual_task(ct, None, catalog_update.diff);
1814 }
1815 CatalogItem::Secret(secret) => {
1816 self.absorb_secret(secret, None, catalog_update.diff);
1817 }
1818 CatalogItem::Connection(connection) => {
1819 self.absorb_connection(connection, None, catalog_update.diff);
1820 }
1821 CatalogItem::Log(_) => {}
1822 CatalogItem::Type(_) => {}
1823 CatalogItem::Func(_) => {}
1824 },
1825 ParsedStateUpdateKind::Cluster {
1826 durable_cluster: _,
1827 parsed_cluster,
1828 } => {
1829 let name = parsed_cluster.name.clone();
1830 self.absorb_cluster(parsed_cluster, Some(name), catalog_update.diff);
1831 }
1832 ParsedStateUpdateKind::ClusterReplica {
1833 durable_cluster_replica: _,
1834 parsed_cluster_replica,
1835 } => {
1836 let name = parsed_cluster_replica.name.clone();
1837 self.absorb_cluster_replica(
1838 parsed_cluster_replica,
1839 Some(name),
1840 catalog_update.diff,
1841 );
1842 }
1843 ParsedStateUpdateKind::IntrospectionSourceIndex { .. } => {
1844 unreachable!("IntrospectionSourceIndex should not be passed to absorb");
1848 }
1849 }
1850 }
1851
1852 impl_absorb_method!(absorb_table, Table, Table);
1853 impl_absorb_method!(
1854 absorb_source,
1855 Source,
1856 (Source, Option<GenericSourceConnection>)
1857 );
1858 impl_absorb_method!(absorb_sink, Sink, Sink);
1859 impl_absorb_method!(absorb_index, Index, Index);
1860 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1861 impl_absorb_method!(absorb_view, View, View);
1862
1863 impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1864 impl_absorb_method!(absorb_secret, Secret, Secret);
1865 impl_absorb_method!(absorb_connection, Connection, Connection);
1866
1867 impl_absorb_method!(absorb_cluster, Cluster, Cluster);
1868 impl_absorb_method!(absorb_cluster_replica, ClusterReplica, ClusterReplica);
1869}
1870
1871#[cfg(test)]
1872mod tests {
1873 use super::*;
1874 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1875 use mz_sql::names::ResolvedIds;
1876 use std::collections::BTreeMap;
1877
1878 fn create_test_table(name: &str) -> Table {
1879 Table {
1880 desc: VersionedRelationDesc::new(
1881 RelationDesc::builder()
1882 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1883 .finish(),
1884 ),
1885 create_sql: None,
1886 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1887 conn_id: None,
1888 resolved_ids: ResolvedIds::empty(),
1889 custom_logical_compaction_window: None,
1890 is_retained_metrics_object: false,
1891 data_source: TableDataSource::TableWrites { defaults: vec![] },
1892 }
1893 }
1894
1895 #[mz_ore::test]
1896 fn test_item_state_transitions() {
1897 let mut state = CatalogImplicationKind::None;
1899 assert!(
1900 state
1901 .transition("item1".to_string(), None, StateDiff::Addition)
1902 .is_ok()
1903 );
1904 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1905
1906 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1908 assert!(
1909 state
1910 .transition("old_item".to_string(), None, StateDiff::Retraction)
1911 .is_ok()
1912 );
1913 match &state {
1914 CatalogImplicationKind::Altered { prev, new } => {
1915 assert_eq!(prev, "old_item");
1917 assert_eq!(new, "new_item");
1919 }
1920 _ => panic!("Expected Altered state"),
1921 }
1922
1923 let mut state = CatalogImplicationKind::None;
1925 assert!(
1926 state
1927 .transition(
1928 "item1".to_string(),
1929 Some("test_name".to_string()),
1930 StateDiff::Retraction
1931 )
1932 .is_ok()
1933 );
1934 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1935
1936 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1938 assert!(
1939 state
1940 .transition("new_item".to_string(), None, StateDiff::Addition)
1941 .is_ok()
1942 );
1943 match &state {
1944 CatalogImplicationKind::Altered { prev, new } => {
1945 assert_eq!(prev, "old_item");
1947 assert_eq!(new, "new_item");
1949 }
1950 _ => panic!("Expected Altered state"),
1951 }
1952
1953 let mut state = CatalogImplicationKind::Added("item".to_string());
1955 assert!(
1956 state
1957 .transition("item2".to_string(), None, StateDiff::Addition)
1958 .is_err()
1959 );
1960
1961 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1962 assert!(
1963 state
1964 .transition("item2".to_string(), None, StateDiff::Retraction)
1965 .is_err()
1966 );
1967 }
1968
1969 #[mz_ore::test]
1970 fn test_table_absorb_state_machine() {
1971 let table1 = create_test_table("table1");
1972 let table2 = create_test_table("table2");
1973
1974 let mut cmd = CatalogImplication::None;
1976 cmd.absorb_table(
1977 table1.clone(),
1978 Some("schema.table1".to_string()),
1979 StateDiff::Addition,
1980 );
1981 match &cmd {
1983 CatalogImplication::Table(state) => match state {
1984 CatalogImplicationKind::Added(t) => {
1985 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1986 }
1987 _ => panic!("Expected Added state"),
1988 },
1989 _ => panic!("Expected Table command"),
1990 }
1991
1992 cmd.absorb_table(
1996 table2.clone(),
1997 Some("schema.table2".to_string()),
1998 StateDiff::Retraction,
1999 );
2000 match &cmd {
2001 CatalogImplication::Table(state) => match state {
2002 CatalogImplicationKind::Altered { prev, new } => {
2003 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
2005 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
2006 }
2007 _ => panic!("Expected Altered state"),
2008 },
2009 _ => panic!("Expected Table command"),
2010 }
2011
2012 let mut cmd = CatalogImplication::None;
2014 cmd.absorb_table(
2015 table1.clone(),
2016 Some("schema.table1".to_string()),
2017 StateDiff::Retraction,
2018 );
2019 match &cmd {
2020 CatalogImplication::Table(state) => match state {
2021 CatalogImplicationKind::Dropped(t, name) => {
2022 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
2023 assert_eq!(name, "schema.table1");
2024 }
2025 _ => panic!("Expected Dropped state"),
2026 },
2027 _ => panic!("Expected Table command"),
2028 }
2029
2030 cmd.absorb_table(
2032 table2.clone(),
2033 Some("schema.table2".to_string()),
2034 StateDiff::Addition,
2035 );
2036 match &cmd {
2037 CatalogImplication::Table(state) => match state {
2038 CatalogImplicationKind::Altered { prev, new } => {
2039 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
2041 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
2042 }
2043 _ => panic!("Expected Altered state"),
2044 },
2045 _ => panic!("Expected Table command"),
2046 }
2047 }
2048
2049 #[mz_ore::test]
2050 #[should_panic(expected = "Cannot add an already added object")]
2051 fn test_invalid_double_add() {
2052 let table = create_test_table("table");
2053 let mut cmd = CatalogImplication::None;
2054
2055 cmd.absorb_table(
2057 table.clone(),
2058 Some("schema.table".to_string()),
2059 StateDiff::Addition,
2060 );
2061
2062 cmd.absorb_table(
2064 table.clone(),
2065 Some("schema.table".to_string()),
2066 StateDiff::Addition,
2067 );
2068 }
2069
2070 #[mz_ore::test]
2071 #[should_panic(expected = "Cannot drop an already dropped object")]
2072 fn test_invalid_double_drop() {
2073 let table = create_test_table("table");
2074 let mut cmd = CatalogImplication::None;
2075
2076 cmd.absorb_table(
2078 table.clone(),
2079 Some("schema.table".to_string()),
2080 StateDiff::Retraction,
2081 );
2082
2083 cmd.absorb_table(
2085 table.clone(),
2086 Some("schema.table".to_string()),
2087 StateDiff::Retraction,
2088 );
2089 }
2090}