1use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use fail::fail_point;
18use futures::Future;
19use maplit::{btreemap, btreeset};
20use mz_adapter_types::compaction::SINCE_GRANULARITY;
21use mz_adapter_types::connection::ConnectionId;
22use mz_audit_log::VersionedEvent;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::memory::objects::{CatalogItem, Connection, DataSourceDesc, Sink};
25use mz_compute_client::protocol::response::PeekResponse;
26use mz_controller::clusters::ReplicaLocation;
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::instrument;
31use mz_ore::now::to_datetime;
32use mz_ore::retry::Retry;
33use mz_ore::str::StrExt;
34use mz_ore::task;
35use mz_postgres_util::tunnel::PostgresFlavor;
36use mz_repr::adt::numeric::Numeric;
37use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
38use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, CatalogSchema};
39use mz_sql::names::ResolvedDatabaseSpecifier;
40use mz_sql::plan::ConnectionDetails;
41use mz_sql::session::metadata::SessionMetadata;
42use mz_sql::session::vars::{
43 self, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
44 MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
45 MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
46 MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
47 MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
48};
49use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
50use mz_storage_types::connections::PostgresConnection;
51use mz_storage_types::connections::inline::IntoInlineConnection;
52use mz_storage_types::read_policy::ReadPolicy;
53use mz_storage_types::sources::GenericSourceConnection;
54use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
55use serde_json::json;
56use tracing::{Instrument, Level, event, info_span, warn};
57
58use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
59use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
60use crate::coord::appends::BuiltinTableAppendNotify;
61use crate::coord::timeline::{TimelineContext, TimelineState};
62use crate::coord::{Coordinator, ReplicaMetadata};
63use crate::session::{Session, Transaction, TransactionOps};
64use crate::statement_logging::StatementEndedExecutionReason;
65use crate::telemetry::{EventDetails, SegmentClientExt};
66use crate::util::ResultExt;
67use crate::{AdapterError, TimestampProvider, catalog, flags};
68
69impl Coordinator {
70 #[instrument(name = "coord::catalog_transact")]
72 pub(crate) async fn catalog_transact(
73 &mut self,
74 session: Option<&Session>,
75 ops: Vec<catalog::Op>,
76 ) -> Result<(), AdapterError> {
77 self.catalog_transact_conn(session.map(|session| session.conn_id()), ops)
78 .await
79 }
80
81 #[instrument(name = "coord::catalog_transact_with_side_effects")]
85 pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>(
86 &'c mut self,
87 session: Option<&Session>,
88 ops: Vec<catalog::Op>,
89 side_effect: F,
90 ) -> Result<(), AdapterError>
91 where
92 F: FnOnce(&'c mut Coordinator) -> Fut,
93 Fut: Future<Output = ()>,
94 {
95 let table_updates = self
96 .catalog_transact_inner(session.map(|session| session.conn_id()), ops)
97 .await?;
98 let side_effects_fut = side_effect(self);
99
100 let ((), ()) = futures::future::join(
102 side_effects_fut.instrument(info_span!(
103 "coord::catalog_transact_with_side_effects::side_effects_fut"
104 )),
105 table_updates.instrument(info_span!(
106 "coord::catalog_transact_with_side_effects::table_updates"
107 )),
108 )
109 .await;
110
111 Ok(())
112 }
113
114 #[instrument(name = "coord::catalog_transact_conn")]
116 pub(crate) async fn catalog_transact_conn(
117 &mut self,
118 conn_id: Option<&ConnectionId>,
119 ops: Vec<catalog::Op>,
120 ) -> Result<(), AdapterError> {
121 let table_updates = self.catalog_transact_inner(conn_id, ops).await?;
122 table_updates
123 .instrument(info_span!("coord::catalog_transact_conn::table_updates"))
124 .await;
125 Ok(())
126 }
127
128 #[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
131 pub(crate) async fn catalog_transact_with_ddl_transaction(
132 &mut self,
133 session: &mut Session,
134 ops: Vec<catalog::Op>,
135 ) -> Result<(), AdapterError> {
136 let Some(Transaction {
137 ops:
138 TransactionOps::DDL {
139 ops: txn_ops,
140 revision: txn_revision,
141 state: _,
142 },
143 ..
144 }) = session.transaction().inner()
145 else {
146 return self.catalog_transact(Some(session), ops).await;
147 };
148
149 if self.catalog().transient_revision() != *txn_revision {
151 return Err(AdapterError::DDLTransactionRace);
152 }
153
154 let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
156 all_ops.extend(txn_ops.iter().cloned());
157 all_ops.extend(ops.clone());
158 all_ops.push(Op::TransactionDryRun);
159
160 let result = self.catalog_transact(Some(session), all_ops).await;
162
163 match result {
164 Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
166 session.transaction_mut().add_ops(TransactionOps::DDL {
169 ops: new_ops,
170 state: new_state,
171 revision: self.catalog().transient_revision(),
172 })?;
173 Ok(())
174 }
175 Ok(_) => unreachable!("unexpected success!"),
176 Err(e) => Err(e),
177 }
178 }
179
180 #[instrument(name = "coord::catalog_transact_inner")]
184 pub(crate) async fn catalog_transact_inner<'a>(
185 &mut self,
186 conn_id: Option<&ConnectionId>,
187 ops: Vec<catalog::Op>,
188 ) -> Result<BuiltinTableAppendNotify, AdapterError> {
189 if self.controller.read_only() {
190 return Err(AdapterError::ReadOnly);
191 }
192
193 event!(Level::TRACE, ops = format!("{:?}", ops));
194
195 let mut sources_to_drop = vec![];
196 let mut webhook_sources_to_restart = BTreeSet::new();
197 let mut table_gids_to_drop = vec![];
198 let mut storage_sink_gids_to_drop = vec![];
199 let mut indexes_to_drop = vec![];
200 let mut materialized_views_to_drop = vec![];
201 let mut continual_tasks_to_drop = vec![];
202 let mut views_to_drop = vec![];
203 let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
204 let mut secrets_to_drop = vec![];
205 let mut vpc_endpoints_to_drop = vec![];
206 let mut clusters_to_drop = vec![];
207 let mut cluster_replicas_to_drop = vec![];
208 let mut compute_sinks_to_drop = BTreeMap::new();
209 let mut peeks_to_drop = vec![];
210 let mut copies_to_drop = vec![];
211 let mut clusters_to_create = vec![];
212 let mut cluster_replicas_to_create = vec![];
213 let mut update_metrics_config = false;
214 let mut update_tracing_config = false;
215 let mut update_compute_config = false;
216 let mut update_storage_config = false;
217 let mut update_pg_timestamp_oracle_config = false;
218 let mut update_metrics_retention = false;
219 let mut update_secrets_caching_config = false;
220 let mut update_cluster_scheduling_config = false;
221 let mut update_arrangement_exert_proportionality = false;
222 let mut update_http_config = false;
223
224 for op in &ops {
225 match op {
226 catalog::Op::DropObjects(drop_object_infos) => {
227 for drop_object_info in drop_object_infos {
228 match &drop_object_info {
229 catalog::DropObjectInfo::Item(id) => {
230 match self.catalog().get_entry(id).item() {
231 CatalogItem::Table(table) => {
232 table_gids_to_drop
233 .extend(table.global_ids().map(|gid| (*id, gid)));
234 }
235 CatalogItem::Source(source) => {
236 sources_to_drop.push((*id, source.global_id()));
237 if let DataSourceDesc::Ingestion {
238 ingestion_desc, ..
239 } = &source.data_source
240 {
241 match &ingestion_desc.desc.connection {
242 GenericSourceConnection::Postgres(conn) => {
243 let conn = conn.clone().into_inline_connection(
244 self.catalog().state(),
245 );
246 let pending_drop = (
247 conn.connection.clone(),
248 conn.publication_details.slot.clone(),
249 );
250 replication_slots_to_drop.push(pending_drop);
251 }
252 _ => {}
253 }
254 }
255 }
256 CatalogItem::Sink(sink) => {
257 storage_sink_gids_to_drop.push(sink.global_id());
258 }
259 CatalogItem::Index(index) => {
260 indexes_to_drop.push((index.cluster_id, index.global_id()));
261 }
262 CatalogItem::MaterializedView(mv) => {
263 materialized_views_to_drop
264 .push((mv.cluster_id, mv.global_id()));
265 }
266 CatalogItem::View(view) => {
267 views_to_drop.push((*id, view.clone()))
268 }
269 CatalogItem::ContinualTask(ct) => {
270 continual_tasks_to_drop.push((
271 *id,
272 ct.cluster_id,
273 ct.global_id(),
274 ));
275 }
276 CatalogItem::Secret(_) => {
277 secrets_to_drop.push(*id);
278 }
279 CatalogItem::Connection(Connection { details, .. }) => {
280 match details {
281 ConnectionDetails::Ssh { .. } => {
283 secrets_to_drop.push(*id);
284 }
285 ConnectionDetails::AwsPrivatelink(_) => {
288 vpc_endpoints_to_drop.push(*id);
289 }
290 _ => (),
291 }
292 }
293 _ => (),
294 }
295 }
296 catalog::DropObjectInfo::Cluster(id) => {
297 clusters_to_drop.push(*id);
298 }
299 catalog::DropObjectInfo::ClusterReplica((
300 cluster_id,
301 replica_id,
302 _reason,
303 )) => {
304 cluster_replicas_to_drop.push((*cluster_id, *replica_id));
306 }
307 _ => (),
308 }
309 }
310 }
311 catalog::Op::ResetSystemConfiguration { name }
312 | catalog::Op::UpdateSystemConfiguration { name, .. } => {
313 update_metrics_config |= self
314 .catalog
315 .state()
316 .system_config()
317 .is_metrics_config_var(name);
318 update_tracing_config |= vars::is_tracing_var(name);
319 update_compute_config |= self
320 .catalog
321 .state()
322 .system_config()
323 .is_compute_config_var(name);
324 update_storage_config |= self
325 .catalog
326 .state()
327 .system_config()
328 .is_storage_config_var(name);
329 update_pg_timestamp_oracle_config |=
330 vars::is_pg_timestamp_oracle_config_var(name);
331 update_metrics_retention |= name == vars::METRICS_RETENTION.name();
332 update_secrets_caching_config |= vars::is_secrets_caching_var(name);
333 update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
334 update_arrangement_exert_proportionality |=
335 name == vars::ARRANGEMENT_EXERT_PROPORTIONALITY.name();
336 update_http_config |= vars::is_http_config_var(name);
337 }
338 catalog::Op::ResetAllSystemConfiguration => {
339 update_tracing_config = true;
343 update_compute_config = true;
344 update_storage_config = true;
345 update_pg_timestamp_oracle_config = true;
346 update_metrics_retention = true;
347 update_secrets_caching_config = true;
348 update_cluster_scheduling_config = true;
349 update_arrangement_exert_proportionality = true;
350 update_http_config = true;
351 }
352 catalog::Op::RenameItem { id, .. } => {
353 let item = self.catalog().get_entry(id);
354 let is_webhook_source = item
355 .source()
356 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
357 .unwrap_or(false);
358 if is_webhook_source {
359 webhook_sources_to_restart.insert(*id);
360 }
361 }
362 catalog::Op::RenameSchema {
363 database_spec,
364 schema_spec,
365 ..
366 } => {
367 let schema = self.catalog().get_schema(
368 database_spec,
369 schema_spec,
370 conn_id.unwrap_or(&SYSTEM_CONN_ID),
371 );
372 let webhook_sources = schema.item_ids().filter(|id| {
373 let item = self.catalog().get_entry(id);
374 item.source()
375 .map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
376 .unwrap_or(false)
377 });
378 webhook_sources_to_restart.extend(webhook_sources);
379 }
380 catalog::Op::CreateCluster { id, .. } => {
381 clusters_to_create.push(*id);
382 }
383 catalog::Op::CreateClusterReplica {
384 cluster_id,
385 name,
386 config,
387 ..
388 } => {
389 cluster_replicas_to_create.push((
390 *cluster_id,
391 name.clone(),
392 config.location.num_processes(),
393 ));
394 }
395 _ => (),
396 }
397 }
398
399 let collections_to_drop: BTreeSet<GlobalId> = sources_to_drop
400 .iter()
401 .map(|(_, gid)| *gid)
402 .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
403 .chain(storage_sink_gids_to_drop.iter().copied())
404 .chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
405 .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
406 .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
407 .chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
408 .collect();
409
410 for (sink_id, sink) in &self.active_compute_sinks {
412 let cluster_id = sink.cluster_id();
413 let conn_id = &sink.connection_id();
414 if let Some(id) = sink
415 .depends_on()
416 .iter()
417 .find(|id| collections_to_drop.contains(id))
418 {
419 let entry = self.catalog().get_entry_by_global_id(id);
420 let name = self
421 .catalog()
422 .resolve_full_name(entry.name(), Some(conn_id))
423 .to_string();
424 compute_sinks_to_drop.insert(
425 *sink_id,
426 ActiveComputeSinkRetireReason::DependencyDropped(format!(
427 "relation {}",
428 name.quoted()
429 )),
430 );
431 } else if clusters_to_drop.contains(&cluster_id) {
432 let name = self.catalog().get_cluster(cluster_id).name();
433 compute_sinks_to_drop.insert(
434 *sink_id,
435 ActiveComputeSinkRetireReason::DependencyDropped(format!(
436 "cluster {}",
437 name.quoted()
438 )),
439 );
440 }
441 }
442
443 for (uuid, pending_peek) in &self.pending_peeks {
445 if let Some(id) = pending_peek
446 .depends_on
447 .iter()
448 .find(|id| collections_to_drop.contains(id))
449 {
450 let entry = self.catalog().get_entry_by_global_id(id);
451 let name = self
452 .catalog()
453 .resolve_full_name(entry.name(), Some(&pending_peek.conn_id));
454 peeks_to_drop.push((
455 format!("relation {}", name.to_string().quoted()),
456 uuid.clone(),
457 ));
458 } else if clusters_to_drop.contains(&pending_peek.cluster_id) {
459 let name = self.catalog().get_cluster(pending_peek.cluster_id).name();
460 peeks_to_drop.push((format!("cluster {}", name.quoted()), uuid.clone()));
461 }
462 }
463
464 for (conn_id, pending_copy) in &self.active_copies {
466 let dropping_table = table_gids_to_drop
467 .iter()
468 .any(|(item_id, _gid)| pending_copy.table_id == *item_id);
469 let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
470
471 if dropping_table || dropping_cluster {
472 copies_to_drop.push(conn_id.clone());
473 }
474 }
475
476 let storage_ids_to_drop = sources_to_drop
477 .iter()
478 .map(|(_, gid)| *gid)
479 .chain(storage_sink_gids_to_drop.iter().copied())
480 .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
481 .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
482 .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
483 let compute_ids_to_drop = indexes_to_drop
484 .iter()
485 .copied()
486 .chain(materialized_views_to_drop.iter().copied())
487 .chain(
488 continual_tasks_to_drop
489 .iter()
490 .map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
491 );
492
493 let collection_id_bundle = self.build_collection_id_bundle(
498 storage_ids_to_drop,
499 compute_ids_to_drop,
500 clusters_to_drop.clone(),
501 );
502 let timeline_associations: BTreeMap<_, _> = self
503 .partition_ids_by_timeline_context(&collection_id_bundle)
504 .filter_map(|(context, bundle)| {
505 let TimelineContext::TimelineDependent(timeline) = context else {
506 return None;
507 };
508 let TimelineState { read_holds, .. } = self
509 .global_timelines
510 .get(&timeline)
511 .expect("all timeslines have a timestamp oracle");
512
513 let empty = read_holds.id_bundle().difference(&bundle).is_empty();
514
515 Some((timeline, (empty, bundle)))
516 })
517 .collect();
518
519 self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
520
521 let oracle_write_ts = self.get_local_write_ts().await.timestamp;
530
531 let Coordinator {
532 catalog,
533 active_conns,
534 controller,
535 cluster_replica_statuses,
536 ..
537 } = self;
538 let catalog = Arc::make_mut(catalog);
539 let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
540
541 let TransactionResult {
542 mut builtin_table_updates,
543 audit_events,
544 } = catalog
545 .transact(
546 Some(&mut controller.storage_collections),
547 oracle_write_ts,
548 conn,
549 ops,
550 )
551 .await?;
552
553 for (cluster_id, replica_id) in &cluster_replicas_to_drop {
557 let replica_statuses =
558 cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
559 for (process_id, status) in replica_statuses {
560 let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
561 *replica_id,
562 process_id,
563 &status,
564 Diff::MINUS_ONE,
565 );
566 let builtin_table_update = catalog
567 .state()
568 .resolve_builtin_table_update(builtin_table_update);
569 builtin_table_updates.push(builtin_table_update);
570 }
571 }
572 for cluster_id in &clusters_to_drop {
573 let cluster_statuses = cluster_replica_statuses.remove_cluster_statuses(cluster_id);
574 for (replica_id, replica_statuses) in cluster_statuses {
575 for (process_id, status) in replica_statuses {
576 let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
577 replica_id,
578 process_id,
579 &status,
580 Diff::MINUS_ONE,
581 );
582 let builtin_table_update = catalog
583 .state()
584 .resolve_builtin_table_update(builtin_table_update);
585 builtin_table_updates.push(builtin_table_update);
586 }
587 }
588 }
589 for cluster_id in clusters_to_create {
590 cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
591 for (replica_id, replica_statuses) in
592 cluster_replica_statuses.get_cluster_statuses(cluster_id)
593 {
594 for (process_id, status) in replica_statuses {
595 let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
596 *replica_id,
597 *process_id,
598 status,
599 Diff::ONE,
600 );
601 let builtin_table_update = catalog
602 .state()
603 .resolve_builtin_table_update(builtin_table_update);
604 builtin_table_updates.push(builtin_table_update);
605 }
606 }
607 }
608 let now = to_datetime((catalog.config().now)());
609 for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
610 let replica_id = catalog
611 .resolve_replica_in_cluster(&cluster_id, &replica_name)
612 .expect("just created")
613 .replica_id();
614 cluster_replica_statuses.initialize_cluster_replica_statuses(
615 cluster_id,
616 replica_id,
617 num_processes,
618 now,
619 );
620 for (process_id, status) in
621 cluster_replica_statuses.get_cluster_replica_statuses(cluster_id, replica_id)
622 {
623 let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
624 replica_id,
625 *process_id,
626 status,
627 Diff::ONE,
628 );
629 let builtin_table_update = catalog
630 .state()
631 .resolve_builtin_table_update(builtin_table_update);
632 builtin_table_updates.push(builtin_table_update);
633 }
634 }
635
636 let (builtin_update_notify, _) = self
639 .builtin_table_update()
640 .execute(builtin_table_updates)
641 .await;
642
643 let _: () = async {
646 if !timeline_associations.is_empty() {
647 for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
648 let became_empty =
649 self.remove_resources_associated_with_timeline(timeline, id_bundle);
650 assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
651 }
652 }
653 if !table_gids_to_drop.is_empty() {
654 let ts = self.get_local_write_ts().await;
655 self.drop_tables(table_gids_to_drop, ts.timestamp);
656 }
657 if !sources_to_drop.is_empty() {
663 self.drop_sources(sources_to_drop);
664 }
665 if !webhook_sources_to_restart.is_empty() {
666 self.restart_webhook_sources(webhook_sources_to_restart);
667 }
668 if !storage_sink_gids_to_drop.is_empty() {
669 self.drop_storage_sinks(storage_sink_gids_to_drop);
670 }
671 if !compute_sinks_to_drop.is_empty() {
672 self.retire_compute_sinks(compute_sinks_to_drop).await;
673 }
674 if !peeks_to_drop.is_empty() {
675 for (dropped_name, uuid) in peeks_to_drop {
676 if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
677 let cancel_reason = PeekResponse::Error(format!(
678 "query could not complete because {dropped_name} was dropped"
679 ));
680 self.controller
681 .compute
682 .cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
683 .unwrap_or_terminate("unable to cancel peek");
684 self.retire_execution(
685 StatementEndedExecutionReason::Canceled,
686 pending_peek.ctx_extra,
687 );
688 }
689 }
690 }
691 if !copies_to_drop.is_empty() {
692 for conn_id in copies_to_drop {
693 self.cancel_pending_copy(&conn_id);
694 }
695 }
696 if !indexes_to_drop.is_empty() {
697 self.drop_indexes(indexes_to_drop);
698 }
699 if !materialized_views_to_drop.is_empty() {
700 self.drop_materialized_views(materialized_views_to_drop);
701 }
702 if !continual_tasks_to_drop.is_empty() {
703 self.drop_continual_tasks(continual_tasks_to_drop);
704 }
705 if !vpc_endpoints_to_drop.is_empty() {
706 self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
707 }
708 if !cluster_replicas_to_drop.is_empty() {
709 fail::fail_point!("after_catalog_drop_replica");
710 for (cluster_id, replica_id) in cluster_replicas_to_drop {
711 self.drop_replica(cluster_id, replica_id);
712 }
713 }
714 if !clusters_to_drop.is_empty() {
715 for cluster_id in clusters_to_drop {
716 self.controller.drop_cluster(cluster_id);
717 }
718 }
719
720 task::spawn(|| "drop_replication_slots_and_secrets", {
728 let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
729 let secrets_controller = Arc::clone(&self.secrets_controller);
730 let secrets_reader = Arc::clone(self.secrets_reader());
731 let storage_config = self.controller.storage.config().clone();
732
733 async move {
734 for (connection, replication_slot_name) in replication_slots_to_drop {
735 tracing::info!(?replication_slot_name, "dropping replication slot");
736
737 let result: Result<(), anyhow::Error> = Retry::default()
743 .max_duration(Duration::from_secs(60))
744 .retry_async(|_state| async {
745 let config = connection
746 .config(&secrets_reader, &storage_config, InTask::No)
747 .await
748 .map_err(|e| {
749 anyhow::anyhow!(
750 "error creating Postgres client for \
751 dropping acquired slots: {}",
752 e.display_with_causes()
753 )
754 })?;
755
756 let should_wait = match connection.flavor {
761 PostgresFlavor::Vanilla => true,
762 PostgresFlavor::Yugabyte => false,
763 };
764 mz_postgres_util::drop_replication_slots(
765 &ssh_tunnel_manager,
766 config.clone(),
767 &[(&replication_slot_name, should_wait)],
768 )
769 .await?;
770
771 Ok(())
772 })
773 .await;
774
775 if let Err(err) = result {
776 tracing::warn!(
777 ?replication_slot_name,
778 ?err,
779 "failed to drop replication slot"
780 );
781 }
782 }
783
784 fail_point!("drop_secrets");
791 for secret in secrets_to_drop {
792 if let Err(e) = secrets_controller.delete(secret).await {
793 warn!("Dropping secrets has encountered an error: {}", e);
794 }
795 }
796 }
797 });
798
799 if update_metrics_config {
800 mz_metrics::update_dyncfg(&self.catalog().system_config().dyncfg_updates());
801 }
802 if update_compute_config {
803 self.update_compute_config();
804 }
805 if update_storage_config {
806 self.update_storage_config();
807 }
808 if update_pg_timestamp_oracle_config {
809 self.update_pg_timestamp_oracle_config();
810 }
811 if update_metrics_retention {
812 self.update_metrics_retention();
813 }
814 if update_tracing_config {
815 self.update_tracing_config();
816 }
817 if update_secrets_caching_config {
818 self.update_secrets_caching_config();
819 }
820 if update_cluster_scheduling_config {
821 self.update_cluster_scheduling_config();
822 }
823 if update_arrangement_exert_proportionality {
824 self.update_arrangement_exert_proportionality();
825 }
826 if update_http_config {
827 self.update_http_config();
828 }
829 }
830 .instrument(info_span!("coord::catalog_transact_with::finalize"))
831 .await;
832
833 let conn = conn_id.and_then(|id| self.active_conns.get(id));
834 if let Some(segment_client) = &self.segment_client {
835 for VersionedEvent::V1(event) in audit_events {
836 let event_type = format!(
837 "{} {}",
838 event.object_type.as_title_case(),
839 event.event_type.as_title_case()
840 );
841 segment_client.environment_track(
842 &self.catalog().config().environment_id,
843 event_type,
844 json!({ "details": event.details.as_json() }),
845 EventDetails {
846 user_id: conn
847 .and_then(|c| c.user().external_metadata.as_ref())
848 .map(|m| m.user_id),
849 application_name: conn.map(|c| c.application_name()),
850 ..Default::default()
851 },
852 );
853 }
854 }
855
856 mz_ore::soft_assert_eq_no_log!(
859 self.check_consistency(),
860 Ok(()),
861 "coordinator inconsistency detected"
862 );
863
864 Ok(builtin_update_notify)
865 }
866
867 fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
868 if let Some(Some(ReplicaMetadata { metrics })) =
869 self.transient_replica_metadata.insert(replica_id, None)
870 {
871 let mut updates = vec![];
872 if let Some(metrics) = metrics {
873 let retractions = self.catalog().state().pack_replica_metric_updates(
874 replica_id,
875 &metrics,
876 Diff::MINUS_ONE,
877 );
878 let retractions = self
879 .catalog()
880 .state()
881 .resolve_builtin_table_updates(retractions);
882 updates.extend(retractions);
883 }
884 let _notify = self.builtin_table_update().background(updates);
886 }
887
888 self.drop_introspection_subscribes(replica_id);
889
890 self.controller
891 .drop_replica(cluster_id, replica_id)
892 .expect("dropping replica must not fail");
893 }
894
895 fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
897 for (item_id, _gid) in &sources {
898 self.active_webhooks.remove(item_id);
899 }
900 let storage_metadata = self.catalog.state().storage_metadata();
901 let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
902 self.controller
903 .storage
904 .drop_sources(storage_metadata, source_gids)
905 .unwrap_or_terminate("cannot fail to drop sources");
906 }
907
908 fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp) {
909 for (item_id, _gid) in &tables {
910 self.active_webhooks.remove(item_id);
911 }
912 let storage_metadata = self.catalog.state().storage_metadata();
913 let table_gids = tables.into_iter().map(|(_id, gid)| gid).collect();
914 self.controller
915 .storage
916 .drop_tables(storage_metadata, table_gids, ts)
917 .unwrap_or_terminate("cannot fail to drop tables");
918 }
919
920 fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
921 for id in sources {
922 self.active_webhooks.remove(&id);
923 }
924 }
925
926 #[must_use]
932 pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
933 self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
934 }
935
936 #[must_use]
945 pub async fn drop_compute_sinks(
946 &mut self,
947 sink_ids: impl IntoIterator<Item = GlobalId>,
948 ) -> BTreeMap<GlobalId, ActiveComputeSink> {
949 let mut by_id = BTreeMap::new();
950 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
951 for sink_id in sink_ids {
952 let sink = match self.remove_active_compute_sink(sink_id).await {
953 None => {
954 tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
955 continue;
956 }
957 Some(sink) => sink,
958 };
959
960 by_cluster
961 .entry(sink.cluster_id())
962 .or_default()
963 .push(sink_id);
964 by_id.insert(sink_id, sink);
965 }
966 for (cluster_id, ids) in by_cluster {
967 let compute = &mut self.controller.compute;
968 if compute.instance_exists(cluster_id) {
970 compute
971 .drop_collections(cluster_id, ids)
972 .unwrap_or_terminate("cannot fail to drop collections");
973 }
974 }
975 by_id
976 }
977
978 pub async fn retire_compute_sinks(
983 &mut self,
984 mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
985 ) {
986 let sink_ids = reasons.keys().cloned();
987 for (id, sink) in self.drop_compute_sinks(sink_ids).await {
988 let reason = reasons
989 .remove(&id)
990 .expect("all returned IDs are in `reasons`");
991 sink.retire(reason);
992 }
993 }
994
995 pub async fn drop_reconfiguration_replicas(
998 &mut self,
999 cluster_ids: BTreeSet<ClusterId>,
1000 ) -> Result<(), AdapterError> {
1001 let pending_cluster_ops: Vec<Op> = cluster_ids
1002 .iter()
1003 .map(|c| {
1004 self.catalog()
1005 .get_cluster(c.clone())
1006 .replicas()
1007 .filter_map(|r| match r.config.location {
1008 ReplicaLocation::Managed(ref l) if l.pending => {
1009 Some(DropObjectInfo::ClusterReplica((
1010 c.clone(),
1011 r.replica_id,
1012 ReplicaCreateDropReason::Manual,
1013 )))
1014 }
1015 _ => None,
1016 })
1017 .collect::<Vec<DropObjectInfo>>()
1018 })
1019 .filter_map(|pending_replica_drop_ops_by_cluster| {
1020 match pending_replica_drop_ops_by_cluster.len() {
1021 0 => None,
1022 _ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
1023 }
1024 })
1025 .collect();
1026 if !pending_cluster_ops.is_empty() {
1027 self.catalog_transact(None, pending_cluster_ops).await?;
1028 }
1029 Ok(())
1030 }
1031
1032 #[mz_ore::instrument(level = "debug")]
1034 pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
1035 self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
1036 .await
1037 }
1038
1039 #[mz_ore::instrument(level = "debug")]
1041 pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
1042 &mut self,
1043 conn_id: &ConnectionId,
1044 ) {
1045 self.retire_cluster_reconfigurations_for_conn(conn_id).await
1046 }
1047
1048 #[mz_ore::instrument(level = "debug")]
1051 pub(crate) async fn retire_compute_sinks_for_conn(
1052 &mut self,
1053 conn_id: &ConnectionId,
1054 reason: ActiveComputeSinkRetireReason,
1055 ) {
1056 let drop_sinks = self
1057 .active_conns
1058 .get_mut(conn_id)
1059 .expect("must exist for active session")
1060 .drop_sinks
1061 .iter()
1062 .map(|sink_id| (*sink_id, reason.clone()))
1063 .collect();
1064 self.retire_compute_sinks(drop_sinks).await;
1065 }
1066
1067 #[mz_ore::instrument(level = "debug")]
1069 pub(crate) async fn retire_cluster_reconfigurations_for_conn(
1070 &mut self,
1071 conn_id: &ConnectionId,
1072 ) {
1073 let reconfiguring_clusters = self
1074 .active_conns
1075 .get(conn_id)
1076 .expect("must exist for active session")
1077 .pending_cluster_alters
1078 .clone();
1079 self.drop_reconfiguration_replicas(reconfiguring_clusters)
1081 .await
1082 .unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
1083
1084 self.active_conns
1085 .get_mut(conn_id)
1086 .expect("must exist for active session")
1087 .pending_cluster_alters
1088 .clear();
1089 }
1090
1091 pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
1092 let storage_metadata = self.catalog.state().storage_metadata();
1093 self.controller
1094 .storage
1095 .drop_sinks(storage_metadata, sink_gids)
1096 .unwrap_or_terminate("cannot fail to drop sinks");
1097 }
1098
1099 pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
1100 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1101 for (cluster_id, gid) in indexes {
1102 by_cluster.entry(cluster_id).or_default().push(gid);
1103 }
1104 for (cluster_id, gids) in by_cluster {
1105 let compute = &mut self.controller.compute;
1106 if compute.instance_exists(cluster_id) {
1108 compute
1109 .drop_collections(cluster_id, gids)
1110 .unwrap_or_terminate("cannot fail to drop collections");
1111 }
1112 }
1113 }
1114
1115 fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
1117 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1118 let mut mv_gids = Vec::new();
1119 for (cluster_id, gid) in mviews {
1120 by_cluster.entry(cluster_id).or_default().push(gid);
1121 mv_gids.push(gid);
1122 }
1123
1124 for (cluster_id, ids) in by_cluster {
1126 let compute = &mut self.controller.compute;
1127 if compute.instance_exists(cluster_id) {
1129 compute
1130 .drop_collections(cluster_id, ids)
1131 .unwrap_or_terminate("cannot fail to drop collections");
1132 }
1133 }
1134
1135 let storage_metadata = self.catalog.state().storage_metadata();
1137 self.controller
1138 .storage
1139 .drop_sources(storage_metadata, mv_gids)
1140 .unwrap_or_terminate("cannot fail to drop sources");
1141 }
1142
1143 fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
1145 let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1146 let mut source_ids = Vec::new();
1147 for (item_id, cluster_id, gid) in cts {
1148 by_cluster.entry(cluster_id).or_default().push(gid);
1149 source_ids.push((item_id, gid));
1150 }
1151
1152 for (cluster_id, ids) in by_cluster {
1154 let compute = &mut self.controller.compute;
1155 if compute.instance_exists(cluster_id) {
1157 compute
1158 .drop_collections(cluster_id, ids)
1159 .unwrap_or_terminate("cannot fail to drop collections");
1160 }
1161 }
1162
1163 self.drop_sources(source_ids)
1165 }
1166
1167 fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
1168 let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
1169 .as_ref()
1170 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
1171 .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
1172 task::spawn(
1180 || "drop_vpc_endpoints",
1181 async move {
1182 for vpc_endpoint in vpc_endpoints {
1183 let _ = Retry::default()
1184 .max_duration(Duration::from_secs(60))
1185 .retry_async(|_state| async {
1186 fail_point!("drop_vpc_endpoint", |r| {
1187 Err(anyhow::anyhow!("Fail point error {:?}", r))
1188 });
1189 match cloud_resource_controller
1190 .delete_vpc_endpoint(vpc_endpoint)
1191 .await
1192 {
1193 Ok(_) => Ok(()),
1194 Err(e) => {
1195 warn!("Dropping VPC Endpoints has encountered an error: {}", e);
1196 Err(e)
1197 }
1198 }
1199 })
1200 .await;
1201 }
1202 }
1203 .instrument(info_span!(
1204 "coord::catalog_transact_inner::drop_vpc_endpoints"
1205 )),
1206 );
1207 }
1208
1209 pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
1212 let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
1213 let all_items = self.catalog().object_dependents(&temp_items, conn_id);
1214
1215 if all_items.is_empty() {
1216 return;
1217 }
1218 let op = Op::DropObjects(
1219 all_items
1220 .into_iter()
1221 .map(DropObjectInfo::manual_drop_from_object_id)
1222 .collect(),
1223 );
1224
1225 self.catalog_transact_conn(Some(conn_id), vec![op])
1226 .await
1227 .expect("unable to drop temporary items for conn_id");
1228 }
1229
1230 fn update_cluster_scheduling_config(&self) {
1231 let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
1232 self.controller
1233 .update_orchestrator_scheduling_config(config);
1234 }
1235
1236 fn update_secrets_caching_config(&self) {
1237 let config = flags::caching_config(self.catalog.system_config());
1238 self.caching_secrets_reader.set_policy(config);
1239 }
1240
1241 fn update_tracing_config(&self) {
1242 let tracing = flags::tracing_config(self.catalog().system_config());
1243 tracing.apply(&self.tracing_handle);
1244 }
1245
1246 fn update_compute_config(&mut self) {
1247 let config_params = flags::compute_config(self.catalog().system_config());
1248 self.controller.compute.update_configuration(config_params);
1249 }
1250
1251 fn update_storage_config(&mut self) {
1252 let config_params = flags::storage_config(self.catalog().system_config());
1253 self.controller.storage.update_parameters(config_params);
1254 }
1255
1256 fn update_pg_timestamp_oracle_config(&self) {
1257 let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
1258 if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
1259 config_params.apply(config)
1260 }
1261 }
1262
1263 fn update_metrics_retention(&self) {
1264 let duration = self.catalog().system_config().metrics_retention();
1265 let policy = ReadPolicy::lag_writes_by(
1266 Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
1267 tracing::error!("Absurd metrics retention duration: {duration:?}.");
1268 u64::MAX
1269 })),
1270 SINCE_GRANULARITY,
1271 );
1272 let storage_policies = self
1273 .catalog()
1274 .entries()
1275 .filter(|entry| {
1276 entry.item().is_retained_metrics_object()
1277 && entry.item().is_compute_object_on_cluster().is_none()
1278 })
1279 .map(|entry| (entry.id(), policy.clone()))
1280 .collect::<Vec<_>>();
1281 let compute_policies = self
1282 .catalog()
1283 .entries()
1284 .filter_map(|entry| {
1285 if let (true, Some(cluster_id)) = (
1286 entry.item().is_retained_metrics_object(),
1287 entry.item().is_compute_object_on_cluster(),
1288 ) {
1289 Some((cluster_id, entry.id(), policy.clone()))
1290 } else {
1291 None
1292 }
1293 })
1294 .collect::<Vec<_>>();
1295 self.update_storage_read_policies(storage_policies);
1296 self.update_compute_read_policies(compute_policies);
1297 }
1298
1299 fn update_arrangement_exert_proportionality(&mut self) {
1300 let prop = self
1301 .catalog()
1302 .system_config()
1303 .arrangement_exert_proportionality();
1304 self.controller
1305 .compute
1306 .set_arrangement_exert_proportionality(prop);
1307 }
1308
1309 fn update_http_config(&mut self) {
1310 let webhook_request_limit = self
1311 .catalog()
1312 .system_config()
1313 .webhook_concurrent_request_limit();
1314 self.webhook_concurrency_limit
1315 .set_limit(webhook_request_limit);
1316 }
1317
1318 pub(crate) async fn create_storage_export(
1319 &mut self,
1320 id: GlobalId,
1321 sink: &Sink,
1322 ) -> Result<(), AdapterError> {
1323 self.controller.storage.check_exists(sink.from)?;
1325
1326 let id_bundle = crate::CollectionIdBundle {
1333 storage_ids: btreeset! {sink.from},
1334 compute_ids: btreemap! {},
1335 };
1336
1337 let read_holds = self.acquire_read_holds(&id_bundle);
1345 let as_of = self.least_valid_read(&read_holds);
1346
1347 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
1348 let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
1349 from: sink.from,
1350 from_desc: storage_sink_from_entry
1351 .desc(&self.catalog().resolve_full_name(
1352 storage_sink_from_entry.name(),
1353 storage_sink_from_entry.conn_id(),
1354 ))
1355 .expect("sinks can only be built on items with descs")
1356 .into_owned(),
1357 connection: sink
1358 .connection
1359 .clone()
1360 .into_inline_connection(self.catalog().state()),
1361 envelope: sink.envelope,
1362 as_of,
1363 with_snapshot: sink.with_snapshot,
1364 version: sink.version,
1365 from_storage_metadata: (),
1366 to_storage_metadata: (),
1367 };
1368
1369 let collection_desc = CollectionDescription {
1370 desc: KAFKA_PROGRESS_DESC.clone(),
1372 data_source: DataSource::Sink {
1373 desc: ExportDescription {
1374 sink: storage_sink_desc,
1375 instance_id: sink.cluster_id,
1376 },
1377 },
1378 since: None,
1379 status_collection_id: None,
1380 timeline: None,
1381 };
1382 let collections = vec![(id, collection_desc)];
1383
1384 let storage_metadata = self.catalog.state().storage_metadata();
1386 let res = self
1387 .controller
1388 .storage
1389 .create_collections(storage_metadata, None, collections)
1390 .await;
1391
1392 drop(read_holds);
1395
1396 Ok(res?)
1397 }
1398
1399 fn validate_resource_limits(
1402 &self,
1403 ops: &Vec<catalog::Op>,
1404 conn_id: &ConnectionId,
1405 ) -> Result<(), AdapterError> {
1406 let mut new_kafka_connections = 0;
1407 let mut new_postgres_connections = 0;
1408 let mut new_mysql_connections = 0;
1409 let mut new_sql_server_connections = 0;
1410 let mut new_aws_privatelink_connections = 0;
1411 let mut new_tables = 0;
1412 let mut new_sources = 0;
1413 let mut new_sinks = 0;
1414 let mut new_materialized_views = 0;
1415 let mut new_clusters = 0;
1416 let mut new_replicas_per_cluster = BTreeMap::new();
1417 let mut new_credit_consumption_rate = Numeric::zero();
1418 let mut new_databases = 0;
1419 let mut new_schemas_per_database = BTreeMap::new();
1420 let mut new_objects_per_schema = BTreeMap::new();
1421 let mut new_secrets = 0;
1422 let mut new_roles = 0;
1423 let mut new_continual_tasks = 0;
1424 let mut new_network_policies = 0;
1425 for op in ops {
1426 match op {
1427 Op::CreateDatabase { .. } => {
1428 new_databases += 1;
1429 }
1430 Op::CreateSchema { database_id, .. } => {
1431 if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
1432 *new_schemas_per_database.entry(database_id).or_insert(0) += 1;
1433 }
1434 }
1435 Op::CreateRole { .. } => {
1436 new_roles += 1;
1437 }
1438 Op::CreateNetworkPolicy { .. } => {
1439 new_network_policies += 1;
1440 }
1441 Op::CreateCluster { .. } => {
1442 new_clusters += 1;
1446 }
1447 Op::CreateClusterReplica {
1448 cluster_id, config, ..
1449 } => {
1450 if cluster_id.is_user() {
1451 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
1452 if let ReplicaLocation::Managed(location) = &config.location {
1453 let replica_allocation = self
1454 .catalog()
1455 .cluster_replica_sizes()
1456 .0
1457 .get(location.size_for_billing())
1458 .expect(
1459 "location size is validated against the cluster replica sizes",
1460 );
1461 new_credit_consumption_rate += replica_allocation.credits_per_hour
1462 }
1463 }
1464 }
1465 Op::CreateItem { name, item, .. } => {
1466 *new_objects_per_schema
1467 .entry((
1468 name.qualifiers.database_spec.clone(),
1469 name.qualifiers.schema_spec.clone(),
1470 ))
1471 .or_insert(0) += 1;
1472 match item {
1473 CatalogItem::Connection(connection) => match connection.details {
1474 ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
1475 ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
1476 ConnectionDetails::MySql(_) => new_mysql_connections += 1,
1477 ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
1478 ConnectionDetails::AwsPrivatelink(_) => {
1479 new_aws_privatelink_connections += 1
1480 }
1481 ConnectionDetails::Csr(_)
1482 | ConnectionDetails::Ssh { .. }
1483 | ConnectionDetails::Aws(_) => {}
1484 },
1485 CatalogItem::Table(_) => {
1486 new_tables += 1;
1487 }
1488 CatalogItem::Source(source) => {
1489 new_sources += source.user_controllable_persist_shard_count()
1490 }
1491 CatalogItem::Sink(_) => new_sinks += 1,
1492 CatalogItem::MaterializedView(_) => {
1493 new_materialized_views += 1;
1494 }
1495 CatalogItem::Secret(_) => {
1496 new_secrets += 1;
1497 }
1498 CatalogItem::ContinualTask(_) => {
1499 new_continual_tasks += 1;
1500 }
1501 CatalogItem::Log(_)
1502 | CatalogItem::View(_)
1503 | CatalogItem::Index(_)
1504 | CatalogItem::Type(_)
1505 | CatalogItem::Func(_) => {}
1506 }
1507 }
1508 Op::DropObjects(drop_object_infos) => {
1509 for drop_object_info in drop_object_infos {
1510 match drop_object_info {
1511 DropObjectInfo::Cluster(_) => {
1512 new_clusters -= 1;
1513 }
1514 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
1515 if cluster_id.is_user() {
1516 *new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
1517 let cluster = self
1518 .catalog()
1519 .get_cluster_replica(*cluster_id, *replica_id);
1520 if let ReplicaLocation::Managed(location) =
1521 &cluster.config.location
1522 {
1523 let replica_allocation = self
1524 .catalog()
1525 .cluster_replica_sizes()
1526 .0
1527 .get(location.size_for_billing())
1528 .expect(
1529 "location size is validated against the cluster replica sizes",
1530 );
1531 new_credit_consumption_rate -=
1532 replica_allocation.credits_per_hour
1533 }
1534 }
1535 }
1536 DropObjectInfo::Database(_) => {
1537 new_databases -= 1;
1538 }
1539 DropObjectInfo::Schema((database_spec, _)) => {
1540 if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
1541 *new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
1542 }
1543 }
1544 DropObjectInfo::Role(_) => {
1545 new_roles -= 1;
1546 }
1547 DropObjectInfo::NetworkPolicy(_) => {
1548 new_network_policies -= 1;
1549 }
1550 DropObjectInfo::Item(id) => {
1551 let entry = self.catalog().get_entry(id);
1552 *new_objects_per_schema
1553 .entry((
1554 entry.name().qualifiers.database_spec.clone(),
1555 entry.name().qualifiers.schema_spec.clone(),
1556 ))
1557 .or_insert(0) -= 1;
1558 match entry.item() {
1559 CatalogItem::Connection(connection) => match connection.details
1560 {
1561 ConnectionDetails::AwsPrivatelink(_) => {
1562 new_aws_privatelink_connections -= 1;
1563 }
1564 _ => (),
1565 },
1566 CatalogItem::Table(_) => {
1567 new_tables -= 1;
1568 }
1569 CatalogItem::Source(source) => {
1570 new_sources -=
1571 source.user_controllable_persist_shard_count()
1572 }
1573 CatalogItem::Sink(_) => new_sinks -= 1,
1574 CatalogItem::MaterializedView(_) => {
1575 new_materialized_views -= 1;
1576 }
1577 CatalogItem::Secret(_) => {
1578 new_secrets -= 1;
1579 }
1580 CatalogItem::ContinualTask(_) => {
1581 new_continual_tasks -= 1;
1582 }
1583 CatalogItem::Log(_)
1584 | CatalogItem::View(_)
1585 | CatalogItem::Index(_)
1586 | CatalogItem::Type(_)
1587 | CatalogItem::Func(_) => {}
1588 }
1589 }
1590 }
1591 }
1592 }
1593 Op::UpdateItem {
1594 name: _,
1595 id,
1596 to_item,
1597 } => match to_item {
1598 CatalogItem::Source(source) => {
1599 let current_source = self
1600 .catalog()
1601 .get_entry(id)
1602 .source()
1603 .expect("source update is for source item");
1604
1605 new_sources += source.user_controllable_persist_shard_count()
1606 - current_source.user_controllable_persist_shard_count();
1607 }
1608 CatalogItem::Connection(_)
1609 | CatalogItem::Table(_)
1610 | CatalogItem::Sink(_)
1611 | CatalogItem::MaterializedView(_)
1612 | CatalogItem::Secret(_)
1613 | CatalogItem::Log(_)
1614 | CatalogItem::View(_)
1615 | CatalogItem::Index(_)
1616 | CatalogItem::Type(_)
1617 | CatalogItem::Func(_)
1618 | CatalogItem::ContinualTask(_) => {}
1619 },
1620 Op::AlterRole { .. }
1621 | Op::AlterRetainHistory { .. }
1622 | Op::AlterNetworkPolicy { .. }
1623 | Op::AlterAddColumn { .. }
1624 | Op::UpdatePrivilege { .. }
1625 | Op::UpdateDefaultPrivilege { .. }
1626 | Op::GrantRole { .. }
1627 | Op::RenameCluster { .. }
1628 | Op::RenameClusterReplica { .. }
1629 | Op::RenameItem { .. }
1630 | Op::RenameSchema { .. }
1631 | Op::UpdateOwner { .. }
1632 | Op::RevokeRole { .. }
1633 | Op::UpdateClusterConfig { .. }
1634 | Op::UpdateClusterReplicaConfig { .. }
1635 | Op::UpdateSourceReferences { .. }
1636 | Op::UpdateSystemConfiguration { .. }
1637 | Op::ResetSystemConfiguration { .. }
1638 | Op::ResetAllSystemConfiguration { .. }
1639 | Op::Comment { .. }
1640 | Op::WeirdStorageUsageUpdates { .. }
1641 | Op::TransactionDryRun => {}
1642 }
1643 }
1644
1645 let mut current_aws_privatelink_connections = 0;
1646 let mut current_postgres_connections = 0;
1647 let mut current_mysql_connections = 0;
1648 let mut current_sql_server_connections = 0;
1649 let mut current_kafka_connections = 0;
1650 for c in self.catalog().user_connections() {
1651 let connection = c
1652 .connection()
1653 .expect("`user_connections()` only returns connection objects");
1654
1655 match connection.details {
1656 ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
1657 ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
1658 ConnectionDetails::MySql(_) => current_mysql_connections += 1,
1659 ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
1660 ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
1661 ConnectionDetails::Csr(_)
1662 | ConnectionDetails::Ssh { .. }
1663 | ConnectionDetails::Aws(_) => {}
1664 }
1665 }
1666 self.validate_resource_limit(
1667 current_kafka_connections,
1668 new_kafka_connections,
1669 SystemVars::max_kafka_connections,
1670 "Kafka Connection",
1671 MAX_KAFKA_CONNECTIONS.name(),
1672 )?;
1673 self.validate_resource_limit(
1674 current_postgres_connections,
1675 new_postgres_connections,
1676 SystemVars::max_postgres_connections,
1677 "PostgreSQL Connection",
1678 MAX_POSTGRES_CONNECTIONS.name(),
1679 )?;
1680 self.validate_resource_limit(
1681 current_mysql_connections,
1682 new_mysql_connections,
1683 SystemVars::max_mysql_connections,
1684 "MySQL Connection",
1685 MAX_MYSQL_CONNECTIONS.name(),
1686 )?;
1687 self.validate_resource_limit(
1688 current_sql_server_connections,
1689 new_sql_server_connections,
1690 SystemVars::max_sql_server_connections,
1691 "SQL Server Connection",
1692 MAX_SQL_SERVER_CONNECTIONS.name(),
1693 )?;
1694 self.validate_resource_limit(
1695 current_aws_privatelink_connections,
1696 new_aws_privatelink_connections,
1697 SystemVars::max_aws_privatelink_connections,
1698 "AWS PrivateLink Connection",
1699 MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
1700 )?;
1701 self.validate_resource_limit(
1702 self.catalog().user_tables().count(),
1703 new_tables,
1704 SystemVars::max_tables,
1705 "table",
1706 MAX_TABLES.name(),
1707 )?;
1708
1709 let current_sources: usize = self
1710 .catalog()
1711 .user_sources()
1712 .filter_map(|source| source.source())
1713 .map(|source| source.user_controllable_persist_shard_count())
1714 .sum::<i64>()
1715 .try_into()
1716 .expect("non-negative sum of sources");
1717
1718 self.validate_resource_limit(
1719 current_sources,
1720 new_sources,
1721 SystemVars::max_sources,
1722 "source",
1723 MAX_SOURCES.name(),
1724 )?;
1725 self.validate_resource_limit(
1726 self.catalog().user_sinks().count(),
1727 new_sinks,
1728 SystemVars::max_sinks,
1729 "sink",
1730 MAX_SINKS.name(),
1731 )?;
1732 self.validate_resource_limit(
1733 self.catalog().user_materialized_views().count(),
1734 new_materialized_views,
1735 SystemVars::max_materialized_views,
1736 "materialized view",
1737 MAX_MATERIALIZED_VIEWS.name(),
1738 )?;
1739 self.validate_resource_limit(
1740 self.catalog().user_clusters().count(),
1746 new_clusters,
1747 SystemVars::max_clusters,
1748 "cluster",
1749 MAX_CLUSTERS.name(),
1750 )?;
1751 for (cluster_id, new_replicas) in new_replicas_per_cluster {
1752 let current_amount = self
1754 .catalog()
1755 .try_get_cluster(cluster_id)
1756 .map(|instance| instance.user_replicas().count())
1757 .unwrap_or(0);
1758 self.validate_resource_limit(
1759 current_amount,
1760 new_replicas,
1761 SystemVars::max_replicas_per_cluster,
1762 "cluster replica",
1763 MAX_REPLICAS_PER_CLUSTER.name(),
1764 )?;
1765 }
1766 let current_credit_consumption_rate = self
1767 .catalog()
1768 .user_cluster_replicas()
1769 .filter_map(|replica| match &replica.config.location {
1770 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
1771 ReplicaLocation::Unmanaged(_) => None,
1772 })
1773 .map(|size| {
1774 self.catalog()
1775 .cluster_replica_sizes()
1776 .0
1777 .get(size)
1778 .expect("location size is validated against the cluster replica sizes")
1779 .credits_per_hour
1780 })
1781 .sum();
1782 self.validate_resource_limit_numeric(
1783 current_credit_consumption_rate,
1784 new_credit_consumption_rate,
1785 |system_vars| {
1786 self.license_key
1787 .max_credit_consumption_rate()
1788 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1789 },
1790 "cluster replica",
1791 MAX_CREDIT_CONSUMPTION_RATE.name(),
1792 )?;
1793 self.validate_resource_limit(
1794 self.catalog().databases().count(),
1795 new_databases,
1796 SystemVars::max_databases,
1797 "database",
1798 MAX_DATABASES.name(),
1799 )?;
1800 for (database_id, new_schemas) in new_schemas_per_database {
1801 self.validate_resource_limit(
1802 self.catalog().get_database(database_id).schemas_by_id.len(),
1803 new_schemas,
1804 SystemVars::max_schemas_per_database,
1805 "schema",
1806 MAX_SCHEMAS_PER_DATABASE.name(),
1807 )?;
1808 }
1809 for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
1810 self.validate_resource_limit(
1811 self.catalog()
1812 .get_schema(&database_spec, &schema_spec, conn_id)
1813 .items
1814 .len(),
1815 new_objects,
1816 SystemVars::max_objects_per_schema,
1817 "object",
1818 MAX_OBJECTS_PER_SCHEMA.name(),
1819 )?;
1820 }
1821 self.validate_resource_limit(
1822 self.catalog().user_secrets().count(),
1823 new_secrets,
1824 SystemVars::max_secrets,
1825 "secret",
1826 MAX_SECRETS.name(),
1827 )?;
1828 self.validate_resource_limit(
1829 self.catalog().user_roles().count(),
1830 new_roles,
1831 SystemVars::max_roles,
1832 "role",
1833 MAX_ROLES.name(),
1834 )?;
1835 self.validate_resource_limit(
1836 self.catalog().user_continual_tasks().count(),
1837 new_continual_tasks,
1838 SystemVars::max_continual_tasks,
1839 "continual_task",
1840 MAX_CONTINUAL_TASKS.name(),
1841 )?;
1842 self.validate_resource_limit(
1843 self.catalog().user_continual_tasks().count(),
1844 new_network_policies,
1845 SystemVars::max_network_policies,
1846 "network_policy",
1847 MAX_NETWORK_POLICIES.name(),
1848 )?;
1849 Ok(())
1850 }
1851
1852 pub(crate) fn validate_resource_limit<F>(
1854 &self,
1855 current_amount: usize,
1856 new_instances: i64,
1857 resource_limit: F,
1858 resource_type: &str,
1859 limit_name: &str,
1860 ) -> Result<(), AdapterError>
1861 where
1862 F: Fn(&SystemVars) -> u32,
1863 {
1864 if new_instances <= 0 {
1865 return Ok(());
1866 }
1867
1868 let limit: i64 = resource_limit(self.catalog().system_config()).into();
1869 let current_amount: Option<i64> = current_amount.try_into().ok();
1870 let desired =
1871 current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
1872
1873 let exceeds_limit = if let Some(desired) = desired {
1874 desired > limit
1875 } else {
1876 true
1877 };
1878
1879 let desired = desired
1880 .map(|desired| desired.to_string())
1881 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1882 let current = current_amount
1883 .map(|current| current.to_string())
1884 .unwrap_or_else(|| format!("more than {}", i64::MAX));
1885 if exceeds_limit {
1886 Err(AdapterError::ResourceExhaustion {
1887 resource_type: resource_type.to_string(),
1888 limit_name: limit_name.to_string(),
1889 desired,
1890 limit: limit.to_string(),
1891 current,
1892 })
1893 } else {
1894 Ok(())
1895 }
1896 }
1897
1898 fn validate_resource_limit_numeric<F>(
1902 &self,
1903 current_amount: Numeric,
1904 new_amount: Numeric,
1905 resource_limit: F,
1906 resource_type: &str,
1907 limit_name: &str,
1908 ) -> Result<(), AdapterError>
1909 where
1910 F: Fn(&SystemVars) -> Numeric,
1911 {
1912 if new_amount <= Numeric::zero() {
1913 return Ok(());
1914 }
1915
1916 let limit = resource_limit(self.catalog().system_config());
1917 let desired = current_amount + new_amount;
1921 if desired > limit {
1922 Err(AdapterError::ResourceExhaustion {
1923 resource_type: resource_type.to_string(),
1924 limit_name: limit_name.to_string(),
1925 desired: desired.to_string(),
1926 limit: limit.to_string(),
1927 current: current_amount.to_string(),
1928 })
1929 } else {
1930 Ok(())
1931 }
1932 }
1933}