1use std::collections::{BTreeMap, BTreeSet};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use fail::fail_point;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, Connection, DataSourceDesc, Sink};
25use mz_compute_client::protocol::response::PeekResponse;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::instrument;
31use mz_ore::now::to_datetime;
32use mz_ore::retry::Retry;
33use mz_ore::str::StrExt;
34use mz_ore::task;
35use mz_repr::adt::numeric::Numeric;
36use mz_repr::{CatalogItemId, GlobalId, Timestamp};
37use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, CatalogSchema};
38use mz_sql::names::ResolvedDatabaseSpecifier;
39use mz_sql::plan::ConnectionDetails;
40use mz_sql::session::metadata::SessionMetadata;
41use mz_sql::session::vars::{
42 self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
43 MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
44 MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
45 MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
46 MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
47};
48use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
49use mz_storage_types::connections::PostgresConnection;
50use mz_storage_types::connections::inline::IntoInlineConnection;
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::GenericSourceConnection;
53use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
54use serde_json::json;
55use tracing::{Instrument, Level, event, info_span, warn};
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
58use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
59use crate::coord::Coordinator;
60use crate::coord::appends::BuiltinTableAppendNotify;
61use crate::coord::timeline::{TimelineContext, TimelineState};
62use crate::session::{Session, Transaction, TransactionOps};
63use crate::statement_logging::StatementEndedExecutionReason;
64use crate::telemetry::{EventDetails, SegmentClientExt};
65use crate::util::ResultExt;
66use crate::{AdapterError, ExecuteContext, catalog, flags};
67
68impl Coordinator {
69 #[instrument(name = "coord::catalog_transact")]
71 pub(crate) async fn catalog_transact(
72 &mut self,
73 session: Option<&Session>,
74 ops: Vec<catalog::Op>,
75 ) -> Result<(), AdapterError> {
76 self.catalog_transact_conn(session.map(|session| session.conn_id()), ops)
77 .await
78 }
79
80 #[instrument(name = "coord::catalog_transact_with_side_effects")]
84 pub(crate) async fn catalog_transact_with_side_effects<F>(
85 &mut self,
86 ctx: Option<&mut ExecuteContext>,
87 ops: Vec<catalog::Op>,
88 side_effect: F,
89 ) -> Result<(), AdapterError>
90 where
91 F: for<'a> FnOnce(
92 &'a mut Coordinator,
93 Option<&'a mut ExecuteContext>,
94 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
95 + 'static,
96 {
97 let table_updates = self
98 .catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
99 .await?;
100 let side_effects_fut = side_effect(self, ctx);
101
102 let ((), ()) = futures::future::join(
104 side_effects_fut.instrument(info_span!(
105 "coord::catalog_transact_with_side_effects::side_effects_fut"
106 )),
107 table_updates.instrument(info_span!(
108 "coord::catalog_transact_with_side_effects::table_updates"
109 )),
110 )
111 .await;
112
113 Ok(())
114 }
115
116 #[instrument(name = "coord::catalog_transact_conn")]
118 pub(crate) async fn catalog_transact_conn(
119 &mut self,
120 conn_id: Option<&ConnectionId>,
121 ops: Vec<catalog::Op>,
122 ) -> Result<(), AdapterError> {
123 let table_updates = self.catalog_transact_inner(conn_id, ops).await?;
124 table_updates
125 .instrument(info_span!("coord::catalog_transact_conn::table_updates"))
126 .await;
127 Ok(())
128 }
129
130 #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
133 pub(crate) async fn catalog_transact_with_ddl_transaction<F>(
134 &mut self,
135 ctx: &mut ExecuteContext,
136 ops: Vec<catalog::Op>,
137 side_effect: F,
138 ) -> Result<(), AdapterError>
139 where
140 F: for<'a> FnOnce(
141 &'a mut Coordinator,
142 Option<&'a mut ExecuteContext>,
143 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
144 + Send
145 + Sync
146 + 'static,
147 {
148 let Some(Transaction {
149 ops:
150 TransactionOps::DDL {
151 ops: txn_ops,
152 revision: txn_revision,
153 side_effects: _,
154 state: _,
155 },
156 ..
157 }) = ctx.session().transaction().inner()
158 else {
159 return self
160 .catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
161 .await;
162 };
163
164 if self.catalog().transient_revision() != *txn_revision {
166 return Err(AdapterError::DDLTransactionRace);
167 }
168
169 let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
171 all_ops.extend(txn_ops.iter().cloned());
172 all_ops.extend(ops.clone());
173 all_ops.push(Op::TransactionDryRun);
174
175 let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
177
178 match result {
179 Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
181 ctx.session_mut()
184 .transaction_mut()
185 .add_ops(TransactionOps::DDL {
186 ops: new_ops,
187 state: new_state,
188 side_effects: vec![Box::new(side_effect)],
189 revision: self.catalog().transient_revision(),
190 })?;
191 Ok(())
192 }
193 Ok(_) => unreachable!("unexpected success!"),
194 Err(e) => Err(e),
195 }
196 }
197
198 #[instrument(name = "coord::catalog_transact_inner")]
202 pub(crate) async fn catalog_transact_inner<'a>(
203 &mut self,
204 conn_id: Option<&ConnectionId>,
205 ops: Vec<catalog::Op>,
206 ) -> Result<BuiltinTableAppendNotify, AdapterError> {
207 if self.controller.read_only() {
208 return Err(AdapterError::ReadOnly);
209 }
210
211 event!(Level::TRACE, ops = format!("{:?}", ops));
212
213 let mut sources_to_drop = vec![];
214 let mut webhook_sources_to_restart = BTreeSet::new();
215 let mut table_gids_to_drop = vec![];
216 let mut storage_sink_gids_to_drop = vec![];
217 let mut indexes_to_drop = vec![];
218 let mut materialized_views_to_drop = vec![];
219 let mut continual_tasks_to_drop = vec![];
220 let mut views_to_drop = vec![];
221 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
222 let mut secrets_to_drop = vec![];
223 let mut vpc_endpoints_to_drop = vec![];
224 let mut clusters_to_drop = vec![];
225 let mut cluster_replicas_to_drop = vec![];
226 let mut compute_sinks_to_drop = BTreeMap::new();
227 let mut peeks_to_drop = vec![];
228 let mut copies_to_drop = vec![];
229 let mut clusters_to_create = vec![];
230 let mut cluster_replicas_to_create = vec![];
231 let mut update_metrics_config = false;
232 let mut update_tracing_config = false;
233 let mut update_controller_config = false;
234 let mut update_compute_config = false;
235 let mut update_storage_config = false;
236 let mut update_pg_timestamp_oracle_config = false;
237 let mut update_metrics_retention = false;
238 let mut update_secrets_caching_config = false;
239 let mut update_cluster_scheduling_config = false;
240 let mut update_http_config = false;
241
242 for op in &ops {
243 match op {
244 catalog::Op::DropObjects(drop_object_infos) => {
245 for drop_object_info in drop_object_infos {
246 match &drop_object_info {
247 catalog::DropObjectInfo::Item(id) => {
248 match self.catalog().get_entry(id).item() {
249 CatalogItem::Table(table) => {
250 table_gids_to_drop
251 .extend(table.global_ids().map(|gid| (*id, gid)));
252 }
253 CatalogItem::Source(source) => {
254 sources_to_drop.push((*id, source.global_id()));
255 if let DataSourceDesc::Ingestion {
256 ingestion_desc, ..
257 } = &source.data_source
258 {
259 match &ingestion_desc.desc.connection {
260 GenericSourceConnection::Postgres(conn) => {
261 let conn = conn.clone().into_inline_connection(
262 self.catalog().state(),
263 );
264 let pending_drop = (
265 conn.connection.clone(),
266 conn.publication_details.slot.clone(),
267 );
268 replication_slots_to_drop.push(pending_drop);
269 }
270 _ => {}
271 }
272 }
273 }
274 CatalogItem::Sink(sink) => {
275 storage_sink_gids_to_drop.push(sink.global_id());
276 }
277 CatalogItem::Index(index) => {
278 indexes_to_drop.push((index.cluster_id, index.global_id()));
279 }
280 CatalogItem::MaterializedView(mv) => {
281 materialized_views_to_drop
282 .push((mv.cluster_id, mv.global_id()));
283 }
284 CatalogItem::View(view) => {
285 views_to_drop.push((*id, view.clone()))
286 }
287 CatalogItem::ContinualTask(ct) => {
288 continual_tasks_to_drop.push((
289 *id,
290 ct.cluster_id,
291 ct.global_id(),
292 ));
293 }
294 CatalogItem::Secret(_) => {
295 secrets_to_drop.push(*id);
296 }
297 CatalogItem::Connection(Connection { details, .. }) => {
298 match details {
299 ConnectionDetails::Ssh { .. } => {
301 secrets_to_drop.push(*id);
302 }
303 ConnectionDetails::AwsPrivatelink(_) => {
306 vpc_endpoints_to_drop.push(*id);
307 }
308 _ => (),
309 }
310 }
311 _ => (),
312 }
313 }
314 catalog::DropObjectInfo::Cluster(id) => {
315 clusters_to_drop.push(*id);
316 }
317 catalog::DropObjectInfo::ClusterReplica((
318 cluster_id,
319 replica_id,
320 _reason,
321 )) => {
322 cluster_replicas_to_drop.push((*cluster_id, *replica_id));
324 }
325 _ => (),
326 }
327 }
328 }
329 catalog::Op::ResetSystemConfiguration { name }
330 | catalog::Op::UpdateSystemConfiguration { name, .. } => {
331 update_metrics_config |= self
332 .catalog
333 .state()
334 .system_config()
335 .is_metrics_config_var(name);
336 update_tracing_config |= vars::is_tracing_var(name);
337 update_controller_config |= self
338 .catalog
339 .state()
340 .system_config()
341 .is_controller_config_var(name);
342 update_compute_config |= self
343 .catalog
344 .state()
345 .system_config()
346 .is_compute_config_var(name);
347 update_storage_config |= self
348 .catalog
349 .state()
350 .system_config()
351 .is_storage_config_var(name);
352 update_pg_timestamp_oracle_config |=
353 vars::is_pg_timestamp_oracle_config_var(name);
354 update_metrics_retention |= name == vars::METRICS_RETENTION.name();
355 update_secrets_caching_config |= vars::is_secrets_caching_var(name);
356 update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
357 update_http_config |= vars::is_http_config_var(name);
358 }
359 catalog::Op::ResetAllSystemConfiguration => {
360 update_tracing_config = true;
364 update_controller_config = true;
365 update_compute_config = true;
366 update_storage_config = true;
367 update_pg_timestamp_oracle_config = true;
368 update_metrics_retention = true;
369 update_secrets_caching_config = true;
370 update_cluster_scheduling_config = true;
371 update_http_config = true;
372 }
373 catalog::Op::RenameItem { id, .. } => {
374 let item = self.catalog().get_entry(id);
375 let is_webhook_source = item
376 .source()
377 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
378 .unwrap_or(false);
379 if is_webhook_source {
380 webhook_sources_to_restart.insert(*id);
381 }
382 }
383 catalog::Op::RenameSchema {
384 database_spec,
385 schema_spec,
386 ..
387 } => {
388 let schema = self.catalog().get_schema(
389 database_spec,
390 schema_spec,
391 conn_id.unwrap_or(&SYSTEM_CONN_ID),
392 );
393 let webhook_sources = schema.item_ids().filter(|id| {
394 let item = self.catalog().get_entry(id);
395 item.source()
396 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
397 .unwrap_or(false)
398 });
399 webhook_sources_to_restart.extend(webhook_sources);
400 }
401 catalog::Op::CreateCluster { id, .. } => {
402 clusters_to_create.push(*id);
403 }
404 catalog::Op::CreateClusterReplica {
405 cluster_id,
406 name,
407 config,
408 ..
409 } => {
410 cluster_replicas_to_create.push((
411 *cluster_id,
412 name.clone(),
413 config.location.num_processes(),
414 ));
415 }
416 _ => (),
417 }
418 }
419
420 let collections_to_drop: BTreeSet<GlobalId> = sources_to_drop
421 .iter()
422 .map(|(_, gid)| *gid)
423 .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
424 .chain(storage_sink_gids_to_drop.iter().copied())
425 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
426 .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
427 .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
428 .chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
429 .collect();
430
431 for (sink_id, sink) in &self.active_compute_sinks {
433 let cluster_id = sink.cluster_id();
434 let conn_id = &sink.connection_id();
435 if let Some(id) = sink
436 .depends_on()
437 .iter()
438 .find(|id| collections_to_drop.contains(id))
439 {
440 let entry = self.catalog().get_entry_by_global_id(id);
441 let name = self
442 .catalog()
443 .resolve_full_name(entry.name(), Some(conn_id))
444 .to_string();
445 compute_sinks_to_drop.insert(
446 *sink_id,
447 ActiveComputeSinkRetireReason::DependencyDropped(format!(
448 "relation {}",
449 name.quoted()
450 )),
451 );
452 } else if clusters_to_drop.contains(&cluster_id) {
453 let name = self.catalog().get_cluster(cluster_id).name();
454 compute_sinks_to_drop.insert(
455 *sink_id,
456 ActiveComputeSinkRetireReason::DependencyDropped(format!(
457 "cluster {}",
458 name.quoted()
459 )),
460 );
461 }
462 }
463
464 for (uuid, pending_peek) in &self.pending_peeks {
466 if let Some(id) = pending_peek
467 .depends_on
468 .iter()
469 .find(|id| collections_to_drop.contains(id))
470 {
471 let entry = self.catalog().get_entry_by_global_id(id);
472 let name = self
473 .catalog()
474 .resolve_full_name(entry.name(), Some(&pending_peek.conn_id));
475 peeks_to_drop.push((
476 format!("relation {}", name.to_string().quoted()),
477 uuid.clone(),
478 ));
479 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
480 let name = self.catalog().get_cluster(pending_peek.cluster_id).name();
481 peeks_to_drop.push((format!("cluster {}", name.quoted()), uuid.clone()));
482 }
483 }
484
485 for (conn_id, pending_copy) in &self.active_copies {
487 let dropping_table = table_gids_to_drop
488 .iter()
489 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
490 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
491
492 if dropping_table || dropping_cluster {
493 copies_to_drop.push(conn_id.clone());
494 }
495 }
496
497 let storage_ids_to_drop = sources_to_drop
498 .iter()
499 .map(|(_, gid)| *gid)
500 .chain(storage_sink_gids_to_drop.iter().copied())
501 .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
502 .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
503 .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
504 let compute_ids_to_drop = indexes_to_drop
505 .iter()
506 .copied()
507 .chain(materialized_views_to_drop.iter().copied())
508 .chain(
509 continual_tasks_to_drop
510 .iter()
511 .map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
512 );
513
514 let collection_id_bundle = self.build_collection_id_bundle(
519 storage_ids_to_drop,
520 compute_ids_to_drop,
521 clusters_to_drop.clone(),
522 );
523 let timeline_associations: BTreeMap<_, _> = self
524 .catalog()
525 .partition_ids_by_timeline_context(&collection_id_bundle)
526 .filter_map(|(context, bundle)| {
527 let TimelineContext::TimelineDependent(timeline) = context else {
528 return None;
529 };
530 let TimelineState { read_holds, .. } = self
531 .global_timelines
532 .get(&timeline)
533 .expect("all timeslines have a timestamp oracle");
534
535 let empty = read_holds.id_bundle().difference(&bundle).is_empty();
536
537 Some((timeline, (empty, bundle)))
538 })
539 .collect();
540
541 self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
542
543 let oracle_write_ts = self.get_local_write_ts().await.timestamp;
552
553 let Coordinator {
554 catalog,
555 active_conns,
556 controller,
557 cluster_replica_statuses,
558 ..
559 } = self;
560 let catalog = Arc::make_mut(catalog);
561 let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
562
563 let TransactionResult {
564 builtin_table_updates,
565 audit_events,
566 } = catalog
567 .transact(
568 Some(&mut controller.storage_collections),
569 oracle_write_ts,
570 conn,
571 ops,
572 )
573 .await?;
574
575 for (cluster_id, replica_id) in &cluster_replicas_to_drop {
576 cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
577 }
578 for cluster_id in &clusters_to_drop {
579 cluster_replica_statuses.remove_cluster_statuses(cluster_id);
580 }
581 for cluster_id in clusters_to_create {
582 cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
583 }
584 let now = to_datetime((catalog.config().now)());
585 for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
586 let replica_id = catalog
587 .resolve_replica_in_cluster(&cluster_id, &replica_name)
588 .expect("just created")
589 .replica_id();
590 cluster_replica_statuses.initialize_cluster_replica_statuses(
591 cluster_id,
592 replica_id,
593 num_processes,
594 now,
595 );
596 }
597
598 let (builtin_update_notify, _) = self
601 .builtin_table_update()
602 .execute(builtin_table_updates)
603 .await;
604
605 let _: () = async {
608 if !timeline_associations.is_empty() {
609 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
610 let became_empty =
611 self.remove_resources_associated_with_timeline(timeline, id_bundle);
612 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
613 }
614 }
615 if !table_gids_to_drop.is_empty() {
616 let ts = self.get_local_write_ts().await;
617 self.drop_tables(table_gids_to_drop, ts.timestamp);
618 }
619 if !sources_to_drop.is_empty() {
625 self.drop_sources(sources_to_drop);
626 }
627 if !webhook_sources_to_restart.is_empty() {
628 self.restart_webhook_sources(webhook_sources_to_restart);
629 }
630 if !storage_sink_gids_to_drop.is_empty() {
631 self.drop_storage_sinks(storage_sink_gids_to_drop);
632 }
633 if !compute_sinks_to_drop.is_empty() {
634 self.retire_compute_sinks(compute_sinks_to_drop).await;
635 }
636 if !peeks_to_drop.is_empty() {
637 for (dropped_name, uuid) in peeks_to_drop {
638 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
639 let cancel_reason = PeekResponse::Error(format!(
640 "query could not complete because {dropped_name} was dropped"
641 ));
642 self.controller
643 .compute
644 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
645 .unwrap_or_terminate("unable to cancel peek");
646 self.retire_execution(
647 StatementEndedExecutionReason::Canceled,
648 pending_peek.ctx_extra,
649 );
650 }
651 }
652 }
653 if !copies_to_drop.is_empty() {
654 for conn_id in copies_to_drop {
655 self.cancel_pending_copy(&conn_id);
656 }
657 }
658 if !indexes_to_drop.is_empty() {
659 self.drop_indexes(indexes_to_drop);
660 }
661 if !materialized_views_to_drop.is_empty() {
662 self.drop_materialized_views(materialized_views_to_drop);
663 }
664 if !continual_tasks_to_drop.is_empty() {
665 self.drop_continual_tasks(continual_tasks_to_drop);
666 }
667 if !vpc_endpoints_to_drop.is_empty() {
668 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
669 }
670 if !cluster_replicas_to_drop.is_empty() {
671 fail::fail_point!("after_catalog_drop_replica");
672 for (cluster_id, replica_id) in cluster_replicas_to_drop {
673 self.drop_replica(cluster_id, replica_id);
674 }
675 }
676 if !clusters_to_drop.is_empty() {
677 for cluster_id in clusters_to_drop {
678 self.controller.drop_cluster(cluster_id);
679 }
680 }
681
682 task::spawn(|| "drop_replication_slots_and_secrets", {
690 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
691 let secrets_controller = Arc::clone(&self.secrets_controller);
692 let secrets_reader = Arc::clone(self.secrets_reader());
693 let storage_config = self.controller.storage.config().clone();
694
695 async move {
696 for (connection, replication_slot_name) in replication_slots_to_drop {
697 tracing::info!(?replication_slot_name, "dropping replication slot");
698
699 let result: Result<(), anyhow::Error> = Retry::default()
705 .max_duration(Duration::from_secs(60))
706 .retry_async(|_state| async {
707 let config = connection
708 .config(&secrets_reader, &storage_config, InTask::No)
709 .await
710 .map_err(|e| {
711 anyhow::anyhow!(
712 "error creating Postgres client for \
713 dropping acquired slots: {}",
714 e.display_with_causes()
715 )
716 })?;
717 mz_postgres_util::drop_replication_slots(
719 &ssh_tunnel_manager,
720 config.clone(),
721 &[(&replication_slot_name, true)],
722 )
723 .await?;
724
725 Ok(())
726 })
727 .await;
728
729 if let Err(err) = result {
730 tracing::warn!(
731 ?replication_slot_name,
732 ?err,
733 "failed to drop replication slot"
734 );
735 }
736 }
737
738 fail_point!("drop_secrets");
745 for secret in secrets_to_drop {
746 if let Err(e) = secrets_controller.delete(secret).await {
747 warn!("Dropping secrets has encountered an error: {}", e);
748 }
749 }
750 }
751 });
752
753 if update_metrics_config {
754 mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
755 }
756 if update_controller_config {
757 self.update_controller_config();
758 }
759 if update_compute_config {
760 self.update_compute_config();
761 }
762 if update_storage_config {
763 self.update_storage_config();
764 }
765 if update_pg_timestamp_oracle_config {
766 self.update_pg_timestamp_oracle_config();
767 }
768 if update_metrics_retention {
769 self.update_metrics_retention();
770 }
771 if update_tracing_config {
772 self.update_tracing_config();
773 }
774 if update_secrets_caching_config {
775 self.update_secrets_caching_config();
776 }
777 if update_cluster_scheduling_config {
778 self.update_cluster_scheduling_config();
779 }
780 if update_http_config {
781 self.update_http_config();
782 }
783 }
784 .instrument(info_span!("coord::catalog_transact_with::finalize"))
785 .await;
786
787 let conn = conn_id.and_then(|id| self.active_conns.get(id));
788 if let Some(segment_client) = &self.segment_client {
789 for VersionedEvent::V1(event) in audit_events {
790 let event_type = format!(
791 "{} {}",
792 event.object_type.as_title_case(),
793 event.event_type.as_title_case()
794 );
795 segment_client.environment_track(
796 &self.catalog().config().environment_id,
797 event_type,
798 json!({ "details": event.details.as_json() }),
799 EventDetails {
800 user_id: conn
801 .and_then(|c| c.user().external_metadata.as_ref())
802 .map(|m| m.user_id),
803 application_name: conn.map(|c| c.application_name()),
804 ..Default::default()
805 },
806 );
807 }
808 }
809
810 mz_ore::soft_assert_eq_no_log!(
813 self.check_consistency(),
814 Ok(()),
815 "coordinator inconsistency detected"
816 );
817
818 Ok(builtin_update_notify)
819 }
820
821 fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
822 self.drop_introspection_subscribes(replica_id);
823
824 self.controller
825 .drop_replica(cluster_id, replica_id)
826 .expect("dropping replica must not fail");
827 }
828
829 fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
831 for (item_id, _gid) in &sources {
832 self.active_webhooks.remove(item_id);
833 }
834 let storage_metadata = self.catalog.state().storage_metadata();
835 let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
836 self.controller
837 .storage
838 .drop_sources(storage_metadata, source_gids)
839 .unwrap_or_terminate("cannot fail to drop sources");
840 }
841
842 fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
843 for (item_id, _gid) in &tables {
844 self.active_webhooks.remove(item_id);
845 }
846 let storage_metadata = self.catalog.state().storage_metadata();
847 let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
848 self.controller
849 .storage
850 .drop_tables(storage_metadata, table_gids, ts)
851 .unwrap_or_terminate("cannot fail to drop tables");
852 }
853
854 fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
855 for id in sources {
856 self.active_webhooks.remove(&id);
857 }
858 }
859
860 #[must_use]
866 pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
867 self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
868 }
869
870 #[must_use]
879 pub async fn drop_compute_sinks(
880 &mut self,
881 sink_ids: impl IntoIterator<Item = GlobalId>,
882 ) -> BTreeMap<GlobalId, ActiveComputeSink> {
883 let mut by_id = BTreeMap::new();
884 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
885 for sink_id in sink_ids {
886 let sink = match self.remove_active_compute_sink(sink_id).await {
887 None => {
888 tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
889 continue;
890 }
891 Some(sink) => sink,
892 };
893
894 by_cluster
895 .entry(sink.cluster_id())
896 .or_default()
897 .push(sink_id);
898 by_id.insert(sink_id, sink);
899 }
900 for (cluster_id, ids) in by_cluster {
901 let compute = &mut self.controller.compute;
902 if compute.instance_exists(cluster_id) {
904 compute
905 .drop_collections(cluster_id, ids)
906 .unwrap_or_terminate("cannot fail to drop collections");
907 }
908 }
909 by_id
910 }
911
912 pub async fn retire_compute_sinks(
917 &mut self,
918 mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
919 ) {
920 let sink_ids = reasons.keys().cloned();
921 for (id, sink) in self.drop_compute_sinks(sink_ids).await {
922 let reason = reasons
923 .remove(&id)
924 .expect("all returned IDs are in `reasons`");
925 sink.retire(reason);
926 }
927 }
928
929 pub async fn drop_reconfiguration_replicas(
932 &mut self,
933 cluster_ids: BTreeSet<ClusterId>,
934 ) -> Result<(), AdapterError> {
935 let pending_cluster_ops: Vec<Op> = cluster_ids
936 .iter()
937 .map(|c| {
938 self.catalog()
939 .get_cluster(c.clone())
940 .replicas()
941 .filter_map(|r| match r.config.location {
942 ReplicaLocation::Managed(ref l) if l.pending => {
943 Some(DropObjectInfo::ClusterReplica((
944 c.clone(),
945 r.replica_id,
946 ReplicaCreateDropReason::Manual,
947 )))
948 }
949 _ => None,
950 })
951 .collect::<Vec<DropObjectInfo>>()
952 })
953 .filter_map(|pending_replica_drop_ops_by_cluster| {
954 match pending_replica_drop_ops_by_cluster.len() {
955 0 => None,
956 _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
957 }
958 })
959 .collect();
960 if !pending_cluster_ops.is_empty() {
961 self.catalog_transact(None, pending_cluster_ops).await?;
962 }
963 Ok(())
964 }
965
966 #[mz_ore::instrument(level = "debug")]
968 pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
969 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
970 .await
971 }
972
973 #[mz_ore::instrument(level = "debug")]
975 pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
976 &mut self,
977 conn_id: &ConnectionId,
978 ) {
979 self.retire_cluster_reconfigurations_for_conn(conn_id).await
980 }
981
982 #[mz_ore::instrument(level = "debug")]
985 pub(crate) async fn retire_compute_sinks_for_conn(
986 &mut self,
987 conn_id: &ConnectionId,
988 reason: ActiveComputeSinkRetireReason,
989 ) {
990 let drop_sinks = self
991 .active_conns
992 .get_mut(conn_id)
993 .expect("must exist for active session")
994 .drop_sinks
995 .iter()
996 .map(|sink_id| (*sink_id, reason.clone()))
997 .collect();
998 self.retire_compute_sinks(drop_sinks).await;
999 }
1000
1001 #[mz_ore::instrument(level = "debug")]
1003 pub(crate) async fn retire_cluster_reconfigurations_for_conn(
1004 &mut self,
1005 conn_id: &ConnectionId,
1006 ) {
1007 let reconfiguring_clusters = self
1008 .active_conns
1009 .get(conn_id)
1010 .expect("must exist for active session")
1011 .pending_cluster_alters
1012 .clone();
1013 self.drop_reconfiguration_replicas(reconfiguring_clusters)
1015 .await
1016 .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
1017
1018 self.active_conns
1019 .get_mut(conn_id)
1020 .expect("must exist for active session")
1021 .pending_cluster_alters
1022 .clear();
1023 }
1024
1025 pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
1026 let storage_metadata = self.catalog.state().storage_metadata();
1027 self.controller
1028 .storage
1029 .drop_sinks(storage_metadata, sink_gids)
1030 .unwrap_or_terminate("cannot fail to drop sinks");
1031 }
1032
1033 pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
1034 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1035 for (cluster_id, gid) in indexes {
1036 by_cluster.entry(cluster_id).or_default().push(gid);
1037 }
1038 for (cluster_id, gids) in by_cluster {
1039 let compute = &mut self.controller.compute;
1040 if compute.instance_exists(cluster_id) {
1042 compute
1043 .drop_collections(cluster_id, gids)
1044 .unwrap_or_terminate("cannot fail to drop collections");
1045 }
1046 }
1047 }
1048
1049 fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
1051 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1052 let mut mv_gids = Vec::new();
1053 for (cluster_id, gid) in mviews {
1054 by_cluster.entry(cluster_id).or_default().push(gid);
1055 mv_gids.push(gid);
1056 }
1057
1058 for (cluster_id, ids) in by_cluster {
1060 let compute = &mut self.controller.compute;
1061 if compute.instance_exists(cluster_id) {
1063 compute
1064 .drop_collections(cluster_id, ids)
1065 .unwrap_or_terminate("cannot fail to drop collections");
1066 }
1067 }
1068
1069 let storage_metadata = self.catalog.state().storage_metadata();
1071 self.controller
1072 .storage
1073 .drop_sources(storage_metadata, mv_gids)
1074 .unwrap_or_terminate("cannot fail to drop sources");
1075 }
1076
1077 fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
1079 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1080 let mut source_ids = Vec::new();
1081 for (item_id, cluster_id, gid) in cts {
1082 by_cluster.entry(cluster_id).or_default().push(gid);
1083 source_ids.push((item_id, gid));
1084 }
1085
1086 for (cluster_id, ids) in by_cluster {
1088 let compute = &mut self.controller.compute;
1089 if compute.instance_exists(cluster_id) {
1091 compute
1092 .drop_collections(cluster_id, ids)
1093 .unwrap_or_terminate("cannot fail to drop collections");
1094 }
1095 }
1096
1097 self.drop_sources(source_ids)
1099 }
1100
1101 fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
1102 let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
1103 .as_ref()
1104 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
1105 .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
1106 task::spawn(
1114 || "drop_vpc_endpoints",
1115 async move {
1116 for vpc_endpoint in vpc_endpoints {
1117 let _ = Retry::default()
1118 .max_duration(Duration::from_secs(60))
1119 .retry_async(|_state| async {
1120 fail_point!("drop_vpc_endpoint", |r| {
1121 Err(anyhow::anyhow!("Fail point error {:?}", r))
1122 });
1123 match cloud_resource_controller
1124 .delete_vpc_endpoint(vpc_endpoint)
1125 .await
1126 {
1127 Ok(_) => Ok(()),
1128 Err(e) => {
1129 warn!("Dropping VPC Endpoints has encountered an error: {}", e);
1130 Err(e)
1131 }
1132 }
1133 })
1134 .await;
1135 }
1136 }
1137 .instrument(info_span!(
1138 "coord::catalog_transact_inner::drop_vpc_endpoints"
1139 )),
1140 );
1141 }
1142
1143 pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
1146 let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
1147 let all_items = self.catalog().object_dependents(&temp_items, conn_id);
1148
1149 if all_items.is_empty() {
1150 return;
1151 }
1152 let op = Op::DropObjects(
1153 all_items
1154 .into_iter()
1155 .map(DropObjectInfo::manual_drop_from_object_id)
1156 .collect(),
1157 );
1158
1159 self.catalog_transact_conn(Some(conn_id), vec![op])
1160 .await
1161 .expect("unable to drop temporary items for conn_id");
1162 }
1163
1164 fn update_cluster_scheduling_config(&self) {
1165 let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
1166 self.controller
1167 .update_orchestrator_scheduling_config(config);
1168 }
1169
1170 fn update_secrets_caching_config(&self) {
1171 let config = flags::caching_config(self.catalog.system_config());
1172 self.caching_secrets_reader.set_policy(config);
1173 }
1174
1175 fn update_tracing_config(&self) {
1176 let tracing = flags::tracing_config(self.catalog().system_config());
1177 tracing.apply(&self.tracing_handle);
1178 }
1179
1180 fn update_compute_config(&mut self) {
1181 let config_params = flags::compute_config(self.catalog().system_config());
1182 self.controller.compute.update_configuration(config_params);
1183 }
1184
1185 fn update_storage_config(&mut self) {
1186 let config_params = flags::storage_config(self.catalog().system_config());
1187 self.controller.storage.update_parameters(config_params);
1188 }
1189
1190 fn update_pg_timestamp_oracle_config(&self) {
1191 let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
1192 if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
1193 config_params.apply(config)
1194 }
1195 }
1196
1197 fn update_metrics_retention(&self) {
1198 let duration = self.catalog().system_config().metrics_retention();
1199 let policy = ReadPolicy::lag_writes_by(
1200 Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
1201 tracing::error!("Absurd metrics retention duration: {duration:?}.");
1202 u64::MAX
1203 })),
1204 SINCE_GRANULARITY,
1205 );
1206 let storage_policies = self
1207 .catalog()
1208 .entries()
1209 .filter(|entry| {
1210 entry.item().is_retained_metrics_object()
1211 && entry.item().is_compute_object_on_cluster().is_none()
1212 })
1213 .map(|entry| (entry.id(), policy.clone()))
1214 .collect::<Vec<_>>();
1215 let compute_policies = self
1216 .catalog()
1217 .entries()
1218 .filter_map(|entry| {
1219 if let (true, Some(cluster_id)) = (
1220 entry.item().is_retained_metrics_object(),
1221 entry.item().is_compute_object_on_cluster(),
1222 ) {
1223 Some((cluster_id, entry.id(), policy.clone()))
1224 } else {
1225 None
1226 }
1227 })
1228 .collect::<Vec<_>>();
1229 self.update_storage_read_policies(storage_policies);
1230 self.update_compute_read_policies(compute_policies);
1231 }
1232
1233 fn update_controller_config(&mut self) {
1234 let sys_config = self.catalog().system_config();
1235 self.controller
1236 .update_configuration(sys_config.dyncfg_updates());
1237 }
1238
1239 fn update_http_config(&mut self) {
1240 let webhook_request_limit = self
1241 .catalog()
1242 .system_config()
1243 .webhook_concurrent_request_limit();
1244 self.webhook_concurrency_limit
1245 .set_limit(webhook_request_limit);
1246 }
1247
1248 pub(crate) async fn create_storage_export(
1249 &mut self,
1250 id: GlobalId,
1251 sink: &Sink,
1252 ) -> Result<(), AdapterError> {
1253 self.controller.storage.check_exists(sink.from)?;
1255
1256 let id_bundle = crate::CollectionIdBundle {
1263 storage_ids: btreeset! {sink.from},
1264 compute_ids: btreemap! {},
1265 };
1266
1267 let read_holds = self.acquire_read_holds(&id_bundle);
1275 let as_of = read_holds.least_valid_read();
1276
1277 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1278 let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1279 from: sink.from,
1280 from_desc: storage_sink_from_entry
1281 .desc(&self.catalog().resolve_full_name(
1282 storage_sink_from_entry.name(),
1283 storage_sink_from_entry.conn_id(),
1284 ))
1285 .expect("sinks can only be built on items with descs")
1286 .into_owned(),
1287 connection: sink
1288 .connection
1289 .clone()
1290 .into_inline_connection(self.catalog().state()),
1291 envelope: sink.envelope,
1292 as_of,
1293 with_snapshot: sink.with_snapshot,
1294 version: sink.version,
1295 from_storage_metadata: (),
1296 to_storage_metadata: (),
1297 };
1298
1299 let collection_desc = CollectionDescription {
1300 desc: KAFKA_PROGRESS_DESC.clone(),
1302 data_source: DataSource::Sink {
1303 desc: ExportDescription {
1304 sink: storage_sink_desc,
1305 instance_id: sink.cluster_id,
1306 },
1307 },
1308 since: None,
1309 status_collection_id: None,
1310 timeline: None,
1311 };
1312 let collections = vec![(id, collection_desc)];
1313
1314 let storage_metadata = self.catalog.state().storage_metadata();
1316 let res = self
1317 .controller
1318 .storage
1319 .create_collections(storage_metadata, None, collections)
1320 .await;
1321
1322 drop(read_holds);
1325
1326 Ok(res?)
1327 }
1328
1329 fn validate_resource_limits(
1332 &self,
1333 ops: &Vec<catalog::Op>,
1334 conn_id: &ConnectionId,
1335 ) -> Result<(), AdapterError> {
1336 let mut new_kafka_connections = 0;
1337 let mut new_postgres_connections = 0;
1338 let mut new_mysql_connections = 0;
1339 let mut new_sql_server_connections = 0;
1340 let mut new_aws_privatelink_connections = 0;
1341 let mut new_tables = 0;
1342 let mut new_sources = 0;
1343 let mut new_sinks = 0;
1344 let mut new_materialized_views = 0;
1345 let mut new_clusters = 0;
1346 let mut new_replicas_per_cluster = BTreeMap::new();
1347 let mut new_credit_consumption_rate = Numeric::zero();
1348 let mut new_databases = 0;
1349 let mut new_schemas_per_database = BTreeMap::new();
1350 let mut new_objects_per_schema = BTreeMap::new();
1351 let mut new_secrets = 0;
1352 let mut new_roles = 0;
1353 let mut new_continual_tasks = 0;
1354 let mut new_network_policies = 0;
1355 for op in ops {
1356 match op {
1357 Op::CreateDatabase { .. } => {
1358 new_databases += 1;
1359 }
1360 Op::CreateSchema { database_id, .. } => {
1361 if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1362 *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1363 }
1364 }
1365 Op::CreateRole { .. } => {
1366 new_roles += 1;
1367 }
1368 Op::CreateNetworkPolicy { .. } => {
1369 new_network_policies += 1;
1370 }
1371 Op::CreateCluster { .. } => {
1372 new_clusters += 1;
1376 }
1377 Op::CreateClusterReplica {
1378 cluster_id, config, ..
1379 } => {
1380 if cluster_id.is_user() {
1381 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1382 if let ReplicaLocation::Managed(location) = &config.location {
1383 let replica_allocation = self
1384 .catalog()
1385 .cluster_replica_sizes()
1386 .0
1387 .get(location.size_for_billing())
1388 .expect(
1389 "location size is validated against the cluster replica sizes",
1390 );
1391 new_credit_consumption_rate += replica_allocation.credits_per_hour
1392 }
1393 }
1394 }
1395 Op::CreateItem { name, item, .. } => {
1396 *new_objects_per_schema
1397 .entry((
1398 name.qualifiers.database_spec.clone(),
1399 name.qualifiers.schema_spec.clone(),
1400 ))
1401 .or_insert(0) += 1;
1402 match item {
1403 CatalogItem::Connection(connection) => match connection.details {
1404 ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1405 ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1406 ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1407 ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1408 ConnectionDetails::AwsPrivatelink(_) => {
1409 new_aws_privatelink_connections += 1
1410 }
1411 ConnectionDetails::Csr(_)
1412 | ConnectionDetails::Ssh { .. }
1413 | ConnectionDetails::Aws(_)
1414 | ConnectionDetails::IcebergCatalog(_) => {}
1415 },
1416 CatalogItem::Table(_) => {
1417 new_tables += 1;
1418 }
1419 CatalogItem::Source(source) => {
1420 new_sources += source.user_controllable_persist_shard_count()
1421 }
1422 CatalogItem::Sink(_) => new_sinks += 1,
1423 CatalogItem::MaterializedView(_) => {
1424 new_materialized_views += 1;
1425 }
1426 CatalogItem::Secret(_) => {
1427 new_secrets += 1;
1428 }
1429 CatalogItem::ContinualTask(_) => {
1430 new_continual_tasks += 1;
1431 }
1432 CatalogItem::Log(_)
1433 | CatalogItem::View(_)
1434 | CatalogItem::Index(_)
1435 | CatalogItem::Type(_)
1436 | CatalogItem::Func(_) => {}
1437 }
1438 }
1439 Op::DropObjects(drop_object_infos) => {
1440 for drop_object_info in drop_object_infos {
1441 match drop_object_info {
1442 DropObjectInfo::Cluster(_) => {
1443 new_clusters -= 1;
1444 }
1445 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1446 if cluster_id.is_user() {
1447 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1448 let cluster = self
1449 .catalog()
1450 .get_cluster_replica(*cluster_id, *replica_id);
1451 if let ReplicaLocation::Managed(location) =
1452 &cluster.config.location
1453 {
1454 let replica_allocation = self
1455 .catalog()
1456 .cluster_replica_sizes()
1457 .0
1458 .get(location.size_for_billing())
1459 .expect(
1460 "location size is validated against the cluster replica sizes",
1461 );
1462 new_credit_consumption_rate -=
1463 replica_allocation.credits_per_hour
1464 }
1465 }
1466 }
1467 DropObjectInfo::Database(_) => {
1468 new_databases -= 1;
1469 }
1470 DropObjectInfo::Schema((database_spec, _)) => {
1471 if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1472 *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1473 }
1474 }
1475 DropObjectInfo::Role(_) => {
1476 new_roles -= 1;
1477 }
1478 DropObjectInfo::NetworkPolicy(_) => {
1479 new_network_policies -= 1;
1480 }
1481 DropObjectInfo::Item(id) => {
1482 let entry = self.catalog().get_entry(id);
1483 *new_objects_per_schema
1484 .entry((
1485 entry.name().qualifiers.database_spec.clone(),
1486 entry.name().qualifiers.schema_spec.clone(),
1487 ))
1488 .or_insert(0) -= 1;
1489 match entry.item() {
1490 CatalogItem::Connection(connection) => match connection.details
1491 {
1492 ConnectionDetails::AwsPrivatelink(_) => {
1493 new_aws_privatelink_connections -= 1;
1494 }
1495 _ => (),
1496 },
1497 CatalogItem::Table(_) => {
1498 new_tables -= 1;
1499 }
1500 CatalogItem::Source(source) => {
1501 new_sources -=
1502 source.user_controllable_persist_shard_count()
1503 }
1504 CatalogItem::Sink(_) => new_sinks -= 1,
1505 CatalogItem::MaterializedView(_) => {
1506 new_materialized_views -= 1;
1507 }
1508 CatalogItem::Secret(_) => {
1509 new_secrets -= 1;
1510 }
1511 CatalogItem::ContinualTask(_) => {
1512 new_continual_tasks -= 1;
1513 }
1514 CatalogItem::Log(_)
1515 | CatalogItem::View(_)
1516 | CatalogItem::Index(_)
1517 | CatalogItem::Type(_)
1518 | CatalogItem::Func(_) => {}
1519 }
1520 }
1521 }
1522 }
1523 }
1524 Op::UpdateItem {
1525 name: _,
1526 id,
1527 to_item,
1528 } => match to_item {
1529 CatalogItem::Source(source) => {
1530 let current_source = self
1531 .catalog()
1532 .get_entry(id)
1533 .source()
1534 .expect("source update is for source item");
1535
1536 new_sources += source.user_controllable_persist_shard_count()
1537 - current_source.user_controllable_persist_shard_count();
1538 }
1539 CatalogItem::Connection(_)
1540 | CatalogItem::Table(_)
1541 | CatalogItem::Sink(_)
1542 | CatalogItem::MaterializedView(_)
1543 | CatalogItem::Secret(_)
1544 | CatalogItem::Log(_)
1545 | CatalogItem::View(_)
1546 | CatalogItem::Index(_)
1547 | CatalogItem::Type(_)
1548 | CatalogItem::Func(_)
1549 | CatalogItem::ContinualTask(_) => {}
1550 },
1551 Op::AlterRole { .. }
1552 | Op::AlterRetainHistory { .. }
1553 | Op::AlterNetworkPolicy { .. }
1554 | Op::AlterAddColumn { .. }
1555 | Op::UpdatePrivilege { .. }
1556 | Op::UpdateDefaultPrivilege { .. }
1557 | Op::GrantRole { .. }
1558 | Op::RenameCluster { .. }
1559 | Op::RenameClusterReplica { .. }
1560 | Op::RenameItem { .. }
1561 | Op::RenameSchema { .. }
1562 | Op::UpdateOwner { .. }
1563 | Op::RevokeRole { .. }
1564 | Op::UpdateClusterConfig { .. }
1565 | Op::UpdateClusterReplicaConfig { .. }
1566 | Op::UpdateSourceReferences { .. }
1567 | Op::UpdateSystemConfiguration { .. }
1568 | Op::ResetSystemConfiguration { .. }
1569 | Op::ResetAllSystemConfiguration { .. }
1570 | Op::Comment { .. }
1571 | Op::WeirdStorageUsageUpdates { .. }
1572 | Op::TransactionDryRun => {}
1573 }
1574 }
1575
1576 let mut current_aws_privatelink_connections = 0;
1577 let mut current_postgres_connections = 0;
1578 let mut current_mysql_connections = 0;
1579 let mut current_sql_server_connections = 0;
1580 let mut current_kafka_connections = 0;
1581 for c in self.catalog().user_connections() {
1582 let connection = c
1583 .connection()
1584 .expect("`user_connections()` only returns connection objects");
1585
1586 match connection.details {
1587 ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1588 ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1589 ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1590 ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1591 ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1592 ConnectionDetails::Csr(_)
1593 | ConnectionDetails::Ssh { .. }
1594 | ConnectionDetails::Aws(_)
1595 | ConnectionDetails::IcebergCatalog(_) => {}
1596 }
1597 }
1598 self.validate_resource_limit(
1599 current_kafka_connections,
1600 new_kafka_connections,
1601 SystemVars::max_kafka_connections,
1602 "Kafka Connection",
1603 MAX_KAFKA_CONNECTIONS.name(),
1604 )?;
1605 self.validate_resource_limit(
1606 current_postgres_connections,
1607 new_postgres_connections,
1608 SystemVars::max_postgres_connections,
1609 "PostgreSQL Connection",
1610 MAX_POSTGRES_CONNECTIONS.name(),
1611 )?;
1612 self.validate_resource_limit(
1613 current_mysql_connections,
1614 new_mysql_connections,
1615 SystemVars::max_mysql_connections,
1616 "MySQL Connection",
1617 MAX_MYSQL_CONNECTIONS.name(),
1618 )?;
1619 self.validate_resource_limit(
1620 current_sql_server_connections,
1621 new_sql_server_connections,
1622 SystemVars::max_sql_server_connections,
1623 "SQL Server Connection",
1624 MAX_SQL_SERVER_CONNECTIONS.name(),
1625 )?;
1626 self.validate_resource_limit(
1627 current_aws_privatelink_connections,
1628 new_aws_privatelink_connections,
1629 SystemVars::max_aws_privatelink_connections,
1630 "AWS PrivateLink Connection",
1631 MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1632 )?;
1633 self.validate_resource_limit(
1634 self.catalog().user_tables().count(),
1635 new_tables,
1636 SystemVars::max_tables,
1637 "table",
1638 MAX_TABLES.name(),
1639 )?;
1640
1641 let current_sources: usize = self
1642 .catalog()
1643 .user_sources()
1644 .filter_map(|source| source.source())
1645 .map(|source| source.user_controllable_persist_shard_count())
1646 .sum::<i64>()
1647 .try_into()
1648 .expect("non-negative sum of sources");
1649
1650 self.validate_resource_limit(
1651 current_sources,
1652 new_sources,
1653 SystemVars::max_sources,
1654 "source",
1655 MAX_SOURCES.name(),
1656 )?;
1657 self.validate_resource_limit(
1658 self.catalog().user_sinks().count(),
1659 new_sinks,
1660 SystemVars::max_sinks,
1661 "sink",
1662 MAX_SINKS.name(),
1663 )?;
1664 self.validate_resource_limit(
1665 self.catalog().user_materialized_views().count(),
1666 new_materialized_views,
1667 SystemVars::max_materialized_views,
1668 "materialized view",
1669 MAX_MATERIALIZED_VIEWS.name(),
1670 )?;
1671 self.validate_resource_limit(
1672 self.catalog().user_clusters().count(),
1678 new_clusters,
1679 SystemVars::max_clusters,
1680 "cluster",
1681 MAX_CLUSTERS.name(),
1682 )?;
1683 for (cluster_id, new_replicas) in new_replicas_per_cluster {
1684 let current_amount = self
1686 .catalog()
1687 .try_get_cluster(cluster_id)
1688 .map(|instance| instance.user_replicas().count())
1689 .unwrap_or(0);
1690 self.validate_resource_limit(
1691 current_amount,
1692 new_replicas,
1693 SystemVars::max_replicas_per_cluster,
1694 "cluster replica",
1695 MAX_REPLICAS_PER_CLUSTER.name(),
1696 )?;
1697 }
1698 let current_credit_consumption_rate = self
1699 .catalog()
1700 .user_cluster_replicas()
1701 .filter_map(|replica| match &replica.config.location {
1702 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
1703 ReplicaLocation::Unmanaged(_) => None,
1704 })
1705 .map(|size| {
1706 self.catalog()
1707 .cluster_replica_sizes()
1708 .0
1709 .get(size)
1710 .expect("location size is validated against the cluster replica sizes")
1711 .credits_per_hour
1712 })
1713 .sum();
1714 self.validate_resource_limit_numeric(
1715 current_credit_consumption_rate,
1716 new_credit_consumption_rate,
1717 |system_vars| {
1718 self.license_key
1719 .max_credit_consumption_rate()
1720 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1721 },
1722 "cluster replica",
1723 MAX_CREDIT_CONSUMPTION_RATE.name(),
1724 )?;
1725 self.validate_resource_limit(
1726 self.catalog().databases().count(),
1727 new_databases,
1728 SystemVars::max_databases,
1729 "database",
1730 MAX_DATABASES.name(),
1731 )?;
1732 for (database_id, new_schemas) in new_schemas_per_database {
1733 self.validate_resource_limit(
1734 self.catalog().get_database(database_id).schemas_by_id.len(),
1735 new_schemas,
1736 SystemVars::max_schemas_per_database,
1737 "schema",
1738 MAX_SCHEMAS_PER_DATABASE.name(),
1739 )?;
1740 }
1741 for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1742 self.validate_resource_limit(
1743 self.catalog()
1744 .get_schema(&database_spec, &schema_spec, conn_id)
1745 .items
1746 .len(),
1747 new_objects,
1748 SystemVars::max_objects_per_schema,
1749 "object",
1750 MAX_OBJECTS_PER_SCHEMA.name(),
1751 )?;
1752 }
1753 self.validate_resource_limit(
1754 self.catalog().user_secrets().count(),
1755 new_secrets,
1756 SystemVars::max_secrets,
1757 "secret",
1758 MAX_SECRETS.name(),
1759 )?;
1760 self.validate_resource_limit(
1761 self.catalog().user_roles().count(),
1762 new_roles,
1763 SystemVars::max_roles,
1764 "role",
1765 MAX_ROLES.name(),
1766 )?;
1767 self.validate_resource_limit(
1768 self.catalog().user_continual_tasks().count(),
1769 new_continual_tasks,
1770 SystemVars::max_continual_tasks,
1771 "continual_task",
1772 MAX_CONTINUAL_TASKS.name(),
1773 )?;
1774 self.validate_resource_limit(
1775 self.catalog().user_continual_tasks().count(),
1776 new_network_policies,
1777 SystemVars::max_network_policies,
1778 "network_policy",
1779 MAX_NETWORK_POLICIES.name(),
1780 )?;
1781 Ok(())
1782 }
1783
1784 pub(crate) fn validate_resource_limit<F>(
1786 &self,
1787 current_amount: usize,
1788 new_instances: i64,
1789 resource_limit: F,
1790 resource_type: &str,
1791 limit_name: &str,
1792 ) -> Result<(), AdapterError>
1793 where
1794 F: Fn(&SystemVars) -> u32,
1795 {
1796 if new_instances <= 0 {
1797 return Ok(());
1798 }
1799
1800 let limit: i64 = resource_limit(self.catalog().system_config()).into();
1801 let current_amount: Option<i64> = current_amount.try_into().ok();
1802 let desired =
1803 current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1804
1805 let exceeds_limit = if let Some(desired) = desired {
1806 desired > limit
1807 } else {
1808 true
1809 };
1810
1811 let desired = desired
1812 .map(|desired| desired.to_string())
1813 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1814 let current = current_amount
1815 .map(|current| current.to_string())
1816 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1817 if exceeds_limit {
1818 Err(AdapterError::ResourceExhaustion {
1819 resource_type: resource_type.to_string(),
1820 limit_name: limit_name.to_string(),
1821 desired,
1822 limit: limit.to_string(),
1823 current,
1824 })
1825 } else {
1826 Ok(())
1827 }
1828 }
1829
1830 fn validate_resource_limit_numeric<F>(
1834 &self,
1835 current_amount: Numeric,
1836 new_amount: Numeric,
1837 resource_limit: F,
1838 resource_type: &str,
1839 limit_name: &str,
1840 ) -> Result<(), AdapterError>
1841 where
1842 F: Fn(&SystemVars) -> Numeric,
1843 {
1844 if new_amount <= Numeric::zero() {
1845 return Ok(());
1846 }
1847
1848 let limit = resource_limit(self.catalog().system_config());
1849 let desired = current_amount + new_amount;
1853 if desired > limit {
1854 Err(AdapterError::ResourceExhaustion {
1855 resource_type: resource_type.to_string(),
1856 limit_name: limit_name.to_string(),
1857 desired: desired.to_string(),
1858 limit: limit.to_string(),
1859 current: current_amount.to_string(),
1860 })
1861 } else {
1862 Ok(())
1863 }
1864 }
1865}