1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_ore::now::EpochMillis;
24use mz_ore::option::OptionExt;
25use mz_ore::tracing::OpenTelemetryContext;
26use mz_ore::{soft_assert_or_log, task};
27use mz_persist_client::usage::ShardsUsageReferenced;
28use mz_repr::{Datum, Diff, Row};
29use mz_sql::ast::Statement;
30use mz_sql::names::ResolvedIds;
31use mz_sql::pure::PurifiedStatement;
32use mz_storage_client::controller::IntrospectionType;
33use opentelemetry::trace::TraceContextExt;
34use rand::{Rng, SeedableRng, rngs};
35use serde_json::json;
36use tracing::{Instrument, Level, event, info_span, warn};
37use tracing_opentelemetry::OpenTelemetrySpanExt;
38
39use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
40use crate::catalog::{BuiltinTableUpdate, Op};
41use crate::command::Command;
42use crate::coord::{
43 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
44 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
45};
46use crate::telemetry::{EventDetails, SegmentClientExt};
47use crate::{AdapterNotice, TimestampContext};
48
49impl Coordinator {
50 #[instrument]
55 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
56 match msg {
57 Message::Command(otel_ctx, cmd) => {
58 let span = tracing::info_span!("message_command").or_current();
62 span.in_scope(|| otel_ctx.attach_as_parent());
63 self.message_command(cmd).instrument(span).await
64 }
65 Message::ControllerReady { controller: _ } => {
66 let Coordinator {
67 controller,
68 catalog,
69 ..
70 } = self;
71 let storage_metadata = catalog.state().storage_metadata();
72 if let Some(m) = controller
73 .process(storage_metadata)
74 .expect("`process` never returns an error")
75 {
76 self.message_controller(m).boxed_local().await
77 }
78 }
79 Message::PurifiedStatementReady(ready) => {
80 self.message_purified_statement_ready(ready)
81 .boxed_local()
82 .await
83 }
84 Message::CreateConnectionValidationReady(ready) => {
85 self.message_create_connection_validation_ready(ready)
86 .boxed_local()
87 .await
88 }
89 Message::AlterConnectionValidationReady(ready) => {
90 self.message_alter_connection_validation_ready(ready)
91 .boxed_local()
92 .await
93 }
94 Message::TryDeferred {
95 conn_id,
96 acquired_lock,
97 } => self.try_deferred(conn_id, acquired_lock).await,
98 Message::GroupCommitInitiate(span, permit) => {
99 tracing::Span::current().add_link(span.context().span().span_context().clone());
101 self.try_group_commit(permit)
102 .instrument(span)
103 .boxed_local()
104 .await
105 }
106 Message::AdvanceTimelines => {
107 self.advance_timelines().boxed_local().await;
108 }
109 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
110 Message::CancelPendingPeeks { conn_id } => {
111 self.cancel_pending_peeks(&conn_id);
112 }
113 Message::LinearizeReads => {
114 self.message_linearize_reads().boxed_local().await;
115 }
116 Message::StagedBatches {
117 conn_id,
118 table_id,
119 batches,
120 } => {
121 self.commit_staged_batches(conn_id, table_id, batches);
122 }
123 Message::StorageUsageSchedule => {
124 self.schedule_storage_usage_collection().boxed_local().await;
125 }
126 Message::StorageUsageFetch => {
127 self.storage_usage_fetch().boxed_local().await;
128 }
129 Message::StorageUsageUpdate(sizes) => {
130 self.storage_usage_update(sizes).boxed_local().await;
131 }
132 Message::StorageUsagePrune(expired) => {
133 self.storage_usage_prune(expired).boxed_local().await;
134 }
135 Message::ArrangementSizesSchedule => {
136 self.schedule_arrangement_sizes_collection()
137 .boxed_local()
138 .await;
139 }
140 Message::ArrangementSizesSnapshot => {
141 self.arrangement_sizes_snapshot().boxed_local().await;
142 }
143 Message::ArrangementSizesPrune(expired) => {
144 self.arrangement_sizes_prune(expired).boxed_local().await;
145 }
146 Message::RetireExecute {
147 otel_ctx,
148 data,
149 reason,
150 } => {
151 otel_ctx.attach_as_parent();
152 self.retire_execution(reason, data);
153 }
154 Message::ExecuteSingleStatementTransaction {
155 ctx,
156 otel_ctx,
157 stmt,
158 params,
159 } => {
160 otel_ctx.attach_as_parent();
161 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
162 .boxed_local()
163 .await;
164 }
165 Message::PeekStageReady { ctx, span, stage } => {
166 self.sequence_staged(ctx, span, stage).boxed_local().await;
167 }
168 Message::CreateIndexStageReady { ctx, span, stage } => {
169 self.sequence_staged(ctx, span, stage).boxed_local().await;
170 }
171 Message::CreateViewStageReady { ctx, span, stage } => {
172 self.sequence_staged(ctx, span, stage).boxed_local().await;
173 }
174 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
175 self.sequence_staged(ctx, span, stage).boxed_local().await;
176 }
177 Message::SubscribeStageReady { ctx, span, stage } => {
178 self.sequence_staged(ctx, span, stage).boxed_local().await;
179 }
180 Message::IntrospectionSubscribeStageReady { span, stage } => {
181 self.sequence_staged((), span, stage).boxed_local().await;
182 }
183 Message::ExplainTimestampStageReady { ctx, span, stage } => {
184 self.sequence_staged(ctx, span, stage).boxed_local().await;
185 }
186 Message::SecretStageReady { ctx, span, stage } => {
187 self.sequence_staged(ctx, span, stage).boxed_local().await;
188 }
189 Message::ClusterStageReady { ctx, span, stage } => {
190 self.sequence_staged(ctx, span, stage).boxed_local().await;
191 }
192 Message::DrainStatementLog => {
193 self.drain_statement_log();
194 }
195 Message::PrivateLinkVpcEndpointEvents(events) => {
196 if !self.controller.read_only() {
197 self.controller.storage.append_introspection_updates(
198 IntrospectionType::PrivatelinkConnectionStatusHistory,
199 events
200 .into_iter()
201 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
202 .collect(),
203 );
204 }
205 }
206 Message::CheckSchedulingPolicies => {
207 self.check_scheduling_policies().boxed_local().await;
208 }
209 Message::SchedulingDecisions(decisions) => {
210 self.handle_scheduling_decisions(decisions)
211 .boxed_local()
212 .await;
213 }
214 Message::DeferredStatementReady => {
215 self.handle_deferred_statement().boxed_local().await;
216 }
217 }
218 }
219
220 #[mz_ore::instrument(level = "debug")]
221 pub async fn storage_usage_fetch(&self) {
222 let internal_cmd_tx = self.internal_cmd_tx.clone();
223 let client = self.storage_usage_client.clone();
224
225 let live_shards: BTreeSet<_> = self
227 .controller
228 .storage
229 .active_collection_metadatas()
230 .into_iter()
231 .map(|(_id, m)| m.data_shard)
232 .collect();
233
234 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
235
236 task::spawn(|| "storage_usage_fetch", async move {
239 let collection_metric_timer = collection_metric.start_timer();
240 let shard_sizes = client.shards_usage_referenced(live_shards).await;
241 collection_metric_timer.observe_duration();
242
243 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
246 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
247 }
248 });
249 }
250
251 #[mz_ore::instrument(level = "debug")]
252 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
253 let collection_timestamp = if self.controller.read_only() {
258 self.peek_local_write_ts().await.into()
259 } else {
260 self.get_local_write_ts().await.timestamp.into()
263 };
264
265 let ops = shards_usage
266 .by_shard
267 .into_iter()
268 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
269 object_id: Some(shard_id.to_string()),
270 size_bytes: shard_usage.size_bytes(),
271 collection_timestamp,
272 })
273 .collect();
274
275 match self.catalog_transact_inner(None, ops).await {
276 Ok((table_updates, catalog_updates)) => {
277 assert!(
278 catalog_updates.is_empty(),
279 "applying builtin table updates does not produce catalog implications"
280 );
281
282 let internal_cmd_tx = self.internal_cmd_tx.clone();
283 let task_span =
284 info_span!(parent: None, "coord::storage_usage_update::table_updates");
285 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
286 task::spawn(|| "storage_usage_update_table_updates", async move {
287 table_updates.instrument(task_span).await;
288 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
290 warn!("internal_cmd_rx dropped before we could send: {e:?}");
291 }
292 });
293 }
294 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
295 }
296 }
297
298 #[mz_ore::instrument(level = "debug")]
299 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
300 let (fut, _) = self.builtin_table_update().execute(expired).await;
301 task::spawn(|| "storage_usage_pruning_apply", async move {
302 fut.await;
303 });
304 }
305
306 pub async fn schedule_storage_usage_collection(&self) {
307 const SEED_LEN: usize = 32;
315 let mut seed = [0; SEED_LEN];
316 for (i, byte) in self
317 .catalog()
318 .state()
319 .config()
320 .environment_id
321 .organization_id()
322 .as_bytes()
323 .into_iter()
324 .take(SEED_LEN)
325 .enumerate()
326 {
327 seed[i] = *byte;
328 }
329 let storage_usage_collection_interval_ms: EpochMillis =
330 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
331 .expect("storage usage collection interval must fit into u64");
332 let offset =
333 rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
334 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
335
336 let previous_collection_ts =
338 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
339 let next_collection_ts = if previous_collection_ts > now_ts {
340 previous_collection_ts
341 } else {
342 previous_collection_ts + storage_usage_collection_interval_ms
343 };
344 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
345
346 let internal_cmd_tx = self.internal_cmd_tx.clone();
348 task::spawn(|| "storage_usage_collection", async move {
349 tokio::time::sleep(next_collection_interval).await;
350 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
351 }
353 });
354 }
355
356 pub async fn schedule_arrangement_sizes_collection(&self) {
364 const MAX_SLEEP: Duration = Duration::from_secs(60);
365
366 let interval_duration =
367 mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_COLLECTION_INTERVAL
368 .get(self.catalog().system_config().dyncfgs());
369
370 if interval_duration.is_zero() {
373 let internal_cmd_tx = self.internal_cmd_tx.clone();
374 task::spawn(|| "arrangement_sizes_collection_disabled", async move {
375 tokio::time::sleep(MAX_SLEEP).await;
376 let _ = internal_cmd_tx.send(Message::ArrangementSizesSchedule);
377 });
378 return;
379 }
380
381 const SEED_LEN: usize = 32;
382 let mut seed = [0; SEED_LEN];
383 for (i, byte) in self
384 .catalog()
385 .state()
386 .config()
387 .environment_id
388 .organization_id()
389 .as_bytes()
390 .into_iter()
391 .take(SEED_LEN)
392 .enumerate()
393 {
394 seed[i] = *byte;
395 }
396 let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis())
397 .expect("arrangement_size_history_collection_interval must fit into u64");
398 let interval_ms = interval_ms.max(1);
400 let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms);
401 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
402
403 let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset;
404 let next_collection_ts = if previous_collection_ts > now_ts {
405 previous_collection_ts
406 } else {
407 previous_collection_ts + interval_ms
408 };
409 let sleep_for = Duration::from_millis(next_collection_ts - now_ts);
410
411 let (capped_sleep, fire_snapshot) = if sleep_for <= MAX_SLEEP {
415 (sleep_for, true)
416 } else {
417 (MAX_SLEEP, false)
418 };
419
420 let internal_cmd_tx = self.internal_cmd_tx.clone();
421 task::spawn(|| "arrangement_sizes_collection", async move {
422 tokio::time::sleep(capped_sleep).await;
423 let msg = if fire_snapshot {
424 Message::ArrangementSizesSnapshot
425 } else {
426 Message::ArrangementSizesSchedule
427 };
428 let _ = internal_cmd_tx.send(msg);
430 });
431 }
432
433 #[mz_ore::instrument(level = "debug")]
443 async fn arrangement_sizes_snapshot(&mut self) {
444 if self.controller.read_only() {
446 self.schedule_arrangement_sizes_collection().await;
447 return;
448 }
449
450 let collection_timer = self
451 .metrics
452 .arrangement_sizes_collection_time_seconds
453 .start_timer();
454
455 let live_item_id = self.catalog().resolve_builtin_storage_collection(
456 &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED,
457 );
458 let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id();
459 let hydration_item_id = self
460 .catalog()
461 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
462 let hydration_global_id = self
463 .catalog
464 .get_entry(&hydration_item_id)
465 .latest_global_id();
466 let history_item_id = self
467 .catalog()
468 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
469
470 let read_ts = self.get_local_read_ts().await;
471 let snapshot = match self
472 .controller
473 .storage_collections
474 .snapshot(live_global_id, read_ts)
475 .await
476 {
477 Ok(s) => s,
478 Err(e) => {
479 tracing::warn!("arrangement sizes snapshot failed: {e:?}");
480 drop(collection_timer);
481 self.schedule_arrangement_sizes_collection().await;
482 return;
483 }
484 };
485 let mut hydration_snapshot = match self
486 .controller
487 .storage_collections
488 .snapshot(hydration_global_id, read_ts)
489 .await
490 {
491 Ok(s) => s,
492 Err(e) => {
493 tracing::warn!("arrangement sizes hydration snapshot failed: {e:?}");
494 drop(collection_timer);
495 self.schedule_arrangement_sizes_collection().await;
496 return;
497 }
498 };
499 differential_dataflow::consolidation::consolidate(&mut hydration_snapshot);
500
501 let mut datum_vec = mz_repr::DatumVec::new();
505 let mut hydrated: BTreeSet<(String, String)> = BTreeSet::new();
506 const HYDRATION_COL_REPLICA_ID: usize = 0;
507 const HYDRATION_COL_OBJECT_ID: usize = 1;
508 const HYDRATION_COL_TIME_NS: usize = 2;
509 const HYDRATION_COL_COUNT: usize = 3;
510 for (row, diff) in &hydration_snapshot {
511 if *diff != 1 {
512 continue;
513 }
514 let datums = datum_vec.borrow_with(row);
515 if datums.len() < HYDRATION_COL_COUNT {
516 continue;
517 }
518 if datums[HYDRATION_COL_TIME_NS].is_null() {
519 continue;
520 }
521 hydrated.insert((
522 datums[HYDRATION_COL_REPLICA_ID].unwrap_str().to_string(),
523 datums[HYDRATION_COL_OBJECT_ID].unwrap_str().to_string(),
524 ));
525 }
526
527 let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into();
532 let collection_datum = Datum::TimestampTz(
533 mz_ore::now::to_datetime(collection_ts)
534 .try_into()
535 .expect("collection_timestamp must fit into TimestampTz"),
536 );
537
538 let mut consolidated = snapshot;
539 differential_dataflow::consolidation::consolidate(&mut consolidated);
540
541 const LIVE_COL_REPLICA_ID: usize = 0;
543 const LIVE_COL_OBJECT_ID: usize = 1;
544 const LIVE_COL_SIZE: usize = 2;
545 const LIVE_COL_COUNT: usize = 3;
546
547 let mut skipped_malformed: u64 = 0;
548 let mut skipped_null_size: u64 = 0;
549 let mut updates: Vec<BuiltinTableUpdate> = Vec::with_capacity(consolidated.len());
550 for (row, diff) in consolidated.iter() {
551 if *diff != 1 {
552 continue;
553 }
554 let datums = datum_vec.borrow_with(row);
555 if datums.len() != LIVE_COL_COUNT {
558 skipped_malformed += 1;
559 continue;
560 }
561 let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str();
562 let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str();
563 let size_datum = datums[LIVE_COL_SIZE];
564 if size_datum.is_null() {
567 skipped_null_size += 1;
568 continue;
569 }
570 let size = size_datum.unwrap_int64();
571 let hydration_complete =
575 hydrated.contains(&(replica_id.to_string(), object_id.to_string()));
576 let new_row = Row::pack_slice(&[
577 Datum::String(replica_id),
578 Datum::String(object_id),
579 Datum::Int64(size),
580 collection_datum,
581 Datum::from(hydration_complete),
582 ]);
583 updates.push(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE));
584 }
585 if skipped_malformed > 0 {
586 warn!(
587 "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
588 with unexpected arity"
589 );
590 }
591 if skipped_null_size > 0 {
592 tracing::debug!("skipped {skipped_null_size} live rows with null size");
593 }
594
595 let row_count = updates.len();
596 collection_timer.observe_duration();
599
600 if !updates.is_empty() {
601 self.metrics
602 .arrangement_sizes_rows_written
603 .inc_by(u64::cast_from(row_count));
604 let (fut, _) = self.builtin_table_update().execute(updates).await;
609 let internal_cmd_tx = self.internal_cmd_tx.clone();
610 let task_span =
611 info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates");
612 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
613 task::spawn(|| "arrangement_sizes_snapshot_apply", async move {
614 fut.instrument(task_span).await;
615 if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) {
616 warn!("internal_cmd_rx dropped before we could send: {e:?}");
617 }
618 });
619 } else {
620 self.schedule_arrangement_sizes_collection().await;
621 }
622
623 tracing::debug!(
624 "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
625 );
626 }
627
628 #[mz_ore::instrument(level = "debug")]
629 async fn arrangement_sizes_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
630 let (fut, _) = self.builtin_table_update().execute(expired).await;
631 task::spawn(|| "arrangement_sizes_pruning_apply", async move {
632 fut.await;
633 });
634 }
635
636 #[mz_ore::instrument(level = "debug")]
637 async fn message_command(&mut self, cmd: Command) {
638 self.handle_command(cmd).await;
639 }
640
641 #[mz_ore::instrument(level = "debug")]
642 async fn message_controller(&mut self, message: ControllerResponse) {
643 event!(Level::TRACE, message = format!("{:?}", message));
644 match message {
645 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
646 self.handle_peek_notification(uuid, response, otel_ctx);
647 }
648 ControllerResponse::SubscribeResponse(sink_id, response) => {
649 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
650 self.active_compute_sinks.get_mut(&sink_id)
651 {
652 let finished = active_subscribe.process_response(response);
653 if finished {
654 self.retire_compute_sinks(btreemap! {
655 sink_id => ActiveComputeSinkRetireReason::Finished,
656 })
657 .await;
658 }
659
660 soft_assert_or_log!(
661 !self.introspection_subscribes.contains_key(&sink_id),
662 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
663 and `introspection_subscribes`",
664 );
665 } else if self.introspection_subscribes.contains_key(&sink_id) {
666 self.handle_introspection_subscribe_batch(sink_id, response)
667 .await;
668 } else {
669 }
672 }
673 ControllerResponse::CopyToResponse(sink_id, response) => {
674 match self.drop_compute_sink(sink_id).await {
675 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
676 active_copy_to.retire_with_response(response);
677 }
678 _ => {
679 }
682 }
683 }
684 ControllerResponse::WatchSetFinished(ws_ids) => {
685 let now = self.now();
686 for ws_id in ws_ids {
687 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
688 continue;
689 };
690 self.connection_watch_sets
691 .get_mut(&conn_id)
692 .expect("corrupted coordinator state: unknown connection id")
693 .remove(&ws_id);
694 if self.connection_watch_sets[&conn_id].is_empty() {
695 self.connection_watch_sets.remove(&conn_id);
696 }
697
698 match rsp {
699 WatchSetResponse::StatementDependenciesReady(id, ev) => {
700 self.record_statement_lifecycle_event(&id, &ev, now);
701 }
702 WatchSetResponse::AlterSinkReady(ctx) => {
703 self.sequence_alter_sink_finish(ctx).await;
704 }
705 WatchSetResponse::AlterMaterializedViewReady(ctx) => {
706 self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
707 .await;
708 }
709 }
710 }
711 }
712 }
713 }
714
715 #[mz_ore::instrument(level = "debug")]
716 async fn message_purified_statement_ready(
717 &mut self,
718 PurifiedStatementReady {
719 ctx,
720 result,
721 params,
722 mut plan_validity,
723 original_stmt,
724 otel_ctx,
725 }: PurifiedStatementReady,
726 ) {
727 otel_ctx.attach_as_parent();
728
729 if plan_validity.check(self.catalog()).is_err() {
740 self.handle_execute_inner(original_stmt, params, ctx).await;
741 return;
742 }
743
744 let purified_statement = match result {
745 Ok(ok) => ok,
746 Err(e) => return ctx.retire(Err(e)),
747 };
748
749 let plan = match purified_statement {
750 PurifiedStatement::PurifiedCreateSource {
751 create_progress_subsource_stmt,
752 create_source_stmt,
753 subsources,
754 available_source_references,
755 } => self
756 .plan_purified_create_source(
757 &ctx,
758 params,
759 create_progress_subsource_stmt,
760 create_source_stmt,
761 subsources,
762 available_source_references,
763 )
764 .await
765 .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
766 PurifiedStatement::PurifiedAlterSourceAddSubsources {
767 source_name,
768 options,
769 subsources,
770 } => self
771 .plan_purified_alter_source_add_subsource(
772 ctx.session(),
773 params,
774 source_name,
775 options,
776 subsources,
777 )
778 .await
779 .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
780 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
781 source_name,
782 available_source_references,
783 } => self
784 .plan_purified_alter_source_refresh_references(
785 ctx.session(),
786 params,
787 source_name,
788 available_source_references,
789 )
790 .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
791 o @ (PurifiedStatement::PurifiedAlterSource { .. }
792 | PurifiedStatement::PurifiedCreateSink(..)
793 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
794 let stmt = match o {
796 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
797 Statement::AlterSource(alter_source_stmt)
798 }
799 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
800 Statement::CreateTableFromSource(stmt)
801 }
802 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
803 PurifiedStatement::PurifiedCreateSource { .. }
804 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
805 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
806 unreachable!("not part of exterior match stmt")
807 }
808 };
809
810 let catalog = self.catalog().for_session(ctx.session());
813 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
814 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
815 .map(|(plan, sql_impl_ids)| (plan, resolved_ids, sql_impl_ids))
816 }
817 };
818
819 match plan {
820 Ok((plan, resolved_ids, sql_impl_ids)) => {
821 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
822 .await
823 }
824 Err(e) => ctx.retire(Err(e)),
825 }
826 }
827
828 #[mz_ore::instrument(level = "debug")]
829 async fn message_create_connection_validation_ready(
830 &mut self,
831 CreateConnectionValidationReady {
832 mut ctx,
833 result,
834 connection_id,
835 connection_gid,
836 mut plan_validity,
837 otel_ctx,
838 resolved_ids,
839 }: CreateConnectionValidationReady,
840 ) {
841 otel_ctx.attach_as_parent();
842
843 if let Err(e) = plan_validity.check(self.catalog()) {
849 if self.secrets_controller.delete(connection_id).await.is_ok() {
850 self.caching_secrets_reader.invalidate(connection_id);
851 }
852 return ctx.retire(Err(e));
853 }
854
855 let plan = match result {
856 Ok(ok) => ok,
857 Err(e) => {
858 if self.secrets_controller.delete(connection_id).await.is_ok() {
859 self.caching_secrets_reader.invalidate(connection_id);
860 }
861 return ctx.retire(Err(e));
862 }
863 };
864
865 let result = self
866 .sequence_create_connection_stage_finish(
867 &mut ctx,
868 connection_id,
869 connection_gid,
870 plan,
871 resolved_ids,
872 )
873 .await;
874 ctx.retire(result);
875 }
876
877 #[mz_ore::instrument(level = "debug")]
878 async fn message_alter_connection_validation_ready(
879 &mut self,
880 AlterConnectionValidationReady {
881 mut ctx,
882 result,
883 connection_id,
884 connection_gid: _,
885 mut plan_validity,
886 otel_ctx,
887 resolved_ids: _,
888 }: AlterConnectionValidationReady,
889 ) {
890 otel_ctx.attach_as_parent();
891
892 if let Err(e) = plan_validity.check(self.catalog()) {
898 return ctx.retire(Err(e));
899 }
900
901 let conn = match result {
902 Ok(ok) => ok,
903 Err(e) => {
904 return ctx.retire(Err(e));
905 }
906 };
907
908 let result = self
909 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
910 .await;
911 ctx.retire(result);
912 }
913
914 #[mz_ore::instrument(level = "debug")]
915 async fn message_cluster_event(&mut self, event: ClusterEvent) {
916 event!(Level::TRACE, event = format!("{:?}", event));
917
918 if let Some(segment_client) = &self.segment_client {
919 let env_id = &self.catalog().config().environment_id;
920 let mut properties = json!({
921 "cluster_id": event.cluster_id.to_string(),
922 "replica_id": event.replica_id.to_string(),
923 "process_id": event.process_id,
924 "status": event.status.as_kebab_case_str(),
925 });
926 match event.status {
927 ClusterStatus::Online => (),
928 ClusterStatus::Offline(reason) => {
929 let properties = match &mut properties {
930 serde_json::Value::Object(map) => map,
931 _ => unreachable!(),
932 };
933 properties.insert(
934 "reason".into(),
935 json!(reason.display_or("unknown").to_string()),
936 );
937 }
938 };
939 segment_client.environment_track(
940 env_id,
941 "Cluster Changed Status",
942 properties,
943 EventDetails {
944 timestamp: Some(event.time),
945 ..Default::default()
946 },
947 );
948 }
949
950 let Some(replica_statues) = self
953 .cluster_replica_statuses
954 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
955 else {
956 return;
957 };
958
959 if event.status != replica_statues[&event.process_id].status {
960 if !self.controller.read_only() {
961 let offline_reason = match event.status {
962 ClusterStatus::Online => None,
963 ClusterStatus::Offline(None) => None,
964 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
965 };
966 let row = Row::pack_slice(&[
967 Datum::String(&event.replica_id.to_string()),
968 Datum::UInt64(event.process_id),
969 Datum::String(event.status.as_kebab_case_str()),
970 Datum::from(offline_reason.as_deref()),
971 Datum::TimestampTz(event.time.try_into().expect("must fit")),
972 ]);
973 self.controller.storage.append_introspection_updates(
974 IntrospectionType::ReplicaStatusHistory,
975 vec![(row, Diff::ONE)],
976 );
977 }
978
979 let old_replica_status =
980 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
981
982 let new_process_status = ClusterReplicaProcessStatus {
983 status: event.status,
984 time: event.time,
985 };
986 self.cluster_replica_statuses.ensure_cluster_status(
987 event.cluster_id,
988 event.replica_id,
989 event.process_id,
990 new_process_status,
991 );
992
993 let cluster = self.catalog().get_cluster(event.cluster_id);
994 let replica = cluster.replica(event.replica_id).expect("Replica exists");
995 let new_replica_status = self
996 .cluster_replica_statuses
997 .get_cluster_replica_status(event.cluster_id, event.replica_id);
998
999 if old_replica_status != new_replica_status {
1000 let notifier = self.broadcast_notice_tx();
1001 let notice = AdapterNotice::ClusterReplicaStatusChanged {
1002 cluster: cluster.name.clone(),
1003 replica: replica.name.clone(),
1004 status: new_replica_status,
1005 time: event.time,
1006 };
1007 notifier(notice);
1008 }
1009 }
1010 }
1011
1012 #[mz_ore::instrument(level = "debug")]
1013 async fn message_linearize_reads(&mut self) {
1018 let mut shortest_wait = Duration::MAX;
1019 let mut ready_txns = Vec::new();
1020
1021 let mut cached_oracle_ts = BTreeMap::new();
1026
1027 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
1028 if let TimestampContext::TimelineTimestamp {
1029 timeline,
1030 chosen_ts,
1031 oracle_ts,
1032 } = read_txn.timestamp_context()
1033 {
1034 let oracle_ts = match oracle_ts {
1035 Some(oracle_ts) => oracle_ts,
1036 None => {
1037 ready_txns.push(read_txn);
1039 continue;
1040 }
1041 };
1042
1043 if chosen_ts <= oracle_ts {
1044 ready_txns.push(read_txn);
1047 continue;
1048 }
1049
1050 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
1052 let current_oracle_ts = match current_oracle_ts {
1053 btree_map::Entry::Vacant(entry) => {
1054 let timestamp_oracle = self.get_timestamp_oracle(timeline);
1055 let read_ts = timestamp_oracle.read_ts().await;
1056 entry.insert(read_ts.clone());
1057 read_ts
1058 }
1059 btree_map::Entry::Occupied(entry) => entry.get().clone(),
1060 };
1061
1062 if *chosen_ts <= current_oracle_ts {
1063 ready_txns.push(read_txn);
1064 } else {
1065 let wait =
1066 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
1067 if wait < shortest_wait {
1068 shortest_wait = wait;
1069 }
1070 read_txn.num_requeues += 1;
1071 self.pending_linearize_read_txns.insert(conn_id, read_txn);
1072 }
1073 } else {
1074 ready_txns.push(read_txn);
1075 }
1076 }
1077
1078 if !ready_txns.is_empty() {
1079 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
1082 let span = tracing::debug_span!("message_linearize_reads");
1083 otel_ctx.attach_as_parent_to(&span);
1084
1085 let now = Instant::now();
1086 for ready_txn in ready_txns {
1087 let span = tracing::debug_span!("retire_read_results");
1088 ready_txn.otel_ctx.attach_as_parent_to(&span);
1089 let _entered = span.enter();
1090 self.metrics
1091 .linearize_message_seconds
1092 .with_label_values(&[
1093 ready_txn.txn.label(),
1094 if ready_txn.num_requeues == 0 {
1095 "true"
1096 } else {
1097 "false"
1098 },
1099 ])
1100 .observe((now - ready_txn.created).as_secs_f64());
1101 if let Some((ctx, result)) = ready_txn.txn.finish() {
1102 ctx.retire(result);
1103 }
1104 }
1105 }
1106
1107 if !self.pending_linearize_read_txns.is_empty() {
1108 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
1110 let internal_cmd_tx = self.internal_cmd_tx.clone();
1111 task::spawn(|| "deferred_read_txns", async move {
1112 tokio::time::sleep(remaining_ms).await;
1113 let result = internal_cmd_tx.send(Message::LinearizeReads);
1115 if let Err(e) = result {
1116 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1117 }
1118 });
1119 }
1120 }
1121}