1use std::collections::{BTreeMap, BTreeSet};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use fail::fail_point;
34use itertools::Itertools;
35use mz_adapter_types::compaction::CompactionWindow;
36use mz_catalog::builtin;
37use mz_catalog::memory::objects::{
38 CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
39 MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
40};
41use mz_compute_client::protocol::response::PeekResponse;
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_ore::collections::CollectionExt;
44use mz_ore::error::ErrorExt;
45use mz_ore::future::InTask;
46use mz_ore::instrument;
47use mz_ore::retry::Retry;
48use mz_ore::str::StrExt;
49use mz_ore::task;
50use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
51use mz_sql::plan::ConnectionDetails;
52use mz_storage_client::controller::{CollectionDescription, DataSource};
53use mz_storage_types::connections::PostgresConnection;
54use mz_storage_types::connections::inline::IntoInlineConnection;
55use mz_storage_types::sources::{GenericSourceConnection, SourceExport};
56use tracing::{Instrument, info_span, warn};
57
58use crate::active_compute_sink::ActiveComputeSinkRetireReason;
59use crate::coord::Coordinator;
60use crate::coord::catalog_implications::parsed_state_updates::{
61 ParsedStateUpdate, ParsedStateUpdateKind,
62};
63use crate::coord::statement_logging::StatementLoggingId;
64use crate::coord::timeline::TimelineState;
65use crate::statement_logging::StatementEndedExecutionReason;
66use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
67
68pub mod parsed_state_updates;
69
70impl Coordinator {
71 #[instrument(level = "debug")]
80 pub async fn apply_catalog_implications(
81 &mut self,
82 ctx: Option<&mut ExecuteContext>,
83 catalog_updates: Vec<ParsedStateUpdate>,
84 ) -> Result<(), AdapterError> {
85 let start = Instant::now();
86
87 let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
88 let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
89 let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
90 BTreeMap::new();
91
92 for update in catalog_updates {
93 tracing::trace!(?update, "got parsed state update");
94 match &update.kind {
95 ParsedStateUpdateKind::Item {
96 durable_item,
97 parsed_item: _,
98 connection: _,
99 parsed_full_name: _,
100 } => {
101 let entry = catalog_implications
102 .entry(durable_item.id.clone())
103 .or_insert_with(|| CatalogImplication::None);
104 entry.absorb(update);
105 }
106 ParsedStateUpdateKind::TemporaryItem {
107 durable_item,
108 parsed_item: _,
109 connection: _,
110 parsed_full_name: _,
111 } => {
112 let entry = catalog_implications
113 .entry(durable_item.id.clone())
114 .or_insert_with(|| CatalogImplication::None);
115 entry.absorb(update);
116 }
117 ParsedStateUpdateKind::Cluster {
118 durable_cluster,
119 parsed_cluster: _,
120 } => {
121 let entry = cluster_commands
122 .entry(durable_cluster.id)
123 .or_insert_with(|| CatalogImplication::None);
124 entry.absorb(update.clone());
125 }
126 ParsedStateUpdateKind::ClusterReplica {
127 durable_cluster_replica,
128 parsed_cluster_replica: _,
129 } => {
130 let entry = cluster_replica_commands
131 .entry((
132 durable_cluster_replica.cluster_id,
133 durable_cluster_replica.replica_id,
134 ))
135 .or_insert_with(|| CatalogImplication::None);
136 entry.absorb(update.clone());
137 }
138 }
139 }
140
141 self.apply_catalog_implications_inner(
142 ctx,
143 catalog_implications.into_iter().collect_vec(),
144 cluster_commands.into_iter().collect_vec(),
145 cluster_replica_commands.into_iter().collect_vec(),
146 )
147 .await?;
148
149 self.metrics
150 .apply_catalog_implications_seconds
151 .observe(start.elapsed().as_secs_f64());
152
153 Ok(())
154 }
155
156 #[instrument(level = "debug")]
157 async fn apply_catalog_implications_inner(
158 &mut self,
159 mut ctx: Option<&mut ExecuteContext>,
160 implications: Vec<(CatalogItemId, CatalogImplication)>,
161 cluster_commands: Vec<(ClusterId, CatalogImplication)>,
162 cluster_replica_commands: Vec<((ClusterId, ReplicaId), CatalogImplication)>,
163 ) -> Result<(), AdapterError> {
164 let mut tables_to_drop = BTreeSet::new();
165 let mut sources_to_drop = vec![];
166 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167 let mut storage_sink_gids_to_drop = vec![];
168 let mut compute_gids_to_drop = vec![];
169 let mut view_gids_to_drop = vec![];
170 let mut secrets_to_drop = vec![];
171 let mut vpc_endpoints_to_drop = vec![];
172 let mut clusters_to_drop = vec![];
173 let mut cluster_replicas_to_drop = vec![];
174 let mut compute_sinks_to_drop = BTreeMap::new();
175 let mut peeks_to_drop = vec![];
176 let mut copies_to_drop = vec![];
177
178 let mut dropped_item_names: BTreeMap<GlobalId, String> = BTreeMap::new();
180 let mut dropped_cluster_names: BTreeMap<ClusterId, String> = BTreeMap::new();
181
182 let mut table_collections_to_create = BTreeMap::new();
185 let mut source_collections_to_create = BTreeMap::new();
186 let mut storage_policies_to_initialize = BTreeMap::new();
187 let mut execution_timestamps_to_set = BTreeSet::new();
188
189 let mut replacement_gids = vec![];
193
194 for (catalog_id, implication) in implications {
201 tracing::trace!(?implication, "have to apply catalog implication");
202
203 match implication {
204 CatalogImplication::Table(CatalogImplicationKind::Added(table)) => {
205 self.handle_create_table(
206 &mut ctx,
207 &mut table_collections_to_create,
208 &mut storage_policies_to_initialize,
209 &mut execution_timestamps_to_set,
210 catalog_id,
211 table.clone(),
212 )
213 .await?
214 }
215 CatalogImplication::Table(CatalogImplicationKind::Altered {
216 prev: prev_table,
217 new: new_table,
218 }) => self.handle_alter_table(prev_table, new_table).await?,
219
220 CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
221 let global_ids = table.global_ids();
222 for global_id in global_ids {
223 tables_to_drop.insert((catalog_id, global_id));
224 dropped_item_names.insert(global_id, full_name.clone());
225 }
226 }
227 CatalogImplication::Source(CatalogImplicationKind::Added((
228 source,
229 _connection,
230 ))) => {
231 let compaction_windows = self
236 .catalog()
237 .state()
238 .source_compaction_windows(vec![catalog_id]);
239
240 self.handle_create_source(
241 &mut source_collections_to_create,
242 &mut storage_policies_to_initialize,
243 catalog_id,
244 source,
245 compaction_windows,
246 )
247 .await?
248 }
249 CatalogImplication::Source(CatalogImplicationKind::Altered {
250 prev: (prev_source, _prev_connection),
251 new: (new_source, _new_connection),
252 }) => {
253 tracing::debug!(
254 ?prev_source,
255 ?new_source,
256 "not handling AlterSource in here yet"
257 );
258 }
259 CatalogImplication::Source(CatalogImplicationKind::Dropped(
260 (source, connection),
261 full_name,
262 )) => {
263 let global_id = source.global_id();
264 sources_to_drop.push((catalog_id, global_id));
265 dropped_item_names.insert(global_id, full_name);
266
267 if let DataSourceDesc::Ingestion { desc, .. }
268 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
269 {
270 match &desc.connection {
271 GenericSourceConnection::Postgres(_referenced_conn) => {
272 let inline_conn = connection.expect("missing inlined connection");
273
274 let pg_conn = match inline_conn {
275 GenericSourceConnection::Postgres(pg_conn) => pg_conn,
276 other => {
277 panic!("expected postgres connection, got: {:?}", other)
278 }
279 };
280 let pending_drop = (
281 pg_conn.connection.clone(),
282 pg_conn.publication_details.slot.clone(),
283 );
284 replication_slots_to_drop.push(pending_drop);
285 }
286 _ => {}
287 }
288 }
289 }
290 CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
291 tracing::debug!(?sink, "not handling AddSink in here yet");
292 }
293 CatalogImplication::Sink(CatalogImplicationKind::Altered {
294 prev: prev_sink,
295 new: new_sink,
296 }) => {
297 tracing::debug!(?prev_sink, ?new_sink, "not handling AlterSink in here yet");
298 }
299 CatalogImplication::Sink(CatalogImplicationKind::Dropped(sink, full_name)) => {
300 storage_sink_gids_to_drop.push(sink.global_id());
301 dropped_item_names.insert(sink.global_id(), full_name);
302 }
303 CatalogImplication::Index(CatalogImplicationKind::Added(index)) => {
304 tracing::debug!(?index, "not handling AddIndex in here yet");
305 }
306 CatalogImplication::Index(CatalogImplicationKind::Altered {
307 prev: prev_index,
308 new: new_index,
309 }) => {
310 tracing::debug!(
311 ?prev_index,
312 ?new_index,
313 "not handling AlterIndex in here yet"
314 );
315 }
316 CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
317 compute_gids_to_drop.push((index.cluster_id, index.global_id()));
318 dropped_item_names.insert(index.global_id(), full_name);
319 }
320 CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
321 tracing::debug!(?mv, "not handling AddMaterializedView in here yet");
322 }
323 CatalogImplication::MaterializedView(CatalogImplicationKind::Altered {
324 prev: prev_mv,
325 new: new_mv,
326 }) => {
327 let old_gid = prev_mv.global_id_writes();
328 let new_gid = new_mv.global_id_writes();
329 if new_gid != old_gid {
330 replacement_gids.push((new_mv.cluster_id, new_gid));
331 }
332
333 self.handle_alter_materialized_view(prev_mv, new_mv)?;
334 }
335 CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
336 mv,
337 full_name,
338 )) => {
339 compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
340 sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
341 dropped_item_names.insert(mv.global_id_writes(), full_name);
342 }
343 CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
344 tracing::debug!(?view, "not handling AddView in here yet");
345 }
346 CatalogImplication::View(CatalogImplicationKind::Altered {
347 prev: prev_view,
348 new: new_view,
349 }) => {
350 tracing::debug!(?prev_view, ?new_view, "not handling AlterView in here yet");
351 }
352 CatalogImplication::View(CatalogImplicationKind::Dropped(view, full_name)) => {
353 view_gids_to_drop.push(view.global_id());
354 dropped_item_names.insert(view.global_id(), full_name);
355 }
356 CatalogImplication::ContinualTask(CatalogImplicationKind::Added(ct)) => {
357 tracing::debug!(?ct, "not handling AddContinualTask in here yet");
358 }
359 CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
360 prev: prev_ct,
361 new: new_ct,
362 }) => {
363 tracing::debug!(
364 ?prev_ct,
365 ?new_ct,
366 "not handling AlterContinualTask in here yet"
367 );
368 }
369 CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
370 ct,
371 _full_name,
372 )) => {
373 compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
374 sources_to_drop.push((catalog_id, ct.global_id()));
375 }
376 CatalogImplication::Secret(CatalogImplicationKind::Added(secret)) => {
377 tracing::debug!(?secret, "not handling AddSecret in here yet");
378 }
379 CatalogImplication::Secret(CatalogImplicationKind::Altered {
380 prev: prev_secret,
381 new: new_secret,
382 }) => {
383 tracing::debug!(
384 ?prev_secret,
385 ?new_secret,
386 "not handling AlterSecret in here yet"
387 );
388 }
389 CatalogImplication::Secret(CatalogImplicationKind::Dropped(
390 _secret,
391 _full_name,
392 )) => {
393 secrets_to_drop.push(catalog_id);
394 }
395 CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
396 tracing::debug!(?connection, "not handling AddConnection in here yet");
397 }
398 CatalogImplication::Connection(CatalogImplicationKind::Altered {
399 prev: prev_connection,
400 new: new_connection,
401 }) => {
402 tracing::debug!(
403 ?prev_connection,
404 ?new_connection,
405 "not handling AlterConnection in here yet"
406 );
407 }
408 CatalogImplication::Connection(CatalogImplicationKind::Dropped(
409 connection,
410 _full_name,
411 )) => {
412 match &connection.details {
413 ConnectionDetails::Ssh { .. } => {
415 secrets_to_drop.push(catalog_id);
416 }
417 ConnectionDetails::AwsPrivatelink(_) => {
420 vpc_endpoints_to_drop.push(catalog_id);
421 }
422 _ => (),
423 }
424 }
425 CatalogImplication::None => {
426 }
428 CatalogImplication::Cluster(_) | CatalogImplication::ClusterReplica(_) => {
429 unreachable!("clusters and cluster replicas are handled below")
430 }
431 CatalogImplication::Table(CatalogImplicationKind::None)
432 | CatalogImplication::Source(CatalogImplicationKind::None)
433 | CatalogImplication::Sink(CatalogImplicationKind::None)
434 | CatalogImplication::Index(CatalogImplicationKind::None)
435 | CatalogImplication::MaterializedView(CatalogImplicationKind::None)
436 | CatalogImplication::View(CatalogImplicationKind::None)
437 | CatalogImplication::ContinualTask(CatalogImplicationKind::None)
438 | CatalogImplication::Secret(CatalogImplicationKind::None)
439 | CatalogImplication::Connection(CatalogImplicationKind::None) => {
440 unreachable!("will never leave None in place");
441 }
442 }
443 }
444
445 for (cluster_id, command) in cluster_commands {
446 tracing::trace!(?command, "have cluster command to apply!");
447
448 match command {
449 CatalogImplication::Cluster(CatalogImplicationKind::Added(cluster)) => {
450 tracing::debug!(?cluster, "not handling AddCluster in here yet");
451 }
452 CatalogImplication::Cluster(CatalogImplicationKind::Altered {
453 prev: prev_cluster,
454 new: new_cluster,
455 }) => {
456 tracing::debug!(
457 ?prev_cluster,
458 ?new_cluster,
459 "not handling AlterCluster in here yet"
460 );
461 }
462 CatalogImplication::Cluster(CatalogImplicationKind::Dropped(
463 cluster,
464 _full_name,
465 )) => {
466 clusters_to_drop.push(cluster_id);
467 dropped_cluster_names.insert(cluster_id, cluster.name);
468 }
469 CatalogImplication::Cluster(CatalogImplicationKind::None) => {
470 unreachable!("will never leave None in place");
471 }
472 command => {
473 unreachable!(
474 "we only handle cluster commands in this map, got: {:?}",
475 command
476 );
477 }
478 }
479 }
480
481 for ((cluster_id, replica_id), command) in cluster_replica_commands {
482 tracing::trace!(?command, "have cluster replica command to apply!");
483
484 match command {
485 CatalogImplication::ClusterReplica(CatalogImplicationKind::Added(replica)) => {
486 tracing::debug!(?replica, "not handling AddClusterReplica in here yet");
487 }
488 CatalogImplication::ClusterReplica(CatalogImplicationKind::Altered {
489 prev: prev_replica,
490 new: new_replica,
491 }) => {
492 tracing::debug!(
493 ?prev_replica,
494 ?new_replica,
495 "not handling AlterClusterReplica in here yet"
496 );
497 }
498 CatalogImplication::ClusterReplica(CatalogImplicationKind::Dropped(
499 _replica,
500 _full_name,
501 )) => {
502 cluster_replicas_to_drop.push((cluster_id, replica_id));
503 }
504 CatalogImplication::ClusterReplica(CatalogImplicationKind::None) => {
505 unreachable!("will never leave None in place");
506 }
507 command => {
508 unreachable!(
509 "we only handle cluster replica commands in this map, got: {:?}",
510 command
511 );
512 }
513 }
514 }
515
516 for (cluster_id, gid) in replacement_gids {
518 sources_to_drop.retain(|(_, id)| *id != gid);
519 compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
520 }
521
522 if !source_collections_to_create.is_empty() {
523 self.create_source_collections(source_collections_to_create)
524 .await?;
525 }
526
527 if !table_collections_to_create.is_empty() {
530 self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
531 .await?;
532 }
533 self.initialize_storage_collections(storage_policies_to_initialize)
539 .await?;
540
541 let collections_to_drop: BTreeSet<_> = sources_to_drop
542 .iter()
543 .map(|(_, gid)| *gid)
544 .chain(tables_to_drop.iter().map(|(_, gid)| *gid))
545 .chain(storage_sink_gids_to_drop.iter().copied())
546 .chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
547 .chain(view_gids_to_drop.iter().copied())
548 .collect();
549
550 for (sink_id, sink) in &self.active_compute_sinks {
553 let cluster_id = sink.cluster_id();
554 if let Some(id) = sink
555 .depends_on()
556 .iter()
557 .find(|id| collections_to_drop.contains(id))
558 {
559 let name = dropped_item_names
560 .get(id)
561 .map(|n| format!("relation {}", n.quoted()))
562 .expect("missing relation name");
563 compute_sinks_to_drop.insert(
564 *sink_id,
565 ActiveComputeSinkRetireReason::DependencyDropped(name),
566 );
567 } else if clusters_to_drop.contains(&cluster_id) {
568 let name = dropped_cluster_names
569 .get(&cluster_id)
570 .map(|n| format!("cluster {}", n.quoted()))
571 .expect("missing cluster name");
572 compute_sinks_to_drop.insert(
573 *sink_id,
574 ActiveComputeSinkRetireReason::DependencyDropped(name),
575 );
576 }
577 }
578
579 for (uuid, pending_peek) in &self.pending_peeks {
581 if let Some(id) = pending_peek
582 .depends_on
583 .iter()
584 .find(|id| collections_to_drop.contains(id))
585 {
586 let name = dropped_item_names
587 .get(id)
588 .map(|n| format!("relation {}", n.quoted()))
589 .expect("missing relation name");
590 peeks_to_drop.push((name, uuid.clone()));
591 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
592 let name = dropped_cluster_names
593 .get(&pending_peek.cluster_id)
594 .map(|n| format!("cluster {}", n.quoted()))
595 .expect("missing cluster name");
596 peeks_to_drop.push((name, uuid.clone()));
597 }
598 }
599
600 for (conn_id, pending_copy) in &self.active_copies {
602 let dropping_table = tables_to_drop
603 .iter()
604 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
605 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
606
607 if dropping_table || dropping_cluster {
608 copies_to_drop.push(conn_id.clone());
609 }
610 }
611
612 let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
613 .iter()
614 .map(|(_id, gid)| gid)
615 .chain(storage_sink_gids_to_drop.iter())
616 .chain(tables_to_drop.iter().map(|(_id, gid)| gid))
617 .copied()
618 .collect();
619
620 let mut timeline_id_bundles = BTreeMap::new();
626
627 for (timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
628 let mut id_bundle = CollectionIdBundle::default();
629
630 for storage_id in read_holds.storage_ids() {
631 if storage_gids_to_drop.contains(&storage_id) {
632 id_bundle.storage_ids.insert(storage_id);
633 }
634 }
635
636 for (instance_id, id) in read_holds.compute_ids() {
637 if compute_gids_to_drop.contains(&(instance_id, id))
638 || clusters_to_drop.contains(&instance_id)
639 {
640 id_bundle
641 .compute_ids
642 .entry(instance_id)
643 .or_default()
644 .insert(id);
645 }
646 }
647
648 timeline_id_bundles.insert(timeline.clone(), id_bundle);
649 }
650
651 let mut timeline_associations = BTreeMap::new();
652 for (timeline, id_bundle) in timeline_id_bundles.into_iter() {
653 let TimelineState { read_holds, .. } = self
654 .global_timelines
655 .get(&timeline)
656 .expect("all timelines have a timestamp oracle");
657
658 let empty = read_holds.id_bundle().difference(&id_bundle).is_empty();
659 timeline_associations.insert(timeline, (empty, id_bundle));
660 }
661
662 let _: () = async {
665 if !timeline_associations.is_empty() {
666 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
667 let became_empty =
668 self.remove_resources_associated_with_timeline(timeline, id_bundle);
669 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
670 }
671 }
672
673 if !tables_to_drop.is_empty() {
680 let ts = self.get_local_write_ts().await;
681 self.drop_tables(tables_to_drop.into_iter().collect_vec(), ts.timestamp);
682 }
683
684 if !sources_to_drop.is_empty() {
685 self.drop_sources(sources_to_drop);
686 }
687
688 if !storage_sink_gids_to_drop.is_empty() {
689 self.drop_storage_sinks(storage_sink_gids_to_drop);
690 }
691
692 if !compute_sinks_to_drop.is_empty() {
693 self.retire_compute_sinks(compute_sinks_to_drop).await;
694 }
695
696 if !peeks_to_drop.is_empty() {
697 for (dropped_name, uuid) in peeks_to_drop {
698 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
699 let cancel_reason = PeekResponse::Error(format!(
700 "query could not complete because {dropped_name} was dropped"
701 ));
702 self.controller
703 .compute
704 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
705 .unwrap_or_terminate("unable to cancel peek");
706 self.retire_execution(
707 StatementEndedExecutionReason::Canceled,
708 pending_peek.ctx_extra,
709 );
710 }
711 }
712 }
713
714 if !copies_to_drop.is_empty() {
715 for conn_id in copies_to_drop {
716 self.cancel_pending_copy(&conn_id);
717 }
718 }
719
720 if !compute_gids_to_drop.is_empty() {
721 self.drop_compute_collections(compute_gids_to_drop);
722 }
723
724 if !vpc_endpoints_to_drop.is_empty() {
725 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
726 }
727
728 if !cluster_replicas_to_drop.is_empty() {
729 fail::fail_point!("after_catalog_drop_replica");
730
731 for (cluster_id, replica_id) in cluster_replicas_to_drop {
732 self.drop_replica(cluster_id, replica_id);
733 }
734 }
735 if !clusters_to_drop.is_empty() {
736 for cluster_id in clusters_to_drop {
737 self.controller.drop_cluster(cluster_id);
738 }
739 }
740
741 task::spawn(|| "drop_replication_slots_and_secrets", {
749 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
750 let secrets_controller = Arc::clone(&self.secrets_controller);
751 let secrets_reader = Arc::clone(self.secrets_reader());
752 let storage_config = self.controller.storage.config().clone();
753
754 async move {
755 for (connection, replication_slot_name) in replication_slots_to_drop {
756 tracing::info!(?replication_slot_name, "dropping replication slot");
757
758 let result: Result<(), anyhow::Error> = Retry::default()
764 .max_duration(Duration::from_secs(60))
765 .retry_async(|_state| async {
766 let config = connection
767 .config(&secrets_reader, &storage_config, InTask::No)
768 .await
769 .map_err(|e| {
770 anyhow::anyhow!(
771 "error creating Postgres client for \
772 dropping acquired slots: {}",
773 e.display_with_causes()
774 )
775 })?;
776
777 mz_postgres_util::drop_replication_slots(
778 &ssh_tunnel_manager,
779 config.clone(),
780 &[(&replication_slot_name, true)],
781 )
782 .await?;
783
784 Ok(())
785 })
786 .await;
787
788 if let Err(err) = result {
789 tracing::warn!(
790 ?replication_slot_name,
791 ?err,
792 "failed to drop replication slot"
793 );
794 }
795 }
796
797 fail_point!("drop_secrets");
805 for secret in secrets_to_drop {
806 if let Err(e) = secrets_controller.delete(secret).await {
807 warn!("Dropping secrets has encountered an error: {}", e);
808 }
809 }
810 }
811 });
812 }
813 .instrument(info_span!(
814 "coord::apply_catalog_implications_inner::finalize"
815 ))
816 .await;
817
818 Ok(())
819 }
820
821 #[instrument(level = "debug")]
822 async fn create_table_collections(
823 &mut self,
824 table_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
825 execution_timestamps_to_set: BTreeSet<StatementLoggingId>,
826 ) -> Result<(), AdapterError> {
827 let register_ts = self.get_local_write_ts().await.timestamp;
829
830 self.catalog
837 .confirm_leadership()
838 .await
839 .unwrap_or_terminate("unable to confirm leadership");
840
841 for id in execution_timestamps_to_set {
842 self.set_statement_execution_timestamp(id, register_ts);
843 }
844
845 let storage_metadata = self.catalog.state().storage_metadata();
846
847 self.controller
848 .storage
849 .create_collections(
850 storage_metadata,
851 Some(register_ts),
852 table_collections_to_create.into_iter().collect_vec(),
853 )
854 .await
855 .unwrap_or_terminate("cannot fail to create collections");
856
857 self.apply_local_write(register_ts).await;
858
859 Ok(())
860 }
861
862 #[instrument(level = "debug")]
863 async fn create_source_collections(
864 &mut self,
865 source_collections_to_create: BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
866 ) -> Result<(), AdapterError> {
867 let storage_metadata = self.catalog.state().storage_metadata();
868
869 self.controller
870 .storage
871 .create_collections(
872 storage_metadata,
873 None, source_collections_to_create.into_iter().collect_vec(),
875 )
876 .await
877 .unwrap_or_terminate("cannot fail to create collections");
878
879 Ok(())
880 }
881
882 #[instrument(level = "debug")]
883 async fn initialize_storage_collections(
884 &mut self,
885 storage_policies_to_initialize: BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
886 ) -> Result<(), AdapterError> {
887 for (compaction_window, global_ids) in storage_policies_to_initialize {
888 self.initialize_read_policies(
889 &CollectionIdBundle {
890 storage_ids: global_ids,
891 compute_ids: BTreeMap::new(),
892 },
893 compaction_window,
894 )
895 .await;
896 }
897
898 Ok(())
899 }
900
901 #[instrument(level = "debug")]
902 async fn handle_create_table(
903 &mut self,
904 ctx: &mut Option<&mut ExecuteContext>,
905 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
906 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
907 execution_timestamps_to_set: &mut BTreeSet<StatementLoggingId>,
908 table_id: CatalogItemId,
909 table: Table,
910 ) -> Result<(), AdapterError> {
911 match &table.data_source {
915 TableDataSource::TableWrites { defaults: _ } => {
916 let versions: BTreeMap<_, _> = table
917 .collection_descs()
918 .map(|(gid, version, desc)| (version, (gid, desc)))
919 .collect();
920 let collection_descs = versions.iter().map(|(_version, (gid, desc))| {
921 let collection_desc = CollectionDescription::for_table(desc.clone());
922
923 (*gid, collection_desc)
924 });
925
926 let compaction_window = table
927 .custom_logical_compaction_window
928 .unwrap_or(CompactionWindow::Default);
929 let ids_to_initialize = storage_policies_to_initialize
930 .entry(compaction_window)
931 .or_default();
932
933 for (gid, collection_desc) in collection_descs {
934 storage_collections_to_create.insert(gid, collection_desc);
935 ids_to_initialize.insert(gid);
936 }
937
938 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
939 execution_timestamps_to_set.insert(id);
940 }
941 }
942 TableDataSource::DataSource {
943 desc: data_source_desc,
944 timeline,
945 } => {
946 match data_source_desc {
947 DataSourceDesc::IngestionExport {
948 ingestion_id,
949 external_reference: _,
950 details,
951 data_config,
952 } => {
953 let status_collection_id = self
957 .catalog()
958 .resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
959
960 let global_ingestion_id =
961 self.catalog().get_entry(ingestion_id).latest_global_id();
962 let global_status_collection_id = self
963 .catalog()
964 .get_entry(&status_collection_id)
965 .latest_global_id();
966
967 let collection_desc = CollectionDescription::<Timestamp> {
968 desc: table.desc.latest(),
969 data_source: DataSource::IngestionExport {
970 ingestion_id: global_ingestion_id,
971 details: details.clone(),
972 data_config: data_config
973 .clone()
974 .into_inline_connection(self.catalog.state()),
975 },
976 since: None,
977 status_collection_id: Some(global_status_collection_id),
978 timeline: Some(timeline.clone()),
979 primary: None,
980 };
981
982 let global_id = table
983 .global_ids()
984 .expect_element(|| "subsources cannot have multiple versions");
985
986 storage_collections_to_create.insert(global_id, collection_desc);
987
988 let read_policies = self
989 .catalog()
990 .state()
991 .source_compaction_windows(vec![table_id]);
992 for (compaction_window, catalog_ids) in read_policies {
993 let compaction_ids = storage_policies_to_initialize
994 .entry(compaction_window)
995 .or_default();
996
997 let gids = catalog_ids
998 .into_iter()
999 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1000 .flatten();
1001 compaction_ids.extend(gids);
1002 }
1003 }
1004 DataSourceDesc::Webhook {
1005 validate_using: _,
1006 body_format: _,
1007 headers: _,
1008 cluster_id: _,
1009 } => {
1010 assert_eq!(
1012 table.desc.latest_version(),
1013 RelationVersion::root(),
1014 "found webhook with more than 1 relation version, {:?}",
1015 table.desc
1016 );
1017 let desc = table.desc.latest();
1018
1019 let collection_desc = CollectionDescription::<Timestamp> {
1020 desc,
1021 data_source: DataSource::Webhook,
1022 since: None,
1023 status_collection_id: None, timeline: Some(timeline.clone()),
1025 primary: None,
1026 };
1027
1028 let global_id = table
1029 .global_ids()
1030 .expect_element(|| "webhooks cannot have multiple versions");
1031
1032 storage_collections_to_create.insert(global_id, collection_desc);
1033
1034 let read_policies = self
1035 .catalog()
1036 .state()
1037 .source_compaction_windows(vec![table_id]);
1038
1039 for (compaction_window, catalog_ids) in read_policies {
1040 let compaction_ids = storage_policies_to_initialize
1041 .entry(compaction_window)
1042 .or_default();
1043
1044 let gids = catalog_ids
1045 .into_iter()
1046 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1047 .flatten();
1048 compaction_ids.extend(gids);
1049 }
1050 }
1051 _ => unreachable!("CREATE TABLE data source got {:?}", data_source_desc),
1052 }
1053 }
1054 }
1055
1056 Ok(())
1057 }
1058
1059 #[instrument(level = "debug")]
1060 async fn handle_alter_table(
1061 &mut self,
1062 prev_table: Table,
1063 new_table: Table,
1064 ) -> Result<(), AdapterError> {
1065 let existing_gid = prev_table.global_id_writes();
1066 let new_gid = new_table.global_id_writes();
1067
1068 if existing_gid == new_gid {
1069 return Ok(());
1073 }
1074
1075 let existing_table = crate::CollectionIdBundle {
1079 storage_ids: BTreeSet::from([existing_gid]),
1080 compute_ids: BTreeMap::new(),
1081 };
1082 let existing_table_read_hold = self.acquire_read_holds(&existing_table);
1083
1084 let expected_version = prev_table.desc.latest_version();
1085 let new_version = new_table.desc.latest_version();
1086 let new_desc = new_table
1087 .desc
1088 .at_version(RelationVersionSelector::Specific(new_version));
1089
1090 let register_ts = self.get_local_write_ts().await.timestamp;
1091
1092 self.controller
1094 .storage
1095 .alter_table_desc(
1096 existing_gid,
1097 new_gid,
1098 new_desc,
1099 expected_version,
1100 register_ts,
1101 )
1102 .await
1103 .expect("failed to alter desc of table");
1104
1105 let compaction_window = new_table
1107 .custom_logical_compaction_window
1108 .unwrap_or(CompactionWindow::Default);
1109 self.initialize_read_policies(
1110 &crate::CollectionIdBundle {
1111 storage_ids: BTreeSet::from([new_gid]),
1112 compute_ids: BTreeMap::new(),
1113 },
1114 compaction_window,
1115 )
1116 .await;
1117
1118 self.apply_local_write(register_ts).await;
1119
1120 drop(existing_table_read_hold);
1122
1123 Ok(())
1124 }
1125
1126 #[instrument(level = "debug")]
1127 fn handle_alter_materialized_view(
1128 &mut self,
1129 prev_mv: MaterializedView,
1130 new_mv: MaterializedView,
1131 ) -> Result<(), AdapterError> {
1132 let old_gid = prev_mv.global_id_writes();
1133 let new_gid = new_mv.global_id_writes();
1134
1135 if old_gid == new_gid {
1136 return Ok(());
1140 }
1141
1142 self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
1151 self.allow_writes(new_mv.cluster_id, new_gid);
1152
1153 Ok(())
1154 }
1155
1156 #[instrument(level = "debug")]
1157 async fn handle_create_source(
1158 &mut self,
1159 storage_collections_to_create: &mut BTreeMap<GlobalId, CollectionDescription<Timestamp>>,
1160 storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
1161 item_id: CatalogItemId,
1162 source: Source,
1163 compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
1164 ) -> Result<(), AdapterError> {
1165 let source_status_item_id = self
1166 .catalog()
1167 .resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
1168 let source_status_collection_id = Some(
1169 self.catalog()
1170 .get_entry(&source_status_item_id)
1171 .latest_global_id(),
1172 );
1173
1174 let (data_source, status_collection_id) = match source.data_source {
1175 DataSourceDesc::Ingestion { desc, cluster_id } => {
1176 let desc = desc.into_inline_connection(self.catalog().state());
1177 let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
1178
1179 let ingestion = mz_storage_types::sources::IngestionDescription::new(
1180 desc,
1181 cluster_id,
1182 item_global_id,
1183 );
1184
1185 (
1186 DataSource::Ingestion(ingestion),
1187 source_status_collection_id,
1188 )
1189 }
1190 DataSourceDesc::OldSyntaxIngestion {
1191 desc,
1192 progress_subsource,
1193 data_config,
1194 details,
1195 cluster_id,
1196 } => {
1197 let desc = desc.into_inline_connection(self.catalog().state());
1198 let data_config = data_config.into_inline_connection(self.catalog().state());
1199
1200 let progress_subsource = self
1203 .catalog()
1204 .get_entry(&progress_subsource)
1205 .latest_global_id();
1206
1207 let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
1208 desc,
1209 cluster_id,
1210 progress_subsource,
1211 );
1212
1213 let legacy_export = SourceExport {
1214 storage_metadata: (),
1215 data_config,
1216 details,
1217 };
1218
1219 ingestion
1220 .source_exports
1221 .insert(source.global_id, legacy_export);
1222
1223 (
1224 DataSource::Ingestion(ingestion),
1225 source_status_collection_id,
1226 )
1227 }
1228 DataSourceDesc::IngestionExport {
1229 ingestion_id,
1230 external_reference: _,
1231 details,
1232 data_config,
1233 } => {
1234 let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1237 (
1238 DataSource::IngestionExport {
1239 ingestion_id,
1240 details,
1241 data_config: data_config.into_inline_connection(self.catalog().state()),
1242 },
1243 source_status_collection_id,
1244 )
1245 }
1246 DataSourceDesc::Progress => (DataSource::Progress, None),
1247 DataSourceDesc::Webhook { .. } => (DataSource::Webhook, None),
1248 DataSourceDesc::Introspection(_) => {
1249 unreachable!("cannot create sources with introspection data sources")
1250 }
1251 };
1252
1253 storage_collections_to_create.insert(
1254 source.global_id,
1255 CollectionDescription::<Timestamp> {
1256 desc: source.desc.clone(),
1257 data_source,
1258 timeline: Some(source.timeline),
1259 since: None,
1260 status_collection_id,
1261 primary: None,
1262 },
1263 );
1264
1265 for (compaction_window, catalog_ids) in compaction_windows {
1267 let compaction_ids = storage_policies_to_initialize
1268 .entry(compaction_window)
1269 .or_default();
1270
1271 let gids = catalog_ids
1272 .into_iter()
1273 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
1274 .flatten();
1275 compaction_ids.extend(gids);
1276 }
1277
1278 Ok(())
1279 }
1280}
1281
1282#[derive(Debug, Clone)]
1288enum CatalogImplication {
1289 None,
1290 Table(CatalogImplicationKind<Table>),
1291 Source(CatalogImplicationKind<(Source, Option<GenericSourceConnection>)>),
1292 Sink(CatalogImplicationKind<Sink>),
1293 Index(CatalogImplicationKind<Index>),
1294 MaterializedView(CatalogImplicationKind<MaterializedView>),
1295 View(CatalogImplicationKind<View>),
1296 ContinualTask(CatalogImplicationKind<ContinualTask>),
1297 Secret(CatalogImplicationKind<Secret>),
1298 Connection(CatalogImplicationKind<Connection>),
1299 Cluster(CatalogImplicationKind<Cluster>),
1300 ClusterReplica(CatalogImplicationKind<ClusterReplica>),
1301}
1302
1303#[derive(Debug, Clone)]
1304enum CatalogImplicationKind<T> {
1305 None,
1307 Added(T),
1309 Dropped(T, String),
1311 Altered { prev: T, new: T },
1313}
1314
1315impl<T: Clone> CatalogImplicationKind<T> {
1316 fn transition(&mut self, item: T, name: Option<String>, diff: StateDiff) -> Result<(), String> {
1319 use CatalogImplicationKind::*;
1320 use StateDiff::*;
1321
1322 let new_state = match (&*self, diff) {
1323 (None, Addition) => Added(item),
1325 (None, Retraction) => Dropped(item, name.unwrap_or_else(|| "<unknown>".to_string())),
1326
1327 (Added(existing), Retraction) => {
1329 Altered {
1331 prev: item,
1332 new: existing.clone(),
1333 }
1334 }
1335 (Added(_), Addition) => {
1336 return Err("Cannot add an already added object".to_string());
1337 }
1338
1339 (Dropped(existing, _), Addition) => {
1341 Altered {
1343 prev: existing.clone(),
1344 new: item,
1345 }
1346 }
1347 (Dropped(_, _), Retraction) => {
1348 return Err("Cannot drop an already dropped object".to_string());
1349 }
1350
1351 (Altered { .. }, _) => {
1353 return Err(format!(
1354 "Cannot apply {:?} to an object in Altered state",
1355 diff
1356 ));
1357 }
1358 };
1359
1360 *self = new_state;
1361 Ok(())
1362 }
1363}
1364
1365macro_rules! impl_absorb_method {
1367 (
1368 $method_name:ident,
1369 $variant:ident,
1370 $item_type:ty
1371 ) => {
1372 fn $method_name(
1373 &mut self,
1374 item: $item_type,
1375 parsed_full_name: Option<String>,
1376 diff: StateDiff,
1377 ) {
1378 let state = match self {
1379 CatalogImplication::$variant(state) => state,
1380 CatalogImplication::None => {
1381 *self = CatalogImplication::$variant(CatalogImplicationKind::None);
1382 match self {
1383 CatalogImplication::$variant(state) => state,
1384 _ => unreachable!(),
1385 }
1386 }
1387 _ => {
1388 panic!(
1389 "Unexpected command type for {:?}: {} {:?}",
1390 self,
1391 stringify!($variant),
1392 diff,
1393 );
1394 }
1395 };
1396
1397 if let Err(e) = state.transition(item, parsed_full_name, diff) {
1398 panic!(
1399 "Invalid state transition for {}: {}",
1400 stringify!($variant),
1401 e
1402 );
1403 }
1404 }
1405 };
1406}
1407
1408impl CatalogImplication {
1409 fn absorb(&mut self, catalog_update: ParsedStateUpdate) {
1412 match catalog_update.kind {
1413 ParsedStateUpdateKind::Item {
1414 durable_item: _,
1415 parsed_item,
1416 connection,
1417 parsed_full_name,
1418 } => match parsed_item {
1419 CatalogItem::Table(table) => {
1420 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1421 }
1422 CatalogItem::Source(source) => {
1423 self.absorb_source(
1424 (source, connection),
1425 Some(parsed_full_name),
1426 catalog_update.diff,
1427 );
1428 }
1429 CatalogItem::Sink(sink) => {
1430 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1431 }
1432 CatalogItem::Index(index) => {
1433 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1434 }
1435 CatalogItem::MaterializedView(mv) => {
1436 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1437 }
1438 CatalogItem::View(view) => {
1439 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1440 }
1441 CatalogItem::ContinualTask(ct) => {
1442 self.absorb_continual_task(ct, Some(parsed_full_name), catalog_update.diff);
1443 }
1444 CatalogItem::Secret(secret) => {
1445 self.absorb_secret(secret, None, catalog_update.diff);
1446 }
1447 CatalogItem::Connection(connection) => {
1448 self.absorb_connection(connection, None, catalog_update.diff);
1449 }
1450 CatalogItem::Log(_) => {}
1451 CatalogItem::Type(_) => {}
1452 CatalogItem::Func(_) => {}
1453 },
1454 ParsedStateUpdateKind::TemporaryItem {
1455 durable_item: _,
1456 parsed_item,
1457 connection,
1458 parsed_full_name,
1459 } => match parsed_item {
1460 CatalogItem::Table(table) => {
1461 self.absorb_table(table, Some(parsed_full_name), catalog_update.diff)
1462 }
1463 CatalogItem::Source(source) => {
1464 self.absorb_source(
1465 (source, connection),
1466 Some(parsed_full_name),
1467 catalog_update.diff,
1468 );
1469 }
1470 CatalogItem::Sink(sink) => {
1471 self.absorb_sink(sink, Some(parsed_full_name), catalog_update.diff);
1472 }
1473 CatalogItem::Index(index) => {
1474 self.absorb_index(index, Some(parsed_full_name), catalog_update.diff);
1475 }
1476 CatalogItem::MaterializedView(mv) => {
1477 self.absorb_materialized_view(mv, Some(parsed_full_name), catalog_update.diff);
1478 }
1479 CatalogItem::View(view) => {
1480 self.absorb_view(view, Some(parsed_full_name), catalog_update.diff);
1481 }
1482 CatalogItem::ContinualTask(ct) => {
1483 self.absorb_continual_task(ct, None, catalog_update.diff);
1484 }
1485 CatalogItem::Secret(secret) => {
1486 self.absorb_secret(secret, None, catalog_update.diff);
1487 }
1488 CatalogItem::Connection(connection) => {
1489 self.absorb_connection(connection, None, catalog_update.diff);
1490 }
1491 CatalogItem::Log(_) => {}
1492 CatalogItem::Type(_) => {}
1493 CatalogItem::Func(_) => {}
1494 },
1495 ParsedStateUpdateKind::Cluster {
1496 durable_cluster: _,
1497 parsed_cluster,
1498 } => {
1499 self.absorb_cluster(parsed_cluster, catalog_update.diff);
1500 }
1501 ParsedStateUpdateKind::ClusterReplica {
1502 durable_cluster_replica: _,
1503 parsed_cluster_replica,
1504 } => {
1505 self.absorb_cluster_replica(parsed_cluster_replica, catalog_update.diff);
1506 }
1507 }
1508 }
1509
1510 impl_absorb_method!(absorb_table, Table, Table);
1511 impl_absorb_method!(
1512 absorb_source,
1513 Source,
1514 (Source, Option<GenericSourceConnection>)
1515 );
1516 impl_absorb_method!(absorb_sink, Sink, Sink);
1517 impl_absorb_method!(absorb_index, Index, Index);
1518 impl_absorb_method!(absorb_materialized_view, MaterializedView, MaterializedView);
1519 impl_absorb_method!(absorb_view, View, View);
1520
1521 impl_absorb_method!(absorb_continual_task, ContinualTask, ContinualTask);
1522 impl_absorb_method!(absorb_secret, Secret, Secret);
1523 impl_absorb_method!(absorb_connection, Connection, Connection);
1524
1525 fn absorb_cluster(&mut self, cluster: Cluster, diff: StateDiff) {
1527 let state = match self {
1528 CatalogImplication::Cluster(state) => state,
1529 CatalogImplication::None => {
1530 *self = CatalogImplication::Cluster(CatalogImplicationKind::None);
1531 match self {
1532 CatalogImplication::Cluster(state) => state,
1533 _ => unreachable!(),
1534 }
1535 }
1536 _ => {
1537 panic!("Unexpected command type for {:?}: Cluster {:?}", self, diff);
1538 }
1539 };
1540
1541 if let Err(e) = state.transition(cluster.clone(), Some(cluster.name), diff) {
1542 panic!("invalid state transition for cluster: {}", e);
1543 }
1544 }
1545
1546 fn absorb_cluster_replica(&mut self, cluster_replica: ClusterReplica, diff: StateDiff) {
1548 let state = match self {
1549 CatalogImplication::ClusterReplica(state) => state,
1550 CatalogImplication::None => {
1551 *self = CatalogImplication::ClusterReplica(CatalogImplicationKind::None);
1552 match self {
1553 CatalogImplication::ClusterReplica(state) => state,
1554 _ => unreachable!(),
1555 }
1556 }
1557 _ => {
1558 panic!(
1559 "Unexpected command type for {:?}: ClusterReplica {:?}",
1560 self, diff
1561 );
1562 }
1563 };
1564
1565 if let Err(e) = state.transition(cluster_replica.clone(), Some(cluster_replica.name), diff)
1566 {
1567 panic!("invalid state transition for cluster replica: {}", e);
1568 }
1569 }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574 use super::*;
1575 use mz_repr::{GlobalId, RelationDesc, RelationVersion, VersionedRelationDesc};
1576 use mz_sql::names::ResolvedIds;
1577 use std::collections::BTreeMap;
1578
1579 fn create_test_table(name: &str) -> Table {
1580 Table {
1581 desc: VersionedRelationDesc::new(
1582 RelationDesc::builder()
1583 .with_column(name, mz_repr::SqlScalarType::String.nullable(false))
1584 .finish(),
1585 ),
1586 create_sql: None,
1587 collections: BTreeMap::from([(RelationVersion::root(), GlobalId::System(1))]),
1588 conn_id: None,
1589 resolved_ids: ResolvedIds::empty(),
1590 custom_logical_compaction_window: None,
1591 is_retained_metrics_object: false,
1592 data_source: TableDataSource::TableWrites { defaults: vec![] },
1593 }
1594 }
1595
1596 #[mz_ore::test]
1597 fn test_item_state_transitions() {
1598 let mut state = CatalogImplicationKind::None;
1600 assert!(
1601 state
1602 .transition("item1".to_string(), None, StateDiff::Addition)
1603 .is_ok()
1604 );
1605 assert!(matches!(state, CatalogImplicationKind::Added(_)));
1606
1607 let mut state = CatalogImplicationKind::Added("new_item".to_string());
1609 assert!(
1610 state
1611 .transition("old_item".to_string(), None, StateDiff::Retraction)
1612 .is_ok()
1613 );
1614 match &state {
1615 CatalogImplicationKind::Altered { prev, new } => {
1616 assert_eq!(prev, "old_item");
1618 assert_eq!(new, "new_item");
1620 }
1621 _ => panic!("Expected Altered state"),
1622 }
1623
1624 let mut state = CatalogImplicationKind::None;
1626 assert!(
1627 state
1628 .transition(
1629 "item1".to_string(),
1630 Some("test_name".to_string()),
1631 StateDiff::Retraction
1632 )
1633 .is_ok()
1634 );
1635 assert!(matches!(state, CatalogImplicationKind::Dropped(_, _)));
1636
1637 let mut state = CatalogImplicationKind::Dropped("old_item".to_string(), "name".to_string());
1639 assert!(
1640 state
1641 .transition("new_item".to_string(), None, StateDiff::Addition)
1642 .is_ok()
1643 );
1644 match &state {
1645 CatalogImplicationKind::Altered { prev, new } => {
1646 assert_eq!(prev, "old_item");
1648 assert_eq!(new, "new_item");
1650 }
1651 _ => panic!("Expected Altered state"),
1652 }
1653
1654 let mut state = CatalogImplicationKind::Added("item".to_string());
1656 assert!(
1657 state
1658 .transition("item2".to_string(), None, StateDiff::Addition)
1659 .is_err()
1660 );
1661
1662 let mut state = CatalogImplicationKind::Dropped("item".to_string(), "name".to_string());
1663 assert!(
1664 state
1665 .transition("item2".to_string(), None, StateDiff::Retraction)
1666 .is_err()
1667 );
1668 }
1669
1670 #[mz_ore::test]
1671 fn test_table_absorb_state_machine() {
1672 let table1 = create_test_table("table1");
1673 let table2 = create_test_table("table2");
1674
1675 let mut cmd = CatalogImplication::None;
1677 cmd.absorb_table(
1678 table1.clone(),
1679 Some("schema.table1".to_string()),
1680 StateDiff::Addition,
1681 );
1682 match &cmd {
1684 CatalogImplication::Table(state) => match state {
1685 CatalogImplicationKind::Added(t) => {
1686 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity())
1687 }
1688 _ => panic!("Expected Added state"),
1689 },
1690 _ => panic!("Expected Table command"),
1691 }
1692
1693 cmd.absorb_table(
1697 table2.clone(),
1698 Some("schema.table2".to_string()),
1699 StateDiff::Retraction,
1700 );
1701 match &cmd {
1702 CatalogImplication::Table(state) => match state {
1703 CatalogImplicationKind::Altered { prev, new } => {
1704 assert_eq!(prev.desc.latest().arity(), table2.desc.latest().arity());
1706 assert_eq!(new.desc.latest().arity(), table1.desc.latest().arity());
1707 }
1708 _ => panic!("Expected Altered state"),
1709 },
1710 _ => panic!("Expected Table command"),
1711 }
1712
1713 let mut cmd = CatalogImplication::None;
1715 cmd.absorb_table(
1716 table1.clone(),
1717 Some("schema.table1".to_string()),
1718 StateDiff::Retraction,
1719 );
1720 match &cmd {
1721 CatalogImplication::Table(state) => match state {
1722 CatalogImplicationKind::Dropped(t, name) => {
1723 assert_eq!(t.desc.latest().arity(), table1.desc.latest().arity());
1724 assert_eq!(name, "schema.table1");
1725 }
1726 _ => panic!("Expected Dropped state"),
1727 },
1728 _ => panic!("Expected Table command"),
1729 }
1730
1731 cmd.absorb_table(
1733 table2.clone(),
1734 Some("schema.table2".to_string()),
1735 StateDiff::Addition,
1736 );
1737 match &cmd {
1738 CatalogImplication::Table(state) => match state {
1739 CatalogImplicationKind::Altered { prev, new } => {
1740 assert_eq!(prev.desc.latest().arity(), table1.desc.latest().arity());
1742 assert_eq!(new.desc.latest().arity(), table2.desc.latest().arity());
1743 }
1744 _ => panic!("Expected Altered state"),
1745 },
1746 _ => panic!("Expected Table command"),
1747 }
1748 }
1749
1750 #[mz_ore::test]
1751 #[should_panic(expected = "Cannot add an already added object")]
1752 fn test_invalid_double_add() {
1753 let table = create_test_table("table");
1754 let mut cmd = CatalogImplication::None;
1755
1756 cmd.absorb_table(
1758 table.clone(),
1759 Some("schema.table".to_string()),
1760 StateDiff::Addition,
1761 );
1762
1763 cmd.absorb_table(
1765 table.clone(),
1766 Some("schema.table".to_string()),
1767 StateDiff::Addition,
1768 );
1769 }
1770
1771 #[mz_ore::test]
1772 #[should_panic(expected = "Cannot drop an already dropped object")]
1773 fn test_invalid_double_drop() {
1774 let table = create_test_table("table");
1775 let mut cmd = CatalogImplication::None;
1776
1777 cmd.absorb_table(
1779 table.clone(),
1780 Some("schema.table".to_string()),
1781 StateDiff::Retraction,
1782 );
1783
1784 cmd.absorb_table(
1786 table.clone(),
1787 Some("schema.table".to_string()),
1788 StateDiff::Retraction,
1789 );
1790 }
1791}