Skip to main content

mz_adapter/coord/
message_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
11//! messages from various sources (ex: controller, clients, background tasks, etc).
12
13use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_ore::now::EpochMillis;
24use mz_ore::option::OptionExt;
25use mz_ore::tracing::OpenTelemetryContext;
26use mz_ore::{soft_assert_or_log, task};
27use mz_persist_client::usage::ShardsUsageReferenced;
28use mz_repr::{Datum, Diff, Row};
29use mz_sql::ast::Statement;
30use mz_sql::pure::PurifiedStatement;
31use mz_storage_client::controller::IntrospectionType;
32use opentelemetry::trace::TraceContextExt;
33use rand::{Rng, SeedableRng, rngs};
34use serde_json::json;
35use tracing::{Instrument, Level, event, info_span, warn};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
39use crate::catalog::{BuiltinTableUpdate, Op};
40use crate::command::Command;
41use crate::coord::{
42    AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
43    CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
44};
45use crate::telemetry::{EventDetails, SegmentClientExt};
46use crate::{AdapterNotice, TimestampContext};
47
48impl Coordinator {
49    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
50    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
51    /// Because of that we purposefully move Futures of inner function calls onto the heap
52    /// (i.e. Box it).
53    #[instrument]
54    pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
55        match msg {
56            Message::Command(otel_ctx, cmd) => {
57                // TODO: We need a Span that is not none for the otel_ctx to attach the parent
58                // relationship to. If we swap the otel_ctx in `Command::Message` for a Span, we
59                // can downgrade this to a debug_span.
60                let span = tracing::info_span!("message_command").or_current();
61                span.in_scope(|| otel_ctx.attach_as_parent());
62                self.message_command(cmd).instrument(span).await
63            }
64            Message::ControllerReady { controller: _ } => {
65                let Coordinator {
66                    controller,
67                    catalog,
68                    ..
69                } = self;
70                let storage_metadata = catalog.state().storage_metadata();
71                if let Some(m) = controller
72                    .process(storage_metadata)
73                    .expect("`process` never returns an error")
74                {
75                    self.message_controller(m).boxed_local().await
76                }
77            }
78            Message::PurifiedStatementReady(ready) => {
79                self.message_purified_statement_ready(ready)
80                    .boxed_local()
81                    .await
82            }
83            Message::CreateConnectionValidationReady(ready) => {
84                self.message_create_connection_validation_ready(ready)
85                    .boxed_local()
86                    .await
87            }
88            Message::AlterConnectionValidationReady(ready) => {
89                self.message_alter_connection_validation_ready(ready)
90                    .boxed_local()
91                    .await
92            }
93            Message::TryDeferred {
94                conn_id,
95                acquired_lock,
96            } => self.try_deferred(conn_id, acquired_lock).await,
97            Message::GroupCommitInitiate(span, permit) => {
98                // Add an OpenTelemetry link to our current span.
99                tracing::Span::current().add_link(span.context().span().span_context().clone());
100                self.try_group_commit(permit)
101                    .instrument(span)
102                    .boxed_local()
103                    .await
104            }
105            Message::AdvanceTimelines => {
106                self.advance_timelines().boxed_local().await;
107            }
108            Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
109            Message::CancelPendingPeeks { conn_id } => {
110                self.cancel_pending_peeks(&conn_id);
111            }
112            Message::LinearizeReads => {
113                self.message_linearize_reads().boxed_local().await;
114            }
115            Message::StagedBatches {
116                conn_id,
117                table_id,
118                batches,
119            } => {
120                self.commit_staged_batches(conn_id, table_id, batches);
121            }
122            Message::StorageUsageSchedule => {
123                self.schedule_storage_usage_collection().boxed_local().await;
124            }
125            Message::StorageUsageFetch => {
126                self.storage_usage_fetch().boxed_local().await;
127            }
128            Message::StorageUsageUpdate(sizes) => {
129                self.storage_usage_update(sizes).boxed_local().await;
130            }
131            Message::StorageUsagePrune(expired) => {
132                self.storage_usage_prune(expired).boxed_local().await;
133            }
134            Message::ArrangementSizesSchedule => {
135                self.schedule_arrangement_sizes_collection()
136                    .boxed_local()
137                    .await;
138            }
139            Message::ArrangementSizesSnapshot => {
140                self.arrangement_sizes_snapshot().boxed_local().await;
141            }
142            Message::ArrangementSizesPrune(expired) => {
143                self.arrangement_sizes_prune(expired).boxed_local().await;
144            }
145            Message::RetireExecute {
146                otel_ctx,
147                data,
148                reason,
149            } => {
150                otel_ctx.attach_as_parent();
151                self.retire_execution(reason, data);
152            }
153            Message::ExecuteSingleStatementTransaction {
154                ctx,
155                otel_ctx,
156                stmt,
157                params,
158            } => {
159                otel_ctx.attach_as_parent();
160                self.sequence_execute_single_statement_transaction(ctx, stmt, params)
161                    .boxed_local()
162                    .await;
163            }
164            Message::PeekStageReady { ctx, span, stage } => {
165                self.sequence_staged(ctx, span, stage).boxed_local().await;
166            }
167            Message::CreateIndexStageReady { ctx, span, stage } => {
168                self.sequence_staged(ctx, span, stage).boxed_local().await;
169            }
170            Message::CreateViewStageReady { ctx, span, stage } => {
171                self.sequence_staged(ctx, span, stage).boxed_local().await;
172            }
173            Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
174                self.sequence_staged(ctx, span, stage).boxed_local().await;
175            }
176            Message::SubscribeStageReady { ctx, span, stage } => {
177                self.sequence_staged(ctx, span, stage).boxed_local().await;
178            }
179            Message::IntrospectionSubscribeStageReady { span, stage } => {
180                self.sequence_staged((), span, stage).boxed_local().await;
181            }
182            Message::ExplainTimestampStageReady { ctx, span, stage } => {
183                self.sequence_staged(ctx, span, stage).boxed_local().await;
184            }
185            Message::SecretStageReady { ctx, span, stage } => {
186                self.sequence_staged(ctx, span, stage).boxed_local().await;
187            }
188            Message::ClusterStageReady { ctx, span, stage } => {
189                self.sequence_staged(ctx, span, stage).boxed_local().await;
190            }
191            Message::DrainStatementLog => {
192                self.drain_statement_log();
193            }
194            Message::PrivateLinkVpcEndpointEvents(events) => {
195                if !self.controller.read_only() {
196                    self.controller.storage.append_introspection_updates(
197                        IntrospectionType::PrivatelinkConnectionStatusHistory,
198                        events
199                            .into_iter()
200                            .map(|e| (mz_repr::Row::from(e), Diff::ONE))
201                            .collect(),
202                    );
203                }
204            }
205            Message::CheckSchedulingPolicies => {
206                self.check_scheduling_policies().boxed_local().await;
207            }
208            Message::SchedulingDecisions(decisions) => {
209                self.handle_scheduling_decisions(decisions)
210                    .boxed_local()
211                    .await;
212            }
213            Message::DeferredStatementReady => {
214                self.handle_deferred_statement().boxed_local().await;
215            }
216        }
217    }
218
219    #[mz_ore::instrument(level = "debug")]
220    pub async fn storage_usage_fetch(&self) {
221        let internal_cmd_tx = self.internal_cmd_tx.clone();
222        let client = self.storage_usage_client.clone();
223
224        // Record the currently live shards.
225        let live_shards: BTreeSet<_> = self
226            .controller
227            .storage
228            .active_collection_metadatas()
229            .into_iter()
230            .map(|(_id, m)| m.data_shard)
231            .collect();
232
233        let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
234
235        // Spawn an asynchronous task to compute the storage usage, which
236        // requires a slow scan of the underlying storage engine.
237        task::spawn(|| "storage_usage_fetch", async move {
238            let collection_metric_timer = collection_metric.start_timer();
239            let shard_sizes = client.shards_usage_referenced(live_shards).await;
240            collection_metric_timer.observe_duration();
241
242            // It is not an error for shard sizes to become ready after
243            // `internal_cmd_rx` is dropped.
244            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
245                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
246            }
247        });
248    }
249
250    #[mz_ore::instrument(level = "debug")]
251    async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
252        // Similar to audit events, use the oracle ts so this is guaranteed to
253        // increase. This is intentionally the timestamp of when collection
254        // finished, not when it started, so that we don't write data with a
255        // timestamp in the past.
256        let collection_timestamp = if self.controller.read_only() {
257            self.peek_local_write_ts().await.into()
258        } else {
259            // Getting a write timestamp bumps the write timestamp in the
260            // oracle, which we're not allowed in read-only mode.
261            self.get_local_write_ts().await.timestamp.into()
262        };
263
264        let ops = shards_usage
265            .by_shard
266            .into_iter()
267            .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
268                object_id: Some(shard_id.to_string()),
269                size_bytes: shard_usage.size_bytes(),
270                collection_timestamp,
271            })
272            .collect();
273
274        match self.catalog_transact_inner(None, ops).await {
275            Ok((table_updates, catalog_updates)) => {
276                assert!(
277                    catalog_updates.is_empty(),
278                    "applying builtin table updates does not produce catalog implications"
279                );
280
281                let internal_cmd_tx = self.internal_cmd_tx.clone();
282                let task_span =
283                    info_span!(parent: None, "coord::storage_usage_update::table_updates");
284                OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
285                task::spawn(|| "storage_usage_update_table_updates", async move {
286                    table_updates.instrument(task_span).await;
287                    // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
288                    if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
289                        warn!("internal_cmd_rx dropped before we could send: {e:?}");
290                    }
291                });
292            }
293            Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
294        }
295    }
296
297    #[mz_ore::instrument(level = "debug")]
298    async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
299        let (fut, _) = self.builtin_table_update().execute(expired).await;
300        task::spawn(|| "storage_usage_pruning_apply", async move {
301            fut.await;
302        });
303    }
304
305    pub async fn schedule_storage_usage_collection(&self) {
306        // Instead of using an `tokio::timer::Interval`, we calculate the time until the next
307        // usage collection and wait for that amount of time. This is so we can keep the intervals
308        // consistent even across restarts. If collection takes too long, it is possible that
309        // we miss an interval.
310
311        // 1) Deterministically pick some offset within the collection interval to prevent
312        // thundering herds across environments.
313        const SEED_LEN: usize = 32;
314        let mut seed = [0; SEED_LEN];
315        for (i, byte) in self
316            .catalog()
317            .state()
318            .config()
319            .environment_id
320            .organization_id()
321            .as_bytes()
322            .into_iter()
323            .take(SEED_LEN)
324            .enumerate()
325        {
326            seed[i] = *byte;
327        }
328        let storage_usage_collection_interval_ms: EpochMillis =
329            EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
330                .expect("storage usage collection interval must fit into u64");
331        let offset =
332            rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
333        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
334
335        // 2) Determine the amount of ms between now and the next collection time.
336        let previous_collection_ts =
337            (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
338        let next_collection_ts = if previous_collection_ts > now_ts {
339            previous_collection_ts
340        } else {
341            previous_collection_ts + storage_usage_collection_interval_ms
342        };
343        let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
344
345        // 3) Sleep for that amount of time, then initiate another storage usage collection.
346        let internal_cmd_tx = self.internal_cmd_tx.clone();
347        task::spawn(|| "storage_usage_collection", async move {
348            tokio::time::sleep(next_collection_interval).await;
349            if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
350                // If sending fails, the main thread has shutdown.
351            }
352        });
353    }
354
355    /// Schedules the next per-object arrangement sizes snapshot.
356    ///
357    /// Aligns each fire to an `organization_id`-seeded offset within the
358    /// interval so collections stay consistent across restarts and don't
359    /// synchronize across environments. Sleeps are capped at `MAX_SLEEP`,
360    /// so dyncfg changes (interval edits or the `0s` disable sentinel) take
361    /// effect within one cap rather than after the full interval.
362    pub async fn schedule_arrangement_sizes_collection(&self) {
363        const MAX_SLEEP: Duration = Duration::from_secs(60);
364
365        let interval_duration =
366            mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_COLLECTION_INTERVAL
367                .get(self.catalog().system_config().dyncfgs());
368
369        // `0s` disables collection. Keep polling so re-enabling takes effect
370        // within `MAX_SLEEP` rather than requiring an envd restart.
371        if interval_duration.is_zero() {
372            let internal_cmd_tx = self.internal_cmd_tx.clone();
373            task::spawn(|| "arrangement_sizes_collection_disabled", async move {
374                tokio::time::sleep(MAX_SLEEP).await;
375                let _ = internal_cmd_tx.send(Message::ArrangementSizesSchedule);
376            });
377            return;
378        }
379
380        const SEED_LEN: usize = 32;
381        let mut seed = [0; SEED_LEN];
382        for (i, byte) in self
383            .catalog()
384            .state()
385            .config()
386            .environment_id
387            .organization_id()
388            .as_bytes()
389            .into_iter()
390            .take(SEED_LEN)
391            .enumerate()
392        {
393            seed[i] = *byte;
394        }
395        let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis())
396            .expect("arrangement_size_history_collection_interval must fit into u64");
397        // `rand::random_range` panics on an empty range.
398        let interval_ms = interval_ms.max(1);
399        let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms);
400        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
401
402        let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset;
403        let next_collection_ts = if previous_collection_ts > now_ts {
404            previous_collection_ts
405        } else {
406            previous_collection_ts + interval_ms
407        };
408        let sleep_for = Duration::from_millis(next_collection_ts - now_ts);
409
410        // Within one cap of the next fire we sleep the remainder and snapshot;
411        // further out we sleep the cap and re-enter so a dyncfg change is
412        // picked up before committing to a long sleep.
413        let (capped_sleep, fire_snapshot) = if sleep_for <= MAX_SLEEP {
414            (sleep_for, true)
415        } else {
416            (MAX_SLEEP, false)
417        };
418
419        let internal_cmd_tx = self.internal_cmd_tx.clone();
420        task::spawn(|| "arrangement_sizes_collection", async move {
421            tokio::time::sleep(capped_sleep).await;
422            let msg = if fire_snapshot {
423                Message::ArrangementSizesSnapshot
424            } else {
425                Message::ArrangementSizesSchedule
426            };
427            // Send is best-effort: if the coordinator is shutting down, drop.
428            let _ = internal_cmd_tx.send(msg);
429        });
430    }
431
432    /// Snapshots the current contents of `mz_object_arrangement_sizes` and
433    /// appends them to `mz_object_arrangement_size_history`, tagged with a
434    /// shared `collection_timestamp`. Reschedules on completion.
435    ///
436    /// Each `(replica_id, object_id)` pair is recorded with a
437    /// `hydration_complete` flag derived from `mz_compute_hydration_times`:
438    /// `true` once the pair's initial hydration on that replica is finished,
439    /// `false` while still building. Consumers that want only stable sizes
440    /// should filter `WHERE hydration_complete`.
441    #[mz_ore::instrument(level = "debug")]
442    async fn arrangement_sizes_snapshot(&mut self) {
443        // The catalog server is not writable in read-only mode.
444        if self.controller.read_only() {
445            self.schedule_arrangement_sizes_collection().await;
446            return;
447        }
448
449        let collection_timer = self
450            .metrics
451            .arrangement_sizes_collection_time_seconds
452            .start_timer();
453
454        let live_item_id = self.catalog().resolve_builtin_storage_collection(
455            &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED,
456        );
457        let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id();
458        let hydration_item_id = self
459            .catalog()
460            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
461        let hydration_global_id = self
462            .catalog
463            .get_entry(&hydration_item_id)
464            .latest_global_id();
465        let history_item_id = self
466            .catalog()
467            .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
468
469        let read_ts = self.get_local_read_ts().await;
470        let snapshot = match self
471            .controller
472            .storage_collections
473            .snapshot(live_global_id, read_ts)
474            .await
475        {
476            Ok(s) => s,
477            Err(e) => {
478                tracing::warn!("arrangement sizes snapshot failed: {e:?}");
479                drop(collection_timer);
480                self.schedule_arrangement_sizes_collection().await;
481                return;
482            }
483        };
484        let mut hydration_snapshot = match self
485            .controller
486            .storage_collections
487            .snapshot(hydration_global_id, read_ts)
488            .await
489        {
490            Ok(s) => s,
491            Err(e) => {
492                tracing::warn!("arrangement sizes hydration snapshot failed: {e:?}");
493                drop(collection_timer);
494                self.schedule_arrangement_sizes_collection().await;
495                return;
496            }
497        };
498        differential_dataflow::consolidation::consolidate(&mut hydration_snapshot);
499
500        // Build the set of pairs whose initial hydration has finished
501        // (`time_ns IS NOT NULL`). The set drives the `hydration_complete`
502        // flag for each row we emit below.
503        let mut datum_vec = mz_repr::DatumVec::new();
504        let mut hydrated: BTreeSet<(String, String)> = BTreeSet::new();
505        const HYDRATION_COL_REPLICA_ID: usize = 0;
506        const HYDRATION_COL_OBJECT_ID: usize = 1;
507        const HYDRATION_COL_TIME_NS: usize = 2;
508        const HYDRATION_COL_COUNT: usize = 3;
509        for (row, diff) in &hydration_snapshot {
510            if *diff != 1 {
511                continue;
512            }
513            let datums = datum_vec.borrow_with(row);
514            if datums.len() < HYDRATION_COL_COUNT {
515                continue;
516            }
517            if datums[HYDRATION_COL_TIME_NS].is_null() {
518                continue;
519            }
520            hydrated.insert((
521                datums[HYDRATION_COL_REPLICA_ID].unwrap_str().to_string(),
522                datums[HYDRATION_COL_OBJECT_ID].unwrap_str().to_string(),
523            ));
524        }
525
526        // `collection_ts` is stamped after the snapshot so it's always >= the
527        // state the rows describe, and monotone across restarts. The snapshot
528        // read and this stamp aren't atomic, but the resulting skew is bounded
529        // by snapshot latency and negligible at this cadence.
530        let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into();
531        let collection_datum = Datum::TimestampTz(
532            mz_ore::now::to_datetime(collection_ts)
533                .try_into()
534                .expect("collection_timestamp must fit into TimestampTz"),
535        );
536
537        let mut consolidated = snapshot;
538        differential_dataflow::consolidation::consolidate(&mut consolidated);
539
540        // Column positions in `mz_object_arrangement_sizes`.
541        const LIVE_COL_REPLICA_ID: usize = 0;
542        const LIVE_COL_OBJECT_ID: usize = 1;
543        const LIVE_COL_SIZE: usize = 2;
544        const LIVE_COL_COUNT: usize = 3;
545
546        let mut skipped_malformed: u64 = 0;
547        let mut skipped_null_size: u64 = 0;
548        let mut updates: Vec<BuiltinTableUpdate> = Vec::with_capacity(consolidated.len());
549        for (row, diff) in consolidated.iter() {
550            if *diff != 1 {
551                continue;
552            }
553            let datums = datum_vec.borrow_with(row);
554            // Surface schema drift via a warn log below rather than silently
555            // skipping entire snapshots.
556            if datums.len() != LIVE_COL_COUNT {
557                skipped_malformed += 1;
558                continue;
559            }
560            let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str();
561            let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str();
562            let size_datum = datums[LIVE_COL_SIZE];
563            // The history table's `size` is non-null; fabricating zero would
564            // be misleading, so drop.
565            if size_datum.is_null() {
566                skipped_null_size += 1;
567                continue;
568            }
569            let size = size_datum.unwrap_int64();
570            // Pairs whose hydration hasn't completed yet are still recorded,
571            // tagged with `hydration_complete = false`. Consumers that care
572            // only about stable sizes can filter on `hydration_complete`.
573            let hydration_complete =
574                hydrated.contains(&(replica_id.to_string(), object_id.to_string()));
575            let new_row = Row::pack_slice(&[
576                Datum::String(replica_id),
577                Datum::String(object_id),
578                Datum::Int64(size),
579                collection_datum,
580                Datum::from(hydration_complete),
581            ]);
582            updates.push(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE));
583        }
584        if skipped_malformed > 0 {
585            warn!(
586                "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
587                 with unexpected arity"
588            );
589        }
590        if skipped_null_size > 0 {
591            tracing::debug!("skipped {skipped_null_size} live rows with null size");
592        }
593
594        let row_count = updates.len();
595        // Captures snapshot + row construction. The async table-apply below
596        // is captured separately by `mz_append_table_duration_seconds`.
597        collection_timer.observe_duration();
598
599        if !updates.is_empty() {
600            self.metrics
601                .arrangement_sizes_rows_written
602                .inc_by(u64::cast_from(row_count));
603            // TODO(arrangement-sizes): when the writeable-catalog-server plumbing
604            // in https://github.com/MaterializeInc/materialize/pull/35436 lands,
605            // append directly on `mz_catalog_server` instead of going through
606            // the environmentd builtin-table-update path.
607            let (fut, _) = self.builtin_table_update().execute(updates).await;
608            let internal_cmd_tx = self.internal_cmd_tx.clone();
609            let task_span =
610                info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates");
611            OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
612            task::spawn(|| "arrangement_sizes_snapshot_apply", async move {
613                fut.instrument(task_span).await;
614                if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) {
615                    warn!("internal_cmd_rx dropped before we could send: {e:?}");
616                }
617            });
618        } else {
619            self.schedule_arrangement_sizes_collection().await;
620        }
621
622        tracing::debug!(
623            "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
624        );
625    }
626
627    #[mz_ore::instrument(level = "debug")]
628    async fn arrangement_sizes_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
629        let (fut, _) = self.builtin_table_update().execute(expired).await;
630        task::spawn(|| "arrangement_sizes_pruning_apply", async move {
631            fut.await;
632        });
633    }
634
635    #[mz_ore::instrument(level = "debug")]
636    async fn message_command(&mut self, cmd: Command) {
637        self.handle_command(cmd).await;
638    }
639
640    #[mz_ore::instrument(level = "debug")]
641    async fn message_controller(&mut self, message: ControllerResponse) {
642        event!(Level::TRACE, message = format!("{:?}", message));
643        match message {
644            ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
645                self.handle_peek_notification(uuid, response, otel_ctx);
646            }
647            ControllerResponse::SubscribeResponse(sink_id, response) => {
648                if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
649                    self.active_compute_sinks.get_mut(&sink_id)
650                {
651                    let finished = active_subscribe.process_response(response);
652                    if finished {
653                        self.retire_compute_sinks(btreemap! {
654                            sink_id => ActiveComputeSinkRetireReason::Finished,
655                        })
656                        .await;
657                    }
658
659                    soft_assert_or_log!(
660                        !self.introspection_subscribes.contains_key(&sink_id),
661                        "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
662                         and `introspection_subscribes`",
663                    );
664                } else if self.introspection_subscribes.contains_key(&sink_id) {
665                    self.handle_introspection_subscribe_batch(sink_id, response)
666                        .await;
667                } else {
668                    // Cancellation may cause us to receive responses for subscribes no longer
669                    // tracked, so we quietly ignore them.
670                }
671            }
672            ControllerResponse::CopyToResponse(sink_id, response) => {
673                match self.drop_compute_sink(sink_id).await {
674                    Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
675                        active_copy_to.retire_with_response(response);
676                    }
677                    _ => {
678                        // Cancellation may cause us to receive responses for subscribes no longer
679                        // tracked, so we quietly ignore them.
680                    }
681                }
682            }
683            ControllerResponse::WatchSetFinished(ws_ids) => {
684                let now = self.now();
685                for ws_id in ws_ids {
686                    let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
687                        continue;
688                    };
689                    self.connection_watch_sets
690                        .get_mut(&conn_id)
691                        .expect("corrupted coordinator state: unknown connection id")
692                        .remove(&ws_id);
693                    if self.connection_watch_sets[&conn_id].is_empty() {
694                        self.connection_watch_sets.remove(&conn_id);
695                    }
696
697                    match rsp {
698                        WatchSetResponse::StatementDependenciesReady(id, ev) => {
699                            self.record_statement_lifecycle_event(&id, &ev, now);
700                        }
701                        WatchSetResponse::AlterSinkReady(ctx) => {
702                            self.sequence_alter_sink_finish(ctx).await;
703                        }
704                        WatchSetResponse::AlterMaterializedViewReady(ctx) => {
705                            self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
706                                .await;
707                        }
708                    }
709                }
710            }
711        }
712    }
713
714    #[mz_ore::instrument(level = "debug")]
715    async fn message_purified_statement_ready(
716        &mut self,
717        PurifiedStatementReady {
718            ctx,
719            result,
720            params,
721            mut plan_validity,
722            original_stmt,
723            otel_ctx,
724        }: PurifiedStatementReady,
725    ) {
726        otel_ctx.attach_as_parent();
727
728        // Ensure that all dependencies still exist after purification, as a
729        // `DROP CONNECTION` or other `DROP` may have sneaked in. If any have gone missing, we
730        // repurify the original statement. This will either produce a nice
731        // "unknown connector" error, or pick up a new connector that has
732        // replaced the dropped connector.
733        //
734        // n.b. an `ALTER CONNECTION` occurring during purification is OK
735        // because we always look up/populate a connection's state after
736        // committing to the catalog, so are guaranteed to see the connection's
737        // most recent version.
738        if plan_validity.check(self.catalog()).is_err() {
739            self.handle_execute_inner(original_stmt, params, ctx).await;
740            return;
741        }
742
743        let purified_statement = match result {
744            Ok(ok) => ok,
745            Err(e) => return ctx.retire(Err(e)),
746        };
747
748        let plan = match purified_statement {
749            PurifiedStatement::PurifiedCreateSource {
750                create_progress_subsource_stmt,
751                create_source_stmt,
752                subsources,
753                available_source_references,
754            } => {
755                self.plan_purified_create_source(
756                    &ctx,
757                    params,
758                    create_progress_subsource_stmt,
759                    create_source_stmt,
760                    subsources,
761                    available_source_references,
762                )
763                .await
764            }
765            PurifiedStatement::PurifiedAlterSourceAddSubsources {
766                source_name,
767                options,
768                subsources,
769            } => {
770                self.plan_purified_alter_source_add_subsource(
771                    ctx.session(),
772                    params,
773                    source_name,
774                    options,
775                    subsources,
776                )
777                .await
778            }
779            PurifiedStatement::PurifiedAlterSourceRefreshReferences {
780                source_name,
781                available_source_references,
782            } => self.plan_purified_alter_source_refresh_references(
783                ctx.session(),
784                params,
785                source_name,
786                available_source_references,
787            ),
788            o @ (PurifiedStatement::PurifiedAlterSource { .. }
789            | PurifiedStatement::PurifiedCreateSink(..)
790            | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
791                // Unify these into a `Statement`.
792                let stmt = match o {
793                    PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
794                        Statement::AlterSource(alter_source_stmt)
795                    }
796                    PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
797                        Statement::CreateTableFromSource(stmt)
798                    }
799                    PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
800                    PurifiedStatement::PurifiedCreateSource { .. }
801                    | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
802                    | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
803                        unreachable!("not part of exterior match stmt")
804                    }
805                };
806
807                // Determine all dependencies, not just those in the statement
808                // itself.
809                let catalog = self.catalog().for_session(ctx.session());
810                let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
811                self.plan_statement(ctx.session(), stmt, &params, &resolved_ids)
812                    .map(|plan| (plan, resolved_ids))
813            }
814        };
815
816        match plan {
817            Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
818            Err(e) => ctx.retire(Err(e)),
819        }
820    }
821
822    #[mz_ore::instrument(level = "debug")]
823    async fn message_create_connection_validation_ready(
824        &mut self,
825        CreateConnectionValidationReady {
826            mut ctx,
827            result,
828            connection_id,
829            connection_gid,
830            mut plan_validity,
831            otel_ctx,
832            resolved_ids,
833        }: CreateConnectionValidationReady,
834    ) {
835        otel_ctx.attach_as_parent();
836
837        // Ensure that all dependencies still exist after validation, as a
838        // `DROP SECRET` may have sneaked in.
839        //
840        // WARNING: If we support `ALTER SECRET`, we'll need to also check
841        // for connectors that were altered while we were purifying.
842        if let Err(e) = plan_validity.check(self.catalog()) {
843            if self.secrets_controller.delete(connection_id).await.is_ok() {
844                self.caching_secrets_reader.invalidate(connection_id);
845            }
846            return ctx.retire(Err(e));
847        }
848
849        let plan = match result {
850            Ok(ok) => ok,
851            Err(e) => {
852                if self.secrets_controller.delete(connection_id).await.is_ok() {
853                    self.caching_secrets_reader.invalidate(connection_id);
854                }
855                return ctx.retire(Err(e));
856            }
857        };
858
859        let result = self
860            .sequence_create_connection_stage_finish(
861                &mut ctx,
862                connection_id,
863                connection_gid,
864                plan,
865                resolved_ids,
866            )
867            .await;
868        ctx.retire(result);
869    }
870
871    #[mz_ore::instrument(level = "debug")]
872    async fn message_alter_connection_validation_ready(
873        &mut self,
874        AlterConnectionValidationReady {
875            mut ctx,
876            result,
877            connection_id,
878            connection_gid: _,
879            mut plan_validity,
880            otel_ctx,
881            resolved_ids: _,
882        }: AlterConnectionValidationReady,
883    ) {
884        otel_ctx.attach_as_parent();
885
886        // Ensure that all dependencies still exist after validation, as a
887        // `DROP SECRET` may have sneaked in.
888        //
889        // WARNING: If we support `ALTER SECRET`, we'll need to also check
890        // for connectors that were altered while we were purifying.
891        if let Err(e) = plan_validity.check(self.catalog()) {
892            return ctx.retire(Err(e));
893        }
894
895        let conn = match result {
896            Ok(ok) => ok,
897            Err(e) => {
898                return ctx.retire(Err(e));
899            }
900        };
901
902        let result = self
903            .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
904            .await;
905        ctx.retire(result);
906    }
907
908    #[mz_ore::instrument(level = "debug")]
909    async fn message_cluster_event(&mut self, event: ClusterEvent) {
910        event!(Level::TRACE, event = format!("{:?}", event));
911
912        if let Some(segment_client) = &self.segment_client {
913            let env_id = &self.catalog().config().environment_id;
914            let mut properties = json!({
915                "cluster_id": event.cluster_id.to_string(),
916                "replica_id": event.replica_id.to_string(),
917                "process_id": event.process_id,
918                "status": event.status.as_kebab_case_str(),
919            });
920            match event.status {
921                ClusterStatus::Online => (),
922                ClusterStatus::Offline(reason) => {
923                    let properties = match &mut properties {
924                        serde_json::Value::Object(map) => map,
925                        _ => unreachable!(),
926                    };
927                    properties.insert(
928                        "reason".into(),
929                        json!(reason.display_or("unknown").to_string()),
930                    );
931                }
932            };
933            segment_client.environment_track(
934                env_id,
935                "Cluster Changed Status",
936                properties,
937                EventDetails {
938                    timestamp: Some(event.time),
939                    ..Default::default()
940                },
941            );
942        }
943
944        // It is possible that we receive a status update for a replica that has
945        // already been dropped from the catalog. Just ignore these events.
946        let Some(replica_statues) = self
947            .cluster_replica_statuses
948            .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
949        else {
950            return;
951        };
952
953        if event.status != replica_statues[&event.process_id].status {
954            if !self.controller.read_only() {
955                let offline_reason = match event.status {
956                    ClusterStatus::Online => None,
957                    ClusterStatus::Offline(None) => None,
958                    ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
959                };
960                let row = Row::pack_slice(&[
961                    Datum::String(&event.replica_id.to_string()),
962                    Datum::UInt64(event.process_id),
963                    Datum::String(event.status.as_kebab_case_str()),
964                    Datum::from(offline_reason.as_deref()),
965                    Datum::TimestampTz(event.time.try_into().expect("must fit")),
966                ]);
967                self.controller.storage.append_introspection_updates(
968                    IntrospectionType::ReplicaStatusHistory,
969                    vec![(row, Diff::ONE)],
970                );
971            }
972
973            let old_replica_status =
974                ClusterReplicaStatuses::cluster_replica_status(replica_statues);
975
976            let new_process_status = ClusterReplicaProcessStatus {
977                status: event.status,
978                time: event.time,
979            };
980            self.cluster_replica_statuses.ensure_cluster_status(
981                event.cluster_id,
982                event.replica_id,
983                event.process_id,
984                new_process_status,
985            );
986
987            let cluster = self.catalog().get_cluster(event.cluster_id);
988            let replica = cluster.replica(event.replica_id).expect("Replica exists");
989            let new_replica_status = self
990                .cluster_replica_statuses
991                .get_cluster_replica_status(event.cluster_id, event.replica_id);
992
993            if old_replica_status != new_replica_status {
994                let notifier = self.broadcast_notice_tx();
995                let notice = AdapterNotice::ClusterReplicaStatusChanged {
996                    cluster: cluster.name.clone(),
997                    replica: replica.name.clone(),
998                    status: new_replica_status,
999                    time: event.time,
1000                };
1001                notifier(notice);
1002            }
1003        }
1004    }
1005
1006    #[mz_ore::instrument(level = "debug")]
1007    /// Linearizes sending the results of a read transaction by,
1008    ///   1. Holding back any results that were executed at some point in the future, until the
1009    ///   containing timeline has advanced to that point in the future.
1010    ///   2. Confirming that we are still the current leader before sending results to the client.
1011    async fn message_linearize_reads(&mut self) {
1012        let mut shortest_wait = Duration::MAX;
1013        let mut ready_txns = Vec::new();
1014
1015        // Cache for `TimestampOracle::read_ts` calls. These are somewhat
1016        // expensive so we cache the value. This is correct since all we're
1017        // risking is being too conservative. We will not accidentally "release"
1018        // a result too early.
1019        let mut cached_oracle_ts = BTreeMap::new();
1020
1021        for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
1022            if let TimestampContext::TimelineTimestamp {
1023                timeline,
1024                chosen_ts,
1025                oracle_ts,
1026            } = read_txn.timestamp_context()
1027            {
1028                let oracle_ts = match oracle_ts {
1029                    Some(oracle_ts) => oracle_ts,
1030                    None => {
1031                        // There was no oracle timestamp, so no need to delay.
1032                        ready_txns.push(read_txn);
1033                        continue;
1034                    }
1035                };
1036
1037                if chosen_ts <= oracle_ts {
1038                    // Chosen ts was already <= the oracle ts, so we're good
1039                    // to go!
1040                    ready_txns.push(read_txn);
1041                    continue;
1042                }
1043
1044                // See what the oracle timestamp is now and delay when needed.
1045                let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
1046                let current_oracle_ts = match current_oracle_ts {
1047                    btree_map::Entry::Vacant(entry) => {
1048                        let timestamp_oracle = self.get_timestamp_oracle(timeline);
1049                        let read_ts = timestamp_oracle.read_ts().await;
1050                        entry.insert(read_ts.clone());
1051                        read_ts
1052                    }
1053                    btree_map::Entry::Occupied(entry) => entry.get().clone(),
1054                };
1055
1056                if *chosen_ts <= current_oracle_ts {
1057                    ready_txns.push(read_txn);
1058                } else {
1059                    let wait =
1060                        Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
1061                    if wait < shortest_wait {
1062                        shortest_wait = wait;
1063                    }
1064                    read_txn.num_requeues += 1;
1065                    self.pending_linearize_read_txns.insert(conn_id, read_txn);
1066                }
1067            } else {
1068                ready_txns.push(read_txn);
1069            }
1070        }
1071
1072        if !ready_txns.is_empty() {
1073            // Sniff out one ctx, this is where tracing breaks down because we
1074            // process all outstanding txns as a batch here.
1075            let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
1076            let span = tracing::debug_span!("message_linearize_reads");
1077            otel_ctx.attach_as_parent_to(&span);
1078
1079            let now = Instant::now();
1080            for ready_txn in ready_txns {
1081                let span = tracing::debug_span!("retire_read_results");
1082                ready_txn.otel_ctx.attach_as_parent_to(&span);
1083                let _entered = span.enter();
1084                self.metrics
1085                    .linearize_message_seconds
1086                    .with_label_values(&[
1087                        ready_txn.txn.label(),
1088                        if ready_txn.num_requeues == 0 {
1089                            "true"
1090                        } else {
1091                            "false"
1092                        },
1093                    ])
1094                    .observe((now - ready_txn.created).as_secs_f64());
1095                if let Some((ctx, result)) = ready_txn.txn.finish() {
1096                    ctx.retire(result);
1097                }
1098            }
1099        }
1100
1101        if !self.pending_linearize_read_txns.is_empty() {
1102            // Cap wait time to 1s.
1103            let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
1104            let internal_cmd_tx = self.internal_cmd_tx.clone();
1105            task::spawn(|| "deferred_read_txns", async move {
1106                tokio::time::sleep(remaining_ms).await;
1107                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
1108                let result = internal_cmd_tx.send(Message::LinearizeReads);
1109                if let Err(e) = result {
1110                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1111                }
1112            });
1113        }
1114    }
1115}