Skip to main content

mz_adapter/coord/
message_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
11//! messages from various sources (ex: controller, clients, background tasks, etc).
12
13use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_ore::now::EpochMillis;
24use mz_ore::option::OptionExt;
25use mz_ore::tracing::OpenTelemetryContext;
26use mz_ore::{soft_assert_or_log, task};
27use mz_persist_client::usage::ShardsUsageReferenced;
28use mz_repr::{Datum, Diff, Row};
29use mz_sql::ast::Statement;
30use mz_sql::names::ResolvedIds;
31use mz_sql::pure::PurifiedStatement;
32use mz_storage_client::controller::IntrospectionType;
33use opentelemetry::trace::TraceContextExt;
34use rand::{Rng, SeedableRng, rngs};
35use serde_json::json;
36use tracing::{Instrument, Level, event, info_span, warn};
37use tracing_opentelemetry::OpenTelemetrySpanExt;
38
39use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
40use crate::catalog::{BuiltinTableUpdate, Op};
41use crate::command::Command;
42use crate::coord::{
43    AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
44    CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
45};
46use crate::telemetry::{EventDetails, SegmentClientExt};
47use crate::{AdapterNotice, TimestampContext};
48
49impl Coordinator {
50    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
51    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
52    /// Because of that we purposefully move Futures of inner function calls onto the heap
53    /// (i.e. Box it).
54    #[instrument]
55    pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
56        match msg {
57            Message::Command(otel_ctx, cmd) => {
58                // TODO: We need a Span that is not none for the otel_ctx to attach the parent
59                // relationship to. If we swap the otel_ctx in `Command::Message` for a Span, we
60                // can downgrade this to a debug_span.
61                let span = tracing::info_span!("message_command").or_current();
62                span.in_scope(|| otel_ctx.attach_as_parent());
63                self.message_command(cmd).instrument(span).await
64            }
65            Message::ControllerReady { controller: _ } => {
66                let Coordinator {
67                    controller,
68                    catalog,
69                    ..
70                } = self;
71                let storage_metadata = catalog.state().storage_metadata();
72                if let Some(m) = controller
73                    .process(storage_metadata)
74                    .expect("`process` never returns an error")
75                {
76                    self.message_controller(m).boxed_local().await
77                }
78            }
79            Message::PurifiedStatementReady(ready) => {
80                self.message_purified_statement_ready(ready)
81                    .boxed_local()
82                    .await
83            }
84            Message::CreateConnectionValidationReady(ready) => {
85                self.message_create_connection_validation_ready(ready)
86                    .boxed_local()
87                    .await
88            }
89            Message::AlterConnectionValidationReady(ready) => {
90                self.message_alter_connection_validation_ready(ready)
91                    .boxed_local()
92                    .await
93            }
94            Message::TryDeferred {
95                conn_id,
96                acquired_lock,
97            } => self.try_deferred(conn_id, acquired_lock).await,
98            Message::GroupCommitInitiate(span, permit) => {
99                // Add an OpenTelemetry link to our current span.
100                tracing::Span::current().add_link(span.context().span().span_context().clone());
101                self.try_group_commit(permit)
102                    .instrument(span)
103                    .boxed_local()
104                    .await
105            }
106            Message::AdvanceTimelines => {
107                self.advance_timelines().boxed_local().await;
108            }
109            Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
110            Message::CancelPendingPeeks { conn_id } => {
111                self.cancel_pending_peeks(&conn_id);
112            }
113            Message::LinearizeReads => {
114                self.message_linearize_reads().boxed_local().await;
115            }
116            Message::StagedBatches {
117                conn_id,
118                table_id,
119                batches,
120            } => {
121                self.commit_staged_batches(conn_id, table_id, batches);
122            }
123            Message::StorageUsageSchedule => {
124                self.schedule_storage_usage_collection().boxed_local().await;
125            }
126            Message::StorageUsageFetch => {
127                self.storage_usage_fetch().boxed_local().await;
128            }
129            Message::StorageUsageUpdate(sizes) => {
130                self.storage_usage_update(sizes).boxed_local().await;
131            }
132            Message::StorageUsagePrune(expired) => {
133                self.storage_usage_prune(expired).boxed_local().await;
134            }
135            Message::ArrangementSizesSchedule => {
136                self.schedule_arrangement_sizes_collection()
137                    .boxed_local()
138                    .await;
139            }
140            Message::ArrangementSizesSnapshot => {
141                self.arrangement_sizes_snapshot().boxed_local().await;
142            }
143            Message::ArrangementSizesPrune(expired) => {
144                self.arrangement_sizes_prune(expired).boxed_local().await;
145            }
146            Message::RetireExecute {
147                otel_ctx,
148                data,
149                reason,
150            } => {
151                otel_ctx.attach_as_parent();
152                self.retire_execution(reason, data);
153            }
154            Message::ExecuteSingleStatementTransaction {
155                ctx,
156                otel_ctx,
157                stmt,
158                params,
159            } => {
160                otel_ctx.attach_as_parent();
161                self.sequence_execute_single_statement_transaction(ctx, stmt, params)
162                    .boxed_local()
163                    .await;
164            }
165            Message::PeekStageReady { ctx, span, stage } => {
166                self.sequence_staged(ctx, span, stage).boxed_local().await;
167            }
168            Message::CreateIndexStageReady { ctx, span, stage } => {
169                self.sequence_staged(ctx, span, stage).boxed_local().await;
170            }
171            Message::CreateViewStageReady { ctx, span, stage } => {
172                self.sequence_staged(ctx, span, stage).boxed_local().await;
173            }
174            Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
175                self.sequence_staged(ctx, span, stage).boxed_local().await;
176            }
177            Message::SubscribeStageReady { ctx, span, stage } => {
178                self.sequence_staged(ctx, span, stage).boxed_local().await;
179            }
180            Message::IntrospectionSubscribeStageReady { span, stage } => {
181                self.sequence_staged((), span, stage).boxed_local().await;
182            }
183            Message::ExplainTimestampStageReady { ctx, span, stage } => {
184                self.sequence_staged(ctx, span, stage).boxed_local().await;
185            }
186            Message::SecretStageReady { ctx, span, stage } => {
187                self.sequence_staged(ctx, span, stage).boxed_local().await;
188            }
189            Message::ClusterStageReady { ctx, span, stage } => {
190                self.sequence_staged(ctx, span, stage).boxed_local().await;
191            }
192            Message::DrainStatementLog => {
193                self.drain_statement_log();
194            }
195            Message::PrivateLinkVpcEndpointEvents(events) => {
196                if !self.controller.read_only() {
197                    self.controller.storage.append_introspection_updates(
198                        IntrospectionType::PrivatelinkConnectionStatusHistory,
199                        events
200                            .into_iter()
201                            .map(|e| (mz_repr::Row::from(e), Diff::ONE))
202                            .collect(),
203                    );
204                }
205            }
206            Message::CheckSchedulingPolicies => {
207                self.check_scheduling_policies().boxed_local().await;
208            }
209            Message::SchedulingDecisions(decisions) => {
210                self.handle_scheduling_decisions(decisions)
211                    .boxed_local()
212                    .await;
213            }
214            Message::DeferredStatementReady => {
215                self.handle_deferred_statement().boxed_local().await;
216            }
217        }
218    }
219
220    #[mz_ore::instrument(level = "debug")]
221    pub async fn storage_usage_fetch(&self) {
222        let internal_cmd_tx = self.internal_cmd_tx.clone();
223        let client = self.storage_usage_client.clone();
224
225        // Record the currently live shards.
226        let live_shards: BTreeSet<_> = self
227            .controller
228            .storage
229            .active_collection_metadatas()
230            .into_iter()
231            .map(|(_id, m)| m.data_shard)
232            .collect();
233
234        let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
235
236        // Spawn an asynchronous task to compute the storage usage, which
237        // requires a slow scan of the underlying storage engine.
238        task::spawn(|| "storage_usage_fetch", async move {
239            let collection_metric_timer = collection_metric.start_timer();
240            let shard_sizes = client.shards_usage_referenced(live_shards).await;
241            collection_metric_timer.observe_duration();
242
243            // It is not an error for shard sizes to become ready after
244            // `internal_cmd_rx` is dropped.
245            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
246                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
247            }
248        });
249    }
250
251    #[mz_ore::instrument(level = "debug")]
252    async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
253        // Similar to audit events, use the oracle ts so this is guaranteed to
254        // increase. This is intentionally the timestamp of when collection
255        // finished, not when it started, so that we don't write data with a
256        // timestamp in the past.
257        let collection_timestamp = if self.controller.read_only() {
258            self.peek_local_write_ts().await.into()
259        } else {
260            // Getting a write timestamp bumps the write timestamp in the
261            // oracle, which we're not allowed in read-only mode.
262            self.get_local_write_ts().await.timestamp.into()
263        };
264
265        let ops = shards_usage
266            .by_shard
267            .into_iter()
268            .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
269                object_id: Some(shard_id.to_string()),
270                size_bytes: shard_usage.size_bytes(),
271                collection_timestamp,
272            })
273            .collect();
274
275        match self.catalog_transact_inner(None, ops).await {
276            Ok((table_updates, catalog_updates)) => {
277                assert!(
278                    catalog_updates.is_empty(),
279                    "applying builtin table updates does not produce catalog implications"
280                );
281
282                let internal_cmd_tx = self.internal_cmd_tx.clone();
283                let task_span =
284                    info_span!(parent: None, "coord::storage_usage_update::table_updates");
285                OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
286                task::spawn(|| "storage_usage_update_table_updates", async move {
287                    table_updates.instrument(task_span).await;
288                    // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
289                    if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
290                        warn!("internal_cmd_rx dropped before we could send: {e:?}");
291                    }
292                });
293            }
294            Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
295        }
296    }
297
298    #[mz_ore::instrument(level = "debug")]
299    async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
300        let (fut, _) = self.builtin_table_update().execute(expired).await;
301        task::spawn(|| "storage_usage_pruning_apply", async move {
302            fut.await;
303        });
304    }
305
306    pub async fn schedule_storage_usage_collection(&self) {
307        // Instead of using an `tokio::timer::Interval`, we calculate the time until the next
308        // usage collection and wait for that amount of time. This is so we can keep the intervals
309        // consistent even across restarts. If collection takes too long, it is possible that
310        // we miss an interval.
311
312        // 1) Deterministically pick some offset within the collection interval to prevent
313        // thundering herds across environments.
314        const SEED_LEN: usize = 32;
315        let mut seed = [0; SEED_LEN];
316        for (i, byte) in self
317            .catalog()
318            .state()
319            .config()
320            .environment_id
321            .organization_id()
322            .as_bytes()
323            .into_iter()
324            .take(SEED_LEN)
325            .enumerate()
326        {
327            seed[i] = *byte;
328        }
329        let storage_usage_collection_interval_ms: EpochMillis =
330            EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
331                .expect("storage usage collection interval must fit into u64");
332        let offset =
333            rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
334        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
335
336        // 2) Determine the amount of ms between now and the next collection time.
337        let previous_collection_ts =
338            (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
339        let next_collection_ts = if previous_collection_ts > now_ts {
340            previous_collection_ts
341        } else {
342            previous_collection_ts + storage_usage_collection_interval_ms
343        };
344        let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
345
346        // 3) Sleep for that amount of time, then initiate another storage usage collection.
347        let internal_cmd_tx = self.internal_cmd_tx.clone();
348        task::spawn(|| "storage_usage_collection", async move {
349            tokio::time::sleep(next_collection_interval).await;
350            if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
351                // If sending fails, the main thread has shutdown.
352            }
353        });
354    }
355
356    /// Schedules the next per-object arrangement sizes snapshot.
357    ///
358    /// Aligns each fire to an `organization_id`-seeded offset within the
359    /// interval so collections stay consistent across restarts and don't
360    /// synchronize across environments. Sleeps are capped at `MAX_SLEEP`,
361    /// so dyncfg changes (interval edits or the `0s` disable sentinel) take
362    /// effect within one cap rather than after the full interval.
363    pub async fn schedule_arrangement_sizes_collection(&self) {
364        const MAX_SLEEP: Duration = Duration::from_secs(60);
365
366        let interval_duration =
367            mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_COLLECTION_INTERVAL
368                .get(self.catalog().system_config().dyncfgs());
369
370        // `0s` disables collection. Keep polling so re-enabling takes effect
371        // within `MAX_SLEEP` rather than requiring an envd restart.
372        if interval_duration.is_zero() {
373            let internal_cmd_tx = self.internal_cmd_tx.clone();
374            task::spawn(|| "arrangement_sizes_collection_disabled", async move {
375                tokio::time::sleep(MAX_SLEEP).await;
376                let _ = internal_cmd_tx.send(Message::ArrangementSizesSchedule);
377            });
378            return;
379        }
380
381        const SEED_LEN: usize = 32;
382        let mut seed = [0; SEED_LEN];
383        for (i, byte) in self
384            .catalog()
385            .state()
386            .config()
387            .environment_id
388            .organization_id()
389            .as_bytes()
390            .into_iter()
391            .take(SEED_LEN)
392            .enumerate()
393        {
394            seed[i] = *byte;
395        }
396        let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis())
397            .expect("arrangement_size_history_collection_interval must fit into u64");
398        // `rand::random_range` panics on an empty range.
399        let interval_ms = interval_ms.max(1);
400        let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms);
401        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
402
403        let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset;
404        let next_collection_ts = if previous_collection_ts > now_ts {
405            previous_collection_ts
406        } else {
407            previous_collection_ts + interval_ms
408        };
409        let sleep_for = Duration::from_millis(next_collection_ts - now_ts);
410
411        // Within one cap of the next fire we sleep the remainder and snapshot;
412        // further out we sleep the cap and re-enter so a dyncfg change is
413        // picked up before committing to a long sleep.
414        let (capped_sleep, fire_snapshot) = if sleep_for <= MAX_SLEEP {
415            (sleep_for, true)
416        } else {
417            (MAX_SLEEP, false)
418        };
419
420        let internal_cmd_tx = self.internal_cmd_tx.clone();
421        task::spawn(|| "arrangement_sizes_collection", async move {
422            tokio::time::sleep(capped_sleep).await;
423            let msg = if fire_snapshot {
424                Message::ArrangementSizesSnapshot
425            } else {
426                Message::ArrangementSizesSchedule
427            };
428            // Send is best-effort: if the coordinator is shutting down, drop.
429            let _ = internal_cmd_tx.send(msg);
430        });
431    }
432
433    /// Snapshots the current contents of `mz_object_arrangement_sizes` and
434    /// appends them to `mz_object_arrangement_size_history`, tagged with a
435    /// shared `collection_timestamp`. Reschedules on completion.
436    ///
437    /// Each `(replica_id, object_id)` pair is recorded with a
438    /// `hydration_complete` flag derived from `mz_compute_hydration_times`:
439    /// `true` once the pair's initial hydration on that replica is finished,
440    /// `false` while still building. Consumers that want only stable sizes
441    /// should filter `WHERE hydration_complete`.
442    #[mz_ore::instrument(level = "debug")]
443    async fn arrangement_sizes_snapshot(&mut self) {
444        // The catalog server is not writable in read-only mode.
445        if self.controller.read_only() {
446            self.schedule_arrangement_sizes_collection().await;
447            return;
448        }
449
450        let collection_timer = self
451            .metrics
452            .arrangement_sizes_collection_time_seconds
453            .start_timer();
454
455        let live_item_id = self.catalog().resolve_builtin_storage_collection(
456            &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED,
457        );
458        let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id();
459        let hydration_item_id = self
460            .catalog()
461            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
462        let hydration_global_id = self
463            .catalog
464            .get_entry(&hydration_item_id)
465            .latest_global_id();
466        let history_item_id = self
467            .catalog()
468            .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
469
470        let read_ts = self.get_local_read_ts().await;
471        let snapshot = match self
472            .controller
473            .storage_collections
474            .snapshot(live_global_id, read_ts)
475            .await
476        {
477            Ok(s) => s,
478            Err(e) => {
479                tracing::warn!("arrangement sizes snapshot failed: {e:?}");
480                drop(collection_timer);
481                self.schedule_arrangement_sizes_collection().await;
482                return;
483            }
484        };
485        let mut hydration_snapshot = match self
486            .controller
487            .storage_collections
488            .snapshot(hydration_global_id, read_ts)
489            .await
490        {
491            Ok(s) => s,
492            Err(e) => {
493                tracing::warn!("arrangement sizes hydration snapshot failed: {e:?}");
494                drop(collection_timer);
495                self.schedule_arrangement_sizes_collection().await;
496                return;
497            }
498        };
499        differential_dataflow::consolidation::consolidate(&mut hydration_snapshot);
500
501        // Build the set of pairs whose initial hydration has finished
502        // (`time_ns IS NOT NULL`). The set drives the `hydration_complete`
503        // flag for each row we emit below.
504        let mut datum_vec = mz_repr::DatumVec::new();
505        let mut hydrated: BTreeSet<(String, String)> = BTreeSet::new();
506        const HYDRATION_COL_REPLICA_ID: usize = 0;
507        const HYDRATION_COL_OBJECT_ID: usize = 1;
508        const HYDRATION_COL_TIME_NS: usize = 2;
509        const HYDRATION_COL_COUNT: usize = 3;
510        for (row, diff) in &hydration_snapshot {
511            if *diff != 1 {
512                continue;
513            }
514            let datums = datum_vec.borrow_with(row);
515            if datums.len() < HYDRATION_COL_COUNT {
516                continue;
517            }
518            if datums[HYDRATION_COL_TIME_NS].is_null() {
519                continue;
520            }
521            hydrated.insert((
522                datums[HYDRATION_COL_REPLICA_ID].unwrap_str().to_string(),
523                datums[HYDRATION_COL_OBJECT_ID].unwrap_str().to_string(),
524            ));
525        }
526
527        // `collection_ts` is stamped after the snapshot so it's always >= the
528        // state the rows describe, and monotone across restarts. The snapshot
529        // read and this stamp aren't atomic, but the resulting skew is bounded
530        // by snapshot latency and negligible at this cadence.
531        let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into();
532        let collection_datum = Datum::TimestampTz(
533            mz_ore::now::to_datetime(collection_ts)
534                .try_into()
535                .expect("collection_timestamp must fit into TimestampTz"),
536        );
537
538        let mut consolidated = snapshot;
539        differential_dataflow::consolidation::consolidate(&mut consolidated);
540
541        // Column positions in `mz_object_arrangement_sizes`.
542        const LIVE_COL_REPLICA_ID: usize = 0;
543        const LIVE_COL_OBJECT_ID: usize = 1;
544        const LIVE_COL_SIZE: usize = 2;
545        const LIVE_COL_COUNT: usize = 3;
546
547        let mut skipped_malformed: u64 = 0;
548        let mut skipped_null_size: u64 = 0;
549        let mut updates: Vec<BuiltinTableUpdate> = Vec::with_capacity(consolidated.len());
550        for (row, diff) in consolidated.iter() {
551            if *diff != 1 {
552                continue;
553            }
554            let datums = datum_vec.borrow_with(row);
555            // Surface schema drift via a warn log below rather than silently
556            // skipping entire snapshots.
557            if datums.len() != LIVE_COL_COUNT {
558                skipped_malformed += 1;
559                continue;
560            }
561            let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str();
562            let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str();
563            let size_datum = datums[LIVE_COL_SIZE];
564            // The history table's `size` is non-null; fabricating zero would
565            // be misleading, so drop.
566            if size_datum.is_null() {
567                skipped_null_size += 1;
568                continue;
569            }
570            let size = size_datum.unwrap_int64();
571            // Pairs whose hydration hasn't completed yet are still recorded,
572            // tagged with `hydration_complete = false`. Consumers that care
573            // only about stable sizes can filter on `hydration_complete`.
574            let hydration_complete =
575                hydrated.contains(&(replica_id.to_string(), object_id.to_string()));
576            let new_row = Row::pack_slice(&[
577                Datum::String(replica_id),
578                Datum::String(object_id),
579                Datum::Int64(size),
580                collection_datum,
581                Datum::from(hydration_complete),
582            ]);
583            updates.push(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE));
584        }
585        if skipped_malformed > 0 {
586            warn!(
587                "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
588                 with unexpected arity"
589            );
590        }
591        if skipped_null_size > 0 {
592            tracing::debug!("skipped {skipped_null_size} live rows with null size");
593        }
594
595        let row_count = updates.len();
596        // Captures snapshot + row construction. The async table-apply below
597        // is captured separately by `mz_append_table_duration_seconds`.
598        collection_timer.observe_duration();
599
600        if !updates.is_empty() {
601            self.metrics
602                .arrangement_sizes_rows_written
603                .inc_by(u64::cast_from(row_count));
604            // TODO(arrangement-sizes): when the writeable-catalog-server plumbing
605            // in https://github.com/MaterializeInc/materialize/pull/35436 lands,
606            // append directly on `mz_catalog_server` instead of going through
607            // the environmentd builtin-table-update path.
608            let (fut, _) = self.builtin_table_update().execute(updates).await;
609            let internal_cmd_tx = self.internal_cmd_tx.clone();
610            let task_span =
611                info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates");
612            OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
613            task::spawn(|| "arrangement_sizes_snapshot_apply", async move {
614                fut.instrument(task_span).await;
615                if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) {
616                    warn!("internal_cmd_rx dropped before we could send: {e:?}");
617                }
618            });
619        } else {
620            self.schedule_arrangement_sizes_collection().await;
621        }
622
623        tracing::debug!(
624            "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
625        );
626    }
627
628    #[mz_ore::instrument(level = "debug")]
629    async fn arrangement_sizes_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
630        let (fut, _) = self.builtin_table_update().execute(expired).await;
631        task::spawn(|| "arrangement_sizes_pruning_apply", async move {
632            fut.await;
633        });
634    }
635
636    #[mz_ore::instrument(level = "debug")]
637    async fn message_command(&mut self, cmd: Command) {
638        self.handle_command(cmd).await;
639    }
640
641    #[mz_ore::instrument(level = "debug")]
642    async fn message_controller(&mut self, message: ControllerResponse) {
643        event!(Level::TRACE, message = format!("{:?}", message));
644        match message {
645            ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
646                self.handle_peek_notification(uuid, response, otel_ctx);
647            }
648            ControllerResponse::SubscribeResponse(sink_id, response) => {
649                if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
650                    self.active_compute_sinks.get_mut(&sink_id)
651                {
652                    let finished = active_subscribe.process_response(response);
653                    if finished {
654                        self.retire_compute_sinks(btreemap! {
655                            sink_id => ActiveComputeSinkRetireReason::Finished,
656                        })
657                        .await;
658                    }
659
660                    soft_assert_or_log!(
661                        !self.introspection_subscribes.contains_key(&sink_id),
662                        "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
663                         and `introspection_subscribes`",
664                    );
665                } else if self.introspection_subscribes.contains_key(&sink_id) {
666                    self.handle_introspection_subscribe_batch(sink_id, response)
667                        .await;
668                } else {
669                    // Cancellation may cause us to receive responses for subscribes no longer
670                    // tracked, so we quietly ignore them.
671                }
672            }
673            ControllerResponse::CopyToResponse(sink_id, response) => {
674                match self.drop_compute_sink(sink_id).await {
675                    Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
676                        active_copy_to.retire_with_response(response);
677                    }
678                    _ => {
679                        // Cancellation may cause us to receive responses for subscribes no longer
680                        // tracked, so we quietly ignore them.
681                    }
682                }
683            }
684            ControllerResponse::WatchSetFinished(ws_ids) => {
685                let now = self.now();
686                for ws_id in ws_ids {
687                    let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
688                        continue;
689                    };
690                    self.connection_watch_sets
691                        .get_mut(&conn_id)
692                        .expect("corrupted coordinator state: unknown connection id")
693                        .remove(&ws_id);
694                    if self.connection_watch_sets[&conn_id].is_empty() {
695                        self.connection_watch_sets.remove(&conn_id);
696                    }
697
698                    match rsp {
699                        WatchSetResponse::StatementDependenciesReady(id, ev) => {
700                            self.record_statement_lifecycle_event(&id, &ev, now);
701                        }
702                        WatchSetResponse::AlterSinkReady(ctx) => {
703                            self.sequence_alter_sink_finish(ctx).await;
704                        }
705                        WatchSetResponse::AlterMaterializedViewReady(ctx) => {
706                            self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
707                                .await;
708                        }
709                    }
710                }
711            }
712        }
713    }
714
715    #[mz_ore::instrument(level = "debug")]
716    async fn message_purified_statement_ready(
717        &mut self,
718        PurifiedStatementReady {
719            ctx,
720            result,
721            params,
722            mut plan_validity,
723            original_stmt,
724            otel_ctx,
725        }: PurifiedStatementReady,
726    ) {
727        otel_ctx.attach_as_parent();
728
729        // Ensure that all dependencies still exist after purification, as a
730        // `DROP CONNECTION` or other `DROP` may have sneaked in. If any have gone missing, we
731        // repurify the original statement. This will either produce a nice
732        // "unknown connector" error, or pick up a new connector that has
733        // replaced the dropped connector.
734        //
735        // n.b. an `ALTER CONNECTION` occurring during purification is OK
736        // because we always look up/populate a connection's state after
737        // committing to the catalog, so are guaranteed to see the connection's
738        // most recent version.
739        if plan_validity.check(self.catalog()).is_err() {
740            self.handle_execute_inner(original_stmt, params, ctx).await;
741            return;
742        }
743
744        let purified_statement = match result {
745            Ok(ok) => ok,
746            Err(e) => return ctx.retire(Err(e)),
747        };
748
749        let plan = match purified_statement {
750            PurifiedStatement::PurifiedCreateSource {
751                create_progress_subsource_stmt,
752                create_source_stmt,
753                subsources,
754                available_source_references,
755            } => self
756                .plan_purified_create_source(
757                    &ctx,
758                    params,
759                    create_progress_subsource_stmt,
760                    create_source_stmt,
761                    subsources,
762                    available_source_references,
763                )
764                .await
765                .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
766            PurifiedStatement::PurifiedAlterSourceAddSubsources {
767                source_name,
768                options,
769                subsources,
770            } => self
771                .plan_purified_alter_source_add_subsource(
772                    ctx.session(),
773                    params,
774                    source_name,
775                    options,
776                    subsources,
777                )
778                .await
779                .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
780            PurifiedStatement::PurifiedAlterSourceRefreshReferences {
781                source_name,
782                available_source_references,
783            } => self
784                .plan_purified_alter_source_refresh_references(
785                    ctx.session(),
786                    params,
787                    source_name,
788                    available_source_references,
789                )
790                .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
791            o @ (PurifiedStatement::PurifiedAlterSource { .. }
792            | PurifiedStatement::PurifiedCreateSink(..)
793            | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
794                // Unify these into a `Statement`.
795                let stmt = match o {
796                    PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
797                        Statement::AlterSource(alter_source_stmt)
798                    }
799                    PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
800                        Statement::CreateTableFromSource(stmt)
801                    }
802                    PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
803                    PurifiedStatement::PurifiedCreateSource { .. }
804                    | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
805                    | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
806                        unreachable!("not part of exterior match stmt")
807                    }
808                };
809
810                // Determine all dependencies, not just those in the statement
811                // itself.
812                let catalog = self.catalog().for_session(ctx.session());
813                let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
814                self.plan_statement(ctx.session(), stmt, &params, &resolved_ids)
815                    .map(|(plan, sql_impl_ids)| (plan, resolved_ids, sql_impl_ids))
816            }
817        };
818
819        match plan {
820            Ok((plan, resolved_ids, sql_impl_ids)) => {
821                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
822                    .await
823            }
824            Err(e) => ctx.retire(Err(e)),
825        }
826    }
827
828    #[mz_ore::instrument(level = "debug")]
829    async fn message_create_connection_validation_ready(
830        &mut self,
831        CreateConnectionValidationReady {
832            mut ctx,
833            result,
834            connection_id,
835            connection_gid,
836            mut plan_validity,
837            otel_ctx,
838            resolved_ids,
839        }: CreateConnectionValidationReady,
840    ) {
841        otel_ctx.attach_as_parent();
842
843        // Ensure that all dependencies still exist after validation, as a
844        // `DROP SECRET` may have sneaked in.
845        //
846        // WARNING: If we support `ALTER SECRET`, we'll need to also check
847        // for connectors that were altered while we were purifying.
848        if let Err(e) = plan_validity.check(self.catalog()) {
849            if self.secrets_controller.delete(connection_id).await.is_ok() {
850                self.caching_secrets_reader.invalidate(connection_id);
851            }
852            return ctx.retire(Err(e));
853        }
854
855        let plan = match result {
856            Ok(ok) => ok,
857            Err(e) => {
858                if self.secrets_controller.delete(connection_id).await.is_ok() {
859                    self.caching_secrets_reader.invalidate(connection_id);
860                }
861                return ctx.retire(Err(e));
862            }
863        };
864
865        let result = self
866            .sequence_create_connection_stage_finish(
867                &mut ctx,
868                connection_id,
869                connection_gid,
870                plan,
871                resolved_ids,
872            )
873            .await;
874        ctx.retire(result);
875    }
876
877    #[mz_ore::instrument(level = "debug")]
878    async fn message_alter_connection_validation_ready(
879        &mut self,
880        AlterConnectionValidationReady {
881            mut ctx,
882            result,
883            connection_id,
884            connection_gid: _,
885            mut plan_validity,
886            otel_ctx,
887            resolved_ids: _,
888        }: AlterConnectionValidationReady,
889    ) {
890        otel_ctx.attach_as_parent();
891
892        // Ensure that all dependencies still exist after validation, as a
893        // `DROP SECRET` may have sneaked in.
894        //
895        // WARNING: If we support `ALTER SECRET`, we'll need to also check
896        // for connectors that were altered while we were purifying.
897        if let Err(e) = plan_validity.check(self.catalog()) {
898            return ctx.retire(Err(e));
899        }
900
901        let conn = match result {
902            Ok(ok) => ok,
903            Err(e) => {
904                return ctx.retire(Err(e));
905            }
906        };
907
908        let result = self
909            .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
910            .await;
911        ctx.retire(result);
912    }
913
914    #[mz_ore::instrument(level = "debug")]
915    async fn message_cluster_event(&mut self, event: ClusterEvent) {
916        event!(Level::TRACE, event = format!("{:?}", event));
917
918        if let Some(segment_client) = &self.segment_client {
919            let env_id = &self.catalog().config().environment_id;
920            let mut properties = json!({
921                "cluster_id": event.cluster_id.to_string(),
922                "replica_id": event.replica_id.to_string(),
923                "process_id": event.process_id,
924                "status": event.status.as_kebab_case_str(),
925            });
926            match event.status {
927                ClusterStatus::Online => (),
928                ClusterStatus::Offline(reason) => {
929                    let properties = match &mut properties {
930                        serde_json::Value::Object(map) => map,
931                        _ => unreachable!(),
932                    };
933                    properties.insert(
934                        "reason".into(),
935                        json!(reason.display_or("unknown").to_string()),
936                    );
937                }
938            };
939            segment_client.environment_track(
940                env_id,
941                "Cluster Changed Status",
942                properties,
943                EventDetails {
944                    timestamp: Some(event.time),
945                    ..Default::default()
946                },
947            );
948        }
949
950        // It is possible that we receive a status update for a replica that has
951        // already been dropped from the catalog. Just ignore these events.
952        let Some(replica_statues) = self
953            .cluster_replica_statuses
954            .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
955        else {
956            return;
957        };
958
959        if event.status != replica_statues[&event.process_id].status {
960            if !self.controller.read_only() {
961                let offline_reason = match event.status {
962                    ClusterStatus::Online => None,
963                    ClusterStatus::Offline(None) => None,
964                    ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
965                };
966                let row = Row::pack_slice(&[
967                    Datum::String(&event.replica_id.to_string()),
968                    Datum::UInt64(event.process_id),
969                    Datum::String(event.status.as_kebab_case_str()),
970                    Datum::from(offline_reason.as_deref()),
971                    Datum::TimestampTz(event.time.try_into().expect("must fit")),
972                ]);
973                self.controller.storage.append_introspection_updates(
974                    IntrospectionType::ReplicaStatusHistory,
975                    vec![(row, Diff::ONE)],
976                );
977            }
978
979            let old_replica_status =
980                ClusterReplicaStatuses::cluster_replica_status(replica_statues);
981
982            let new_process_status = ClusterReplicaProcessStatus {
983                status: event.status,
984                time: event.time,
985            };
986            self.cluster_replica_statuses.ensure_cluster_status(
987                event.cluster_id,
988                event.replica_id,
989                event.process_id,
990                new_process_status,
991            );
992
993            let cluster = self.catalog().get_cluster(event.cluster_id);
994            let replica = cluster.replica(event.replica_id).expect("Replica exists");
995            let new_replica_status = self
996                .cluster_replica_statuses
997                .get_cluster_replica_status(event.cluster_id, event.replica_id);
998
999            if old_replica_status != new_replica_status {
1000                let notifier = self.broadcast_notice_tx();
1001                let notice = AdapterNotice::ClusterReplicaStatusChanged {
1002                    cluster: cluster.name.clone(),
1003                    replica: replica.name.clone(),
1004                    status: new_replica_status,
1005                    time: event.time,
1006                };
1007                notifier(notice);
1008            }
1009        }
1010    }
1011
1012    #[mz_ore::instrument(level = "debug")]
1013    /// Linearizes sending the results of a read transaction by,
1014    ///   1. Holding back any results that were executed at some point in the future, until the
1015    ///   containing timeline has advanced to that point in the future.
1016    ///   2. Confirming that we are still the current leader before sending results to the client.
1017    async fn message_linearize_reads(&mut self) {
1018        let mut shortest_wait = Duration::MAX;
1019        let mut ready_txns = Vec::new();
1020
1021        // Cache for `TimestampOracle::read_ts` calls. These are somewhat
1022        // expensive so we cache the value. This is correct since all we're
1023        // risking is being too conservative. We will not accidentally "release"
1024        // a result too early.
1025        let mut cached_oracle_ts = BTreeMap::new();
1026
1027        for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
1028            if let TimestampContext::TimelineTimestamp {
1029                timeline,
1030                chosen_ts,
1031                oracle_ts,
1032            } = read_txn.timestamp_context()
1033            {
1034                let oracle_ts = match oracle_ts {
1035                    Some(oracle_ts) => oracle_ts,
1036                    None => {
1037                        // There was no oracle timestamp, so no need to delay.
1038                        ready_txns.push(read_txn);
1039                        continue;
1040                    }
1041                };
1042
1043                if chosen_ts <= oracle_ts {
1044                    // Chosen ts was already <= the oracle ts, so we're good
1045                    // to go!
1046                    ready_txns.push(read_txn);
1047                    continue;
1048                }
1049
1050                // See what the oracle timestamp is now and delay when needed.
1051                let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
1052                let current_oracle_ts = match current_oracle_ts {
1053                    btree_map::Entry::Vacant(entry) => {
1054                        let timestamp_oracle = self.get_timestamp_oracle(timeline);
1055                        let read_ts = timestamp_oracle.read_ts().await;
1056                        entry.insert(read_ts.clone());
1057                        read_ts
1058                    }
1059                    btree_map::Entry::Occupied(entry) => entry.get().clone(),
1060                };
1061
1062                if *chosen_ts <= current_oracle_ts {
1063                    ready_txns.push(read_txn);
1064                } else {
1065                    let wait =
1066                        Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
1067                    if wait < shortest_wait {
1068                        shortest_wait = wait;
1069                    }
1070                    read_txn.num_requeues += 1;
1071                    self.pending_linearize_read_txns.insert(conn_id, read_txn);
1072                }
1073            } else {
1074                ready_txns.push(read_txn);
1075            }
1076        }
1077
1078        if !ready_txns.is_empty() {
1079            // Sniff out one ctx, this is where tracing breaks down because we
1080            // process all outstanding txns as a batch here.
1081            let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
1082            let span = tracing::debug_span!("message_linearize_reads");
1083            otel_ctx.attach_as_parent_to(&span);
1084
1085            let now = Instant::now();
1086            for ready_txn in ready_txns {
1087                let span = tracing::debug_span!("retire_read_results");
1088                ready_txn.otel_ctx.attach_as_parent_to(&span);
1089                let _entered = span.enter();
1090                self.metrics
1091                    .linearize_message_seconds
1092                    .with_label_values(&[
1093                        ready_txn.txn.label(),
1094                        if ready_txn.num_requeues == 0 {
1095                            "true"
1096                        } else {
1097                            "false"
1098                        },
1099                    ])
1100                    .observe((now - ready_txn.created).as_secs_f64());
1101                if let Some((ctx, result)) = ready_txn.txn.finish() {
1102                    ctx.retire(result);
1103                }
1104            }
1105        }
1106
1107        if !self.pending_linearize_read_txns.is_empty() {
1108            // Cap wait time to 1s.
1109            let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
1110            let internal_cmd_tx = self.internal_cmd_tx.clone();
1111            task::spawn(|| "deferred_read_txns", async move {
1112                tokio::time::sleep(remaining_ms).await;
1113                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
1114                let result = internal_cmd_tx.send(Message::LinearizeReads);
1115                if let Err(e) = result {
1116                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1117                }
1118            });
1119        }
1120    }
1121}