1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_audit_log::VersionedStorageUsage;
19use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
20use mz_controller::ControllerResponse;
21use mz_controller::clusters::{ClusterEvent, ClusterStatus};
22use mz_ore::cast::CastFrom;
23use mz_ore::instrument;
24use mz_ore::now::EpochMillis;
25use mz_ore::option::OptionExt;
26use mz_ore::tracing::OpenTelemetryContext;
27use mz_ore::{soft_assert_or_log, task};
28use mz_persist_client::usage::ShardsUsageReferenced;
29use mz_repr::{Datum, Diff, Row};
30use mz_sql::ast::Statement;
31use mz_sql::names::ResolvedIds;
32use mz_sql::pure::PurifiedStatement;
33use mz_storage_client::controller::IntrospectionType;
34use opentelemetry::trace::TraceContextExt;
35use rand::{Rng, SeedableRng, rngs};
36use serde_json::json;
37use tracing::{Instrument, Level, event, info_span, warn};
38use tracing_opentelemetry::OpenTelemetrySpanExt;
39
40use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
41use crate::catalog::BuiltinTableUpdate;
42use crate::command::Command;
43use crate::coord::{
44 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
45 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
46};
47use crate::telemetry::{EventDetails, SegmentClientExt};
48use crate::{AdapterNotice, TimestampContext};
49
50impl Coordinator {
51 #[instrument]
56 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
57 match msg {
58 Message::Command(otel_ctx, cmd) => {
59 let span = tracing::info_span!("message_command").or_current();
63 span.in_scope(|| otel_ctx.attach_as_parent());
64 self.message_command(cmd).instrument(span).await
65 }
66 Message::ControllerReady { controller: _ } => {
67 let Coordinator {
68 controller,
69 catalog,
70 ..
71 } = self;
72 let storage_metadata = catalog.state().storage_metadata();
73 if let Some(m) = controller
74 .process(storage_metadata)
75 .expect("`process` never returns an error")
76 {
77 self.message_controller(m).boxed_local().await
78 }
79 }
80 Message::PurifiedStatementReady(ready) => {
81 self.message_purified_statement_ready(ready)
82 .boxed_local()
83 .await
84 }
85 Message::CreateConnectionValidationReady(ready) => {
86 self.message_create_connection_validation_ready(ready)
87 .boxed_local()
88 .await
89 }
90 Message::AlterConnectionValidationReady(ready) => {
91 self.message_alter_connection_validation_ready(ready)
92 .boxed_local()
93 .await
94 }
95 Message::TryDeferred {
96 conn_id,
97 acquired_lock,
98 } => self.try_deferred(conn_id, acquired_lock).await,
99 Message::GroupCommitInitiate(span, permit) => {
100 tracing::Span::current().add_link(span.context().span().span_context().clone());
102 self.try_group_commit(permit)
103 .instrument(span)
104 .boxed_local()
105 .await
106 }
107 Message::AdvanceTimelines => {
108 self.advance_timelines().boxed_local().await;
109 }
110 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
111 Message::CancelPendingPeeks { conn_id } => {
112 self.cancel_pending_peeks(&conn_id);
113 }
114 Message::LinearizeReads => {
115 self.message_linearize_reads().boxed_local().await;
116 }
117 Message::StagedBatches {
118 conn_id,
119 table_id,
120 batches,
121 } => {
122 self.commit_staged_batches(conn_id, table_id, batches);
123 }
124 Message::StorageUsageSchedule => {
125 self.schedule_storage_usage_collection().boxed_local().await;
126 }
127 Message::StorageUsageFetch => {
128 self.storage_usage_fetch().boxed_local().await;
129 }
130 Message::StorageUsageUpdate(sizes) => {
131 self.storage_usage_update(sizes).boxed_local().await;
132 }
133 Message::StorageUsagePrune(expired) => {
134 self.storage_usage_prune(expired).boxed_local().await;
135 }
136 Message::ArrangementSizesSchedule => {
137 self.schedule_arrangement_sizes_collection()
138 .boxed_local()
139 .await;
140 }
141 Message::ArrangementSizesSnapshot => {
142 self.arrangement_sizes_snapshot().boxed_local().await;
143 }
144 Message::ArrangementSizesPrune(expired) => {
145 self.arrangement_sizes_prune(expired).boxed_local().await;
146 }
147 Message::RetireExecute {
148 otel_ctx,
149 data,
150 reason,
151 } => {
152 otel_ctx.attach_as_parent();
153 self.retire_execution(reason, data);
154 }
155 Message::ExecuteSingleStatementTransaction {
156 ctx,
157 otel_ctx,
158 stmt,
159 params,
160 } => {
161 otel_ctx.attach_as_parent();
162 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
163 .boxed_local()
164 .await;
165 }
166 Message::PeekStageReady { ctx, span, stage } => {
167 self.sequence_staged(ctx, span, stage).boxed_local().await;
168 }
169 Message::CreateIndexStageReady { ctx, span, stage } => {
170 self.sequence_staged(ctx, span, stage).boxed_local().await;
171 }
172 Message::CreateViewStageReady { ctx, span, stage } => {
173 self.sequence_staged(ctx, span, stage).boxed_local().await;
174 }
175 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
176 self.sequence_staged(ctx, span, stage).boxed_local().await;
177 }
178 Message::SubscribeStageReady { ctx, span, stage } => {
179 self.sequence_staged(ctx, span, stage).boxed_local().await;
180 }
181 Message::IntrospectionSubscribeStageReady { span, stage } => {
182 self.sequence_staged((), span, stage).boxed_local().await;
183 }
184 Message::ExplainTimestampStageReady { ctx, span, stage } => {
185 self.sequence_staged(ctx, span, stage).boxed_local().await;
186 }
187 Message::SecretStageReady { ctx, span, stage } => {
188 self.sequence_staged(ctx, span, stage).boxed_local().await;
189 }
190 Message::ClusterStageReady { ctx, span, stage } => {
191 self.sequence_staged(ctx, span, stage).boxed_local().await;
192 }
193 Message::DrainStatementLog => {
194 self.drain_statement_log();
195 }
196 Message::PrivateLinkVpcEndpointEvents(events) => {
197 if !self.controller.read_only() {
198 self.controller.storage.append_introspection_updates(
199 IntrospectionType::PrivatelinkConnectionStatusHistory,
200 events
201 .into_iter()
202 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
203 .collect(),
204 );
205 }
206 }
207 Message::CheckSchedulingPolicies => {
208 self.check_scheduling_policies().boxed_local().await;
209 }
210 Message::SchedulingDecisions(decisions) => {
211 self.handle_scheduling_decisions(decisions)
212 .boxed_local()
213 .await;
214 }
215 Message::DeferredStatementReady => {
216 self.handle_deferred_statement().boxed_local().await;
217 }
218 }
219 }
220
221 #[mz_ore::instrument(level = "debug")]
222 pub async fn storage_usage_fetch(&self) {
223 if self.controller.read_only() {
230 tracing::info!("skipping storage usage collection in read-only mode");
231 if let Err(e) = self.internal_cmd_tx.send(Message::StorageUsageSchedule) {
232 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
233 }
234 return;
235 }
236
237 let internal_cmd_tx = self.internal_cmd_tx.clone();
238 let client = self.storage_usage_client.clone();
239
240 let live_shards: BTreeSet<_> = self
242 .controller
243 .storage
244 .active_collection_metadatas()
245 .into_iter()
246 .map(|(_id, m)| m.data_shard)
247 .collect();
248
249 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
250
251 task::spawn(|| "storage_usage_fetch", async move {
254 let collection_metric_timer = collection_metric.start_timer();
255 let shard_sizes = client.shards_usage_referenced(live_shards).await;
256 collection_metric_timer.observe_duration();
257
258 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
261 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
262 }
263 });
264 }
265
266 #[mz_ore::instrument(level = "debug")]
267 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
268 let write_ts = self.get_local_write_ts().await.timestamp;
276 let collection_timestamp: EpochMillis = write_ts.into();
277
278 let batch_id = match self.catalog().allocate_storage_usage_id(write_ts).await {
284 Ok(id) => id,
285 Err(err) => {
286 tracing::warn!("failed to allocate storage usage batch id: {:?}", err);
287 return;
288 }
289 };
290
291 let updates: Vec<_> = shards_usage
292 .by_shard
293 .into_iter()
294 .map(|(shard_id, shard_usage)| {
295 let event = VersionedStorageUsage::new(
296 batch_id,
297 Some(shard_id.to_string()),
298 shard_usage.size_bytes(),
299 collection_timestamp,
300 );
301 self.catalog().pack_storage_usage_update(event, Diff::ONE)
302 })
303 .collect();
304
305 let (table_updates, _) = self.builtin_table_update().execute(updates).await;
306
307 let internal_cmd_tx = self.internal_cmd_tx.clone();
308 let task_span = info_span!(parent: None, "coord::storage_usage_update::table_updates");
309 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
310 task::spawn(|| "storage_usage_update_table_updates", async move {
311 table_updates.instrument(task_span).await;
312 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
314 warn!("internal_cmd_rx dropped before we could send: {e:?}");
315 }
316 });
317 }
318
319 #[mz_ore::instrument(level = "debug")]
320 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
321 let (fut, _) = self.builtin_table_update().execute(expired).await;
322 task::spawn(|| "storage_usage_pruning_apply", async move {
323 fut.await;
324 });
325 }
326
327 pub async fn schedule_storage_usage_collection(&self) {
328 const SEED_LEN: usize = 32;
336 let mut seed = [0; SEED_LEN];
337 for (i, byte) in self
338 .catalog()
339 .state()
340 .config()
341 .environment_id
342 .organization_id()
343 .as_bytes()
344 .into_iter()
345 .take(SEED_LEN)
346 .enumerate()
347 {
348 seed[i] = *byte;
349 }
350 let storage_usage_collection_interval_ms: EpochMillis =
351 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
352 .expect("storage usage collection interval must fit into u64");
353 let offset =
354 rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
355 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
356
357 let previous_collection_ts =
359 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
360 let next_collection_ts = if previous_collection_ts > now_ts {
361 previous_collection_ts
362 } else {
363 previous_collection_ts + storage_usage_collection_interval_ms
364 };
365 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
366
367 let internal_cmd_tx = self.internal_cmd_tx.clone();
369 task::spawn(|| "storage_usage_collection", async move {
370 tokio::time::sleep(next_collection_interval).await;
371 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
372 }
374 });
375 }
376
377 pub async fn schedule_arrangement_sizes_collection(&self) {
385 const MAX_SLEEP: Duration = Duration::from_secs(60);
386
387 let interval_duration =
388 mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_COLLECTION_INTERVAL
389 .get(self.catalog().system_config().dyncfgs());
390
391 if interval_duration.is_zero() {
394 let internal_cmd_tx = self.internal_cmd_tx.clone();
395 task::spawn(|| "arrangement_sizes_collection_disabled", async move {
396 tokio::time::sleep(MAX_SLEEP).await;
397 let _ = internal_cmd_tx.send(Message::ArrangementSizesSchedule);
398 });
399 return;
400 }
401
402 const SEED_LEN: usize = 32;
403 let mut seed = [0; SEED_LEN];
404 for (i, byte) in self
405 .catalog()
406 .state()
407 .config()
408 .environment_id
409 .organization_id()
410 .as_bytes()
411 .into_iter()
412 .take(SEED_LEN)
413 .enumerate()
414 {
415 seed[i] = *byte;
416 }
417 let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis())
418 .expect("arrangement_size_history_collection_interval must fit into u64");
419 let interval_ms = interval_ms.max(1);
421 let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms);
422 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
423
424 let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset;
425 let next_collection_ts = if previous_collection_ts > now_ts {
426 previous_collection_ts
427 } else {
428 previous_collection_ts + interval_ms
429 };
430 let sleep_for = Duration::from_millis(next_collection_ts - now_ts);
431
432 let (capped_sleep, fire_snapshot) = if sleep_for <= MAX_SLEEP {
436 (sleep_for, true)
437 } else {
438 (MAX_SLEEP, false)
439 };
440
441 let internal_cmd_tx = self.internal_cmd_tx.clone();
442 task::spawn(|| "arrangement_sizes_collection", async move {
443 tokio::time::sleep(capped_sleep).await;
444 let msg = if fire_snapshot {
445 Message::ArrangementSizesSnapshot
446 } else {
447 Message::ArrangementSizesSchedule
448 };
449 let _ = internal_cmd_tx.send(msg);
451 });
452 }
453
454 #[mz_ore::instrument(level = "debug")]
464 async fn arrangement_sizes_snapshot(&mut self) {
465 if self.controller.read_only() {
467 self.schedule_arrangement_sizes_collection().await;
468 return;
469 }
470
471 let collection_timer = self
472 .metrics
473 .arrangement_sizes_collection_time_seconds
474 .start_timer();
475
476 let live_item_id = self.catalog().resolve_builtin_storage_collection(
477 &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED,
478 );
479 let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id();
480 let hydration_item_id = self
481 .catalog()
482 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
483 let hydration_global_id = self
484 .catalog
485 .get_entry(&hydration_item_id)
486 .latest_global_id();
487 let history_item_id = self
488 .catalog()
489 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
490
491 let read_ts = self.get_local_read_ts().await;
492 let snapshot = match self
493 .controller
494 .storage_collections
495 .snapshot(live_global_id, read_ts)
496 .await
497 {
498 Ok(s) => s,
499 Err(e) => {
500 tracing::warn!("arrangement sizes snapshot failed: {e:?}");
501 drop(collection_timer);
502 self.schedule_arrangement_sizes_collection().await;
503 return;
504 }
505 };
506 let mut hydration_snapshot = match self
507 .controller
508 .storage_collections
509 .snapshot(hydration_global_id, read_ts)
510 .await
511 {
512 Ok(s) => s,
513 Err(e) => {
514 tracing::warn!("arrangement sizes hydration snapshot failed: {e:?}");
515 drop(collection_timer);
516 self.schedule_arrangement_sizes_collection().await;
517 return;
518 }
519 };
520 differential_dataflow::consolidation::consolidate(&mut hydration_snapshot);
521
522 let mut datum_vec = mz_repr::DatumVec::new();
526 let mut hydrated: BTreeSet<(String, String)> = BTreeSet::new();
527 const HYDRATION_COL_REPLICA_ID: usize = 0;
528 const HYDRATION_COL_OBJECT_ID: usize = 1;
529 const HYDRATION_COL_TIME_NS: usize = 2;
530 const HYDRATION_COL_COUNT: usize = 3;
531 for (row, diff) in &hydration_snapshot {
532 if *diff != 1 {
533 continue;
534 }
535 let datums = datum_vec.borrow_with(row);
536 if datums.len() < HYDRATION_COL_COUNT {
537 continue;
538 }
539 if datums[HYDRATION_COL_TIME_NS].is_null() {
540 continue;
541 }
542 hydrated.insert((
543 datums[HYDRATION_COL_REPLICA_ID].unwrap_str().to_string(),
544 datums[HYDRATION_COL_OBJECT_ID].unwrap_str().to_string(),
545 ));
546 }
547
548 let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into();
553 let collection_datum = Datum::TimestampTz(
554 mz_ore::now::to_datetime(collection_ts)
555 .try_into()
556 .expect("collection_timestamp must fit into TimestampTz"),
557 );
558
559 let mut consolidated = snapshot;
560 differential_dataflow::consolidation::consolidate(&mut consolidated);
561
562 const LIVE_COL_REPLICA_ID: usize = 0;
564 const LIVE_COL_OBJECT_ID: usize = 1;
565 const LIVE_COL_SIZE: usize = 2;
566 const LIVE_COL_COUNT: usize = 3;
567
568 let mut skipped_malformed: u64 = 0;
569 let mut skipped_null_size: u64 = 0;
570 let mut updates: Vec<BuiltinTableUpdate> = Vec::with_capacity(consolidated.len());
571 for (row, diff) in consolidated.iter() {
572 if *diff != 1 {
573 continue;
574 }
575 let datums = datum_vec.borrow_with(row);
576 if datums.len() != LIVE_COL_COUNT {
579 skipped_malformed += 1;
580 continue;
581 }
582 let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str();
583 let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str();
584 let size_datum = datums[LIVE_COL_SIZE];
585 if size_datum.is_null() {
588 skipped_null_size += 1;
589 continue;
590 }
591 let size = size_datum.unwrap_int64();
592 let hydration_complete =
596 hydrated.contains(&(replica_id.to_string(), object_id.to_string()));
597 let new_row = Row::pack_slice(&[
598 Datum::String(replica_id),
599 Datum::String(object_id),
600 Datum::Int64(size),
601 collection_datum,
602 Datum::from(hydration_complete),
603 ]);
604 updates.push(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE));
605 }
606 if skipped_malformed > 0 {
607 warn!(
608 "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
609 with unexpected arity"
610 );
611 }
612 if skipped_null_size > 0 {
613 tracing::debug!("skipped {skipped_null_size} live rows with null size");
614 }
615
616 let row_count = updates.len();
617 collection_timer.observe_duration();
620
621 if !updates.is_empty() {
622 self.metrics
623 .arrangement_sizes_rows_written
624 .inc_by(u64::cast_from(row_count));
625 let (fut, _) = self.builtin_table_update().execute(updates).await;
630 let internal_cmd_tx = self.internal_cmd_tx.clone();
631 let task_span =
632 info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates");
633 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
634 task::spawn(|| "arrangement_sizes_snapshot_apply", async move {
635 fut.instrument(task_span).await;
636 if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) {
637 warn!("internal_cmd_rx dropped before we could send: {e:?}");
638 }
639 });
640 } else {
641 self.schedule_arrangement_sizes_collection().await;
642 }
643
644 tracing::debug!(
645 "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
646 );
647 }
648
649 #[mz_ore::instrument(level = "debug")]
650 async fn arrangement_sizes_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
651 let (fut, _) = self.builtin_table_update().execute(expired).await;
652 task::spawn(|| "arrangement_sizes_pruning_apply", async move {
653 fut.await;
654 });
655 }
656
657 #[mz_ore::instrument(level = "debug")]
658 async fn message_command(&mut self, cmd: Command) {
659 self.handle_command(cmd).await;
660 }
661
662 #[mz_ore::instrument(level = "debug")]
663 async fn message_controller(&mut self, message: ControllerResponse) {
664 event!(Level::TRACE, message = format!("{:?}", message));
665 match message {
666 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
667 self.handle_peek_notification(uuid, response, otel_ctx);
668 }
669 ControllerResponse::SubscribeResponse(sink_id, response) => {
670 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
671 self.active_compute_sinks.get_mut(&sink_id)
672 {
673 let finished = active_subscribe.process_response(response);
674 if finished {
675 self.retire_compute_sinks(btreemap! {
676 sink_id => ActiveComputeSinkRetireReason::Finished,
677 })
678 .await;
679 }
680
681 soft_assert_or_log!(
682 !self.introspection_subscribes.contains_key(&sink_id),
683 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
684 and `introspection_subscribes`",
685 );
686 } else if self.introspection_subscribes.contains_key(&sink_id) {
687 self.handle_introspection_subscribe_batch(sink_id, response)
688 .await;
689 } else {
690 }
693 }
694 ControllerResponse::CopyToResponse(sink_id, response) => {
695 match self.drop_compute_sink(sink_id).await {
696 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
697 active_copy_to.retire_with_response(response);
698 }
699 _ => {
700 }
703 }
704 }
705 ControllerResponse::WatchSetFinished(ws_ids) => {
706 let now = self.now();
707 for ws_id in ws_ids {
708 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
709 continue;
710 };
711 self.connection_watch_sets
712 .get_mut(&conn_id)
713 .expect("corrupted coordinator state: unknown connection id")
714 .remove(&ws_id);
715 if self.connection_watch_sets[&conn_id].is_empty() {
716 self.connection_watch_sets.remove(&conn_id);
717 }
718
719 match rsp {
720 WatchSetResponse::StatementDependenciesReady(id, ev) => {
721 self.record_statement_lifecycle_event(&id, &ev, now);
722 }
723 WatchSetResponse::AlterSinkReady(ctx) => {
724 self.sequence_alter_sink_finish(ctx).await;
725 }
726 WatchSetResponse::AlterMaterializedViewReady(ctx) => {
727 self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
728 .await;
729 }
730 }
731 }
732 }
733 }
734 }
735
736 #[mz_ore::instrument(level = "debug")]
737 async fn message_purified_statement_ready(
738 &mut self,
739 PurifiedStatementReady {
740 ctx,
741 result,
742 params,
743 mut plan_validity,
744 original_stmt,
745 otel_ctx,
746 }: PurifiedStatementReady,
747 ) {
748 otel_ctx.attach_as_parent();
749
750 if plan_validity.check(self.catalog()).is_err() {
761 self.handle_execute_inner(original_stmt, params, ctx).await;
762 return;
763 }
764
765 let purified_statement = match result {
766 Ok(ok) => ok,
767 Err(e) => return ctx.retire(Err(e)),
768 };
769
770 let plan = match purified_statement {
771 PurifiedStatement::PurifiedCreateSource {
772 create_progress_subsource_stmt,
773 create_source_stmt,
774 subsources,
775 available_source_references,
776 } => self
777 .plan_purified_create_source(
778 &ctx,
779 params,
780 create_progress_subsource_stmt,
781 create_source_stmt,
782 subsources,
783 available_source_references,
784 )
785 .await
786 .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
787 PurifiedStatement::PurifiedAlterSourceAddSubsources {
788 source_name,
789 options,
790 subsources,
791 } => self
792 .plan_purified_alter_source_add_subsource(
793 ctx.session(),
794 params,
795 source_name,
796 options,
797 subsources,
798 )
799 .await
800 .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
801 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
802 source_name,
803 available_source_references,
804 } => self
805 .plan_purified_alter_source_refresh_references(
806 ctx.session(),
807 params,
808 source_name,
809 available_source_references,
810 )
811 .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
812 o @ (PurifiedStatement::PurifiedAlterSource { .. }
813 | PurifiedStatement::PurifiedCreateSink(..)
814 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
815 let stmt = match o {
817 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
818 Statement::AlterSource(alter_source_stmt)
819 }
820 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
821 Statement::CreateTableFromSource(stmt)
822 }
823 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
824 PurifiedStatement::PurifiedCreateSource { .. }
825 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
826 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
827 unreachable!("not part of exterior match stmt")
828 }
829 };
830
831 let catalog = self.catalog().for_session(ctx.session());
834 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
835 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
836 .map(|(plan, sql_impl_ids)| (plan, resolved_ids, sql_impl_ids))
837 }
838 };
839
840 match plan {
841 Ok((plan, resolved_ids, sql_impl_ids)) => {
842 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
843 .await
844 }
845 Err(e) => ctx.retire(Err(e)),
846 }
847 }
848
849 #[mz_ore::instrument(level = "debug")]
850 async fn message_create_connection_validation_ready(
851 &mut self,
852 CreateConnectionValidationReady {
853 mut ctx,
854 result,
855 connection_id,
856 connection_gid,
857 mut plan_validity,
858 otel_ctx,
859 resolved_ids,
860 }: CreateConnectionValidationReady,
861 ) {
862 otel_ctx.attach_as_parent();
863
864 if let Err(e) = plan_validity.check(self.catalog()) {
870 if self.secrets_controller.delete(connection_id).await.is_ok() {
871 self.caching_secrets_reader.invalidate(connection_id);
872 }
873 return ctx.retire(Err(e));
874 }
875
876 let plan = match result {
877 Ok(ok) => ok,
878 Err(e) => {
879 if self.secrets_controller.delete(connection_id).await.is_ok() {
880 self.caching_secrets_reader.invalidate(connection_id);
881 }
882 return ctx.retire(Err(e));
883 }
884 };
885
886 let result = self
887 .sequence_create_connection_stage_finish(
888 &mut ctx,
889 connection_id,
890 connection_gid,
891 plan,
892 resolved_ids,
893 )
894 .await;
895 ctx.retire(result);
896 }
897
898 #[mz_ore::instrument(level = "debug")]
899 async fn message_alter_connection_validation_ready(
900 &mut self,
901 AlterConnectionValidationReady {
902 mut ctx,
903 result,
904 connection_id,
905 connection_gid: _,
906 mut plan_validity,
907 otel_ctx,
908 resolved_ids: _,
909 }: AlterConnectionValidationReady,
910 ) {
911 otel_ctx.attach_as_parent();
912
913 if let Err(e) = plan_validity.check(self.catalog()) {
919 return ctx.retire(Err(e));
920 }
921
922 let conn = match result {
923 Ok(ok) => ok,
924 Err(e) => {
925 return ctx.retire(Err(e));
926 }
927 };
928
929 let result = self
930 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
931 .await;
932 ctx.retire(result);
933 }
934
935 #[mz_ore::instrument(level = "debug")]
936 async fn message_cluster_event(&mut self, event: ClusterEvent) {
937 event!(Level::TRACE, event = format!("{:?}", event));
938
939 if let Some(segment_client) = &self.segment_client {
940 let env_id = &self.catalog().config().environment_id;
941 let mut properties = json!({
942 "cluster_id": event.cluster_id.to_string(),
943 "replica_id": event.replica_id.to_string(),
944 "process_id": event.process_id,
945 "status": event.status.as_kebab_case_str(),
946 });
947 match event.status {
948 ClusterStatus::Online => (),
949 ClusterStatus::Offline(reason) => {
950 let properties = match &mut properties {
951 serde_json::Value::Object(map) => map,
952 _ => unreachable!(),
953 };
954 properties.insert(
955 "reason".into(),
956 json!(reason.display_or("unknown").to_string()),
957 );
958 }
959 };
960 segment_client.environment_track(
961 env_id,
962 "Cluster Changed Status",
963 properties,
964 EventDetails {
965 timestamp: Some(event.time),
966 ..Default::default()
967 },
968 );
969 }
970
971 let Some(replica_statues) = self
974 .cluster_replica_statuses
975 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
976 else {
977 return;
978 };
979
980 if event.status != replica_statues[&event.process_id].status {
981 if !self.controller.read_only() {
982 let offline_reason = match event.status {
983 ClusterStatus::Online => None,
984 ClusterStatus::Offline(None) => None,
985 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
986 };
987 let row = Row::pack_slice(&[
988 Datum::String(&event.replica_id.to_string()),
989 Datum::UInt64(event.process_id),
990 Datum::String(event.status.as_kebab_case_str()),
991 Datum::from(offline_reason.as_deref()),
992 Datum::TimestampTz(event.time.try_into().expect("must fit")),
993 ]);
994 self.controller.storage.append_introspection_updates(
995 IntrospectionType::ReplicaStatusHistory,
996 vec![(row, Diff::ONE)],
997 );
998 }
999
1000 let old_replica_status =
1001 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
1002
1003 let new_process_status = ClusterReplicaProcessStatus {
1004 status: event.status,
1005 time: event.time,
1006 };
1007 self.cluster_replica_statuses.ensure_cluster_status(
1008 event.cluster_id,
1009 event.replica_id,
1010 event.process_id,
1011 new_process_status,
1012 );
1013
1014 let cluster = self.catalog().get_cluster(event.cluster_id);
1015 let replica = cluster.replica(event.replica_id).expect("Replica exists");
1016 let new_replica_status = self
1017 .cluster_replica_statuses
1018 .get_cluster_replica_status(event.cluster_id, event.replica_id);
1019
1020 if old_replica_status != new_replica_status {
1021 let notifier = self.broadcast_notice_tx();
1022 let notice = AdapterNotice::ClusterReplicaStatusChanged {
1023 cluster: cluster.name.clone(),
1024 replica: replica.name.clone(),
1025 status: new_replica_status,
1026 time: event.time,
1027 };
1028 notifier(notice);
1029 }
1030 }
1031 }
1032
1033 #[mz_ore::instrument(level = "debug")]
1034 async fn message_linearize_reads(&mut self) {
1039 let mut shortest_wait = Duration::MAX;
1040 let mut ready_txns = Vec::new();
1041
1042 let mut cached_oracle_ts = BTreeMap::new();
1047
1048 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
1049 if let TimestampContext::TimelineTimestamp {
1050 timeline,
1051 chosen_ts,
1052 oracle_ts,
1053 } = read_txn.timestamp_context()
1054 {
1055 let oracle_ts = match oracle_ts {
1056 Some(oracle_ts) => oracle_ts,
1057 None => {
1058 ready_txns.push(read_txn);
1060 continue;
1061 }
1062 };
1063
1064 if chosen_ts <= oracle_ts {
1065 ready_txns.push(read_txn);
1068 continue;
1069 }
1070
1071 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
1073 let current_oracle_ts = match current_oracle_ts {
1074 btree_map::Entry::Vacant(entry) => {
1075 let timestamp_oracle = self.get_timestamp_oracle(timeline);
1076 let read_ts = timestamp_oracle.read_ts().await;
1077 entry.insert(read_ts.clone());
1078 read_ts
1079 }
1080 btree_map::Entry::Occupied(entry) => entry.get().clone(),
1081 };
1082
1083 if *chosen_ts <= current_oracle_ts {
1084 ready_txns.push(read_txn);
1085 } else {
1086 let wait =
1087 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
1088 if wait < shortest_wait {
1089 shortest_wait = wait;
1090 }
1091 read_txn.num_requeues += 1;
1092 self.pending_linearize_read_txns.insert(conn_id, read_txn);
1093 }
1094 } else {
1095 ready_txns.push(read_txn);
1096 }
1097 }
1098
1099 if !ready_txns.is_empty() {
1100 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
1103 let span = tracing::debug_span!("message_linearize_reads");
1104 otel_ctx.attach_as_parent_to(&span);
1105
1106 let now = Instant::now();
1107 for ready_txn in ready_txns {
1108 let span = tracing::debug_span!("retire_read_results");
1109 ready_txn.otel_ctx.attach_as_parent_to(&span);
1110 let _entered = span.enter();
1111 self.metrics
1112 .linearize_message_seconds
1113 .with_label_values(&[
1114 ready_txn.txn.label(),
1115 if ready_txn.num_requeues == 0 {
1116 "true"
1117 } else {
1118 "false"
1119 },
1120 ])
1121 .observe((now - ready_txn.created).as_secs_f64());
1122 if let Some((ctx, result)) = ready_txn.txn.finish() {
1123 ctx.retire(result);
1124 }
1125 }
1126 }
1127
1128 if !self.pending_linearize_read_txns.is_empty() {
1129 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
1131 let internal_cmd_tx = self.internal_cmd_tx.clone();
1132 task::spawn(|| "deferred_read_txns", async move {
1133 tokio::time::sleep(remaining_ms).await;
1134 let result = internal_cmd_tx.send(Message::LinearizeReads);
1136 if let Err(e) = result {
1137 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1138 }
1139 });
1140 }
1141 }
1142}