1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_ore::now::EpochMillis;
24use mz_ore::option::OptionExt;
25use mz_ore::tracing::OpenTelemetryContext;
26use mz_ore::{soft_assert_or_log, task};
27use mz_persist_client::usage::ShardsUsageReferenced;
28use mz_repr::{Datum, Diff, Row};
29use mz_sql::ast::Statement;
30use mz_sql::pure::PurifiedStatement;
31use mz_storage_client::controller::IntrospectionType;
32use opentelemetry::trace::TraceContextExt;
33use rand::{Rng, SeedableRng, rngs};
34use serde_json::json;
35use tracing::{Instrument, Level, event, info_span, warn};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
39use crate::catalog::{BuiltinTableUpdate, Op};
40use crate::command::Command;
41use crate::coord::{
42 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
43 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
44};
45use crate::telemetry::{EventDetails, SegmentClientExt};
46use crate::{AdapterNotice, TimestampContext};
47
48impl Coordinator {
49 #[instrument]
54 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
55 match msg {
56 Message::Command(otel_ctx, cmd) => {
57 let span = tracing::info_span!("message_command").or_current();
61 span.in_scope(|| otel_ctx.attach_as_parent());
62 self.message_command(cmd).instrument(span).await
63 }
64 Message::ControllerReady { controller: _ } => {
65 let Coordinator {
66 controller,
67 catalog,
68 ..
69 } = self;
70 let storage_metadata = catalog.state().storage_metadata();
71 if let Some(m) = controller
72 .process(storage_metadata)
73 .expect("`process` never returns an error")
74 {
75 self.message_controller(m).boxed_local().await
76 }
77 }
78 Message::PurifiedStatementReady(ready) => {
79 self.message_purified_statement_ready(ready)
80 .boxed_local()
81 .await
82 }
83 Message::CreateConnectionValidationReady(ready) => {
84 self.message_create_connection_validation_ready(ready)
85 .boxed_local()
86 .await
87 }
88 Message::AlterConnectionValidationReady(ready) => {
89 self.message_alter_connection_validation_ready(ready)
90 .boxed_local()
91 .await
92 }
93 Message::TryDeferred {
94 conn_id,
95 acquired_lock,
96 } => self.try_deferred(conn_id, acquired_lock).await,
97 Message::GroupCommitInitiate(span, permit) => {
98 tracing::Span::current().add_link(span.context().span().span_context().clone());
100 self.try_group_commit(permit)
101 .instrument(span)
102 .boxed_local()
103 .await
104 }
105 Message::AdvanceTimelines => {
106 self.advance_timelines().boxed_local().await;
107 }
108 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
109 Message::CancelPendingPeeks { conn_id } => {
110 self.cancel_pending_peeks(&conn_id);
111 }
112 Message::LinearizeReads => {
113 self.message_linearize_reads().boxed_local().await;
114 }
115 Message::StagedBatches {
116 conn_id,
117 table_id,
118 batches,
119 } => {
120 self.commit_staged_batches(conn_id, table_id, batches);
121 }
122 Message::StorageUsageSchedule => {
123 self.schedule_storage_usage_collection().boxed_local().await;
124 }
125 Message::StorageUsageFetch => {
126 self.storage_usage_fetch().boxed_local().await;
127 }
128 Message::StorageUsageUpdate(sizes) => {
129 self.storage_usage_update(sizes).boxed_local().await;
130 }
131 Message::StorageUsagePrune(expired) => {
132 self.storage_usage_prune(expired).boxed_local().await;
133 }
134 Message::ArrangementSizesSchedule => {
135 self.schedule_arrangement_sizes_collection()
136 .boxed_local()
137 .await;
138 }
139 Message::ArrangementSizesSnapshot => {
140 self.arrangement_sizes_snapshot().boxed_local().await;
141 }
142 Message::ArrangementSizesPrune(expired) => {
143 self.arrangement_sizes_prune(expired).boxed_local().await;
144 }
145 Message::RetireExecute {
146 otel_ctx,
147 data,
148 reason,
149 } => {
150 otel_ctx.attach_as_parent();
151 self.retire_execution(reason, data);
152 }
153 Message::ExecuteSingleStatementTransaction {
154 ctx,
155 otel_ctx,
156 stmt,
157 params,
158 } => {
159 otel_ctx.attach_as_parent();
160 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
161 .boxed_local()
162 .await;
163 }
164 Message::PeekStageReady { ctx, span, stage } => {
165 self.sequence_staged(ctx, span, stage).boxed_local().await;
166 }
167 Message::CreateIndexStageReady { ctx, span, stage } => {
168 self.sequence_staged(ctx, span, stage).boxed_local().await;
169 }
170 Message::CreateViewStageReady { ctx, span, stage } => {
171 self.sequence_staged(ctx, span, stage).boxed_local().await;
172 }
173 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
174 self.sequence_staged(ctx, span, stage).boxed_local().await;
175 }
176 Message::SubscribeStageReady { ctx, span, stage } => {
177 self.sequence_staged(ctx, span, stage).boxed_local().await;
178 }
179 Message::IntrospectionSubscribeStageReady { span, stage } => {
180 self.sequence_staged((), span, stage).boxed_local().await;
181 }
182 Message::ExplainTimestampStageReady { ctx, span, stage } => {
183 self.sequence_staged(ctx, span, stage).boxed_local().await;
184 }
185 Message::SecretStageReady { ctx, span, stage } => {
186 self.sequence_staged(ctx, span, stage).boxed_local().await;
187 }
188 Message::ClusterStageReady { ctx, span, stage } => {
189 self.sequence_staged(ctx, span, stage).boxed_local().await;
190 }
191 Message::DrainStatementLog => {
192 self.drain_statement_log();
193 }
194 Message::PrivateLinkVpcEndpointEvents(events) => {
195 if !self.controller.read_only() {
196 self.controller.storage.append_introspection_updates(
197 IntrospectionType::PrivatelinkConnectionStatusHistory,
198 events
199 .into_iter()
200 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
201 .collect(),
202 );
203 }
204 }
205 Message::CheckSchedulingPolicies => {
206 self.check_scheduling_policies().boxed_local().await;
207 }
208 Message::SchedulingDecisions(decisions) => {
209 self.handle_scheduling_decisions(decisions)
210 .boxed_local()
211 .await;
212 }
213 Message::DeferredStatementReady => {
214 self.handle_deferred_statement().boxed_local().await;
215 }
216 }
217 }
218
219 #[mz_ore::instrument(level = "debug")]
220 pub async fn storage_usage_fetch(&self) {
221 let internal_cmd_tx = self.internal_cmd_tx.clone();
222 let client = self.storage_usage_client.clone();
223
224 let live_shards: BTreeSet<_> = self
226 .controller
227 .storage
228 .active_collection_metadatas()
229 .into_iter()
230 .map(|(_id, m)| m.data_shard)
231 .collect();
232
233 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
234
235 task::spawn(|| "storage_usage_fetch", async move {
238 let collection_metric_timer = collection_metric.start_timer();
239 let shard_sizes = client.shards_usage_referenced(live_shards).await;
240 collection_metric_timer.observe_duration();
241
242 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
245 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
246 }
247 });
248 }
249
250 #[mz_ore::instrument(level = "debug")]
251 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
252 let collection_timestamp = if self.controller.read_only() {
257 self.peek_local_write_ts().await.into()
258 } else {
259 self.get_local_write_ts().await.timestamp.into()
262 };
263
264 let ops = shards_usage
265 .by_shard
266 .into_iter()
267 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
268 object_id: Some(shard_id.to_string()),
269 size_bytes: shard_usage.size_bytes(),
270 collection_timestamp,
271 })
272 .collect();
273
274 match self.catalog_transact_inner(None, ops).await {
275 Ok((table_updates, catalog_updates)) => {
276 assert!(
277 catalog_updates.is_empty(),
278 "applying builtin table updates does not produce catalog implications"
279 );
280
281 let internal_cmd_tx = self.internal_cmd_tx.clone();
282 let task_span =
283 info_span!(parent: None, "coord::storage_usage_update::table_updates");
284 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
285 task::spawn(|| "storage_usage_update_table_updates", async move {
286 table_updates.instrument(task_span).await;
287 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
289 warn!("internal_cmd_rx dropped before we could send: {e:?}");
290 }
291 });
292 }
293 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
294 }
295 }
296
297 #[mz_ore::instrument(level = "debug")]
298 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
299 let (fut, _) = self.builtin_table_update().execute(expired).await;
300 task::spawn(|| "storage_usage_pruning_apply", async move {
301 fut.await;
302 });
303 }
304
305 pub async fn schedule_storage_usage_collection(&self) {
306 const SEED_LEN: usize = 32;
314 let mut seed = [0; SEED_LEN];
315 for (i, byte) in self
316 .catalog()
317 .state()
318 .config()
319 .environment_id
320 .organization_id()
321 .as_bytes()
322 .into_iter()
323 .take(SEED_LEN)
324 .enumerate()
325 {
326 seed[i] = *byte;
327 }
328 let storage_usage_collection_interval_ms: EpochMillis =
329 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
330 .expect("storage usage collection interval must fit into u64");
331 let offset =
332 rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
333 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
334
335 let previous_collection_ts =
337 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
338 let next_collection_ts = if previous_collection_ts > now_ts {
339 previous_collection_ts
340 } else {
341 previous_collection_ts + storage_usage_collection_interval_ms
342 };
343 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
344
345 let internal_cmd_tx = self.internal_cmd_tx.clone();
347 task::spawn(|| "storage_usage_collection", async move {
348 tokio::time::sleep(next_collection_interval).await;
349 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
350 }
352 });
353 }
354
355 pub async fn schedule_arrangement_sizes_collection(&self) {
363 const MAX_SLEEP: Duration = Duration::from_secs(60);
364
365 let interval_duration =
366 mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_COLLECTION_INTERVAL
367 .get(self.catalog().system_config().dyncfgs());
368
369 if interval_duration.is_zero() {
372 let internal_cmd_tx = self.internal_cmd_tx.clone();
373 task::spawn(|| "arrangement_sizes_collection_disabled", async move {
374 tokio::time::sleep(MAX_SLEEP).await;
375 let _ = internal_cmd_tx.send(Message::ArrangementSizesSchedule);
376 });
377 return;
378 }
379
380 const SEED_LEN: usize = 32;
381 let mut seed = [0; SEED_LEN];
382 for (i, byte) in self
383 .catalog()
384 .state()
385 .config()
386 .environment_id
387 .organization_id()
388 .as_bytes()
389 .into_iter()
390 .take(SEED_LEN)
391 .enumerate()
392 {
393 seed[i] = *byte;
394 }
395 let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis())
396 .expect("arrangement_size_history_collection_interval must fit into u64");
397 let interval_ms = interval_ms.max(1);
399 let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms);
400 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
401
402 let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset;
403 let next_collection_ts = if previous_collection_ts > now_ts {
404 previous_collection_ts
405 } else {
406 previous_collection_ts + interval_ms
407 };
408 let sleep_for = Duration::from_millis(next_collection_ts - now_ts);
409
410 let (capped_sleep, fire_snapshot) = if sleep_for <= MAX_SLEEP {
414 (sleep_for, true)
415 } else {
416 (MAX_SLEEP, false)
417 };
418
419 let internal_cmd_tx = self.internal_cmd_tx.clone();
420 task::spawn(|| "arrangement_sizes_collection", async move {
421 tokio::time::sleep(capped_sleep).await;
422 let msg = if fire_snapshot {
423 Message::ArrangementSizesSnapshot
424 } else {
425 Message::ArrangementSizesSchedule
426 };
427 let _ = internal_cmd_tx.send(msg);
429 });
430 }
431
432 #[mz_ore::instrument(level = "debug")]
442 async fn arrangement_sizes_snapshot(&mut self) {
443 if self.controller.read_only() {
445 self.schedule_arrangement_sizes_collection().await;
446 return;
447 }
448
449 let collection_timer = self
450 .metrics
451 .arrangement_sizes_collection_time_seconds
452 .start_timer();
453
454 let live_item_id = self.catalog().resolve_builtin_storage_collection(
455 &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED,
456 );
457 let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id();
458 let hydration_item_id = self
459 .catalog()
460 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
461 let hydration_global_id = self
462 .catalog
463 .get_entry(&hydration_item_id)
464 .latest_global_id();
465 let history_item_id = self
466 .catalog()
467 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
468
469 let read_ts = self.get_local_read_ts().await;
470 let snapshot = match self
471 .controller
472 .storage_collections
473 .snapshot(live_global_id, read_ts)
474 .await
475 {
476 Ok(s) => s,
477 Err(e) => {
478 tracing::warn!("arrangement sizes snapshot failed: {e:?}");
479 drop(collection_timer);
480 self.schedule_arrangement_sizes_collection().await;
481 return;
482 }
483 };
484 let mut hydration_snapshot = match self
485 .controller
486 .storage_collections
487 .snapshot(hydration_global_id, read_ts)
488 .await
489 {
490 Ok(s) => s,
491 Err(e) => {
492 tracing::warn!("arrangement sizes hydration snapshot failed: {e:?}");
493 drop(collection_timer);
494 self.schedule_arrangement_sizes_collection().await;
495 return;
496 }
497 };
498 differential_dataflow::consolidation::consolidate(&mut hydration_snapshot);
499
500 let mut datum_vec = mz_repr::DatumVec::new();
504 let mut hydrated: BTreeSet<(String, String)> = BTreeSet::new();
505 const HYDRATION_COL_REPLICA_ID: usize = 0;
506 const HYDRATION_COL_OBJECT_ID: usize = 1;
507 const HYDRATION_COL_TIME_NS: usize = 2;
508 const HYDRATION_COL_COUNT: usize = 3;
509 for (row, diff) in &hydration_snapshot {
510 if *diff != 1 {
511 continue;
512 }
513 let datums = datum_vec.borrow_with(row);
514 if datums.len() < HYDRATION_COL_COUNT {
515 continue;
516 }
517 if datums[HYDRATION_COL_TIME_NS].is_null() {
518 continue;
519 }
520 hydrated.insert((
521 datums[HYDRATION_COL_REPLICA_ID].unwrap_str().to_string(),
522 datums[HYDRATION_COL_OBJECT_ID].unwrap_str().to_string(),
523 ));
524 }
525
526 let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into();
531 let collection_datum = Datum::TimestampTz(
532 mz_ore::now::to_datetime(collection_ts)
533 .try_into()
534 .expect("collection_timestamp must fit into TimestampTz"),
535 );
536
537 let mut consolidated = snapshot;
538 differential_dataflow::consolidation::consolidate(&mut consolidated);
539
540 const LIVE_COL_REPLICA_ID: usize = 0;
542 const LIVE_COL_OBJECT_ID: usize = 1;
543 const LIVE_COL_SIZE: usize = 2;
544 const LIVE_COL_COUNT: usize = 3;
545
546 let mut skipped_malformed: u64 = 0;
547 let mut skipped_null_size: u64 = 0;
548 let mut updates: Vec<BuiltinTableUpdate> = Vec::with_capacity(consolidated.len());
549 for (row, diff) in consolidated.iter() {
550 if *diff != 1 {
551 continue;
552 }
553 let datums = datum_vec.borrow_with(row);
554 if datums.len() != LIVE_COL_COUNT {
557 skipped_malformed += 1;
558 continue;
559 }
560 let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str();
561 let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str();
562 let size_datum = datums[LIVE_COL_SIZE];
563 if size_datum.is_null() {
566 skipped_null_size += 1;
567 continue;
568 }
569 let size = size_datum.unwrap_int64();
570 let hydration_complete =
574 hydrated.contains(&(replica_id.to_string(), object_id.to_string()));
575 let new_row = Row::pack_slice(&[
576 Datum::String(replica_id),
577 Datum::String(object_id),
578 Datum::Int64(size),
579 collection_datum,
580 Datum::from(hydration_complete),
581 ]);
582 updates.push(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE));
583 }
584 if skipped_malformed > 0 {
585 warn!(
586 "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
587 with unexpected arity"
588 );
589 }
590 if skipped_null_size > 0 {
591 tracing::debug!("skipped {skipped_null_size} live rows with null size");
592 }
593
594 let row_count = updates.len();
595 collection_timer.observe_duration();
598
599 if !updates.is_empty() {
600 self.metrics
601 .arrangement_sizes_rows_written
602 .inc_by(u64::cast_from(row_count));
603 let (fut, _) = self.builtin_table_update().execute(updates).await;
608 let internal_cmd_tx = self.internal_cmd_tx.clone();
609 let task_span =
610 info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates");
611 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
612 task::spawn(|| "arrangement_sizes_snapshot_apply", async move {
613 fut.instrument(task_span).await;
614 if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) {
615 warn!("internal_cmd_rx dropped before we could send: {e:?}");
616 }
617 });
618 } else {
619 self.schedule_arrangement_sizes_collection().await;
620 }
621
622 tracing::debug!(
623 "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
624 );
625 }
626
627 #[mz_ore::instrument(level = "debug")]
628 async fn arrangement_sizes_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
629 let (fut, _) = self.builtin_table_update().execute(expired).await;
630 task::spawn(|| "arrangement_sizes_pruning_apply", async move {
631 fut.await;
632 });
633 }
634
635 #[mz_ore::instrument(level = "debug")]
636 async fn message_command(&mut self, cmd: Command) {
637 self.handle_command(cmd).await;
638 }
639
640 #[mz_ore::instrument(level = "debug")]
641 async fn message_controller(&mut self, message: ControllerResponse) {
642 event!(Level::TRACE, message = format!("{:?}", message));
643 match message {
644 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
645 self.handle_peek_notification(uuid, response, otel_ctx);
646 }
647 ControllerResponse::SubscribeResponse(sink_id, response) => {
648 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
649 self.active_compute_sinks.get_mut(&sink_id)
650 {
651 let finished = active_subscribe.process_response(response);
652 if finished {
653 self.retire_compute_sinks(btreemap! {
654 sink_id => ActiveComputeSinkRetireReason::Finished,
655 })
656 .await;
657 }
658
659 soft_assert_or_log!(
660 !self.introspection_subscribes.contains_key(&sink_id),
661 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
662 and `introspection_subscribes`",
663 );
664 } else if self.introspection_subscribes.contains_key(&sink_id) {
665 self.handle_introspection_subscribe_batch(sink_id, response)
666 .await;
667 } else {
668 }
671 }
672 ControllerResponse::CopyToResponse(sink_id, response) => {
673 match self.drop_compute_sink(sink_id).await {
674 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
675 active_copy_to.retire_with_response(response);
676 }
677 _ => {
678 }
681 }
682 }
683 ControllerResponse::WatchSetFinished(ws_ids) => {
684 let now = self.now();
685 for ws_id in ws_ids {
686 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
687 continue;
688 };
689 self.connection_watch_sets
690 .get_mut(&conn_id)
691 .expect("corrupted coordinator state: unknown connection id")
692 .remove(&ws_id);
693 if self.connection_watch_sets[&conn_id].is_empty() {
694 self.connection_watch_sets.remove(&conn_id);
695 }
696
697 match rsp {
698 WatchSetResponse::StatementDependenciesReady(id, ev) => {
699 self.record_statement_lifecycle_event(&id, &ev, now);
700 }
701 WatchSetResponse::AlterSinkReady(ctx) => {
702 self.sequence_alter_sink_finish(ctx).await;
703 }
704 WatchSetResponse::AlterMaterializedViewReady(ctx) => {
705 self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
706 .await;
707 }
708 }
709 }
710 }
711 }
712 }
713
714 #[mz_ore::instrument(level = "debug")]
715 async fn message_purified_statement_ready(
716 &mut self,
717 PurifiedStatementReady {
718 ctx,
719 result,
720 params,
721 mut plan_validity,
722 original_stmt,
723 otel_ctx,
724 }: PurifiedStatementReady,
725 ) {
726 otel_ctx.attach_as_parent();
727
728 if plan_validity.check(self.catalog()).is_err() {
739 self.handle_execute_inner(original_stmt, params, ctx).await;
740 return;
741 }
742
743 let purified_statement = match result {
744 Ok(ok) => ok,
745 Err(e) => return ctx.retire(Err(e)),
746 };
747
748 let plan = match purified_statement {
749 PurifiedStatement::PurifiedCreateSource {
750 create_progress_subsource_stmt,
751 create_source_stmt,
752 subsources,
753 available_source_references,
754 } => {
755 self.plan_purified_create_source(
756 &ctx,
757 params,
758 create_progress_subsource_stmt,
759 create_source_stmt,
760 subsources,
761 available_source_references,
762 )
763 .await
764 }
765 PurifiedStatement::PurifiedAlterSourceAddSubsources {
766 source_name,
767 options,
768 subsources,
769 } => {
770 self.plan_purified_alter_source_add_subsource(
771 ctx.session(),
772 params,
773 source_name,
774 options,
775 subsources,
776 )
777 .await
778 }
779 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
780 source_name,
781 available_source_references,
782 } => self.plan_purified_alter_source_refresh_references(
783 ctx.session(),
784 params,
785 source_name,
786 available_source_references,
787 ),
788 o @ (PurifiedStatement::PurifiedAlterSource { .. }
789 | PurifiedStatement::PurifiedCreateSink(..)
790 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
791 let stmt = match o {
793 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
794 Statement::AlterSource(alter_source_stmt)
795 }
796 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
797 Statement::CreateTableFromSource(stmt)
798 }
799 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
800 PurifiedStatement::PurifiedCreateSource { .. }
801 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
802 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
803 unreachable!("not part of exterior match stmt")
804 }
805 };
806
807 let catalog = self.catalog().for_session(ctx.session());
810 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
811 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
812 .map(|plan| (plan, resolved_ids))
813 }
814 };
815
816 match plan {
817 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
818 Err(e) => ctx.retire(Err(e)),
819 }
820 }
821
822 #[mz_ore::instrument(level = "debug")]
823 async fn message_create_connection_validation_ready(
824 &mut self,
825 CreateConnectionValidationReady {
826 mut ctx,
827 result,
828 connection_id,
829 connection_gid,
830 mut plan_validity,
831 otel_ctx,
832 resolved_ids,
833 }: CreateConnectionValidationReady,
834 ) {
835 otel_ctx.attach_as_parent();
836
837 if let Err(e) = plan_validity.check(self.catalog()) {
843 if self.secrets_controller.delete(connection_id).await.is_ok() {
844 self.caching_secrets_reader.invalidate(connection_id);
845 }
846 return ctx.retire(Err(e));
847 }
848
849 let plan = match result {
850 Ok(ok) => ok,
851 Err(e) => {
852 if self.secrets_controller.delete(connection_id).await.is_ok() {
853 self.caching_secrets_reader.invalidate(connection_id);
854 }
855 return ctx.retire(Err(e));
856 }
857 };
858
859 let result = self
860 .sequence_create_connection_stage_finish(
861 &mut ctx,
862 connection_id,
863 connection_gid,
864 plan,
865 resolved_ids,
866 )
867 .await;
868 ctx.retire(result);
869 }
870
871 #[mz_ore::instrument(level = "debug")]
872 async fn message_alter_connection_validation_ready(
873 &mut self,
874 AlterConnectionValidationReady {
875 mut ctx,
876 result,
877 connection_id,
878 connection_gid: _,
879 mut plan_validity,
880 otel_ctx,
881 resolved_ids: _,
882 }: AlterConnectionValidationReady,
883 ) {
884 otel_ctx.attach_as_parent();
885
886 if let Err(e) = plan_validity.check(self.catalog()) {
892 return ctx.retire(Err(e));
893 }
894
895 let conn = match result {
896 Ok(ok) => ok,
897 Err(e) => {
898 return ctx.retire(Err(e));
899 }
900 };
901
902 let result = self
903 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
904 .await;
905 ctx.retire(result);
906 }
907
908 #[mz_ore::instrument(level = "debug")]
909 async fn message_cluster_event(&mut self, event: ClusterEvent) {
910 event!(Level::TRACE, event = format!("{:?}", event));
911
912 if let Some(segment_client) = &self.segment_client {
913 let env_id = &self.catalog().config().environment_id;
914 let mut properties = json!({
915 "cluster_id": event.cluster_id.to_string(),
916 "replica_id": event.replica_id.to_string(),
917 "process_id": event.process_id,
918 "status": event.status.as_kebab_case_str(),
919 });
920 match event.status {
921 ClusterStatus::Online => (),
922 ClusterStatus::Offline(reason) => {
923 let properties = match &mut properties {
924 serde_json::Value::Object(map) => map,
925 _ => unreachable!(),
926 };
927 properties.insert(
928 "reason".into(),
929 json!(reason.display_or("unknown").to_string()),
930 );
931 }
932 };
933 segment_client.environment_track(
934 env_id,
935 "Cluster Changed Status",
936 properties,
937 EventDetails {
938 timestamp: Some(event.time),
939 ..Default::default()
940 },
941 );
942 }
943
944 let Some(replica_statues) = self
947 .cluster_replica_statuses
948 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
949 else {
950 return;
951 };
952
953 if event.status != replica_statues[&event.process_id].status {
954 if !self.controller.read_only() {
955 let offline_reason = match event.status {
956 ClusterStatus::Online => None,
957 ClusterStatus::Offline(None) => None,
958 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
959 };
960 let row = Row::pack_slice(&[
961 Datum::String(&event.replica_id.to_string()),
962 Datum::UInt64(event.process_id),
963 Datum::String(event.status.as_kebab_case_str()),
964 Datum::from(offline_reason.as_deref()),
965 Datum::TimestampTz(event.time.try_into().expect("must fit")),
966 ]);
967 self.controller.storage.append_introspection_updates(
968 IntrospectionType::ReplicaStatusHistory,
969 vec![(row, Diff::ONE)],
970 );
971 }
972
973 let old_replica_status =
974 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
975
976 let new_process_status = ClusterReplicaProcessStatus {
977 status: event.status,
978 time: event.time,
979 };
980 self.cluster_replica_statuses.ensure_cluster_status(
981 event.cluster_id,
982 event.replica_id,
983 event.process_id,
984 new_process_status,
985 );
986
987 let cluster = self.catalog().get_cluster(event.cluster_id);
988 let replica = cluster.replica(event.replica_id).expect("Replica exists");
989 let new_replica_status = self
990 .cluster_replica_statuses
991 .get_cluster_replica_status(event.cluster_id, event.replica_id);
992
993 if old_replica_status != new_replica_status {
994 let notifier = self.broadcast_notice_tx();
995 let notice = AdapterNotice::ClusterReplicaStatusChanged {
996 cluster: cluster.name.clone(),
997 replica: replica.name.clone(),
998 status: new_replica_status,
999 time: event.time,
1000 };
1001 notifier(notice);
1002 }
1003 }
1004 }
1005
1006 #[mz_ore::instrument(level = "debug")]
1007 async fn message_linearize_reads(&mut self) {
1012 let mut shortest_wait = Duration::MAX;
1013 let mut ready_txns = Vec::new();
1014
1015 let mut cached_oracle_ts = BTreeMap::new();
1020
1021 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
1022 if let TimestampContext::TimelineTimestamp {
1023 timeline,
1024 chosen_ts,
1025 oracle_ts,
1026 } = read_txn.timestamp_context()
1027 {
1028 let oracle_ts = match oracle_ts {
1029 Some(oracle_ts) => oracle_ts,
1030 None => {
1031 ready_txns.push(read_txn);
1033 continue;
1034 }
1035 };
1036
1037 if chosen_ts <= oracle_ts {
1038 ready_txns.push(read_txn);
1041 continue;
1042 }
1043
1044 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
1046 let current_oracle_ts = match current_oracle_ts {
1047 btree_map::Entry::Vacant(entry) => {
1048 let timestamp_oracle = self.get_timestamp_oracle(timeline);
1049 let read_ts = timestamp_oracle.read_ts().await;
1050 entry.insert(read_ts.clone());
1051 read_ts
1052 }
1053 btree_map::Entry::Occupied(entry) => entry.get().clone(),
1054 };
1055
1056 if *chosen_ts <= current_oracle_ts {
1057 ready_txns.push(read_txn);
1058 } else {
1059 let wait =
1060 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
1061 if wait < shortest_wait {
1062 shortest_wait = wait;
1063 }
1064 read_txn.num_requeues += 1;
1065 self.pending_linearize_read_txns.insert(conn_id, read_txn);
1066 }
1067 } else {
1068 ready_txns.push(read_txn);
1069 }
1070 }
1071
1072 if !ready_txns.is_empty() {
1073 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
1076 let span = tracing::debug_span!("message_linearize_reads");
1077 otel_ctx.attach_as_parent_to(&span);
1078
1079 let now = Instant::now();
1080 for ready_txn in ready_txns {
1081 let span = tracing::debug_span!("retire_read_results");
1082 ready_txn.otel_ctx.attach_as_parent_to(&span);
1083 let _entered = span.enter();
1084 self.metrics
1085 .linearize_message_seconds
1086 .with_label_values(&[
1087 ready_txn.txn.label(),
1088 if ready_txn.num_requeues == 0 {
1089 "true"
1090 } else {
1091 "false"
1092 },
1093 ])
1094 .observe((now - ready_txn.created).as_secs_f64());
1095 if let Some((ctx, result)) = ready_txn.txn.finish() {
1096 ctx.retire(result);
1097 }
1098 }
1099 }
1100
1101 if !self.pending_linearize_read_txns.is_empty() {
1102 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
1104 let internal_cmd_tx = self.internal_cmd_tx.clone();
1105 task::spawn(|| "deferred_read_txns", async move {
1106 tokio::time::sleep(remaining_ms).await;
1107 let result = internal_cmd_tx.send(Message::LinearizeReads);
1109 if let Err(e) = result {
1110 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1111 }
1112 });
1113 }
1114 }
1115}